blob: 67eca5b8b6ae0bb22bc2539e0f11f4d02ff6e2f4 [file] [log] [blame]
Scott Graham66962112018-06-08 12:42:08 -07001// Copyright 2017 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "base/synchronization/waitable_event.h"
6
7#include <dispatch/dispatch.h>
8#include <mach/mach.h>
9#include <sys/event.h>
10
Scott Grahamc86d7032018-06-12 22:49:46 -070011#include "base/callback.h"
Scott Graham66962112018-06-08 12:42:08 -070012#include "base/files/scoped_file.h"
13#include "base/mac/dispatch_source_mach.h"
14#include "base/mac/mac_util.h"
15#include "base/mac/mach_logging.h"
16#include "base/mac/scoped_dispatch_object.h"
17#include "base/posix/eintr_wrapper.h"
Scott Graham894986a2018-06-14 14:15:50 -070018#include "base/time/time.h"
Scott Grahamab123de2018-06-08 15:53:07 -070019#include "build_config.h"
Scott Graham66962112018-06-08 12:42:08 -070020
21namespace base {
22
23WaitableEvent::WaitableEvent(ResetPolicy reset_policy,
24 InitialState initial_state)
25 : policy_(reset_policy) {
26 mach_port_options_t options{};
27 options.flags = MPO_INSERT_SEND_RIGHT;
28 options.mpl.mpl_qlimit = 1;
29
30 mach_port_t name;
31 kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name);
32 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct";
33
34 receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_));
35 send_right_.reset(name);
36
37 if (initial_state == InitialState::SIGNALED)
38 Signal();
39}
40
41WaitableEvent::~WaitableEvent() = default;
42
43void WaitableEvent::Reset() {
44 PeekPort(receive_right_->Name(), true);
45}
46
47void WaitableEvent::Signal() {
48 // If using the slow watch-list, copy the watchers to a local. After
49 // mach_msg(), the event object may be deleted by an awoken thread.
50 const bool use_slow_path = UseSlowWatchList(policy_);
51 ReceiveRight* receive_right = nullptr; // Manually reference counted.
52 std::unique_ptr<std::list<OnceClosure>> watch_list;
53 if (use_slow_path) {
54 // To avoid a race condition of a WaitableEventWatcher getting added
55 // while another thread is in this method, hold the watch-list lock for
56 // the duration of mach_msg(). This requires ref-counting the
57 // |receive_right_| object that contains it, in case the event is deleted
58 // by a waiting thread after mach_msg().
59 receive_right = receive_right_.get();
60 receive_right->AddRef();
61
62 ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList();
63 slow_watch_list->lock.Acquire();
64
65 if (!slow_watch_list->list.empty()) {
66 watch_list.reset(new std::list<OnceClosure>());
67 std::swap(*watch_list, slow_watch_list->list);
68 }
69 }
70
71 mach_msg_empty_send_t msg{};
72 msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND);
73 msg.header.msgh_size = sizeof(&msg);
74 msg.header.msgh_remote_port = send_right_.get();
75 // If the event is already signaled, this will time out because the queue
76 // has a length of one.
77 kern_return_t kr =
78 mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0,
79 MACH_PORT_NULL, 0, MACH_PORT_NULL);
80 MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg";
81
82 if (use_slow_path) {
83 // If a WaitableEventWatcher were to start watching when the event is
84 // signaled, it runs the callback immediately without adding it to the
85 // list. Therefore the watch list can only be non-empty if the event is
86 // newly signaled.
87 if (watch_list.get()) {
88 MACH_CHECK(kr == KERN_SUCCESS, kr);
89 for (auto& watcher : *watch_list) {
90 std::move(watcher).Run();
91 }
92 }
93
94 receive_right->SlowWatchList()->lock.Release();
95 receive_right->Release();
96 }
97}
98
99bool WaitableEvent::IsSignaled() {
100 return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC);
101}
102
103void WaitableEvent::Wait() {
104 bool result = TimedWaitUntil(TimeTicks::Max());
105 DCHECK(result) << "TimedWait() should never fail with infinite timeout";
106}
107
108bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) {
109 return TimedWaitUntil(TimeTicks::Now() + wait_delta);
110}
111
112bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) {
Scott Graham66962112018-06-08 12:42:08 -0700113 TimeDelta wait_time = end_time - TimeTicks::Now();
114 if (wait_time < TimeDelta()) {
115 // A negative delta would be treated by the system as indefinite, but
116 // it needs to be treated as a poll instead.
117 wait_time = TimeDelta();
118 }
119
120 mach_msg_empty_rcv_t msg{};
121 msg.header.msgh_local_port = receive_right_->Name();
122
123 mach_msg_option_t options = MACH_RCV_MSG;
124
125 mach_msg_timeout_t timeout = 0;
126 if (!end_time.is_max()) {
127 options |= MACH_RCV_TIMEOUT;
128 timeout = wait_time.InMillisecondsRoundedUp();
129 }
130
131 mach_msg_size_t rcv_size = sizeof(msg);
132 if (policy_ == ResetPolicy::MANUAL) {
133 // To avoid dequeing the message, receive with a size of 0 and set
134 // MACH_RCV_LARGE to keep the message in the queue.
135 options |= MACH_RCV_LARGE;
136 rcv_size = 0;
137 }
138
139 kern_return_t kr = mach_msg(&msg.header, options, 0, rcv_size,
140 receive_right_->Name(), timeout, MACH_PORT_NULL);
141 if (kr == KERN_SUCCESS) {
142 return true;
143 } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) {
144 return true;
145 } else {
146 MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
147 return false;
148 }
149}
150
151// static
152bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) {
153#if defined(OS_IOS)
154 const bool use_slow_path = false;
155#else
156 static bool use_slow_path = !mac::IsAtLeastOS10_12();
157#endif
158 return policy == ResetPolicy::MANUAL && use_slow_path;
159}
160
161// static
162size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) {
Scott Graham66962112018-06-08 12:42:08 -0700163 DCHECK(count) << "Cannot wait on no events";
Scott Graham66962112018-06-08 12:42:08 -0700164
165 // On macOS 10.11+, using Mach port sets may cause system instability, per
166 // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used
167 // instead to work around that. On macOS 10.9 and 10.10, kqueue only works
168 // for port sets, so port sets are just used directly. On macOS 10.11,
169 // libdispatch sources are used. Therefore, there are three different
170 // primitives that can be used to implement WaitMany. Which one to use is
171 // selected at run-time by OS version checks.
172 enum WaitManyPrimitive {
173 KQUEUE,
174 DISPATCH,
175 PORT_SET,
176 };
177#if defined(OS_IOS)
178 const WaitManyPrimitive kPrimitive = PORT_SET;
179#else
180 const WaitManyPrimitive kPrimitive =
181 mac::IsAtLeastOS10_12() ? KQUEUE
182 : (mac::IsOS10_11() ? DISPATCH : PORT_SET);
183#endif
184 if (kPrimitive == KQUEUE) {
185 std::vector<kevent64_s> events(count);
186 for (size_t i = 0; i < count; ++i) {
187 EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(),
188 EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0);
189 }
190
191 std::vector<kevent64_s> out_events(count);
192
193 ScopedFD wait_many(kqueue());
194 PCHECK(wait_many.is_valid()) << "kqueue";
195
196 int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count,
197 out_events.data(), count, 0, nullptr));
198 PCHECK(rv > 0) << "kevent64";
199
200 size_t triggered = -1;
201 for (size_t i = 0; i < static_cast<size_t>(rv); ++i) {
202 // WaitMany should return the lowest index in |raw_waitables| that was
203 // triggered.
204 size_t index = static_cast<size_t>(out_events[i].udata);
205 triggered = std::min(triggered, index);
206 }
207
208 if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) {
209 // The message needs to be dequeued to reset the event.
210 PeekPort(raw_waitables[triggered]->receive_right_->Name(), true);
211 }
212
213 return triggered;
214 } else if (kPrimitive == DISPATCH) {
215 // Each item in |raw_waitables| will be watched using a dispatch souce
216 // scheduled on the serial |queue|. The first one to be invoked will
217 // signal the |semaphore| that this method will wait on.
218 ScopedDispatchObject<dispatch_queue_t> queue(dispatch_queue_create(
219 "org.chromium.base.WaitableEvent.WaitMany", DISPATCH_QUEUE_SERIAL));
220 ScopedDispatchObject<dispatch_semaphore_t> semaphore(
221 dispatch_semaphore_create(0));
222
223 // Block capture references. |signaled| will identify the index in
224 // |raw_waitables| whose source was invoked.
225 dispatch_semaphore_t semaphore_ref = semaphore.get();
226 const size_t kUnsignaled = -1;
227 __block size_t signaled = kUnsignaled;
228
229 // Create a MACH_RECV dispatch source for each event. These must be
230 // destroyed before the |queue| and |semaphore|.
231 std::vector<std::unique_ptr<DispatchSourceMach>> sources;
232 for (size_t i = 0; i < count; ++i) {
233 const bool auto_reset =
234 raw_waitables[i]->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC;
235 // The block will copy a reference to |right|.
236 scoped_refptr<WaitableEvent::ReceiveRight> right =
237 raw_waitables[i]->receive_right_;
238 auto source =
239 std::make_unique<DispatchSourceMach>(queue, right->Name(), ^{
240 // After the semaphore is signaled, another event be signaled and
241 // the source may have its block put on the |queue|. WaitMany
242 // should only report (and auto-reset) one event, so the first
243 // event to signal is reported.
244 if (signaled == kUnsignaled) {
245 signaled = i;
246 if (auto_reset) {
247 PeekPort(right->Name(), true);
248 }
249 dispatch_semaphore_signal(semaphore_ref);
250 }
251 });
252 source->Resume();
253 sources.push_back(std::move(source));
254 }
255
256 dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
257 DCHECK_NE(signaled, kUnsignaled);
258 return signaled;
259 } else {
260 DCHECK_EQ(kPrimitive, PORT_SET);
261
262 kern_return_t kr;
263
264 mac::ScopedMachPortSet port_set;
265 {
266 mach_port_t name;
267 kr =
268 mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name);
269 MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate";
270 port_set.reset(name);
271 }
272
273 for (size_t i = 0; i < count; ++i) {
274 kr = mach_port_insert_member(mach_task_self(),
275 raw_waitables[i]->receive_right_->Name(),
276 port_set.get());
277 MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i;
278 }
279
280 mach_msg_empty_rcv_t msg{};
281 // Wait on the port set. Only specify space enough for the header, to
282 // identify which port in the set is signaled. Otherwise, receiving from the
283 // port set may dequeue a message for a manual-reset event object, which
284 // would cause it to be reset.
285 kr = mach_msg(&msg.header,
286 MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0,
287 sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL);
288 MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg";
289
290 for (size_t i = 0; i < count; ++i) {
291 WaitableEvent* event = raw_waitables[i];
292 if (msg.header.msgh_local_port == event->receive_right_->Name()) {
293 if (event->policy_ == ResetPolicy::AUTOMATIC) {
294 // The message needs to be dequeued to reset the event.
295 PeekPort(msg.header.msgh_local_port, true);
296 }
297 return i;
298 }
299 }
300
301 NOTREACHED();
302 return 0;
303 }
304}
305
306// static
307bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) {
308 if (dequeue) {
309 mach_msg_empty_rcv_t msg{};
310 msg.header.msgh_local_port = port;
311 kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0,
312 sizeof(msg), port, 0, MACH_PORT_NULL);
313 if (kr == KERN_SUCCESS) {
314 return true;
315 } else {
316 MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg";
317 return false;
318 }
319 } else {
320 mach_port_seqno_t seqno = 0;
321 mach_msg_size_t size;
322 mach_msg_id_t id;
323 mach_msg_trailer_t trailer;
324 mach_msg_type_number_t trailer_size = sizeof(trailer);
325 kern_return_t kr = mach_port_peek(
326 mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL),
327 &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer),
328 &trailer_size);
329 if (kr == KERN_SUCCESS) {
330 return true;
331 } else {
332 MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek";
333 return false;
334 }
335 }
336}
337
338WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name,
339 bool create_slow_watch_list)
340 : right_(name),
341 slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {}
342
343WaitableEvent::ReceiveRight::~ReceiveRight() = default;
344
345WaitableEvent::ReceiveRight::WatchList::WatchList() = default;
346
347WaitableEvent::ReceiveRight::WatchList::~WatchList() = default;
348
349} // namespace base