2 * Copyright (c) 2009-2010 Niels Provos and Nick Mathewson
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. The name of the author may not be used to endorse or promote products
15 * derived from this software without specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 #include "event2/event-config.h"
31 #ifdef _EVENT_HAVE_SYS_TIME_H
39 #ifdef _EVENT_HAVE_STDARG_H
42 #ifdef _EVENT_HAVE_UNISTD_H
51 #include <sys/queue.h>
53 #include "event2/util.h"
54 #include "event2/bufferevent.h"
55 #include "event2/buffer.h"
56 #include "event2/bufferevent_struct.h"
57 #include "event2/event.h"
58 #include "event2/util.h"
59 #include "event-internal.h"
60 #include "log-internal.h"
61 #include "mm-internal.h"
62 #include "bufferevent-internal.h"
63 #include "util-internal.h"
64 #include "iocp-internal.h"
67 static int be_async_enable(struct bufferevent *, short);
68 static int be_async_disable(struct bufferevent *, short);
69 static void be_async_destruct(struct bufferevent *);
70 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
71 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73 struct bufferevent_async {
74 struct bufferevent_private bev;
75 struct event_overlapped connect_overlapped;
76 struct event_overlapped read_overlapped;
77 struct event_overlapped write_overlapped;
78 unsigned read_in_progress : 1;
79 unsigned write_in_progress : 1;
81 unsigned read_added : 1;
82 unsigned write_added : 1;
85 const struct bufferevent_ops bufferevent_ops_async = {
87 evutil_offsetof(struct bufferevent_async, bev.bev),
91 _bufferevent_generic_adj_timeouts,
96 static inline struct bufferevent_async *
97 upcast(struct bufferevent *bev)
99 struct bufferevent_async *bev_a;
100 if (bev->be_ops != &bufferevent_ops_async)
102 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
106 static inline struct bufferevent_async *
107 upcast_connect(struct event_overlapped *eo)
109 struct bufferevent_async *bev_a;
110 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
111 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
115 static inline struct bufferevent_async *
116 upcast_read(struct event_overlapped *eo)
118 struct bufferevent_async *bev_a;
119 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
120 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
124 static inline struct bufferevent_async *
125 upcast_write(struct event_overlapped *eo)
127 struct bufferevent_async *bev_a;
128 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
129 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
134 bev_async_del_write(struct bufferevent_async *beva)
136 struct bufferevent *bev = &beva->bev.bev;
138 if (beva->write_added) {
139 beva->write_added = 0;
140 event_base_del_virtual(bev->ev_base);
145 bev_async_del_read(struct bufferevent_async *beva)
147 struct bufferevent *bev = &beva->bev.bev;
149 if (beva->read_added) {
150 beva->read_added = 0;
151 event_base_del_virtual(bev->ev_base);
156 bev_async_add_write(struct bufferevent_async *beva)
158 struct bufferevent *bev = &beva->bev.bev;
160 if (!beva->write_added) {
161 beva->write_added = 1;
162 event_base_add_virtual(bev->ev_base);
167 bev_async_add_read(struct bufferevent_async *beva)
169 struct bufferevent *bev = &beva->bev.bev;
171 if (!beva->read_added) {
172 beva->read_added = 1;
173 event_base_add_virtual(bev->ev_base);
178 bev_async_consider_writing(struct bufferevent_async *beva)
182 struct bufferevent *bev = &beva->bev.bev;
184 /* Don't write if there's a write in progress, or we do not
185 * want to write, or when there's nothing left to write. */
186 if (beva->write_in_progress)
188 if (!beva->ok || !(bev->enabled&EV_WRITE) ||
189 !evbuffer_get_length(bev->output)) {
190 bev_async_del_write(beva);
194 at_most = evbuffer_get_length(bev->output);
196 /* XXXX This over-commits. */
197 /* This is safe so long as bufferevent_get_write_max never returns
198 * more than INT_MAX. That's true for now. XXXX */
199 limit = (int)_bufferevent_get_write_max(&beva->bev);
200 if (at_most >= (size_t)limit && limit >= 0)
203 if (beva->bev.write_suspended) {
204 bev_async_del_write(beva);
208 /* XXXX doesn't respect low-water mark very well. */
209 bufferevent_incref(bev);
210 if (evbuffer_launch_write(bev->output, at_most,
211 &beva->write_overlapped)) {
212 bufferevent_decref(bev);
214 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
216 beva->write_in_progress = 1;
217 bev_async_add_write(beva);
222 bev_async_consider_reading(struct bufferevent_async *beva)
228 struct bufferevent *bev = &beva->bev.bev;
230 /* Don't read if there is a read in progress, or we do not
232 if (beva->read_in_progress)
234 if (!beva->ok || !(bev->enabled&EV_READ)) {
235 bev_async_del_read(beva);
239 /* Don't read if we're full */
240 cur_size = evbuffer_get_length(bev->input);
241 read_high = bev->wm_read.high;
243 if (cur_size >= read_high) {
244 bev_async_del_read(beva);
247 at_most = read_high - cur_size;
249 at_most = 16384; /* FIXME totally magic. */
252 /* XXXX This over-commits. */
253 /* XXXX see also not above on cast on _bufferevent_get_write_max() */
254 limit = (int)_bufferevent_get_read_max(&beva->bev);
255 if (at_most >= (size_t)limit && limit >= 0)
258 if (beva->bev.read_suspended) {
259 bev_async_del_read(beva);
263 bufferevent_incref(bev);
264 if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
266 bufferevent_decref(bev);
267 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
269 beva->read_in_progress = 1;
270 bev_async_add_read(beva);
277 be_async_outbuf_callback(struct evbuffer *buf,
278 const struct evbuffer_cb_info *cbinfo,
281 struct bufferevent *bev = arg;
282 struct bufferevent_async *bev_async = upcast(bev);
284 /* If we added data to the outbuf and were not writing before,
285 * we may want to write now. */
287 _bufferevent_incref_and_lock(bev);
290 bev_async_consider_writing(bev_async);
292 _bufferevent_decref_and_unlock(bev);
296 be_async_inbuf_callback(struct evbuffer *buf,
297 const struct evbuffer_cb_info *cbinfo,
300 struct bufferevent *bev = arg;
301 struct bufferevent_async *bev_async = upcast(bev);
303 /* If we drained data from the inbuf and were not reading before,
304 * we may want to read now */
306 _bufferevent_incref_and_lock(bev);
308 if (cbinfo->n_deleted)
309 bev_async_consider_reading(bev_async);
311 _bufferevent_decref_and_unlock(bev);
315 be_async_enable(struct bufferevent *buf, short what)
317 struct bufferevent_async *bev_async = upcast(buf);
322 /* NOTE: This interferes with non-blocking connect */
324 BEV_RESET_GENERIC_READ_TIMEOUT(buf);
326 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
328 /* If we newly enable reading or writing, and we aren't reading or
329 writing already, consider launching a new read or write. */
332 bev_async_consider_reading(bev_async);
334 bev_async_consider_writing(bev_async);
339 be_async_disable(struct bufferevent *bev, short what)
341 struct bufferevent_async *bev_async = upcast(bev);
342 /* XXXX If we disable reading or writing, we may want to consider
343 * canceling any in-progress read or write operation, though it might
346 if (what & EV_READ) {
347 BEV_DEL_GENERIC_READ_TIMEOUT(bev);
348 bev_async_del_read(bev_async);
350 if (what & EV_WRITE) {
351 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
352 bev_async_del_write(bev_async);
359 be_async_destruct(struct bufferevent *bev)
361 struct bufferevent_async *bev_async = upcast(bev);
362 struct bufferevent_private *bev_p = BEV_UPCAST(bev);
365 EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
366 !upcast(bev)->read_in_progress);
368 bev_async_del_read(bev_async);
369 bev_async_del_write(bev_async);
371 fd = _evbuffer_overlapped_get_fd(bev->input);
372 if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
373 evutil_closesocket(fd);
374 /* delete this in case non-blocking connect was used */
375 if (event_initialized(&bev->ev_write)) {
376 event_del(&bev->ev_write);
377 _bufferevent_del_generic_timeout_cbs(bev);
381 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
382 * we use WSAGetOverlappedResult to translate. */
384 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
389 fd = _evbuffer_overlapped_get_fd(bev->input);
390 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
394 be_async_flush(struct bufferevent *bev, short what,
395 enum bufferevent_flush_mode mode)
401 connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
402 ev_ssize_t nbytes, int ok)
404 struct bufferevent_async *bev_a = upcast_connect(eo);
405 struct bufferevent *bev = &bev_a->bev.bev;
409 EVUTIL_ASSERT(bev_a->bev.connecting);
410 bev_a->bev.connecting = 0;
413 bufferevent_async_set_connected(bev);
415 bev_async_set_wsa_error(bev, eo);
417 _bufferevent_run_eventcb(bev,
418 ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
420 event_base_del_virtual(bev->ev_base);
422 _bufferevent_decref_and_unlock(bev);
426 read_complete(struct event_overlapped *eo, ev_uintptr_t key,
427 ev_ssize_t nbytes, int ok)
429 struct bufferevent_async *bev_a = upcast_read(eo);
430 struct bufferevent *bev = &bev_a->bev.bev;
431 short what = BEV_EVENT_READING;
434 EVUTIL_ASSERT(bev_a->read_in_progress);
436 evbuffer_commit_read(bev->input, nbytes);
437 bev_a->read_in_progress = 0;
440 bev_async_set_wsa_error(bev, eo);
444 BEV_RESET_GENERIC_READ_TIMEOUT(bev);
445 _bufferevent_decrement_read_buckets(&bev_a->bev,
447 if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
448 _bufferevent_run_readcb(bev);
449 bev_async_consider_reading(bev_a);
451 what |= BEV_EVENT_ERROR;
453 _bufferevent_run_eventcb(bev, what);
454 } else if (!nbytes) {
455 what |= BEV_EVENT_EOF;
457 _bufferevent_run_eventcb(bev, what);
461 _bufferevent_decref_and_unlock(bev);
465 write_complete(struct event_overlapped *eo, ev_uintptr_t key,
466 ev_ssize_t nbytes, int ok)
468 struct bufferevent_async *bev_a = upcast_write(eo);
469 struct bufferevent *bev = &bev_a->bev.bev;
470 short what = BEV_EVENT_WRITING;
473 EVUTIL_ASSERT(bev_a->write_in_progress);
474 evbuffer_commit_write(bev->output, nbytes);
475 bev_a->write_in_progress = 0;
478 bev_async_set_wsa_error(bev, eo);
482 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
483 _bufferevent_decrement_write_buckets(&bev_a->bev,
485 if (evbuffer_get_length(bev->output) <=
487 _bufferevent_run_writecb(bev);
488 bev_async_consider_writing(bev_a);
490 what |= BEV_EVENT_ERROR;
492 _bufferevent_run_eventcb(bev, what);
493 } else if (!nbytes) {
494 what |= BEV_EVENT_EOF;
496 _bufferevent_run_eventcb(bev, what);
500 _bufferevent_decref_and_unlock(bev);
504 bufferevent_async_new(struct event_base *base,
505 evutil_socket_t fd, int options)
507 struct bufferevent_async *bev_a;
508 struct bufferevent *bev;
509 struct event_iocp_port *iocp;
511 options |= BEV_OPT_THREADSAFE;
513 if (!(iocp = event_base_get_iocp(base)))
516 if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
517 int err = GetLastError();
518 /* We may have alrady associated this fd with a port.
519 * Let's hope it's this port, and that the error code
520 * for doing this neer changes. */
521 if (err != ERROR_INVALID_PARAMETER)
525 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
528 bev = &bev_a->bev.bev;
529 if (!(bev->input = evbuffer_overlapped_new(fd))) {
533 if (!(bev->output = evbuffer_overlapped_new(fd))) {
534 evbuffer_free(bev->input);
539 if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
543 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
544 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
546 event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
547 event_overlapped_init(&bev_a->read_overlapped, read_complete);
548 event_overlapped_init(&bev_a->write_overlapped, write_complete);
552 _bufferevent_init_generic_timeout_cbs(bev);
556 bufferevent_free(&bev_a->bev.bev);
561 bufferevent_async_set_connected(struct bufferevent *bev)
563 struct bufferevent_async *bev_async = upcast(bev);
565 _bufferevent_init_generic_timeout_cbs(bev);
566 /* Now's a good time to consider reading/writing */
567 be_async_enable(bev, bev->enabled);
571 bufferevent_async_can_connect(struct bufferevent *bev)
573 const struct win32_extension_fns *ext =
574 event_get_win32_extension_fns();
576 if (BEV_IS_ASYNC(bev) &&
577 event_base_get_iocp(bev->ev_base) &&
578 ext && ext->ConnectEx)
585 bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
586 const struct sockaddr *sa, int socklen)
589 struct bufferevent_async *bev_async = upcast(bev);
590 struct sockaddr_storage ss;
591 const struct win32_extension_fns *ext =
592 event_get_win32_extension_fns();
594 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
596 /* ConnectEx() requires that the socket be bound to an address
597 * with bind() before using, otherwise it will fail. We attempt
598 * to issue a bind() here, taking into account that the error
599 * code is set to WSAEINVAL when the socket is already bound. */
600 memset(&ss, 0, sizeof(ss));
601 if (sa->sa_family == AF_INET) {
602 struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
603 sin->sin_family = AF_INET;
604 sin->sin_addr.s_addr = INADDR_ANY;
605 } else if (sa->sa_family == AF_INET6) {
606 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
607 sin6->sin6_family = AF_INET6;
608 sin6->sin6_addr = in6addr_any;
610 /* Well, the user will have to bind() */
613 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
614 WSAGetLastError() != WSAEINVAL)
617 event_base_add_virtual(bev->ev_base);
618 bufferevent_incref(bev);
619 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
620 &bev_async->connect_overlapped.overlapped);
621 if (rc || WSAGetLastError() == ERROR_IO_PENDING)
624 event_base_del_virtual(bev->ev_base);
625 bufferevent_decref(bev);
631 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
632 union bufferevent_ctrl_data *data)
635 case BEV_CTRL_GET_FD:
636 data->fd = _evbuffer_overlapped_get_fd(bev->input);
638 case BEV_CTRL_SET_FD: {
639 struct event_iocp_port *iocp;
641 if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
643 if (!(iocp = event_base_get_iocp(bev->ev_base)))
645 if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
647 _evbuffer_overlapped_set_fd(bev->input, data->fd);
648 _evbuffer_overlapped_set_fd(bev->output, data->fd);
651 case BEV_CTRL_GET_UNDERLYING: