|  | // Copyright 2017 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "base/task_scheduler/scheduler_worker_pool.h" | 
|  |  | 
|  | #include <memory> | 
|  |  | 
|  | #include "base/bind.h" | 
|  | #include "base/bind_helpers.h" | 
|  | #include "base/location.h" | 
|  | #include "base/memory/ref_counted.h" | 
|  | #include "base/task_runner.h" | 
|  | #include "base/task_scheduler/delayed_task_manager.h" | 
|  | #include "base/task_scheduler/scheduler_worker_pool_impl.h" | 
|  | #include "base/task_scheduler/scheduler_worker_pool_params.h" | 
|  | #include "base/task_scheduler/task_tracker.h" | 
|  | #include "base/task_scheduler/task_traits.h" | 
|  | #include "base/task_scheduler/test_task_factory.h" | 
|  | #include "base/task_scheduler/test_utils.h" | 
|  | #include "base/test/test_timeouts.h" | 
|  | #include "base/threading/platform_thread.h" | 
|  | #include "base/threading/simple_thread.h" | 
|  | #include "base/threading/thread.h" | 
|  | #include "build/build_config.h" | 
|  | #include "testing/gtest/include/gtest/gtest.h" | 
|  |  | 
|  | #if defined(OS_WIN) | 
|  | #include "base/task_scheduler/platform_native_worker_pool_win.h" | 
|  | #endif | 
|  |  | 
|  | namespace base { | 
|  | namespace internal { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | constexpr size_t kNumWorkersInWorkerPool = 4; | 
|  | constexpr size_t kNumThreadsPostingTasks = 4; | 
|  | constexpr size_t kNumTasksPostedPerThread = 150; | 
|  |  | 
|  | enum class PoolType { | 
|  | GENERIC, | 
|  | #if defined(OS_WIN) | 
|  | WINDOWS, | 
|  | #endif | 
|  | }; | 
|  |  | 
|  | struct PoolExecutionType { | 
|  | PoolType pool_type; | 
|  | test::ExecutionMode execution_mode; | 
|  | }; | 
|  |  | 
|  | using PostNestedTask = test::TestTaskFactory::PostNestedTask; | 
|  |  | 
|  | class ThreadPostingTasks : public SimpleThread { | 
|  | public: | 
|  | // Constructs a thread that posts |num_tasks_posted_per_thread| tasks to | 
|  | // |worker_pool| through an |execution_mode| task runner. If | 
|  | // |post_nested_task| is YES, each task posted by this thread posts another | 
|  | // task when it runs. | 
|  | ThreadPostingTasks(SchedulerWorkerPool* worker_pool, | 
|  | test::ExecutionMode execution_mode, | 
|  | PostNestedTask post_nested_task) | 
|  | : SimpleThread("ThreadPostingTasks"), | 
|  | worker_pool_(worker_pool), | 
|  | post_nested_task_(post_nested_task), | 
|  | factory_(test::CreateTaskRunnerWithExecutionMode(worker_pool, | 
|  | execution_mode), | 
|  | execution_mode) { | 
|  | DCHECK(worker_pool_); | 
|  | } | 
|  |  | 
|  | const test::TestTaskFactory* factory() const { return &factory_; } | 
|  |  | 
|  | private: | 
|  | void Run() override { | 
|  | EXPECT_FALSE(factory_.task_runner()->RunsTasksInCurrentSequence()); | 
|  |  | 
|  | for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) | 
|  | EXPECT_TRUE(factory_.PostTask(post_nested_task_, Closure())); | 
|  | } | 
|  |  | 
|  | SchedulerWorkerPool* const worker_pool_; | 
|  | const scoped_refptr<TaskRunner> task_runner_; | 
|  | const PostNestedTask post_nested_task_; | 
|  | test::TestTaskFactory factory_; | 
|  |  | 
|  | DISALLOW_COPY_AND_ASSIGN(ThreadPostingTasks); | 
|  | }; | 
|  |  | 
|  | class TaskSchedulerWorkerPoolTest | 
|  | : public testing::TestWithParam<PoolExecutionType> { | 
|  | protected: | 
|  | TaskSchedulerWorkerPoolTest() | 
|  | : service_thread_("TaskSchedulerServiceThread") {} | 
|  |  | 
|  | void SetUp() override { | 
|  | service_thread_.Start(); | 
|  | delayed_task_manager_.Start(service_thread_.task_runner()); | 
|  | CreateWorkerPool(); | 
|  | } | 
|  |  | 
|  | void TearDown() override { | 
|  | service_thread_.Stop(); | 
|  | if (worker_pool_) | 
|  | worker_pool_->JoinForTesting(); | 
|  | } | 
|  |  | 
|  | void CreateWorkerPool() { | 
|  | ASSERT_FALSE(worker_pool_); | 
|  | switch (GetParam().pool_type) { | 
|  | case PoolType::GENERIC: | 
|  | worker_pool_ = std::make_unique<SchedulerWorkerPoolImpl>( | 
|  | "TestWorkerPool", "A", ThreadPriority::NORMAL, | 
|  | task_tracker_.GetTrackedRef(), &delayed_task_manager_); | 
|  | break; | 
|  | #if defined(OS_WIN) | 
|  | case PoolType::WINDOWS: | 
|  | worker_pool_ = std::make_unique<PlatformNativeWorkerPoolWin>( | 
|  | task_tracker_.GetTrackedRef(), &delayed_task_manager_); | 
|  | break; | 
|  | #endif | 
|  | } | 
|  | ASSERT_TRUE(worker_pool_); | 
|  | } | 
|  |  | 
|  | void StartWorkerPool() { | 
|  | ASSERT_TRUE(worker_pool_); | 
|  | switch (GetParam().pool_type) { | 
|  | case PoolType::GENERIC: { | 
|  | SchedulerWorkerPoolImpl* scheduler_worker_pool_impl = | 
|  | static_cast<SchedulerWorkerPoolImpl*>(worker_pool_.get()); | 
|  | scheduler_worker_pool_impl->Start( | 
|  | SchedulerWorkerPoolParams(kNumWorkersInWorkerPool, | 
|  | TimeDelta::Max()), | 
|  | service_thread_.task_runner(), nullptr, | 
|  | SchedulerWorkerPoolImpl::WorkerEnvironment::NONE); | 
|  | break; | 
|  | } | 
|  | #if defined(OS_WIN) | 
|  | case PoolType::WINDOWS: { | 
|  | PlatformNativeWorkerPoolWin* scheduler_worker_pool_windows_impl = | 
|  | static_cast<PlatformNativeWorkerPoolWin*>(worker_pool_.get()); | 
|  | scheduler_worker_pool_windows_impl->Start(); | 
|  | break; | 
|  | } | 
|  | #endif | 
|  | } | 
|  | } | 
|  |  | 
|  | Thread service_thread_; | 
|  | TaskTracker task_tracker_ = {"Test"}; | 
|  | DelayedTaskManager delayed_task_manager_; | 
|  |  | 
|  | std::unique_ptr<SchedulerWorkerPool> worker_pool_; | 
|  |  | 
|  | private: | 
|  | DISALLOW_COPY_AND_ASSIGN(TaskSchedulerWorkerPoolTest); | 
|  | }; | 
|  |  | 
|  | void ShouldNotRun() { | 
|  | ADD_FAILURE() << "Ran a task that shouldn't run."; | 
|  | } | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, PostTasks) { | 
|  | StartWorkerPool(); | 
|  | // Create threads to post tasks. | 
|  | std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | 
|  | for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) { | 
|  | threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>( | 
|  | worker_pool_.get(), GetParam().execution_mode, PostNestedTask::NO)); | 
|  | threads_posting_tasks.back()->Start(); | 
|  | } | 
|  |  | 
|  | // Wait for all tasks to run. | 
|  | for (const auto& thread_posting_tasks : threads_posting_tasks) { | 
|  | thread_posting_tasks->Join(); | 
|  | thread_posting_tasks->factory()->WaitForAllTasksToRun(); | 
|  | } | 
|  |  | 
|  | // Flush the task tracker to be sure that no task accesses its TestTaskFactory | 
|  | // after |thread_posting_tasks| is destroyed. | 
|  | task_tracker_.FlushForTesting(); | 
|  | } | 
|  |  | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, NestedPostTasks) { | 
|  | StartWorkerPool(); | 
|  | // Create threads to post tasks. Each task posted by these threads will post | 
|  | // another task when it runs. | 
|  | std::vector<std::unique_ptr<ThreadPostingTasks>> threads_posting_tasks; | 
|  | for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) { | 
|  | threads_posting_tasks.push_back(std::make_unique<ThreadPostingTasks>( | 
|  | worker_pool_.get(), GetParam().execution_mode, PostNestedTask::YES)); | 
|  | threads_posting_tasks.back()->Start(); | 
|  | } | 
|  |  | 
|  | // Wait for all tasks to run. | 
|  | for (const auto& thread_posting_tasks : threads_posting_tasks) { | 
|  | thread_posting_tasks->Join(); | 
|  | thread_posting_tasks->factory()->WaitForAllTasksToRun(); | 
|  | } | 
|  |  | 
|  | // Flush the task tracker to be sure that no task accesses its TestTaskFactory | 
|  | // after |thread_posting_tasks| is destroyed. | 
|  | task_tracker_.FlushForTesting(); | 
|  | } | 
|  |  | 
|  | // Verify that a Task can't be posted after shutdown. | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, PostTaskAfterShutdown) { | 
|  | StartWorkerPool(); | 
|  | auto task_runner = test::CreateTaskRunnerWithExecutionMode( | 
|  | worker_pool_.get(), GetParam().execution_mode); | 
|  | task_tracker_.Shutdown(); | 
|  | EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun))); | 
|  | } | 
|  |  | 
|  | // Verify that posting tasks after the pool was destroyed fails but doesn't | 
|  | // crash. | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, PostAfterDestroy) { | 
|  | StartWorkerPool(); | 
|  | auto task_runner = test::CreateTaskRunnerWithExecutionMode( | 
|  | worker_pool_.get(), GetParam().execution_mode); | 
|  | EXPECT_TRUE(task_runner->PostTask(FROM_HERE, DoNothing())); | 
|  | task_tracker_.Shutdown(); | 
|  | worker_pool_->JoinForTesting(); | 
|  | worker_pool_.reset(); | 
|  | EXPECT_FALSE(task_runner->PostTask(FROM_HERE, BindOnce(&ShouldNotRun))); | 
|  | } | 
|  |  | 
|  | // Verify that a Task runs shortly after its delay expires. | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, PostDelayedTask) { | 
|  | StartWorkerPool(); | 
|  |  | 
|  | WaitableEvent task_ran(WaitableEvent::ResetPolicy::AUTOMATIC, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED); | 
|  |  | 
|  | auto task_runner = test::CreateTaskRunnerWithExecutionMode( | 
|  | worker_pool_.get(), GetParam().execution_mode); | 
|  |  | 
|  | // Wait until the task runner is up and running to make sure the test below is | 
|  | // solely timing the delayed task, not bringing up a physical thread. | 
|  | task_runner->PostTask( | 
|  | FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran))); | 
|  | task_ran.Wait(); | 
|  | ASSERT_TRUE(!task_ran.IsSignaled()); | 
|  |  | 
|  | // Post a task with a short delay. | 
|  | TimeTicks start_time = TimeTicks::Now(); | 
|  | EXPECT_TRUE(task_runner->PostDelayedTask( | 
|  | FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_ran)), | 
|  | TestTimeouts::tiny_timeout())); | 
|  |  | 
|  | // Wait until the task runs. | 
|  | task_ran.Wait(); | 
|  |  | 
|  | // Expect the task to run after its delay expires, but no more than 250 | 
|  | // ms after that. | 
|  | const TimeDelta actual_delay = TimeTicks::Now() - start_time; | 
|  | EXPECT_GE(actual_delay, TestTimeouts::tiny_timeout()); | 
|  | EXPECT_LT(actual_delay, | 
|  | TimeDelta::FromMilliseconds(250) + TestTimeouts::tiny_timeout()); | 
|  | } | 
|  |  | 
|  | // Verify that the RunsTasksInCurrentSequence() method of a SEQUENCED TaskRunner | 
|  | // returns false when called from a task that isn't part of the sequence. Note: | 
|  | // Tests that use TestTaskFactory already verify that | 
|  | // RunsTasksInCurrentSequence() returns true when appropriate so this method | 
|  | // complements it to get full coverage of that method. | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, SequencedRunsTasksInCurrentSequence) { | 
|  | StartWorkerPool(); | 
|  | auto task_runner = test::CreateTaskRunnerWithExecutionMode( | 
|  | worker_pool_.get(), GetParam().execution_mode); | 
|  | auto sequenced_task_runner = | 
|  | worker_pool_->CreateSequencedTaskRunnerWithTraits(TaskTraits()); | 
|  |  | 
|  | WaitableEvent task_ran(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED); | 
|  | task_runner->PostTask( | 
|  | FROM_HERE, | 
|  | BindOnce( | 
|  | [](scoped_refptr<TaskRunner> sequenced_task_runner, | 
|  | WaitableEvent* task_ran) { | 
|  | EXPECT_FALSE(sequenced_task_runner->RunsTasksInCurrentSequence()); | 
|  | task_ran->Signal(); | 
|  | }, | 
|  | sequenced_task_runner, Unretained(&task_ran))); | 
|  | task_ran.Wait(); | 
|  | } | 
|  |  | 
|  | // Verify that tasks posted before Start run after Start. | 
|  | TEST_P(TaskSchedulerWorkerPoolTest, PostBeforeStart) { | 
|  | WaitableEvent task_1_running(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED); | 
|  | WaitableEvent task_2_running(WaitableEvent::ResetPolicy::MANUAL, | 
|  | WaitableEvent::InitialState::NOT_SIGNALED); | 
|  |  | 
|  | scoped_refptr<TaskRunner> task_runner = | 
|  | worker_pool_->CreateTaskRunnerWithTraits({WithBaseSyncPrimitives()}); | 
|  |  | 
|  | task_runner->PostTask( | 
|  | FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_1_running))); | 
|  | task_runner->PostTask( | 
|  | FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&task_2_running))); | 
|  |  | 
|  | // Workers should not be created and tasks should not run before the pool is | 
|  | // started. The sleep is to give time for the tasks to potentially run. | 
|  | PlatformThread::Sleep(TestTimeouts::tiny_timeout()); | 
|  | EXPECT_FALSE(task_1_running.IsSignaled()); | 
|  | EXPECT_FALSE(task_2_running.IsSignaled()); | 
|  |  | 
|  | StartWorkerPool(); | 
|  |  | 
|  | // Tasks should run shortly after the pool is started. | 
|  | task_1_running.Wait(); | 
|  | task_2_running.Wait(); | 
|  |  | 
|  | task_tracker_.FlushForTesting(); | 
|  | } | 
|  |  | 
|  | INSTANTIATE_TEST_CASE_P(GenericParallel, | 
|  | TaskSchedulerWorkerPoolTest, | 
|  | ::testing::Values(PoolExecutionType{ | 
|  | PoolType::GENERIC, test::ExecutionMode::PARALLEL})); | 
|  | INSTANTIATE_TEST_CASE_P(GenericSequenced, | 
|  | TaskSchedulerWorkerPoolTest, | 
|  | ::testing::Values(PoolExecutionType{ | 
|  | PoolType::GENERIC, | 
|  | test::ExecutionMode::SEQUENCED})); | 
|  |  | 
|  | #if defined(OS_WIN) | 
|  | INSTANTIATE_TEST_CASE_P(WinParallel, | 
|  | TaskSchedulerWorkerPoolTest, | 
|  | ::testing::Values(PoolExecutionType{ | 
|  | PoolType::WINDOWS, test::ExecutionMode::PARALLEL})); | 
|  | INSTANTIATE_TEST_CASE_P(WinSequenced, | 
|  | TaskSchedulerWorkerPoolTest, | 
|  | ::testing::Values(PoolExecutionType{ | 
|  | PoolType::WINDOWS, | 
|  | test::ExecutionMode::SEQUENCED})); | 
|  | #endif | 
|  |  | 
|  | }  // namespace internal | 
|  | }  // namespace base |