|  | // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "base/message_loop/message_pump_libevent.h" | 
|  |  | 
|  | #include <errno.h> | 
|  | #include <unistd.h> | 
|  |  | 
|  | #include <utility> | 
|  |  | 
|  | #include "base/auto_reset.h" | 
|  | #include "base/compiler_specific.h" | 
|  | #include "base/files/file_util.h" | 
|  | #include "base/logging.h" | 
|  | #include "base/posix/eintr_wrapper.h" | 
|  | #include "base/third_party/libevent/event.h" | 
|  | #include "base/time/time.h" | 
|  | #include "base/trace_event/trace_event.h" | 
|  | #include "build_config.h" | 
|  |  | 
|  | #if defined(OS_MACOSX) | 
|  | #include "base/mac/scoped_nsautorelease_pool.h" | 
|  | #endif | 
|  |  | 
|  | // Lifecycle of struct event | 
|  | // Libevent uses two main data structures: | 
|  | // struct event_base (of which there is one per message pump), and | 
|  | // struct event (of which there is roughly one per socket). | 
|  | // The socket's struct event is created in | 
|  | // MessagePumpLibevent::WatchFileDescriptor(), | 
|  | // is owned by the FdWatchController, and is destroyed in | 
|  | // StopWatchingFileDescriptor(). | 
|  | // It is moved into and out of lists in struct event_base by | 
|  | // the libevent functions event_add() and event_del(). | 
|  | // | 
|  | // TODO(dkegel): | 
|  | // At the moment bad things happen if a FdWatchController | 
|  | // is active after its MessagePumpLibevent has been destroyed. | 
|  | // See MessageLoopTest.FdWatchControllerOutlivesMessageLoop | 
|  | // Not clear yet whether that situation occurs in practice, | 
|  | // but if it does, we need to fix it. | 
|  |  | 
|  | namespace base { | 
|  |  | 
|  | MessagePumpLibevent::FdWatchController::FdWatchController( | 
|  | const Location& from_here) | 
|  | : FdWatchControllerInterface(from_here) {} | 
|  |  | 
|  | MessagePumpLibevent::FdWatchController::~FdWatchController() { | 
|  | if (event_) { | 
|  | StopWatchingFileDescriptor(); | 
|  | } | 
|  | if (was_destroyed_) { | 
|  | DCHECK(!*was_destroyed_); | 
|  | *was_destroyed_ = true; | 
|  | } | 
|  | } | 
|  |  | 
|  | bool MessagePumpLibevent::FdWatchController::StopWatchingFileDescriptor() { | 
|  | std::unique_ptr<event> e = ReleaseEvent(); | 
|  | if (!e) | 
|  | return true; | 
|  |  | 
|  | // event_del() is a no-op if the event isn't active. | 
|  | int rv = event_del(e.get()); | 
|  | pump_ = nullptr; | 
|  | watcher_ = nullptr; | 
|  | return (rv == 0); | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::FdWatchController::Init(std::unique_ptr<event> e) { | 
|  | DCHECK(e); | 
|  | DCHECK(!event_); | 
|  |  | 
|  | event_ = std::move(e); | 
|  | } | 
|  |  | 
|  | std::unique_ptr<event> MessagePumpLibevent::FdWatchController::ReleaseEvent() { | 
|  | return std::move(event_); | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::FdWatchController::OnFileCanReadWithoutBlocking( | 
|  | int fd, | 
|  | MessagePumpLibevent* pump) { | 
|  | // Since OnFileCanWriteWithoutBlocking() gets called first, it can stop | 
|  | // watching the file descriptor. | 
|  | if (!watcher_) | 
|  | return; | 
|  | watcher_->OnFileCanReadWithoutBlocking(fd); | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::FdWatchController::OnFileCanWriteWithoutBlocking( | 
|  | int fd, | 
|  | MessagePumpLibevent* pump) { | 
|  | DCHECK(watcher_); | 
|  | watcher_->OnFileCanWriteWithoutBlocking(fd); | 
|  | } | 
|  |  | 
|  | MessagePumpLibevent::MessagePumpLibevent() | 
|  | : keep_running_(true), | 
|  | in_run_(false), | 
|  | processed_io_events_(false), | 
|  | event_base_(event_base_new()), | 
|  | wakeup_pipe_in_(-1), | 
|  | wakeup_pipe_out_(-1) { | 
|  | if (!Init()) | 
|  | NOTREACHED(); | 
|  | } | 
|  |  | 
|  | MessagePumpLibevent::~MessagePumpLibevent() { | 
|  | DCHECK(wakeup_event_); | 
|  | DCHECK(event_base_); | 
|  | event_del(wakeup_event_); | 
|  | delete wakeup_event_; | 
|  | if (wakeup_pipe_in_ >= 0) { | 
|  | if (IGNORE_EINTR(close(wakeup_pipe_in_)) < 0) | 
|  | DPLOG(ERROR) << "close"; | 
|  | } | 
|  | if (wakeup_pipe_out_ >= 0) { | 
|  | if (IGNORE_EINTR(close(wakeup_pipe_out_)) < 0) | 
|  | DPLOG(ERROR) << "close"; | 
|  | } | 
|  | event_base_free(event_base_); | 
|  | } | 
|  |  | 
|  | bool MessagePumpLibevent::WatchFileDescriptor(int fd, | 
|  | bool persistent, | 
|  | int mode, | 
|  | FdWatchController* controller, | 
|  | FdWatcher* delegate) { | 
|  | DCHECK_GE(fd, 0); | 
|  | DCHECK(controller); | 
|  | DCHECK(delegate); | 
|  | DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE); | 
|  | // WatchFileDescriptor should be called on the pump thread. It is not | 
|  | // threadsafe, and your watcher may never be registered. | 
|  | DCHECK(watch_file_descriptor_caller_checker_.CalledOnValidThread()); | 
|  |  | 
|  | int event_mask = persistent ? EV_PERSIST : 0; | 
|  | if (mode & WATCH_READ) { | 
|  | event_mask |= EV_READ; | 
|  | } | 
|  | if (mode & WATCH_WRITE) { | 
|  | event_mask |= EV_WRITE; | 
|  | } | 
|  |  | 
|  | std::unique_ptr<event> evt(controller->ReleaseEvent()); | 
|  | if (!evt) { | 
|  | // Ownership is transferred to the controller. | 
|  | evt.reset(new event); | 
|  | } else { | 
|  | // Make sure we don't pick up any funky internal libevent masks. | 
|  | int old_interest_mask = evt->ev_events & (EV_READ | EV_WRITE | EV_PERSIST); | 
|  |  | 
|  | // Combine old/new event masks. | 
|  | event_mask |= old_interest_mask; | 
|  |  | 
|  | // Must disarm the event before we can reuse it. | 
|  | event_del(evt.get()); | 
|  |  | 
|  | // It's illegal to use this function to listen on 2 separate fds with the | 
|  | // same |controller|. | 
|  | if (EVENT_FD(evt.get()) != fd) { | 
|  | NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd; | 
|  | return false; | 
|  | } | 
|  | } | 
|  |  | 
|  | // Set current interest mask and message pump for this event. | 
|  | event_set(evt.get(), fd, event_mask, OnLibeventNotification, controller); | 
|  |  | 
|  | // Tell libevent which message pump this socket will belong to when we add it. | 
|  | if (event_base_set(event_base_, evt.get())) { | 
|  | DPLOG(ERROR) << "event_base_set(fd=" << EVENT_FD(evt.get()) << ")"; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | // Add this socket to the list of monitored sockets. | 
|  | if (event_add(evt.get(), nullptr)) { | 
|  | DPLOG(ERROR) << "event_add failed(fd=" << EVENT_FD(evt.get()) << ")"; | 
|  | return false; | 
|  | } | 
|  |  | 
|  | controller->Init(std::move(evt)); | 
|  | controller->set_watcher(delegate); | 
|  | controller->set_pump(this); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | // Tell libevent to break out of inner loop. | 
|  | static void timer_callback(int fd, short events, void* context) { | 
|  | event_base_loopbreak((struct event_base*)context); | 
|  | } | 
|  |  | 
|  | // Reentrant! | 
|  | void MessagePumpLibevent::Run(Delegate* delegate) { | 
|  | AutoReset<bool> auto_reset_keep_running(&keep_running_, true); | 
|  | AutoReset<bool> auto_reset_in_run(&in_run_, true); | 
|  |  | 
|  | // event_base_loopexit() + EVLOOP_ONCE is leaky, see http://crbug.com/25641. | 
|  | // Instead, make our own timer and reuse it on each call to event_base_loop(). | 
|  | std::unique_ptr<event> timer_event(new event); | 
|  |  | 
|  | for (;;) { | 
|  | #if defined(OS_MACOSX) | 
|  | mac::ScopedNSAutoreleasePool autorelease_pool; | 
|  | #endif | 
|  |  | 
|  | bool did_work = delegate->DoWork(); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | event_base_loop(event_base_, EVLOOP_NONBLOCK); | 
|  | did_work |= processed_io_events_; | 
|  | processed_io_events_ = false; | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | did_work |= delegate->DoDelayedWork(&delayed_work_time_); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | if (did_work) | 
|  | continue; | 
|  |  | 
|  | did_work = delegate->DoIdleWork(); | 
|  | if (!keep_running_) | 
|  | break; | 
|  |  | 
|  | if (did_work) | 
|  | continue; | 
|  |  | 
|  | // EVLOOP_ONCE tells libevent to only block once, | 
|  | // but to service all pending events when it wakes up. | 
|  | if (delayed_work_time_.is_null()) { | 
|  | event_base_loop(event_base_, EVLOOP_ONCE); | 
|  | } else { | 
|  | TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); | 
|  | if (delay > TimeDelta()) { | 
|  | struct timeval poll_tv; | 
|  | poll_tv.tv_sec = delay.InSeconds(); | 
|  | poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond; | 
|  | event_set(timer_event.get(), -1, 0, timer_callback, event_base_); | 
|  | event_base_set(event_base_, timer_event.get()); | 
|  | event_add(timer_event.get(), &poll_tv); | 
|  | event_base_loop(event_base_, EVLOOP_ONCE); | 
|  | event_del(timer_event.get()); | 
|  | } else { | 
|  | // It looks like delayed_work_time_ indicates a time in the past, so we | 
|  | // need to call DoDelayedWork now. | 
|  | delayed_work_time_ = TimeTicks(); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (!keep_running_) | 
|  | break; | 
|  | } | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::Quit() { | 
|  | DCHECK(in_run_) << "Quit was called outside of Run!"; | 
|  | // Tell both libevent and Run that they should break out of their loops. | 
|  | keep_running_ = false; | 
|  | ScheduleWork(); | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::ScheduleWork() { | 
|  | // Tell libevent (in a threadsafe way) that it should break out of its loop. | 
|  | char buf = 0; | 
|  | int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1)); | 
|  | DCHECK(nwrite == 1 || errno == EAGAIN) | 
|  | << "[nwrite:" << nwrite << "] [errno:" << errno << "]"; | 
|  | } | 
|  |  | 
|  | void MessagePumpLibevent::ScheduleDelayedWork( | 
|  | const TimeTicks& delayed_work_time) { | 
|  | // We know that we can't be blocked on Wait right now since this method can | 
|  | // only be called on the same thread as Run, so we only need to update our | 
|  | // record of how long to sleep when we do sleep. | 
|  | delayed_work_time_ = delayed_work_time; | 
|  | } | 
|  |  | 
|  | bool MessagePumpLibevent::Init() { | 
|  | int fds[2]; | 
|  | if (!CreateLocalNonBlockingPipe(fds)) { | 
|  | DPLOG(ERROR) << "pipe creation failed"; | 
|  | return false; | 
|  | } | 
|  | wakeup_pipe_out_ = fds[0]; | 
|  | wakeup_pipe_in_ = fds[1]; | 
|  |  | 
|  | wakeup_event_ = new event; | 
|  | event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST, | 
|  | OnWakeup, this); | 
|  | event_base_set(event_base_, wakeup_event_); | 
|  |  | 
|  | if (event_add(wakeup_event_, nullptr)) | 
|  | return false; | 
|  | return true; | 
|  | } | 
|  |  | 
|  | // static | 
|  | void MessagePumpLibevent::OnLibeventNotification(int fd, | 
|  | short flags, | 
|  | void* context) { | 
|  | FdWatchController* controller = static_cast<FdWatchController*>(context); | 
|  | DCHECK(controller); | 
|  | TRACE_EVENT2("toplevel", "MessagePumpLibevent::OnLibeventNotification", | 
|  | "src_file", controller->created_from_location().file_name(), | 
|  | "src_func", controller->created_from_location().function_name()); | 
|  | TRACE_HEAP_PROFILER_API_SCOPED_TASK_EXECUTION heap_profiler_scope( | 
|  | controller->created_from_location().file_name()); | 
|  |  | 
|  | MessagePumpLibevent* pump = controller->pump(); | 
|  | pump->processed_io_events_ = true; | 
|  |  | 
|  | if ((flags & (EV_READ | EV_WRITE)) == (EV_READ | EV_WRITE)) { | 
|  | // Both callbacks will be called. It is necessary to check that |controller| | 
|  | // is not destroyed. | 
|  | bool controller_was_destroyed = false; | 
|  | controller->was_destroyed_ = &controller_was_destroyed; | 
|  | controller->OnFileCanWriteWithoutBlocking(fd, pump); | 
|  | if (!controller_was_destroyed) | 
|  | controller->OnFileCanReadWithoutBlocking(fd, pump); | 
|  | if (!controller_was_destroyed) | 
|  | controller->was_destroyed_ = nullptr; | 
|  | } else if (flags & EV_WRITE) { | 
|  | controller->OnFileCanWriteWithoutBlocking(fd, pump); | 
|  | } else if (flags & EV_READ) { | 
|  | controller->OnFileCanReadWithoutBlocking(fd, pump); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Called if a byte is received on the wakeup pipe. | 
|  | // static | 
|  | void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) { | 
|  | MessagePumpLibevent* that = static_cast<MessagePumpLibevent*>(context); | 
|  | DCHECK(that->wakeup_pipe_out_ == socket); | 
|  |  | 
|  | // Remove and discard the wakeup byte. | 
|  | char buf; | 
|  | int nread = HANDLE_EINTR(read(socket, &buf, 1)); | 
|  | DCHECK_EQ(nread, 1); | 
|  | that->processed_io_events_ = true; | 
|  | // Tell libevent to break out of inner loop. | 
|  | event_base_loopbreak(that->event_base_); | 
|  | } | 
|  |  | 
|  | }  // namespace base |