| // Copyright 2017 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "base/task_scheduler/scheduler_worker_pool.h" | 
 |  | 
 | #include "base/bind.h" | 
 | #include "base/bind_helpers.h" | 
 | #include "base/lazy_instance.h" | 
 | #include "base/task_scheduler/delayed_task_manager.h" | 
 | #include "base/task_scheduler/task_tracker.h" | 
 | #include "base/threading/thread_local.h" | 
 |  | 
 | namespace base { | 
 | namespace internal { | 
 |  | 
 | namespace { | 
 |  | 
 | // The number of SchedulerWorkerPool that are alive in this process. This | 
 | // variable should only be incremented when the SchedulerWorkerPool instances | 
 | // are brought up (on the main thread; before any tasks are posted) and | 
 | // decremented when the same instances are brought down (i.e., only when unit | 
 | // tests tear down the task environment and never in production). This makes the | 
 | // variable const while worker threads are up and as such it doesn't need to be | 
 | // atomic. It is used to tell when a task is posted from the main thread after | 
 | // the task environment was brought down in unit tests so that | 
 | // SchedulerWorkerPool bound TaskRunners can return false on PostTask, letting | 
 | // such callers know they should complete necessary work synchronously. Note: | 
 | // |!g_active_pools_count| is generally equivalent to | 
 | // |!TaskScheduler::GetInstance()| but has the advantage of being valid in | 
 | // task_scheduler unit tests that don't instantiate a full TaskScheduler. | 
 | int g_active_pools_count = 0; | 
 |  | 
 | // SchedulerWorkerPool that owns the current thread, if any. | 
 | LazyInstance<ThreadLocalPointer<const SchedulerWorkerPool>>::Leaky | 
 |     tls_current_worker_pool = LAZY_INSTANCE_INITIALIZER; | 
 |  | 
 | const SchedulerWorkerPool* GetCurrentWorkerPool() { | 
 |   return tls_current_worker_pool.Get().Get(); | 
 | } | 
 |  | 
 | }  // namespace | 
 |  | 
 | // A task runner that runs tasks in parallel. | 
 | class SchedulerParallelTaskRunner : public TaskRunner { | 
 |  public: | 
 |   // Constructs a SchedulerParallelTaskRunner which can be used to post tasks so | 
 |   // long as |worker_pool| is alive. | 
 |   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
 |   SchedulerParallelTaskRunner(const TaskTraits& traits, | 
 |                               SchedulerWorkerPool* worker_pool) | 
 |       : traits_(traits), worker_pool_(worker_pool) { | 
 |     DCHECK(worker_pool_); | 
 |   } | 
 |  | 
 |   // TaskRunner: | 
 |   bool PostDelayedTask(const Location& from_here, | 
 |                        OnceClosure closure, | 
 |                        TimeDelta delay) override { | 
 |     if (!g_active_pools_count) | 
 |       return false; | 
 |  | 
 |     // Post the task as part of a one-off single-task Sequence. | 
 |     return worker_pool_->PostTaskWithSequence( | 
 |         Task(from_here, std::move(closure), traits_, delay), | 
 |         MakeRefCounted<Sequence>()); | 
 |   } | 
 |  | 
 |   bool RunsTasksInCurrentSequence() const override { | 
 |     return GetCurrentWorkerPool() == worker_pool_; | 
 |   } | 
 |  | 
 |  private: | 
 |   ~SchedulerParallelTaskRunner() override = default; | 
 |  | 
 |   const TaskTraits traits_; | 
 |   SchedulerWorkerPool* const worker_pool_; | 
 |  | 
 |   DISALLOW_COPY_AND_ASSIGN(SchedulerParallelTaskRunner); | 
 | }; | 
 |  | 
 | // A task runner that runs tasks in sequence. | 
 | class SchedulerSequencedTaskRunner : public SequencedTaskRunner { | 
 |  public: | 
 |   // Constructs a SchedulerSequencedTaskRunner which can be used to post tasks | 
 |   // so long as |worker_pool| is alive. | 
 |   // TODO(robliao): Find a concrete way to manage |worker_pool|'s memory. | 
 |   SchedulerSequencedTaskRunner(const TaskTraits& traits, | 
 |                                SchedulerWorkerPool* worker_pool) | 
 |       : traits_(traits), worker_pool_(worker_pool) { | 
 |     DCHECK(worker_pool_); | 
 |   } | 
 |  | 
 |   // SequencedTaskRunner: | 
 |   bool PostDelayedTask(const Location& from_here, | 
 |                        OnceClosure closure, | 
 |                        TimeDelta delay) override { | 
 |     if (!g_active_pools_count) | 
 |       return false; | 
 |  | 
 |     Task task(from_here, std::move(closure), traits_, delay); | 
 |     task.sequenced_task_runner_ref = this; | 
 |  | 
 |     // Post the task as part of |sequence_|. | 
 |     return worker_pool_->PostTaskWithSequence(std::move(task), sequence_); | 
 |   } | 
 |  | 
 |   bool PostNonNestableDelayedTask(const Location& from_here, | 
 |                                   OnceClosure closure, | 
 |                                   base::TimeDelta delay) override { | 
 |     // Tasks are never nested within the task scheduler. | 
 |     return PostDelayedTask(from_here, std::move(closure), delay); | 
 |   } | 
 |  | 
 |   bool RunsTasksInCurrentSequence() const override { | 
 |     return sequence_->token() == SequenceToken::GetForCurrentThread(); | 
 |   } | 
 |  | 
 |  private: | 
 |   ~SchedulerSequencedTaskRunner() override = default; | 
 |  | 
 |   // Sequence for all Tasks posted through this TaskRunner. | 
 |   const scoped_refptr<Sequence> sequence_ = MakeRefCounted<Sequence>(); | 
 |  | 
 |   const TaskTraits traits_; | 
 |   SchedulerWorkerPool* const worker_pool_; | 
 |  | 
 |   DISALLOW_COPY_AND_ASSIGN(SchedulerSequencedTaskRunner); | 
 | }; | 
 |  | 
 | scoped_refptr<TaskRunner> SchedulerWorkerPool::CreateTaskRunnerWithTraits( | 
 |     const TaskTraits& traits) { | 
 |   return MakeRefCounted<SchedulerParallelTaskRunner>(traits, this); | 
 | } | 
 |  | 
 | scoped_refptr<SequencedTaskRunner> | 
 | SchedulerWorkerPool::CreateSequencedTaskRunnerWithTraits( | 
 |     const TaskTraits& traits) { | 
 |   return MakeRefCounted<SchedulerSequencedTaskRunner>(traits, this); | 
 | } | 
 |  | 
 | bool SchedulerWorkerPool::PostTaskWithSequence( | 
 |     Task task, | 
 |     scoped_refptr<Sequence> sequence) { | 
 |   DCHECK(task.task); | 
 |   DCHECK(sequence); | 
 |  | 
 |   if (!task_tracker_->WillPostTask(task)) | 
 |     return false; | 
 |  | 
 |   if (task.delayed_run_time.is_null()) { | 
 |     PostTaskWithSequenceNow(std::move(task), std::move(sequence)); | 
 |   } else { | 
 |     // Use CHECK instead of DCHECK to crash earlier. See http://crbug.com/711167 | 
 |     // for details. | 
 |     CHECK(task.task); | 
 |     delayed_task_manager_->AddDelayedTask( | 
 |         std::move(task), BindOnce( | 
 |                              [](scoped_refptr<Sequence> sequence, | 
 |                                 SchedulerWorkerPool* worker_pool, Task task) { | 
 |                                worker_pool->PostTaskWithSequenceNow( | 
 |                                    std::move(task), std::move(sequence)); | 
 |                              }, | 
 |                              std::move(sequence), Unretained(this))); | 
 |   } | 
 |  | 
 |   return true; | 
 | } | 
 |  | 
 | SchedulerWorkerPool::SchedulerWorkerPool( | 
 |     TrackedRef<TaskTracker> task_tracker, | 
 |     DelayedTaskManager* delayed_task_manager) | 
 |     : task_tracker_(std::move(task_tracker)), | 
 |       delayed_task_manager_(delayed_task_manager) { | 
 |   DCHECK(task_tracker_); | 
 |   DCHECK(delayed_task_manager_); | 
 |   ++g_active_pools_count; | 
 | } | 
 |  | 
 | SchedulerWorkerPool::~SchedulerWorkerPool() { | 
 |   --g_active_pools_count; | 
 |   DCHECK_GE(g_active_pools_count, 0); | 
 | } | 
 |  | 
 | void SchedulerWorkerPool::BindToCurrentThread() { | 
 |   DCHECK(!GetCurrentWorkerPool()); | 
 |   tls_current_worker_pool.Get().Set(this); | 
 | } | 
 |  | 
 | void SchedulerWorkerPool::UnbindFromCurrentThread() { | 
 |   DCHECK(GetCurrentWorkerPool()); | 
 |   tls_current_worker_pool.Get().Set(nullptr); | 
 | } | 
 |  | 
 | void SchedulerWorkerPool::PostTaskWithSequenceNow( | 
 |     Task task, | 
 |     scoped_refptr<Sequence> sequence) { | 
 |   DCHECK(task.task); | 
 |   DCHECK(sequence); | 
 |  | 
 |   // Confirm that |task| is ready to run (its delayed run time is either null or | 
 |   // in the past). | 
 |   DCHECK_LE(task.delayed_run_time, TimeTicks::Now()); | 
 |  | 
 |   const bool sequence_was_empty = sequence->PushTask(std::move(task)); | 
 |   if (sequence_was_empty) { | 
 |     // Try to schedule |sequence| if it was empty before |task| was inserted | 
 |     // into it. Otherwise, one of these must be true: | 
 |     // - |sequence| is already scheduled, or, | 
 |     // - The pool is running a Task from |sequence|. The pool is expected to | 
 |     //   reschedule |sequence| once it's done running the Task. | 
 |     sequence = task_tracker_->WillScheduleSequence(std::move(sequence), this); | 
 |     if (sequence) | 
 |       OnCanScheduleSequence(std::move(sequence)); | 
 |   } | 
 | } | 
 |  | 
 | }  // namespace internal | 
 | }  // namespace base |