| // Copyright 2017 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "base/synchronization/waitable_event.h" |
| |
| #include <dispatch/dispatch.h> |
| #include <mach/mach.h> |
| #include <sys/event.h> |
| |
| #include "base/callback.h" |
| #include "base/files/scoped_file.h" |
| #include "base/mac/dispatch_source_mach.h" |
| #include "base/mac/mac_util.h" |
| #include "base/mac/mach_logging.h" |
| #include "base/mac/scoped_dispatch_object.h" |
| #include "base/posix/eintr_wrapper.h" |
| #include "base/threading/scoped_blocking_call.h" |
| #include "base/threading/thread_restrictions.h" |
| #include "build_config.h" |
| |
| namespace base { |
| |
| WaitableEvent::WaitableEvent(ResetPolicy reset_policy, |
| InitialState initial_state) |
| : policy_(reset_policy) { |
| mach_port_options_t options{}; |
| options.flags = MPO_INSERT_SEND_RIGHT; |
| options.mpl.mpl_qlimit = 1; |
| |
| mach_port_t name; |
| kern_return_t kr = mach_port_construct(mach_task_self(), &options, 0, &name); |
| MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_construct"; |
| |
| receive_right_ = new ReceiveRight(name, UseSlowWatchList(policy_)); |
| send_right_.reset(name); |
| |
| if (initial_state == InitialState::SIGNALED) |
| Signal(); |
| } |
| |
| WaitableEvent::~WaitableEvent() = default; |
| |
| void WaitableEvent::Reset() { |
| PeekPort(receive_right_->Name(), true); |
| } |
| |
| void WaitableEvent::Signal() { |
| // If using the slow watch-list, copy the watchers to a local. After |
| // mach_msg(), the event object may be deleted by an awoken thread. |
| const bool use_slow_path = UseSlowWatchList(policy_); |
| ReceiveRight* receive_right = nullptr; // Manually reference counted. |
| std::unique_ptr<std::list<OnceClosure>> watch_list; |
| if (use_slow_path) { |
| // To avoid a race condition of a WaitableEventWatcher getting added |
| // while another thread is in this method, hold the watch-list lock for |
| // the duration of mach_msg(). This requires ref-counting the |
| // |receive_right_| object that contains it, in case the event is deleted |
| // by a waiting thread after mach_msg(). |
| receive_right = receive_right_.get(); |
| receive_right->AddRef(); |
| |
| ReceiveRight::WatchList* slow_watch_list = receive_right->SlowWatchList(); |
| slow_watch_list->lock.Acquire(); |
| |
| if (!slow_watch_list->list.empty()) { |
| watch_list.reset(new std::list<OnceClosure>()); |
| std::swap(*watch_list, slow_watch_list->list); |
| } |
| } |
| |
| mach_msg_empty_send_t msg{}; |
| msg.header.msgh_bits = MACH_MSGH_BITS_REMOTE(MACH_MSG_TYPE_COPY_SEND); |
| msg.header.msgh_size = sizeof(&msg); |
| msg.header.msgh_remote_port = send_right_.get(); |
| // If the event is already signaled, this will time out because the queue |
| // has a length of one. |
| kern_return_t kr = |
| mach_msg(&msg.header, MACH_SEND_MSG | MACH_SEND_TIMEOUT, sizeof(msg), 0, |
| MACH_PORT_NULL, 0, MACH_PORT_NULL); |
| MACH_CHECK(kr == KERN_SUCCESS || kr == MACH_SEND_TIMED_OUT, kr) << "mach_msg"; |
| |
| if (use_slow_path) { |
| // If a WaitableEventWatcher were to start watching when the event is |
| // signaled, it runs the callback immediately without adding it to the |
| // list. Therefore the watch list can only be non-empty if the event is |
| // newly signaled. |
| if (watch_list.get()) { |
| MACH_CHECK(kr == KERN_SUCCESS, kr); |
| for (auto& watcher : *watch_list) { |
| std::move(watcher).Run(); |
| } |
| } |
| |
| receive_right->SlowWatchList()->lock.Release(); |
| receive_right->Release(); |
| } |
| } |
| |
| bool WaitableEvent::IsSignaled() { |
| return PeekPort(receive_right_->Name(), policy_ == ResetPolicy::AUTOMATIC); |
| } |
| |
| void WaitableEvent::Wait() { |
| bool result = TimedWaitUntil(TimeTicks::Max()); |
| DCHECK(result) << "TimedWait() should never fail with infinite timeout"; |
| } |
| |
| bool WaitableEvent::TimedWait(const TimeDelta& wait_delta) { |
| return TimedWaitUntil(TimeTicks::Now() + wait_delta); |
| } |
| |
| bool WaitableEvent::TimedWaitUntil(const TimeTicks& end_time) { |
| internal::AssertBaseSyncPrimitivesAllowed(); |
| ScopedBlockingCall scoped_blocking_call(BlockingType::MAY_BLOCK); |
| |
| TimeDelta wait_time = end_time - TimeTicks::Now(); |
| if (wait_time < TimeDelta()) { |
| // A negative delta would be treated by the system as indefinite, but |
| // it needs to be treated as a poll instead. |
| wait_time = TimeDelta(); |
| } |
| |
| mach_msg_empty_rcv_t msg{}; |
| msg.header.msgh_local_port = receive_right_->Name(); |
| |
| mach_msg_option_t options = MACH_RCV_MSG; |
| |
| mach_msg_timeout_t timeout = 0; |
| if (!end_time.is_max()) { |
| options |= MACH_RCV_TIMEOUT; |
| timeout = wait_time.InMillisecondsRoundedUp(); |
| } |
| |
| mach_msg_size_t rcv_size = sizeof(msg); |
| if (policy_ == ResetPolicy::MANUAL) { |
| // To avoid dequeing the message, receive with a size of 0 and set |
| // MACH_RCV_LARGE to keep the message in the queue. |
| options |= MACH_RCV_LARGE; |
| rcv_size = 0; |
| } |
| |
| kern_return_t kr = mach_msg(&msg.header, options, 0, rcv_size, |
| receive_right_->Name(), timeout, MACH_PORT_NULL); |
| if (kr == KERN_SUCCESS) { |
| return true; |
| } else if (rcv_size == 0 && kr == MACH_RCV_TOO_LARGE) { |
| return true; |
| } else { |
| MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; |
| return false; |
| } |
| } |
| |
| // static |
| bool WaitableEvent::UseSlowWatchList(ResetPolicy policy) { |
| #if defined(OS_IOS) |
| const bool use_slow_path = false; |
| #else |
| static bool use_slow_path = !mac::IsAtLeastOS10_12(); |
| #endif |
| return policy == ResetPolicy::MANUAL && use_slow_path; |
| } |
| |
| // static |
| size_t WaitableEvent::WaitMany(WaitableEvent** raw_waitables, size_t count) { |
| internal::AssertBaseSyncPrimitivesAllowed(); |
| DCHECK(count) << "Cannot wait on no events"; |
| ScopedBlockingCall scoped_blocking_call(BlockingType::MAY_BLOCK); |
| |
| // On macOS 10.11+, using Mach port sets may cause system instability, per |
| // https://crbug.com/756102. On macOS 10.12+, a kqueue can be used |
| // instead to work around that. On macOS 10.9 and 10.10, kqueue only works |
| // for port sets, so port sets are just used directly. On macOS 10.11, |
| // libdispatch sources are used. Therefore, there are three different |
| // primitives that can be used to implement WaitMany. Which one to use is |
| // selected at run-time by OS version checks. |
| enum WaitManyPrimitive { |
| KQUEUE, |
| DISPATCH, |
| PORT_SET, |
| }; |
| #if defined(OS_IOS) |
| const WaitManyPrimitive kPrimitive = PORT_SET; |
| #else |
| const WaitManyPrimitive kPrimitive = |
| mac::IsAtLeastOS10_12() ? KQUEUE |
| : (mac::IsOS10_11() ? DISPATCH : PORT_SET); |
| #endif |
| if (kPrimitive == KQUEUE) { |
| std::vector<kevent64_s> events(count); |
| for (size_t i = 0; i < count; ++i) { |
| EV_SET64(&events[i], raw_waitables[i]->receive_right_->Name(), |
| EVFILT_MACHPORT, EV_ADD, 0, 0, i, 0, 0); |
| } |
| |
| std::vector<kevent64_s> out_events(count); |
| |
| ScopedFD wait_many(kqueue()); |
| PCHECK(wait_many.is_valid()) << "kqueue"; |
| |
| int rv = HANDLE_EINTR(kevent64(wait_many.get(), events.data(), count, |
| out_events.data(), count, 0, nullptr)); |
| PCHECK(rv > 0) << "kevent64"; |
| |
| size_t triggered = -1; |
| for (size_t i = 0; i < static_cast<size_t>(rv); ++i) { |
| // WaitMany should return the lowest index in |raw_waitables| that was |
| // triggered. |
| size_t index = static_cast<size_t>(out_events[i].udata); |
| triggered = std::min(triggered, index); |
| } |
| |
| if (raw_waitables[triggered]->policy_ == ResetPolicy::AUTOMATIC) { |
| // The message needs to be dequeued to reset the event. |
| PeekPort(raw_waitables[triggered]->receive_right_->Name(), true); |
| } |
| |
| return triggered; |
| } else if (kPrimitive == DISPATCH) { |
| // Each item in |raw_waitables| will be watched using a dispatch souce |
| // scheduled on the serial |queue|. The first one to be invoked will |
| // signal the |semaphore| that this method will wait on. |
| ScopedDispatchObject<dispatch_queue_t> queue(dispatch_queue_create( |
| "org.chromium.base.WaitableEvent.WaitMany", DISPATCH_QUEUE_SERIAL)); |
| ScopedDispatchObject<dispatch_semaphore_t> semaphore( |
| dispatch_semaphore_create(0)); |
| |
| // Block capture references. |signaled| will identify the index in |
| // |raw_waitables| whose source was invoked. |
| dispatch_semaphore_t semaphore_ref = semaphore.get(); |
| const size_t kUnsignaled = -1; |
| __block size_t signaled = kUnsignaled; |
| |
| // Create a MACH_RECV dispatch source for each event. These must be |
| // destroyed before the |queue| and |semaphore|. |
| std::vector<std::unique_ptr<DispatchSourceMach>> sources; |
| for (size_t i = 0; i < count; ++i) { |
| const bool auto_reset = |
| raw_waitables[i]->policy_ == WaitableEvent::ResetPolicy::AUTOMATIC; |
| // The block will copy a reference to |right|. |
| scoped_refptr<WaitableEvent::ReceiveRight> right = |
| raw_waitables[i]->receive_right_; |
| auto source = |
| std::make_unique<DispatchSourceMach>(queue, right->Name(), ^{ |
| // After the semaphore is signaled, another event be signaled and |
| // the source may have its block put on the |queue|. WaitMany |
| // should only report (and auto-reset) one event, so the first |
| // event to signal is reported. |
| if (signaled == kUnsignaled) { |
| signaled = i; |
| if (auto_reset) { |
| PeekPort(right->Name(), true); |
| } |
| dispatch_semaphore_signal(semaphore_ref); |
| } |
| }); |
| source->Resume(); |
| sources.push_back(std::move(source)); |
| } |
| |
| dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); |
| DCHECK_NE(signaled, kUnsignaled); |
| return signaled; |
| } else { |
| DCHECK_EQ(kPrimitive, PORT_SET); |
| |
| kern_return_t kr; |
| |
| mac::ScopedMachPortSet port_set; |
| { |
| mach_port_t name; |
| kr = |
| mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET, &name); |
| MACH_CHECK(kr == KERN_SUCCESS, kr) << "mach_port_allocate"; |
| port_set.reset(name); |
| } |
| |
| for (size_t i = 0; i < count; ++i) { |
| kr = mach_port_insert_member(mach_task_self(), |
| raw_waitables[i]->receive_right_->Name(), |
| port_set.get()); |
| MACH_CHECK(kr == KERN_SUCCESS, kr) << "index " << i; |
| } |
| |
| mach_msg_empty_rcv_t msg{}; |
| // Wait on the port set. Only specify space enough for the header, to |
| // identify which port in the set is signaled. Otherwise, receiving from the |
| // port set may dequeue a message for a manual-reset event object, which |
| // would cause it to be reset. |
| kr = mach_msg(&msg.header, |
| MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY, 0, |
| sizeof(msg.header), port_set.get(), 0, MACH_PORT_NULL); |
| MACH_CHECK(kr == MACH_RCV_TOO_LARGE, kr) << "mach_msg"; |
| |
| for (size_t i = 0; i < count; ++i) { |
| WaitableEvent* event = raw_waitables[i]; |
| if (msg.header.msgh_local_port == event->receive_right_->Name()) { |
| if (event->policy_ == ResetPolicy::AUTOMATIC) { |
| // The message needs to be dequeued to reset the event. |
| PeekPort(msg.header.msgh_local_port, true); |
| } |
| return i; |
| } |
| } |
| |
| NOTREACHED(); |
| return 0; |
| } |
| } |
| |
| // static |
| bool WaitableEvent::PeekPort(mach_port_t port, bool dequeue) { |
| if (dequeue) { |
| mach_msg_empty_rcv_t msg{}; |
| msg.header.msgh_local_port = port; |
| kern_return_t kr = mach_msg(&msg.header, MACH_RCV_MSG | MACH_RCV_TIMEOUT, 0, |
| sizeof(msg), port, 0, MACH_PORT_NULL); |
| if (kr == KERN_SUCCESS) { |
| return true; |
| } else { |
| MACH_CHECK(kr == MACH_RCV_TIMED_OUT, kr) << "mach_msg"; |
| return false; |
| } |
| } else { |
| mach_port_seqno_t seqno = 0; |
| mach_msg_size_t size; |
| mach_msg_id_t id; |
| mach_msg_trailer_t trailer; |
| mach_msg_type_number_t trailer_size = sizeof(trailer); |
| kern_return_t kr = mach_port_peek( |
| mach_task_self(), port, MACH_RCV_TRAILER_TYPE(MACH_RCV_TRAILER_NULL), |
| &seqno, &size, &id, reinterpret_cast<mach_msg_trailer_info_t>(&trailer), |
| &trailer_size); |
| if (kr == KERN_SUCCESS) { |
| return true; |
| } else { |
| MACH_CHECK(kr == KERN_FAILURE, kr) << "mach_port_peek"; |
| return false; |
| } |
| } |
| } |
| |
| WaitableEvent::ReceiveRight::ReceiveRight(mach_port_t name, |
| bool create_slow_watch_list) |
| : right_(name), |
| slow_watch_list_(create_slow_watch_list ? new WatchList() : nullptr) {} |
| |
| WaitableEvent::ReceiveRight::~ReceiveRight() = default; |
| |
| WaitableEvent::ReceiveRight::WatchList::WatchList() = default; |
| |
| WaitableEvent::ReceiveRight::WatchList::~WatchList() = default; |
| |
| } // namespace base |