Implement simple WorkerPool, remove use of base::PostTask
Change-Id: I90fd52e9882d3e5be32173977d61678389e03513
Reviewed-on: https://gn-review.googlesource.com/1580
Commit-Queue: Scott Graham <scottmg@chromium.org>
Reviewed-by: Brett Wilson <brettw@chromium.org>
diff --git a/build/full_test.py b/build/full_test.py
index b84686b..2095ddd 100755
--- a/build/full_test.py
+++ b/build/full_test.py
@@ -41,7 +41,7 @@
subprocess.check_call([sys.executable, os.path.join('build', 'gen.py')])
subprocess.check_call(['ninja', '-C', 'out'])
- #subprocess.check_call([os.path.join('out', 'gn_unittests')])
+ subprocess.check_call([os.path.join('out', 'gn_unittests')])
orig_dir = os.getcwd()
in_chrome_tree_gn = sys.argv[2]
diff --git a/build/gen.py b/build/gen.py
index 8840652..86affb5 100755
--- a/build/gen.py
+++ b/build/gen.py
@@ -333,6 +333,7 @@
], 'tool': 'cxx', 'include_dirs': []},
'gn_lib': {'sources': [
'src/exe_path.cc',
+ 'src/worker_pool.cc',
'tools/gn/action_target_generator.cc',
'tools/gn/action_values.cc',
'tools/gn/analyzer.cc',
diff --git a/src/worker_pool.cc b/src/worker_pool.cc
new file mode 100644
index 0000000..e204084
--- /dev/null
+++ b/src/worker_pool.cc
@@ -0,0 +1,98 @@
+// Copyright 2018 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "worker_pool.h"
+
+#include "base/command_line.h"
+#include "base/strings/string_number_conversions.h"
+#include "base/sys_info.h"
+#include "tools/gn/switches.h"
+
+namespace {
+
+int GetThreadCount() {
+ std::string thread_count =
+ base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
+ switches::kThreads);
+
+ // See if an override was specified on the command line.
+ int result;
+ if (!thread_count.empty() && base::StringToInt(thread_count, &result) &&
+ result >= 1) {
+ return result;
+ }
+
+ // Base the default number of worker threads on number of cores in the
+ // system. When building large projects, the speed can be limited by how fast
+ // the main thread can dispatch work and connect the dependency graph. If
+ // there are too many worker threads, the main thread can be starved and it
+ // will run slower overall.
+ //
+ // One less worker thread than the number of physical CPUs seems to be a
+ // good value, both theoretically and experimentally. But always use at
+ // least some workers to prevent us from being too sensitive to I/O latency
+ // on low-end systems.
+ //
+ // The minimum thread count is based on measuring the optimal threads for the
+ // Chrome build on a several-year-old 4-core MacBook.
+ // Almost all CPUs now are hyperthreaded.
+ int num_cores = base::SysInfo::NumberOfProcessors() / 2;
+ return std::max(num_cores - 1, 8);
+}
+
+} // namespace
+
+WorkerPool::WorkerPool() : WorkerPool(GetThreadCount()) {}
+
+WorkerPool::WorkerPool(size_t thread_count)
+ : should_stop_processing_(false) {
+ threads_.reserve(thread_count);
+ for (size_t i = 0; i < thread_count; ++i)
+ threads_.emplace_back([this]() { Worker(); });
+}
+
+WorkerPool::~WorkerPool() {
+ {
+ std::unique_lock<std::mutex> queue_lock(queue_mutex_);
+ should_stop_processing_ = true;
+ }
+
+ pool_notifier_.notify_all();
+
+ for (auto& task_thread : threads_) {
+ task_thread.join();
+ }
+}
+
+void WorkerPool::PostTask(std::function<void()> work) {
+ {
+ std::unique_lock<std::mutex> queue_lock(queue_mutex_);
+ CHECK(!should_stop_processing_);
+ task_queue_.emplace(std::move(work));
+ }
+
+ pool_notifier_.notify_one();
+}
+
+void WorkerPool::Worker() {
+ for (;;) {
+ std::function<void()> task;
+
+ {
+ std::unique_lock<std::mutex> queue_lock(queue_mutex_);
+
+ pool_notifier_.wait(queue_lock, [this]() {
+ return (!task_queue_.empty()) || should_stop_processing_;
+ });
+
+ if (should_stop_processing_ && task_queue_.empty())
+ return;
+
+ task = std::move(task_queue_.front());
+ task_queue_.pop();
+ }
+
+ task();
+ }
+}
diff --git a/src/worker_pool.h b/src/worker_pool.h
new file mode 100644
index 0000000..a95a175
--- /dev/null
+++ b/src/worker_pool.h
@@ -0,0 +1,38 @@
+// Copyright 2018 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef WORKER_POOL_H_
+#define WORKER_POOL_H_
+
+#include "base/logging.h"
+#include "base/macros.h"
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>
+
+class WorkerPool {
+ public:
+ WorkerPool();
+ WorkerPool(size_t thread_count);
+ ~WorkerPool();
+
+ void PostTask(std::function<void()> work);
+
+ private:
+ void Worker();
+
+ std::vector<std::thread> threads_;
+ std::queue<std::function<void()>> task_queue_;
+ std::mutex queue_mutex_;
+ std::condition_variable_any pool_notifier_;
+ bool should_stop_processing_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerPool);
+};
+
+#endif // WORKER_POOL_H_
diff --git a/tools/gn/command_gen.cc b/tools/gn/command_gen.cc
index 44bf922..46c6ca9 100644
--- a/tools/gn/command_gen.cc
+++ b/tools/gn/command_gen.cc
@@ -72,8 +72,8 @@
const Item* item = record->item();
const Target* target = item->AsTarget();
if (target) {
- g_scheduler->ScheduleWork(base::Bind(&BackgroundDoWrite,
- write_info, target));
+ g_scheduler->ScheduleWork(
+ std::bind(&BackgroundDoWrite, write_info, target));
}
}
diff --git a/tools/gn/gn_main.cc b/tools/gn/gn_main.cc
index dac41f3..a9f684e 100644
--- a/tools/gn/gn_main.cc
+++ b/tools/gn/gn_main.cc
@@ -8,10 +8,7 @@
#include "base/at_exit.h"
#include "base/command_line.h"
#include "base/message_loop/message_loop.h"
-#include "base/strings/string_number_conversions.h"
#include "base/strings/utf_string_conversions.h"
-#include "base/sys_info.h"
-#include "base/task_scheduler/task_scheduler.h"
#include "build_config.h"
#include "tools/gn/commands.h"
#include "tools/gn/err.h"
@@ -41,54 +38,6 @@
#endif
}
-int GetThreadCount() {
- std::string thread_count =
- base::CommandLine::ForCurrentProcess()->GetSwitchValueASCII(
- switches::kThreads);
-
- // See if an override was specified on the command line.
- int result;
- if (!thread_count.empty() && base::StringToInt(thread_count, &result) &&
- result >= 1) {
- return result;
- }
-
- // Base the default number of worker threads on number of cores in the
- // system. When building large projects, the speed can be limited by how fast
- // the main thread can dispatch work and connect the dependency graph. If
- // there are too many worker threads, the main thread can be starved and it
- // will run slower overall.
- //
- // One less worker thread than the number of physical CPUs seems to be a
- // good value, both theoretically and experimentally. But always use at
- // least some workers to prevent us from being too sensitive to I/O latency
- // on low-end systems.
- //
- // The minimum thread count is based on measuring the optimal threads for the
- // Chrome build on a several-year-old 4-core MacBook.
- // Almost all CPUs now are hyperthreaded.
- int num_cores = base::SysInfo::NumberOfProcessors() / 2;
- return std::max(num_cores - 1, 8);
-}
-
-void StartTaskScheduler() {
- constexpr base::TimeDelta kSuggestedReclaimTime =
- base::TimeDelta::FromSeconds(30);
-
- constexpr int kBackgroundMaxThreads = 1;
- constexpr int kBackgroundBlockingMaxThreads = 2;
- const int kForegroundMaxThreads =
- std::max(1, base::SysInfo::NumberOfProcessors());
- const int foreground_blocking_max_threads = GetThreadCount();
-
- base::TaskScheduler::Create("gn");
- base::TaskScheduler::GetInstance()->Start(
- {{kBackgroundMaxThreads, kSuggestedReclaimTime},
- {kBackgroundBlockingMaxThreads, kSuggestedReclaimTime},
- {kForegroundMaxThreads, kSuggestedReclaimTime},
- {foreground_blocking_max_threads, kSuggestedReclaimTime}});
-}
-
} // namespace
int main(int argc, char** argv) {
@@ -127,9 +76,7 @@
int retval;
if (found_command != command_map.end()) {
base::MessageLoop message_loop;
- StartTaskScheduler();
retval = found_command->second.runner(args);
- base::TaskScheduler::GetInstance()->Shutdown();
} else {
Err(Location(), "Command \"" + command + "\" unknown.").PrintToStdout();
OutputString(
diff --git a/tools/gn/header_checker.cc b/tools/gn/header_checker.cc
index 57d2642..499b16e 100644
--- a/tools/gn/header_checker.cc
+++ b/tools/gn/header_checker.cc
@@ -10,7 +10,6 @@
#include "base/containers/queue.h"
#include "base/files/file_util.h"
#include "base/strings/string_util.h"
-#include "base/task_scheduler/post_task.h"
#include "tools/gn/build_settings.h"
#include "tools/gn/builder.h"
#include "tools/gn/c_include_iterator.h"
@@ -22,6 +21,7 @@
#include "tools/gn/source_file_type.h"
#include "tools/gn/target.h"
#include "tools/gn/trace.h"
+#include "worker_pool.h"
namespace {
@@ -149,6 +149,8 @@
}
void HeaderChecker::RunCheckOverFiles(const FileMap& files, bool force_check) {
+ WorkerPool pool;
+
for (const auto& file : files) {
// Only check C-like source files (RC files also have includes).
SourceFileType type = GetSourceFileType(file.first);
@@ -168,9 +170,19 @@
for (const auto& vect_i : file.second) {
if (vect_i.target->check_includes()) {
task_count_.Increment();
- base::PostTaskWithTraits(FROM_HERE, {base::MayBlock()},
- base::BindOnce(&HeaderChecker::DoWork, this,
- vect_i.target, file.first));
+ pool.PostTask([ this, target = vect_i.target, f = file.first ] {
+ Err err;
+ if (!CheckFile(target, f, &err)) {
+ base::AutoLock lock(lock_);
+ errors_.push_back(err);
+ }
+
+ if (!task_count_.Decrement()) {
+ // Signal |task_count_cv_| when |task_count_| becomes zero.
+ base::AutoLock auto_lock(lock_);
+ task_count_cv_.Signal();
+ }
+ });
}
}
}
@@ -181,20 +193,6 @@
task_count_cv_.Wait();
}
-void HeaderChecker::DoWork(const Target* target, const SourceFile& file) {
- Err err;
- if (!CheckFile(target, file, &err)) {
- base::AutoLock lock(lock_);
- errors_.push_back(err);
- }
-
- if (!task_count_.Decrement()) {
- // Signal |task_count_cv_| when |task_count_| becomes zero.
- base::AutoLock auto_lock(lock_);
- task_count_cv_.Signal();
- }
-}
-
// static
void HeaderChecker::AddTargetToFileMap(const Target* target, FileMap* dest) {
// Files in the sources have this public bit by default.
diff --git a/tools/gn/header_checker.h b/tools/gn/header_checker.h
index b1d0f79..2117b88 100644
--- a/tools/gn/header_checker.h
+++ b/tools/gn/header_checker.h
@@ -101,8 +101,6 @@
// will be populate on failure.
void RunCheckOverFiles(const FileMap& flies, bool force_check);
- void DoWork(const Target* target, const SourceFile& file);
-
// Adds the sources and public files from the given target to the given map.
static void AddTargetToFileMap(const Target* target, FileMap* dest);
diff --git a/tools/gn/input_file_manager.cc b/tools/gn/input_file_manager.cc
index 6fe3cae..e2ae6e8 100644
--- a/tools/gn/input_file_manager.cc
+++ b/tools/gn/input_file_manager.cc
@@ -103,7 +103,7 @@
// Try not to schedule callbacks while holding the lock. All cases that don't
// want to schedule should return early. Otherwise, this will be scheduled
// after we leave the lock.
- base::Closure schedule_this;
+ std::function<void()> schedule_this;
{
base::AutoLock lock(lock_);
@@ -113,12 +113,8 @@
std::unique_ptr<InputFileData> data =
std::make_unique<InputFileData>(file_name);
data->scheduled_callbacks.push_back(callback);
- schedule_this = base::Bind(&InputFileManager::BackgroundLoadFile,
- this,
- origin,
- build_settings,
- file_name,
- &data->file);
+ schedule_this = std::bind(&InputFileManager::BackgroundLoadFile, this,
+ origin, build_settings, file_name, &data->file);
input_files_[file_name] = std::move(data);
} else {
@@ -138,8 +134,8 @@
if (data->loaded) {
// Can just directly issue the callback on the background thread.
- schedule_this = base::Bind(&InvokeFileLoadCallback, callback,
- data->parsed_root.get());
+ schedule_this = std::bind(&InvokeFileLoadCallback, callback,
+ data->parsed_root.get());
} else {
// Load is pending on this file, schedule the invoke.
data->scheduled_callbacks.push_back(callback);
diff --git a/tools/gn/scheduler.cc b/tools/gn/scheduler.cc
index 0feb4a1..dd43ce7 100644
--- a/tools/gn/scheduler.cc
+++ b/tools/gn/scheduler.cc
@@ -8,11 +8,14 @@
#include "base/bind.h"
#include "base/single_thread_task_runner.h"
-#include "base/task_scheduler/post_task.h"
#include "base/threading/thread_task_runner_handle.h"
#include "tools/gn/standard_out.h"
#include "tools/gn/target.h"
+namespace {
+
+} // namespace
+
Scheduler* g_scheduler = nullptr;
Scheduler::Scheduler()
@@ -20,6 +23,7 @@
input_file_manager_(new InputFileManager),
verbose_logging_(false),
pool_work_count_cv_(&pool_work_count_lock_),
+ worker_pool_(),
is_failed_(false),
suppress_output_for_testing_(false),
has_been_shutdown_(false) {
@@ -78,13 +82,17 @@
}
}
-void Scheduler::ScheduleWork(base::OnceClosure work) {
+void Scheduler::ScheduleWork(std::function<void()> work) {
IncrementWorkCount();
pool_work_count_.Increment();
- base::PostTaskWithTraits(
- FROM_HERE, {base::MayBlock()},
- base::BindOnce(&Scheduler::DoWork, base::Unretained(this),
- std::move(work)));
+ worker_pool_.PostTask([ this, work = std::move(work) ] {
+ work();
+ DecrementWorkCount();
+ if (!pool_work_count_.Decrement()) {
+ base::AutoLock auto_lock(pool_work_count_lock_);
+ pool_work_count_cv_.Signal();
+ }
+ });
}
void Scheduler::AddGenDependency(const base::FilePath& file) {
@@ -184,15 +192,6 @@
runner_.Quit();
}
-void Scheduler::DoWork(base::OnceClosure closure) {
- std::move(closure).Run();
- DecrementWorkCount();
- if (!pool_work_count_.Decrement()) {
- base::AutoLock auto_lock(pool_work_count_lock_);
- pool_work_count_cv_.Signal();
- }
-}
-
void Scheduler::OnComplete() {
// Should be called on the main thread.
DCHECK(task_runner()->BelongsToCurrentThread());
diff --git a/tools/gn/scheduler.h b/tools/gn/scheduler.h
index 447551e..e37be30 100644
--- a/tools/gn/scheduler.h
+++ b/tools/gn/scheduler.h
@@ -18,6 +18,7 @@
#include "tools/gn/label.h"
#include "tools/gn/source_file.h"
#include "tools/gn/token.h"
+#include "worker_pool.h"
class Target;
@@ -44,7 +45,7 @@
void Log(const std::string& verb, const std::string& msg);
void FailWithError(const Err& err);
- void ScheduleWork(base::OnceClosure work);
+ void ScheduleWork(std::function<void()> work);
void Shutdown();
@@ -98,8 +99,6 @@
void DoTargetFileWrite(const Target* target);
- void DoWork(base::OnceClosure closure);
-
void OnComplete();
// Waits for tasks scheduled via ScheduleWork() to complete their execution.
@@ -126,6 +125,8 @@
// Condition variable signaled when |pool_work_count_| reaches zero.
base::ConditionVariable pool_work_count_cv_;
+ WorkerPool worker_pool_;
+
mutable base::Lock lock_;
bool is_failed_;