|  | // Copyright 2017 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "base/task_scheduler/scheduler_worker_pool.h" | 
|  |  | 
|  | #include "base/bind.h" | 
|  | #include "base/bind_helpers.h" | 
|  | #include "base/lazy_instance.h" | 
|  | #include "base/task_scheduler/delayed_task_manager.h" | 
|  | #include "base/task_scheduler/task_tracker.h" | 
|  | #include "base/threading/thread_local.h" | 
|  |  | 
|  | namespace base { | 
|  | namespace internal { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | // The number of SchedulerWorkerPool that are alive in this process. This | 
|  | // variable should only be incremented when the SchedulerWorkerPool instances | 
|  | // are brought up (on the main thread; before any tasks are posted) and | 
|  | // decremented when the same instances are brought down (i.e., only when unit | 
|  | // tests tear down the task environment and never in production). This makes the | 
|  | // variable const while worker threads are up and as such it doesn't need to be | 
|  | // atomic. It is used to tell when a task is posted from the main thread after | 
|  | // the task environment was brought down in unit tests so that | 
|  | // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting | 
|  | // such callers know they should complete necessary work synchronously. Note: | 
|  | // |!g_active_pools_count| is generally equivalent to | 
|  | // |!TaskScheduler::GetInstance()| but has the advantage of being valid in | 
|  | // task_scheduler unit tests that don't instantiate a full TaskScheduler. | 
|  | int g_active_pools_count = 0; | 
|  |  | 
|  | // SchedulerWorkerPool that owns the current thread, if any. | 
|  | LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 
|  | tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 
|  |  | 
|  | const SchedulerWorkerPool* GetCurrentWorkerPool() { | 
|  | return tls_current_worker_pool.Get().Get(); | 
|  | } | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | // A task runner that runs tasks in parallel. | 
|  | class SchedulerParallelTaskRunner : public TaskRunner { | 
|  | public: | 
|  | // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 
|  | // long as |worker_pool| is alive. | 
|  | // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
|  | SchedulerParallelTaskRunner(const TaskTraits& traits, | 
|  | SchedulerWorkerPool* worker_pool) | 
|  | : traits_(traits), worker_pool_(worker_pool) { | 
|  | DCHECK(worker_pool_); | 
|  | } | 
|  |  | 
|  | // TaskRunner: | 
|  | bool PostDelayedTask(const Location& from_here, | 
|  | OnceClosure closure, | 
|  | TimeDelta delay) override { | 
|  | if (!g_active_pools_count) | 
|  | return false; | 
|  |  | 
|  | // Post the task as part of a one-off single-task Sequence. | 
|  | return worker_pool_->PostTaskWithSequence( | 
|  | Task(from_here, std::move(closure), traits_, delay), | 
|  | MakeRefCounted<Sequence>()); | 
|  | } | 
|  |  | 
|  | bool RunsTasksInCurrentSequence() const override { | 
|  | return GetCurrentWorkerPool() == worker_pool_; | 
|  | } | 
|  |  | 
|  | private: | 
|  | ~SchedulerParallelTaskRunner() override = default; | 
|  |  | 
|  | const TaskTraits traits_; | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 
|  | }; | 
|  |  | 
|  | // A task runner that runs tasks in sequence. | 
|  | class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 
|  | public: | 
|  | // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 
|  | // so long as |worker_pool| is alive. | 
|  | // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
|  | SchedulerSequencedTaskRunner(const TaskTraits& traits, | 
|  | SchedulerWorkerPool* worker_pool) | 
|  | : traits_(traits), worker_pool_(worker_pool) { | 
|  | DCHECK(worker_pool_); | 
|  | } | 
|  |  | 
|  | // SequencedTaskRunner: | 
|  | bool PostDelayedTask(const Location& from_here, | 
|  | OnceClosure closure, | 
|  | TimeDelta delay) override { | 
|  | if (!g_active_pools_count) | 
|  | return false; | 
|  |  | 
|  | Task task(from_here, std::move(closure), traits_, delay); | 
|  | task.sequenced_task_runner_ref = this; | 
|  |  | 
|  | // Post the task as part of |sequence_|. | 
|  | return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); | 
|  | } | 
|  |  | 
|  | bool PostNonNestableDelayedTask(const Location& from_here, | 
|  | OnceClosure closure, | 
|  | base::TimeDelta delay) override { | 
|  | // Tasks are never nested within the task scheduler. | 
|  | return PostDelayedTask(from_here, std::move(closure), delay); | 
|  | } | 
|  |  | 
|  | bool RunsTasksInCurrentSequence() const override { | 
|  | return sequence_->token() == SequenceToken::GetForCurrentThread(); | 
|  | } | 
|  |  | 
|  | private: | 
|  | ~SchedulerSequencedTaskRunner() override = default; | 
|  |  | 
|  | // Sequence for all Tasks posted through this TaskRunner. | 
|  | const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>(); | 
|  |  | 
|  | const TaskTraits traits_; | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 
|  | }; | 
|  |  | 
|  | scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits( | 
|  | const TaskTraits& traits) { | 
|  | return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this); | 
|  | } | 
|  |  | 
|  | scoped_refptr<SequencedTaskRunner> | 
|  | SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits( | 
|  | const TaskTraits& traits) { | 
|  | return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this); | 
|  | } | 
|  |  | 
|  | bool SchedulerWorkerPool::PostTaskWithSequence( | 
|  | Task task, | 
|  | scoped_refptr<Sequence> sequence) { | 
|  | DCHECK(task.task); | 
|  | DCHECK(sequence); | 
|  |  | 
|  | if (!task_tracker_->WillPostTask(task)) | 
|  | return false; | 
|  |  | 
|  | if (task.delayed_run_time.is_null()) { | 
|  | PostTaskWithSequenceNow(std::move(task), std::move(sequence)); | 
|  | } else { | 
|  | // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 | 
|  | // for details. | 
|  | CHECK(task.task); | 
|  | delayed_task_manager_->AddDelayedTask( | 
|  | std::move(task), BindOnce( | 
|  | [](scoped_refptr<Sequence> sequence, | 
|  | SchedulerWorkerPool* worker_pool, Task task) { | 
|  | worker_pool->PostTaskWithSequenceNow( | 
|  | std::move(task), std::move(sequence)); | 
|  | }, | 
|  | std::move(sequence), Unretained(this))); | 
|  | } | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | SchedulerWorkerPool::SchedulerWorkerPool( | 
|  | TrackedRef<TaskTracker> task_tracker, | 
|  | DelayedTaskManager* delayed_task_manager) | 
|  | : task_tracker_(std::move(task_tracker)), | 
|  | delayed_task_manager_(delayed_task_manager) { | 
|  | DCHECK(task_tracker_); | 
|  | DCHECK(delayed_task_manager_); | 
|  | ++g_active_pools_count; | 
|  | } | 
|  |  | 
|  | SchedulerWorkerPool::~SchedulerWorkerPool() { | 
|  | --g_active_pools_count; | 
|  | DCHECK_GE(g_active_pools_count, 0); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPool::BindToCurrentThread() { | 
|  | DCHECK(!GetCurrentWorkerPool()); | 
|  | tls_current_worker_pool.Get().Set(this); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPool::UnbindFromCurrentThread() { | 
|  | DCHECK(GetCurrentWorkerPool()); | 
|  | tls_current_worker_pool.Get().Set(nullptr); | 
|  | } | 
|  |  | 
|  | void SchedulerWorkerPool::PostTaskWithSequenceNow( | 
|  | Task task, | 
|  | scoped_refptr<Sequence> sequence) { | 
|  | DCHECK(task.task); | 
|  | DCHECK(sequence); | 
|  |  | 
|  | // Confirm that |task| is ready to run (its delayed run time is either null or | 
|  | // in the past). | 
|  | DCHECK_LE(task.delayed_run_time, TimeTicks::Now()); | 
|  |  | 
|  | const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 
|  | if (sequence_was_empty) { | 
|  | // Try to schedule |sequence| if it was empty before |task| was inserted | 
|  | // into it. Otherwise, one of these must be true: | 
|  | // - |sequence| is already scheduled, or, | 
|  | // - The pool is running a Task from |sequence|. The pool is expected to | 
|  | //   reschedule |sequence| once it's done running the Task. | 
|  | sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this); | 
|  | if (sequence) | 
|  | OnCanScheduleSequence(std::move(sequence)); | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace internal | 
|  | }  // namespace base |