blob: d331d3c5d838b1b78818f20d035bfd1f23b26bc0 [file] [log] [blame]
// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
#define BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_
#include <stddef.h>
#include <memory>
#include <string>
#include <vector>
#include "base/base_export.h"
#include "base/containers/stack.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ref_counted.h"
#include "base/strings/string_piece.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/waitable_event.h"
#include "base/task_runner.h"
#include "base/task_scheduler/priority_queue.h"
#include "base/task_scheduler/scheduler_lock.h"
#include "base/task_scheduler/scheduler_worker.h"
#include "base/task_scheduler/scheduler_worker_pool.h"
#include "base/task_scheduler/scheduler_worker_stack.h"
#include "base/task_scheduler/sequence.h"
#include "base/task_scheduler/task.h"
#include "base/task_scheduler/tracked_ref.h"
#include "base/time/time.h"
#include "build_config.h"
namespace base {
class SchedulerWorkerObserver;
class SchedulerWorkerPoolParams;
namespace internal {
class DelayedTaskManager;
class TaskTracker;
// A pool of workers that run Tasks.
//
// The pool doesn't create threads until Start() is called. Tasks can be posted
// at any time but will not run until after Start() is called.
//
// This class is thread-safe.
class BASE_EXPORT SchedulerWorkerPoolImpl : public SchedulerWorkerPool {
public:
enum class WorkerEnvironment {
// No special worker environment required.
NONE,
#if defined(OS_WIN)
// Initialize a COM MTA on the worker.
COM_MTA,
#endif // defined(OS_WIN)
};
// Constructs a pool without workers.
//
// |histogram_label| is used to label the pool's histograms ("TaskScheduler."
// + histogram_name + "." + |histogram_label| + extra suffixes), it must not
// be empty. |pool_label| is used to label the pool's threads, it must not be
// empty. |priority_hint| is the preferred thread priority; the actual thread
// priority depends on shutdown state and platform capabilities.
// |task_tracker| keeps track of tasks. |delayed_task_manager| handles tasks
// posted with a delay.
SchedulerWorkerPoolImpl(StringPiece histogram_label,
StringPiece pool_label,
ThreadPriority priority_hint,
TrackedRef<TaskTracker> task_tracker,
DelayedTaskManager* delayed_task_manager);
// Creates workers following the |params| specification, allowing existing and
// future tasks to run. Uses |service_thread_task_runner| to monitor for
// blocked threads in the pool. If specified, |scheduler_worker_observer| will
// be notified when a worker enters and exits its main function. It must not
// be destroyed before JoinForTesting() has returned (must never be destroyed
// in production). |worker_environment| specifies any requested environment to
// execute the tasks. Can only be called once. CHECKs on failure.
void Start(const SchedulerWorkerPoolParams& params,
scoped_refptr<TaskRunner> service_thread_task_runner,
SchedulerWorkerObserver* scheduler_worker_observer,
WorkerEnvironment worker_environment);
// Destroying a SchedulerWorkerPoolImpl returned by Create() is not allowed in
// production; it is always leaked. In tests, it can only be destroyed after
// JoinForTesting() has returned.
~SchedulerWorkerPoolImpl() override;
// SchedulerWorkerPool:
void JoinForTesting() override;
// Returns the maximum number of non-blocked tasks that can run concurrently
// in this pool.
//
// TODO(fdoray): Remove this method. https://crbug.com/687264
int GetMaxConcurrentNonBlockedTasksDeprecated() const;
// Waits until at least |n| workers are idle. Note that while workers are
// disallowed from cleaning up during this call: tests using a custom
// |suggested_reclaim_time_| need to be careful to invoke this swiftly after
// unblocking the waited upon workers as: if a worker is already detached by
// the time this is invoked, it will never make it onto the idle stack and
// this call will hang.
void WaitForWorkersIdleForTesting(size_t n);
// Waits until all workers are idle.
void WaitForAllWorkersIdleForTesting();
// Waits until |n| workers have cleaned up. Tests that use this must:
// - Invoke WaitForWorkersCleanedUpForTesting(n) well before any workers
// have had time to clean up.
// - Have a long enough |suggested_reclaim_time_| to strengthen the above.
// - Only invoke this once (currently doesn't support waiting for multiple
// cleanup phases in the same test).
void WaitForWorkersCleanedUpForTesting(size_t n);
// Returns the number of workers in this worker pool.
size_t NumberOfWorkersForTesting() const;
// Returns |worker_capacity_|.
size_t GetWorkerCapacityForTesting() const;
// Returns the number of workers that are idle (i.e. not running tasks).
size_t NumberOfIdleWorkersForTesting() const;
// Sets the MayBlock waiting threshold to TimeDelta::Max().
void MaximizeMayBlockThresholdForTesting();
private:
class SchedulerWorkerDelegateImpl;
// Friend tests so that they can access |kBlockedWorkersPollPeriod| and
// BlockedThreshold().
friend class TaskSchedulerWorkerPoolBlockingTest;
friend class TaskSchedulerWorkerPoolMayBlockTest;
// The period between calls to AdjustWorkerCapacity() when the pool is at
// capacity. This value was set unscientifically based on intuition and may be
// adjusted in the future.
static constexpr TimeDelta kBlockedWorkersPollPeriod =
TimeDelta::FromMilliseconds(50);
// SchedulerWorkerPool:
void OnCanScheduleSequence(scoped_refptr<Sequence> sequence) override;
// Waits until at least |n| workers are idle. |lock_| must be held to call
// this function.
void WaitForWorkersIdleLockRequiredForTesting(size_t n);
// Wakes up the last worker from this worker pool to go idle, if any.
void WakeUpOneWorker();
// Performs the same action as WakeUpOneWorker() except asserts |lock_| is
// acquired rather than acquires it and returns true if worker wakeups are
// permitted.
bool WakeUpOneWorkerLockRequired();
// Adds a worker, if needed, to maintain one idle worker, |worker_capacity_|
// permitting.
void MaintainAtLeastOneIdleWorkerLockRequired();
// Adds |worker| to |idle_workers_stack_|.
void AddToIdleWorkersStackLockRequired(SchedulerWorker* worker);
// Peeks from |idle_workers_stack_|.
const SchedulerWorker* PeekAtIdleWorkersStackLockRequired() const;
// Removes |worker| from |idle_workers_stack_|.
void RemoveFromIdleWorkersStackLockRequired(SchedulerWorker* worker);
// Returns true if worker cleanup is permitted.
bool CanWorkerCleanupForTestingLockRequired();
// Tries to add a new SchedulerWorker to the pool. Returns the new
// SchedulerWorker on success, nullptr otherwise. Cannot be called before
// Start(). Must be called under the protection of |lock_|.
SchedulerWorker* CreateRegisterAndStartSchedulerWorkerLockRequired();
// Returns the number of workers in the pool that should not run tasks due to
// the pool being over worker capacity.
size_t NumberOfExcessWorkersLockRequired() const;
// Examines the list of SchedulerWorkers and increments |worker_capacity_| for
// each worker that has been within the scope of a MAY_BLOCK
// ScopedBlockingCall for more than BlockedThreshold().
void AdjustWorkerCapacity();
// Returns the threshold after which the worker capacity is increased to
// compensate for a worker that is within a MAY_BLOCK ScopedBlockingCall.
TimeDelta MayBlockThreshold() const;
// Starts calling AdjustWorkerCapacity() periodically on
// |service_thread_task_runner_| if not already requested.
void PostAdjustWorkerCapacityTaskIfNeeded();
// Calls AdjustWorkerCapacity() and schedules it again as necessary. May only
// be called from the service thread.
void AdjustWorkerCapacityTaskFunction();
// Returns true if AdjustWorkerCapacity() should periodically be called on
// |service_thread_task_runner_|.
bool ShouldPeriodicallyAdjustWorkerCapacityLockRequired();
void DecrementWorkerCapacityLockRequired();
void IncrementWorkerCapacityLockRequired();
const std::string pool_label_;
const ThreadPriority priority_hint_;
// PriorityQueue from which all threads of this worker pool get work.
PriorityQueue shared_priority_queue_;
// Suggested reclaim time for workers. Initialized by Start(). Never modified
// afterwards (i.e. can be read without synchronization after Start()).
TimeDelta suggested_reclaim_time_;
SchedulerBackwardCompatibility backward_compatibility_;
// Synchronizes accesses to |workers_|, |worker_capacity_|,
// |num_pending_may_block_workers_|, |idle_workers_stack_|,
// |idle_workers_stack_cv_for_testing_|, |num_wake_ups_before_start_|,
// |cleanup_timestamps_|, |polling_worker_capacity_|,
// |worker_cleanup_disallowed_for_testing_|,
// |num_workers_cleaned_up_for_testing_|,
// |SchedulerWorkerDelegateImpl::is_on_idle_workers_stack_|,
// |SchedulerWorkerDelegateImpl::incremented_worker_capacity_since_blocked_|
// and |SchedulerWorkerDelegateImpl::may_block_start_time_|. Has
// |shared_priority_queue_|'s lock as its predecessor so that a worker can be
// pushed to |idle_workers_stack_| within the scope of a Transaction (more
// details in GetWork()).
mutable SchedulerLock lock_;
// All workers owned by this worker pool.
std::vector<scoped_refptr<SchedulerWorker>> workers_;
// Workers can be added as needed up until there are |worker_capacity_|
// workers.
size_t worker_capacity_ = 0;
// Initial value of |worker_capacity_| as set in Start().
size_t initial_worker_capacity_ = 0;
// Number workers that are within the scope of a MAY_BLOCK ScopedBlockingCall
// but haven't caused a worker capacity increase yet.
int num_pending_may_block_workers_ = 0;
// Environment to be initialized per worker.
WorkerEnvironment worker_environment_ = WorkerEnvironment::NONE;
// Stack of idle workers. Initially, all workers are on this stack. A worker
// is removed from the stack before its WakeUp() function is called and when
// it receives work from GetWork() (a worker calls GetWork() when its sleep
// timeout expires, even if its WakeUp() method hasn't been called). A worker
// is pushed on this stack when it receives nullptr from GetWork().
SchedulerWorkerStack idle_workers_stack_;
// Signaled when a worker is added to the idle workers stack.
std::unique_ptr<ConditionVariable> idle_workers_stack_cv_for_testing_;
// Number of wake ups that occurred before Start(). Never modified after
// Start() (i.e. can be read without synchronization after Start()).
int num_wake_ups_before_start_ = 0;
// Stack that contains the timestamps of when workers get cleaned up.
// Timestamps get popped off the stack as new workers are added.
base::stack<TimeTicks, std::vector<TimeTicks>> cleanup_timestamps_;
// Whether we are currently polling for necessary adjustments to
// |worker_capacity_|.
bool polling_worker_capacity_ = false;
// Indicates to the delegates that workers are not permitted to cleanup.
bool worker_cleanup_disallowed_for_testing_ = false;
// Counts the number of workers cleaned up since Start(). Tests with a custom
// |suggested_reclaim_time_| can wait on a specific number of workers being
// cleaned up via WaitForWorkersCleanedUpForTesting().
size_t num_workers_cleaned_up_for_testing_ = 0;
// Signaled, if non-null, when |num_workers_cleaned_up_for_testing_| is
// incremented.
std::unique_ptr<ConditionVariable> num_workers_cleaned_up_for_testing_cv_;
// Used for testing and makes MayBlockThreshold() return the maximum
// TimeDelta.
AtomicFlag maximum_blocked_threshold_for_testing_;
#if DCHECK_IS_ON()
// Set at the start of JoinForTesting().
AtomicFlag join_for_testing_started_;
#endif
scoped_refptr<TaskRunner> service_thread_task_runner_;
// Optional observer notified when a worker enters and exits its main
// function. Set in Start() and never modified afterwards.
SchedulerWorkerObserver* scheduler_worker_observer_ = nullptr;
// Ensures recently cleaned up workers (ref.
// SchedulerWorkerDelegateImpl::CleanupLockRequired()) had time to exit as
// they have a raw reference to |this| (and to TaskTracker) which can
// otherwise result in racy use-after-frees per no longer being part of
// |workers_| and hence not being explicitly joined in JoinForTesting() :
// https://crbug.com/810464.
TrackedRefFactory<SchedulerWorkerPoolImpl> tracked_ref_factory_;
DISALLOW_COPY_AND_ASSIGN(SchedulerWorkerPoolImpl);
};
} // namespace internal
} // namespace base
#endif // BASE_TASK_SCHEDULER_SCHEDULER_WORKER_POOL_IMPL_H_