| /* | 
 |  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> | 
 |  * All rights reserved. | 
 |  * | 
 |  * Redistribution and use in source and binary forms, with or without | 
 |  * modification, are permitted provided that the following conditions | 
 |  * are met: | 
 |  * 1. Redistributions of source code must retain the above copyright | 
 |  *    notice, this list of conditions and the following disclaimer. | 
 |  * 2. Redistributions in binary form must reproduce the above copyright | 
 |  *    notice, this list of conditions and the following disclaimer in the | 
 |  *    documentation and/or other materials provided with the distribution. | 
 |  * 3. The name of the author may not be used to endorse or promote products | 
 |  *    derived from this software without specific prior written permission. | 
 |  * | 
 |  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR | 
 |  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES | 
 |  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. | 
 |  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, | 
 |  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT | 
 |  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | 
 |  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY | 
 |  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | 
 |  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF | 
 |  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | 
 |  */ | 
 |  | 
 | #include <sys/types.h> | 
 |  | 
 | #ifdef HAVE_CONFIG_H | 
 | #include "config.h" | 
 | #endif | 
 |  | 
 | #ifdef HAVE_SYS_TIME_H | 
 | #include <sys/time.h> | 
 | #endif | 
 |  | 
 | #include <errno.h> | 
 | #include <stdio.h> | 
 | #include <stdlib.h> | 
 | #include <string.h> | 
 | #ifdef HAVE_STDARG_H | 
 | #include <stdarg.h> | 
 | #endif | 
 |  | 
 | #ifdef WIN32 | 
 | #include <winsock2.h> | 
 | #endif | 
 |  | 
 | #include "evutil.h" | 
 | #include "event.h" | 
 |  | 
 | /* prototypes */ | 
 |  | 
 | void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); | 
 |  | 
 | static int | 
 | bufferevent_add(struct event *ev, int timeout) | 
 | { | 
 | 	struct timeval tv, *ptv = NULL; | 
 |  | 
 | 	if (timeout) { | 
 | 		evutil_timerclear(&tv); | 
 | 		tv.tv_sec = timeout; | 
 | 		ptv = &tv; | 
 | 	} | 
 |  | 
 | 	return (event_add(ev, ptv)); | 
 | } | 
 |  | 
 | /*  | 
 |  * This callback is executed when the size of the input buffer changes. | 
 |  * We use it to apply back pressure on the reading side. | 
 |  */ | 
 |  | 
 | void | 
 | bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, | 
 |     void *arg) { | 
 | 	struct bufferevent *bufev = arg; | 
 | 	/*  | 
 | 	 * If we are below the watermark then reschedule reading if it's | 
 | 	 * still enabled. | 
 | 	 */ | 
 | 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { | 
 | 		evbuffer_setcb(buf, NULL, NULL); | 
 |  | 
 | 		if (bufev->enabled & EV_READ) | 
 | 			bufferevent_add(&bufev->ev_read, bufev->timeout_read); | 
 | 	} | 
 | } | 
 |  | 
 | static void | 
 | bufferevent_readcb(int fd, short event, void *arg) | 
 | { | 
 | 	struct bufferevent *bufev = arg; | 
 | 	int res = 0; | 
 | 	short what = EVBUFFER_READ; | 
 | 	size_t len; | 
 | 	int howmuch = -1; | 
 |  | 
 | 	if (event == EV_TIMEOUT) { | 
 | 		what |= EVBUFFER_TIMEOUT; | 
 | 		goto error; | 
 | 	} | 
 |  | 
 | 	/* | 
 | 	 * If we have a high watermark configured then we don't want to | 
 | 	 * read more data than would make us reach the watermark. | 
 | 	 */ | 
 | 	if (bufev->wm_read.high != 0) { | 
 | 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); | 
 | 		/* we might have lowered the watermark, stop reading */ | 
 | 		if (howmuch <= 0) { | 
 | 			struct evbuffer *buf = bufev->input; | 
 | 			event_del(&bufev->ev_read); | 
 | 			evbuffer_setcb(buf, | 
 | 			    bufferevent_read_pressure_cb, bufev); | 
 | 			return; | 
 | 		} | 
 | 	} | 
 |  | 
 | 	res = evbuffer_read(bufev->input, fd, howmuch); | 
 | 	if (res == -1) { | 
 | 		if (errno == EAGAIN || errno == EINTR) | 
 | 			goto reschedule; | 
 | 		/* error case */ | 
 | 		what |= EVBUFFER_ERROR; | 
 | 	} else if (res == 0) { | 
 | 		/* eof case */ | 
 | 		what |= EVBUFFER_EOF; | 
 | 	} | 
 |  | 
 | 	if (res <= 0) | 
 | 		goto error; | 
 |  | 
 | 	bufferevent_add(&bufev->ev_read, bufev->timeout_read); | 
 |  | 
 | 	/* See if this callbacks meets the water marks */ | 
 | 	len = EVBUFFER_LENGTH(bufev->input); | 
 | 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) | 
 | 		return; | 
 | 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { | 
 | 		struct evbuffer *buf = bufev->input; | 
 | 		event_del(&bufev->ev_read); | 
 |  | 
 | 		/* Now schedule a callback for us when the buffer changes */ | 
 | 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); | 
 | 	} | 
 |  | 
 | 	/* Invoke the user callback - must always be called last */ | 
 | 	if (bufev->readcb != NULL) | 
 | 		(*bufev->readcb)(bufev, bufev->cbarg); | 
 | 	return; | 
 |  | 
 |  reschedule: | 
 | 	bufferevent_add(&bufev->ev_read, bufev->timeout_read); | 
 | 	return; | 
 |  | 
 |  error: | 
 | 	(*bufev->errorcb)(bufev, what, bufev->cbarg); | 
 | } | 
 |  | 
 | static void | 
 | bufferevent_writecb(int fd, short event, void *arg) | 
 | { | 
 | 	struct bufferevent *bufev = arg; | 
 | 	int res = 0; | 
 | 	short what = EVBUFFER_WRITE; | 
 |  | 
 | 	if (event == EV_TIMEOUT) { | 
 | 		what |= EVBUFFER_TIMEOUT; | 
 | 		goto error; | 
 | 	} | 
 |  | 
 | 	if (EVBUFFER_LENGTH(bufev->output)) { | 
 | 	    res = evbuffer_write(bufev->output, fd); | 
 | 	    if (res == -1) { | 
 | #ifndef WIN32 | 
 | /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not | 
 |  *set errno. thus this error checking is not portable*/ | 
 | 		    if (errno == EAGAIN || | 
 | 			errno == EINTR || | 
 | 			errno == EINPROGRESS) | 
 | 			    goto reschedule; | 
 | 		    /* error case */ | 
 | 		    what |= EVBUFFER_ERROR; | 
 |  | 
 | #else | 
 | 				goto reschedule; | 
 | #endif | 
 |  | 
 | 	    } else if (res == 0) { | 
 | 		    /* eof case */ | 
 | 		    what |= EVBUFFER_EOF; | 
 | 	    } | 
 | 	    if (res <= 0) | 
 | 		    goto error; | 
 | 	} | 
 |  | 
 | 	if (EVBUFFER_LENGTH(bufev->output) != 0) | 
 | 		bufferevent_add(&bufev->ev_write, bufev->timeout_write); | 
 |  | 
 | 	/* | 
 | 	 * Invoke the user callback if our buffer is drained or below the | 
 | 	 * low watermark. | 
 | 	 */ | 
 | 	if (bufev->writecb != NULL && | 
 | 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) | 
 | 		(*bufev->writecb)(bufev, bufev->cbarg); | 
 |  | 
 | 	return; | 
 |  | 
 |  reschedule: | 
 | 	if (EVBUFFER_LENGTH(bufev->output) != 0) | 
 | 		bufferevent_add(&bufev->ev_write, bufev->timeout_write); | 
 | 	return; | 
 |  | 
 |  error: | 
 | 	(*bufev->errorcb)(bufev, what, bufev->cbarg); | 
 | } | 
 |  | 
 | /* | 
 |  * Create a new buffered event object. | 
 |  * | 
 |  * The read callback is invoked whenever we read new data. | 
 |  * The write callback is invoked whenever the output buffer is drained. | 
 |  * The error callback is invoked on a write/read error or on EOF. | 
 |  * | 
 |  * Both read and write callbacks maybe NULL.  The error callback is not | 
 |  * allowed to be NULL and have to be provided always. | 
 |  */ | 
 |  | 
 | struct bufferevent * | 
 | bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, | 
 |     everrorcb errorcb, void *cbarg) | 
 | { | 
 | 	struct bufferevent *bufev; | 
 |  | 
 | 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) | 
 | 		return (NULL); | 
 |  | 
 | 	if ((bufev->input = evbuffer_new()) == NULL) { | 
 | 		free(bufev); | 
 | 		return (NULL); | 
 | 	} | 
 |  | 
 | 	if ((bufev->output = evbuffer_new()) == NULL) { | 
 | 		evbuffer_free(bufev->input); | 
 | 		free(bufev); | 
 | 		return (NULL); | 
 | 	} | 
 |  | 
 | 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); | 
 | 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); | 
 |  | 
 | 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); | 
 |  | 
 | 	/* | 
 | 	 * Set to EV_WRITE so that using bufferevent_write is going to | 
 | 	 * trigger a callback.  Reading needs to be explicitly enabled | 
 | 	 * because otherwise no data will be available. | 
 | 	 */ | 
 | 	bufev->enabled = EV_WRITE; | 
 |  | 
 | 	return (bufev); | 
 | } | 
 |  | 
 | void | 
 | bufferevent_setcb(struct bufferevent *bufev, | 
 |     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) | 
 | { | 
 | 	bufev->readcb = readcb; | 
 | 	bufev->writecb = writecb; | 
 | 	bufev->errorcb = errorcb; | 
 |  | 
 | 	bufev->cbarg = cbarg; | 
 | } | 
 |  | 
 | void | 
 | bufferevent_setfd(struct bufferevent *bufev, int fd) | 
 | { | 
 | 	event_del(&bufev->ev_read); | 
 | 	event_del(&bufev->ev_write); | 
 |  | 
 | 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); | 
 | 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); | 
 | 	if (bufev->ev_base != NULL) { | 
 | 		event_base_set(bufev->ev_base, &bufev->ev_read); | 
 | 		event_base_set(bufev->ev_base, &bufev->ev_write); | 
 | 	} | 
 |  | 
 | 	/* might have to manually trigger event registration */ | 
 | } | 
 |  | 
 | int | 
 | bufferevent_priority_set(struct bufferevent *bufev, int priority) | 
 | { | 
 | 	if (event_priority_set(&bufev->ev_read, priority) == -1) | 
 | 		return (-1); | 
 | 	if (event_priority_set(&bufev->ev_write, priority) == -1) | 
 | 		return (-1); | 
 |  | 
 | 	return (0); | 
 | } | 
 |  | 
 | /* Closing the file descriptor is the responsibility of the caller */ | 
 |  | 
 | void | 
 | bufferevent_free(struct bufferevent *bufev) | 
 | { | 
 | 	event_del(&bufev->ev_read); | 
 | 	event_del(&bufev->ev_write); | 
 |  | 
 | 	evbuffer_free(bufev->input); | 
 | 	evbuffer_free(bufev->output); | 
 |  | 
 | 	free(bufev); | 
 | } | 
 |  | 
 | /* | 
 |  * Returns 0 on success; | 
 |  *        -1 on failure. | 
 |  */ | 
 |  | 
 | int | 
 | bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) | 
 | { | 
 | 	int res; | 
 |  | 
 | 	res = evbuffer_add(bufev->output, data, size); | 
 |  | 
 | 	if (res == -1) | 
 | 		return (res); | 
 |  | 
 | 	/* If everything is okay, we need to schedule a write */ | 
 | 	if (size > 0 && (bufev->enabled & EV_WRITE)) | 
 | 		bufferevent_add(&bufev->ev_write, bufev->timeout_write); | 
 |  | 
 | 	return (res); | 
 | } | 
 |  | 
 | int | 
 | bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) | 
 | { | 
 | 	int res; | 
 |  | 
 | 	res = bufferevent_write(bufev, buf->buffer, buf->off); | 
 | 	if (res != -1) | 
 | 		evbuffer_drain(buf, buf->off); | 
 |  | 
 | 	return (res); | 
 | } | 
 |  | 
 | size_t | 
 | bufferevent_read(struct bufferevent *bufev, void *data, size_t size) | 
 | { | 
 | 	struct evbuffer *buf = bufev->input; | 
 |  | 
 | 	if (buf->off < size) | 
 | 		size = buf->off; | 
 |  | 
 | 	/* Copy the available data to the user buffer */ | 
 | 	memcpy(data, buf->buffer, size); | 
 |  | 
 | 	if (size) | 
 | 		evbuffer_drain(buf, size); | 
 |  | 
 | 	return (size); | 
 | } | 
 |  | 
 | int | 
 | bufferevent_enable(struct bufferevent *bufev, short event) | 
 | { | 
 | 	if (event & EV_READ) { | 
 | 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) | 
 | 			return (-1); | 
 | 	} | 
 | 	if (event & EV_WRITE) { | 
 | 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) | 
 | 			return (-1); | 
 | 	} | 
 |  | 
 | 	bufev->enabled |= event; | 
 | 	return (0); | 
 | } | 
 |  | 
 | int | 
 | bufferevent_disable(struct bufferevent *bufev, short event) | 
 | { | 
 | 	if (event & EV_READ) { | 
 | 		if (event_del(&bufev->ev_read) == -1) | 
 | 			return (-1); | 
 | 	} | 
 | 	if (event & EV_WRITE) { | 
 | 		if (event_del(&bufev->ev_write) == -1) | 
 | 			return (-1); | 
 | 	} | 
 |  | 
 | 	bufev->enabled &= ~event; | 
 | 	return (0); | 
 | } | 
 |  | 
 | /* | 
 |  * Sets the read and write timeout for a buffered event. | 
 |  */ | 
 |  | 
 | void | 
 | bufferevent_settimeout(struct bufferevent *bufev, | 
 |     int timeout_read, int timeout_write) { | 
 | 	bufev->timeout_read = timeout_read; | 
 | 	bufev->timeout_write = timeout_write; | 
 |  | 
 | 	if (event_pending(&bufev->ev_read, EV_READ, NULL)) | 
 | 		bufferevent_add(&bufev->ev_read, timeout_read); | 
 | 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) | 
 | 		bufferevent_add(&bufev->ev_write, timeout_write); | 
 | } | 
 |  | 
 | /* | 
 |  * Sets the water marks | 
 |  */ | 
 |  | 
 | void | 
 | bufferevent_setwatermark(struct bufferevent *bufev, short events, | 
 |     size_t lowmark, size_t highmark) | 
 | { | 
 | 	if (events & EV_READ) { | 
 | 		bufev->wm_read.low = lowmark; | 
 | 		bufev->wm_read.high = highmark; | 
 | 	} | 
 |  | 
 | 	if (events & EV_WRITE) { | 
 | 		bufev->wm_write.low = lowmark; | 
 | 		bufev->wm_write.high = highmark; | 
 | 	} | 
 |  | 
 | 	/* If the watermarks changed then see if we should call read again */ | 
 | 	bufferevent_read_pressure_cb(bufev->input, | 
 | 	    0, EVBUFFER_LENGTH(bufev->input), bufev); | 
 | } | 
 |  | 
 | int | 
 | bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) | 
 | { | 
 | 	int res; | 
 |  | 
 | 	bufev->ev_base = base; | 
 |  | 
 | 	res = event_base_set(base, &bufev->ev_read); | 
 | 	if (res == -1) | 
 | 		return (res); | 
 |  | 
 | 	res = event_base_set(base, &bufev->ev_write); | 
 | 	return (res); | 
 | } |