Implement simple WorkerPool, remove use of base::PostTask Change-Id: I90fd52e9882d3e5be32173977d61678389e03513 Reviewed-on: https://gn-review.googlesource.com/1580 Commit-Queue: Scott Graham <scottmg@chromium.org> Reviewed-by: Brett Wilson <brettw@chromium.org>
diff --git a/build/full_test.py b/build/full_test.py index b84686b..2095ddd 100755 --- a/build/full_test.py +++ b/build/full_test.py
@@ -41,7 +41,7 @@ subprocess.check_call([sys.executable, os.path.join('build', 'gen.py')]) subprocess.check_call(['ninja', '-C', 'out']) - #subprocess.check_call([os.path.join('out', 'gn_unittests')]) + subprocess.check_call([os.path.join('out', 'gn_unittests')]) orig_dir = os.getcwd() in_chrome_tree_gn = sys.argv[2]
diff --git a/build/gen.py b/build/gen.py index 8840652..86affb5 100755 --- a/build/gen.py +++ b/build/gen.py
@@ -333,6 +333,7 @@ ], 'tool': 'cxx', 'include_dirs': []}, 'gn_lib': {'sources': [ 'src/exe_path.cc', + 'src/worker_pool.cc', 'tools/gn/action_target_generator.cc', 'tools/gn/action_values.cc', 'tools/gn/analyzer.cc',
diff --git a/src/worker_pool.cc b/src/worker_pool.cc new file mode 100644 index 0000000..e204084 --- /dev/null +++ b/src/worker_pool.cc
@@ -0,0 +1,98 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "worker_pool.h" + +#include "base/command_line.h" +#include "base/strings/string_number_conversions.h" +#include "base/sys_info.h" +#include "tools/gn/switches.h" + +namespace { + +int GetThreadCount() { + std::string thread_count = + base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( + switches::kThreads); + + // See if an override was specified on the command line. + int result; + if (!thread_count.empty() && base::StringToInt(thread_count, &result) && + result >= 1) { + return result; + } + + // Base the default number of worker threads on number of cores in the + // system. When building large projects, the speed can be limited by how fast + // the main thread can dispatch work and connect the dependency graph. If + // there are too many worker threads, the main thread can be starved and it + // will run slower overall. + // + // One less worker thread than the number of physical CPUs seems to be a + // good value, both theoretically and experimentally. But always use at + // least some workers to prevent us from being too sensitive to I/O latency + // on low-end systems. + // + // The minimum thread count is based on measuring the optimal threads for the + // Chrome build on a several-year-old 4-core MacBook. + // Almost all CPUs now are hyperthreaded. + int num_cores = base::SysInfo::NumberOfProcessors() / 2; + return std::max(num_cores - 1, 8); +} + +} // namespace + +WorkerPool::WorkerPool() : WorkerPool(GetThreadCount()) {} + +WorkerPool::WorkerPool(size_t thread_count) + : should_stop_processing_(false) { + threads_.reserve(thread_count); + for (size_t i = 0; i < thread_count; ++i) + threads_.emplace_back([this]() { Worker(); }); +} + +WorkerPool::~WorkerPool() { + { + std::unique_lock<std::mutex> queue_lock(queue_mutex_); + should_stop_processing_ = true; + } + + pool_notifier_.notify_all(); + + for (auto& task_thread : threads_) { + task_thread.join(); + } +} + +void WorkerPool::PostTask(std::function<void()> work) { + { + std::unique_lock<std::mutex> queue_lock(queue_mutex_); + CHECK(!should_stop_processing_); + task_queue_.emplace(std::move(work)); + } + + pool_notifier_.notify_one(); +} + +void WorkerPool::Worker() { + for (;;) { + std::function<void()> task; + + { + std::unique_lock<std::mutex> queue_lock(queue_mutex_); + + pool_notifier_.wait(queue_lock, [this]() { + return (!task_queue_.empty()) || should_stop_processing_; + }); + + if (should_stop_processing_ && task_queue_.empty()) + return; + + task = std::move(task_queue_.front()); + task_queue_.pop(); + } + + task(); + } +}
diff --git a/src/worker_pool.h b/src/worker_pool.h new file mode 100644 index 0000000..a95a175 --- /dev/null +++ b/src/worker_pool.h
@@ -0,0 +1,38 @@ +// Copyright 2018 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef WORKER_POOL_H_ +#define WORKER_POOL_H_ + +#include "base/logging.h" +#include "base/macros.h" + +#include <condition_variable> +#include <functional> +#include <future> +#include <mutex> +#include <queue> +#include <thread> + +class WorkerPool { + public: + WorkerPool(); + WorkerPool(size_t thread_count); + ~WorkerPool(); + + void PostTask(std::function<void()> work); + + private: + void Worker(); + + std::vector<std::thread> threads_; + std::queue<std::function<void()>> task_queue_; + std::mutex queue_mutex_; + std::condition_variable_any pool_notifier_; + bool should_stop_processing_; + + DISALLOW_COPY_AND_ASSIGN(WorkerPool); +}; + +#endif // WORKER_POOL_H_
diff --git a/tools/gn/command_gen.cc b/tools/gn/command_gen.cc index 44bf922..46c6ca9 100644 --- a/tools/gn/command_gen.cc +++ b/tools/gn/command_gen.cc
@@ -72,8 +72,8 @@ const Item* item = record->item(); const Target* target = item->AsTarget(); if (target) { - g_scheduler->ScheduleWork(base::Bind(&BackgroundDoWrite, - write_info, target)); + g_scheduler->ScheduleWork( + std::bind(&BackgroundDoWrite, write_info, target)); } }
diff --git a/tools/gn/gn_main.cc b/tools/gn/gn_main.cc index dac41f3..a9f684e 100644 --- a/tools/gn/gn_main.cc +++ b/tools/gn/gn_main.cc
@@ -8,10 +8,7 @@ #include "base/at_exit.h" #include "base/command_line.h" #include "base/message_loop/message_loop.h" -#include "base/strings/string_number_conversions.h" #include "base/strings/utf_string_conversions.h" -#include "base/sys_info.h" -#include "base/task_scheduler/task_scheduler.h" #include "build_config.h" #include "tools/gn/commands.h" #include "tools/gn/err.h" @@ -41,54 +38,6 @@ #endif } -int GetThreadCount() { - std::string thread_count = - base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII( - switches::kThreads); - - // See if an override was specified on the command line. - int result; - if (!thread_count.empty() && base::StringToInt(thread_count, &result) && - result >= 1) { - return result; - } - - // Base the default number of worker threads on number of cores in the - // system. When building large projects, the speed can be limited by how fast - // the main thread can dispatch work and connect the dependency graph. If - // there are too many worker threads, the main thread can be starved and it - // will run slower overall. - // - // One less worker thread than the number of physical CPUs seems to be a - // good value, both theoretically and experimentally. But always use at - // least some workers to prevent us from being too sensitive to I/O latency - // on low-end systems. - // - // The minimum thread count is based on measuring the optimal threads for the - // Chrome build on a several-year-old 4-core MacBook. - // Almost all CPUs now are hyperthreaded. - int num_cores = base::SysInfo::NumberOfProcessors() / 2; - return std::max(num_cores - 1, 8); -} - -void StartTaskScheduler() { - constexpr base::TimeDelta kSuggestedReclaimTime = - base::TimeDelta::FromSeconds(30); - - constexpr int kBackgroundMaxThreads = 1; - constexpr int kBackgroundBlockingMaxThreads = 2; - const int kForegroundMaxThreads = - std::max(1, base::SysInfo::NumberOfProcessors()); - const int foreground_blocking_max_threads = GetThreadCount(); - - base::TaskScheduler::Create("gn"); - base::TaskScheduler::GetInstance()->Start( - {{kBackgroundMaxThreads, kSuggestedReclaimTime}, - {kBackgroundBlockingMaxThreads, kSuggestedReclaimTime}, - {kForegroundMaxThreads, kSuggestedReclaimTime}, - {foreground_blocking_max_threads, kSuggestedReclaimTime}}); -} - } // namespace int main(int argc, char** argv) { @@ -127,9 +76,7 @@ int retval; if (found_command != command_map.end()) { base::MessageLoop message_loop; - StartTaskScheduler(); retval = found_command->second.runner(args); - base::TaskScheduler::GetInstance()->Shutdown(); } else { Err(Location(), "Command \"" + command + "\" unknown.").PrintToStdout(); OutputString(
diff --git a/tools/gn/header_checker.cc b/tools/gn/header_checker.cc index 57d2642..499b16e 100644 --- a/tools/gn/header_checker.cc +++ b/tools/gn/header_checker.cc
@@ -10,7 +10,6 @@ #include "base/containers/queue.h" #include "base/files/file_util.h" #include "base/strings/string_util.h" -#include "base/task_scheduler/post_task.h" #include "tools/gn/build_settings.h" #include "tools/gn/builder.h" #include "tools/gn/c_include_iterator.h" @@ -22,6 +21,7 @@ #include "tools/gn/source_file_type.h" #include "tools/gn/target.h" #include "tools/gn/trace.h" +#include "worker_pool.h" namespace { @@ -149,6 +149,8 @@ } void HeaderChecker::RunCheckOverFiles(const FileMap& files, bool force_check) { + WorkerPool pool; + for (const auto& file : files) { // Only check C-like source files (RC files also have includes). SourceFileType type = GetSourceFileType(file.first); @@ -168,9 +170,19 @@ for (const auto& vect_i : file.second) { if (vect_i.target->check_includes()) { task_count_.Increment(); - base::PostTaskWithTraits(FROM_HERE, {base::MayBlock()}, - base::BindOnce(&HeaderChecker::DoWork, this, - vect_i.target, file.first)); + pool.PostTask([ this, target = vect_i.target, f = file.first ] { + Err err; + if (!CheckFile(target, f, &err)) { + base::AutoLock lock(lock_); + errors_.push_back(err); + } + + if (!task_count_.Decrement()) { + // Signal |task_count_cv_| when |task_count_| becomes zero. + base::AutoLock auto_lock(lock_); + task_count_cv_.Signal(); + } + }); } } } @@ -181,20 +193,6 @@ task_count_cv_.Wait(); } -void HeaderChecker::DoWork(const Target* target, const SourceFile& file) { - Err err; - if (!CheckFile(target, file, &err)) { - base::AutoLock lock(lock_); - errors_.push_back(err); - } - - if (!task_count_.Decrement()) { - // Signal |task_count_cv_| when |task_count_| becomes zero. - base::AutoLock auto_lock(lock_); - task_count_cv_.Signal(); - } -} - // static void HeaderChecker::AddTargetToFileMap(const Target* target, FileMap* dest) { // Files in the sources have this public bit by default.
diff --git a/tools/gn/header_checker.h b/tools/gn/header_checker.h index b1d0f79..2117b88 100644 --- a/tools/gn/header_checker.h +++ b/tools/gn/header_checker.h
@@ -101,8 +101,6 @@ // will be populate on failure. void RunCheckOverFiles(const FileMap& flies, bool force_check); - void DoWork(const Target* target, const SourceFile& file); - // Adds the sources and public files from the given target to the given map. static void AddTargetToFileMap(const Target* target, FileMap* dest);
diff --git a/tools/gn/input_file_manager.cc b/tools/gn/input_file_manager.cc index 6fe3cae..e2ae6e8 100644 --- a/tools/gn/input_file_manager.cc +++ b/tools/gn/input_file_manager.cc
@@ -103,7 +103,7 @@ // Try not to schedule callbacks while holding the lock. All cases that don't // want to schedule should return early. Otherwise, this will be scheduled // after we leave the lock. - base::Closure schedule_this; + std::function<void()> schedule_this; { base::AutoLock lock(lock_); @@ -113,12 +113,8 @@ std::unique_ptr<InputFileData> data = std::make_unique<InputFileData>(file_name); data->scheduled_callbacks.push_back(callback); - schedule_this = base::Bind(&InputFileManager::BackgroundLoadFile, - this, - origin, - build_settings, - file_name, - &data->file); + schedule_this = std::bind(&InputFileManager::BackgroundLoadFile, this, + origin, build_settings, file_name, &data->file); input_files_[file_name] = std::move(data); } else { @@ -138,8 +134,8 @@ if (data->loaded) { // Can just directly issue the callback on the background thread. - schedule_this = base::Bind(&InvokeFileLoadCallback, callback, - data->parsed_root.get()); + schedule_this = std::bind(&InvokeFileLoadCallback, callback, + data->parsed_root.get()); } else { // Load is pending on this file, schedule the invoke. data->scheduled_callbacks.push_back(callback);
diff --git a/tools/gn/scheduler.cc b/tools/gn/scheduler.cc index 0feb4a1..dd43ce7 100644 --- a/tools/gn/scheduler.cc +++ b/tools/gn/scheduler.cc
@@ -8,11 +8,14 @@ #include "base/bind.h" #include "base/single_thread_task_runner.h" -#include "base/task_scheduler/post_task.h" #include "base/threading/thread_task_runner_handle.h" #include "tools/gn/standard_out.h" #include "tools/gn/target.h" +namespace { + +} // namespace + Scheduler* g_scheduler = nullptr; Scheduler::Scheduler() @@ -20,6 +23,7 @@ input_file_manager_(new InputFileManager), verbose_logging_(false), pool_work_count_cv_(&pool_work_count_lock_), + worker_pool_(), is_failed_(false), suppress_output_for_testing_(false), has_been_shutdown_(false) { @@ -78,13 +82,17 @@ } } -void Scheduler::ScheduleWork(base::OnceClosure work) { +void Scheduler::ScheduleWork(std::function<void()> work) { IncrementWorkCount(); pool_work_count_.Increment(); - base::PostTaskWithTraits( - FROM_HERE, {base::MayBlock()}, - base::BindOnce(&Scheduler::DoWork, base::Unretained(this), - std::move(work))); + worker_pool_.PostTask([ this, work = std::move(work) ] { + work(); + DecrementWorkCount(); + if (!pool_work_count_.Decrement()) { + base::AutoLock auto_lock(pool_work_count_lock_); + pool_work_count_cv_.Signal(); + } + }); } void Scheduler::AddGenDependency(const base::FilePath& file) { @@ -184,15 +192,6 @@ runner_.Quit(); } -void Scheduler::DoWork(base::OnceClosure closure) { - std::move(closure).Run(); - DecrementWorkCount(); - if (!pool_work_count_.Decrement()) { - base::AutoLock auto_lock(pool_work_count_lock_); - pool_work_count_cv_.Signal(); - } -} - void Scheduler::OnComplete() { // Should be called on the main thread. DCHECK(task_runner()->BelongsToCurrentThread());
diff --git a/tools/gn/scheduler.h b/tools/gn/scheduler.h index 447551e..e37be30 100644 --- a/tools/gn/scheduler.h +++ b/tools/gn/scheduler.h
@@ -18,6 +18,7 @@ #include "tools/gn/label.h" #include "tools/gn/source_file.h" #include "tools/gn/token.h" +#include "worker_pool.h" class Target; @@ -44,7 +45,7 @@ void Log(const std::string& verb, const std::string& msg); void FailWithError(const Err& err); - void ScheduleWork(base::OnceClosure work); + void ScheduleWork(std::function<void()> work); void Shutdown(); @@ -98,8 +99,6 @@ void DoTargetFileWrite(const Target* target); - void DoWork(base::OnceClosure closure); - void OnComplete(); // Waits for tasks scheduled via ScheduleWork() to complete their execution. @@ -126,6 +125,8 @@ // Condition variable signaled when |pool_work_count_| reaches zero. base::ConditionVariable pool_work_count_cv_; + WorkerPool worker_pool_; + mutable base::Lock lock_; bool is_failed_;