| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ |
| #define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ |
| |
| #include <stddef.h> |
| |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include "base/base_export.h" |
| #include "base/containers/stack.h" |
| #include "base/logging.h" |
| #include "base/macros.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/strings/string_piece.h" |
| #include "base/synchronization/atomic_flag.h" |
| #include "base/synchronization/condition_variable.h" |
| #include "base/synchronization/waitable_event.h" |
| #include "base/task_runner.h" |
| #include "base/task_scheduler/priority_queue.h" |
| #include "base/task_scheduler/scheduler_lock.h" |
| #include "base/task_scheduler/scheduler_worker.h" |
| #include "base/task_scheduler/scheduler_worker_pool.h" |
| #include "base/task_scheduler/scheduler_worker_stack.h" |
| #include "base/task_scheduler/sequence.h" |
| #include "base/task_scheduler/task.h" |
| #include "base/task_scheduler/tracked_ref.h" |
| #include "base/time/time.h" |
| #include "build_config.h" |
| |
| namespace base { |
| |
| class SchedulerWorkerObserver; |
| class SchedulerWorkerPoolParams; |
| |
| namespace internal { |
| |
| class DelayedTaskManager; |
| class TaskTracker; |
| |
| // A pool of workers that run Tasks. |
| // |
| // The pool doesn't create threads until Start() is called. Tasks can be posted |
| // at any time but will not run until after Start() is called. |
| // |
| // This class is thread-safe. |
| class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool { |
| public: |
| enum class WorkerEnvironment { |
| // No special worker environment required. |
| NONE, |
| #if defined(OS_WIN) |
| // Initialize a COM MTA on the worker. |
| COM_MTA, |
| #endif // defined(OS_WIN) |
| }; |
| |
| // Constructs a pool without workers. |
| // |
| // |histogram_label| is used to label the pool's histograms ("TaskScheduler." |
| // + histogram_name + "." + |histogram_label| + extra suffixes), it must not |
| // be empty. |pool_label| is used to label the pool's threads, it must not be |
| // empty. |priority_hint| is the preferred thread priority; the actual thread |
| // priority depends on shutdown state and platform capabilities. |
| // |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks |
| // posted with a delay. |
| SchedulerWorkerPoolImpl(StringPiece histogram_label, |
| StringPiece pool_label, |
| ThreadPriority priority_hint, |
| TrackedRef<TaskTracker> task_tracker, |
| DelayedTaskManager* delayed_task_manager); |
| |
| // Creates workers following the |params| specification, allowing existing and |
| // future tasks to run. Uses |service_thread_task_runner| to monitor for |
| // blocked threads in the pool. If specified, |scheduler_worker_observer| will |
| // be notified when a worker enters and exits its main function. It must not |
| // be destroyed before JoinForTesting() has returned (must never be destroyed |
| // in production). |worker_environment| specifies any requested environment to |
| // execute the tasks. Can only be called once. CHECKs on failure. |
| void Start(const SchedulerWorkerPoolParams& params, |
| scoped_refptr<TaskRunner> service_thread_task_runner, |
| SchedulerWorkerObserver* scheduler_worker_observer, |
| WorkerEnvironment worker_environment); |
| |
| // Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in |
| // production; it is always leaked. In tests, it can only be destroyed after |
| // JoinForTesting() has returned. |
| ~SchedulerWorkerPoolImpl() override; |
| |
| // SchedulerWorkerPool: |
| void JoinForTesting() override; |
| |
| // Returns the maximum number of non-blocked tasks that can run concurrently |
| // in this pool. |
| // |
| // TODO(fdoray): Remove this method. https://crbug.com/687264 |
| int GetMaxConcurrentNonBlockedTasksDeprecated() const; |
| |
| // Waits until at least |n| workers are idle. Note that while workers are |
| // disallowed from cleaning up during this call: tests using a custom |
| // |suggested_reclaim_time_| need to be careful to invoke this swiftly after |
| // unblocking the waited upon workers as: if a worker is already detached by |
| // the time this is invoked, it will never make it onto the idle stack and |
| // this call will hang. |
| void WaitForWorkersIdleForTesting(size_t n); |
| |
| // Waits until all workers are idle. |
| void WaitForAllWorkersIdleForTesting(); |
| |
| // Waits until |n| workers have cleaned up. Tests that use this must: |
| // - Invoke WaitForWorkersCleanedUpForTesting(n) well before any workers |
| // have had time to clean up. |
| // - Have a long enough |suggested_reclaim_time_| to strengthen the above. |
| // - Only invoke this once (currently doesn't support waiting for multiple |
| // cleanup phases in the same test). |
| void WaitForWorkersCleanedUpForTesting(size_t n); |
| |
| // Returns the number of workers in this worker pool. |
| size_t NumberOfWorkersForTesting() const; |
| |
| // Returns |worker_capacity_|. |
| size_t GetWorkerCapacityForTesting() const; |
| |
| // Returns the number of workers that are idle (i.e. not running tasks). |
| size_t NumberOfIdleWorkersForTesting() const; |
| |
| // Sets the MayBlock waiting threshold to TimeDelta::Max(). |
| void MaximizeMayBlockThresholdForTesting(); |
| |
| private: |
| class SchedulerWorkerDelegateImpl; |
| |
| // Friend tests so that they can access |kBlockedWorkersPollPeriod| and |
| // BlockedThreshold(). |
| friend class TaskSchedulerWorkerPoolBlockingTest; |
| friend class TaskSchedulerWorkerPoolMayBlockTest; |
| |
| // The period between calls to AdjustWorkerCapacity() when the pool is at |
| // capacity. This value was set unscientifically based on intuition and may be |
| // adjusted in the future. |
| static constexpr TimeDelta kBlockedWorkersPollPeriod = |
| TimeDelta::FromMilliseconds(50); |
| |
| // SchedulerWorkerPool: |
| void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override; |
| |
| // Waits until at least |n| workers are idle. |lock_| must be held to call |
| // this function. |
| void WaitForWorkersIdleLockRequiredForTesting(size_t n); |
| |
| // Wakes up the last worker from this worker pool to go idle, if any. |
| void WakeUpOneWorker(); |
| |
| // Performs the same action as WakeUpOneWorker() except asserts |lock_| is |
| // acquired rather than acquires it and returns true if worker wakeups are |
| // permitted. |
| bool WakeUpOneWorkerLockRequired(); |
| |
| // Adds a worker, if needed, to maintain one idle worker, |worker_capacity_| |
| // permitting. |
| void MaintainAtLeastOneIdleWorkerLockRequired(); |
| |
| // Adds |worker| to |idle_workers_stack_|. |
| void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker); |
| |
| // Peeks from |idle_workers_stack_|. |
| const SchedulerWorker* PeekAtIdleWorkersStackLockRequired() const; |
| |
| // Removes |worker| from |idle_workers_stack_|. |
| void RemoveFromIdleWorkersStackLockRequired(SchedulerWorker* worker); |
| |
| // Returns true if worker cleanup is permitted. |
| bool CanWorkerCleanupForTestingLockRequired(); |
| |
| // Tries to add a new SchedulerWorker to the pool. Returns the new |
| // SchedulerWorker on success, nullptr otherwise. Cannot be called before |
| // Start(). Must be called under the protection of |lock_|. |
| SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired(); |
| |
| // Returns the number of workers in the pool that should not run tasks due to |
| // the pool being over worker capacity. |
| size_t NumberOfExcessWorkersLockRequired() const; |
| |
| // Examines the list of SchedulerWorkers and increments |worker_capacity_| for |
| // each worker that has been within the scope of a MAY_BLOCK |
| // ScopedBlockingCall for more than BlockedThreshold(). |
| void AdjustWorkerCapacity(); |
| |
| // Returns the threshold after which the worker capacity is increased to |
| // compensate for a worker that is within a MAY_BLOCK ScopedBlockingCall. |
| TimeDelta MayBlockThreshold() const; |
| |
| // Starts calling AdjustWorkerCapacity() periodically on |
| // |service_thread_task_runner_| if not already requested. |
| void PostAdjustWorkerCapacityTaskIfNeeded(); |
| |
| // Calls AdjustWorkerCapacity() and schedules it again as necessary. May only |
| // be called from the service thread. |
| void AdjustWorkerCapacityTaskFunction(); |
| |
| // Returns true if AdjustWorkerCapacity() should periodically be called on |
| // |service_thread_task_runner_|. |
| bool ShouldPeriodicallyAdjustWorkerCapacityLockRequired(); |
| |
| void DecrementWorkerCapacityLockRequired(); |
| void IncrementWorkerCapacityLockRequired(); |
| |
| const std::string pool_label_; |
| const ThreadPriority priority_hint_; |
| |
| // PriorityQueue from which all threads of this worker pool get work. |
| PriorityQueue shared_priority_queue_; |
| |
| // Suggested reclaim time for workers. Initialized by Start(). Never modified |
| // afterwards (i.e. can be read without synchronization after Start()). |
| TimeDelta suggested_reclaim_time_; |
| |
| SchedulerBackwardCompatibility backward_compatibility_; |
| |
| // Synchronizes accesses to |workers_|, |worker_capacity_|, |
| // |num_pending_may_block_workers_|, |idle_workers_stack_|, |
| // |idle_workers_stack_cv_for_testing_|, |num_wake_ups_before_start_|, |
| // |cleanup_timestamps_|, |polling_worker_capacity_|, |
| // |worker_cleanup_disallowed_for_testing_|, |
| // |num_workers_cleaned_up_for_testing_|, |
| // |SchedulerWorkerDelegateImpl::is_on_idle_workers_stack_|, |
| // |SchedulerWorkerDelegateImpl::incremented_worker_capacity_since_blocked_| |
| // and |SchedulerWorkerDelegateImpl::may_block_start_time_|. Has |
| // |shared_priority_queue_|'s lock as its predecessor so that a worker can be |
| // pushed to |idle_workers_stack_| within the scope of a Transaction (more |
| // details in GetWork()). |
| mutable SchedulerLock lock_; |
| |
| // All workers owned by this worker pool. |
| std::vector<scoped_refptr<SchedulerWorker>> workers_; |
| |
| // Workers can be added as needed up until there are |worker_capacity_| |
| // workers. |
| size_t worker_capacity_ = 0; |
| |
| // Initial value of |worker_capacity_| as set in Start(). |
| size_t initial_worker_capacity_ = 0; |
| |
| // Number workers that are within the scope of a MAY_BLOCK ScopedBlockingCall |
| // but haven't caused a worker capacity increase yet. |
| int num_pending_may_block_workers_ = 0; |
| |
| // Environment to be initialized per worker. |
| WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE; |
| |
| // Stack of idle workers. Initially, all workers are on this stack. A worker |
| // is removed from the stack before its WakeUp() function is called and when |
| // it receives work from GetWork() (a worker calls GetWork() when its sleep |
| // timeout expires, even if its WakeUp() method hasn't been called). A worker |
| // is pushed on this stack when it receives nullptr from GetWork(). |
| SchedulerWorkerStack idle_workers_stack_; |
| |
| // Signaled when a worker is added to the idle workers stack. |
| std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_; |
| |
| // Number of wake ups that occurred before Start(). Never modified after |
| // Start() (i.e. can be read without synchronization after Start()). |
| int num_wake_ups_before_start_ = 0; |
| |
| // Stack that contains the timestamps of when workers get cleaned up. |
| // Timestamps get popped off the stack as new workers are added. |
| base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_; |
| |
| // Whether we are currently polling for necessary adjustments to |
| // |worker_capacity_|. |
| bool polling_worker_capacity_ = false; |
| |
| // Indicates to the delegates that workers are not permitted to cleanup. |
| bool worker_cleanup_disallowed_for_testing_ = false; |
| |
| // Counts the number of workers cleaned up since Start(). Tests with a custom |
| // |suggested_reclaim_time_| can wait on a specific number of workers being |
| // cleaned up via WaitForWorkersCleanedUpForTesting(). |
| size_t num_workers_cleaned_up_for_testing_ = 0; |
| |
| // Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is |
| // incremented. |
| std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_; |
| |
| // Used for testing and makes MayBlockThreshold() return the maximum |
| // TimeDelta. |
| AtomicFlag maximum_blocked_threshold_for_testing_; |
| |
| #if DCHECK_IS_ON() |
| // Set at the start of JoinForTesting(). |
| AtomicFlag join_for_testing_started_; |
| #endif |
| |
| scoped_refptr<TaskRunner> service_thread_task_runner_; |
| |
| // Optional observer notified when a worker enters and exits its main |
| // function. Set in Start() and never modified afterwards. |
| SchedulerWorkerObserver* scheduler_worker_observer_ = nullptr; |
| |
| // Ensures recently cleaned up workers (ref. |
| // SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as |
| // they have a raw reference to |this| (and to TaskTracker) which can |
| // otherwise result in racy use-after-frees per no longer being part of |
| // |workers_| and hence not being explicitly joined in JoinForTesting() : |
| // https://crbug.com/810464. |
| TrackedRefFactory<SchedulerWorkerPoolImpl> tracked_ref_factory_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl); |
| }; |
| |
| } // namespace internal |
| } // namespace base |
| |
| #endif // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_ |