X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?p=netatalk.git;a=blobdiff_plain;f=libevent%2Fbufferevent_async.c;h=f34ca5f1152fec8ec725488f99b9c0480b5d321c;hp=76a161623bf708850ebca374a43b0d6d061a9c07;hb=3a84db87064922ad10ac10cc1d6833380e575995;hpb=15c1fc2f2328736dd428ec3be37c893d8ee2e065 diff --git a/libevent/bufferevent_async.c b/libevent/bufferevent_async.c index 76a16162..f34ca5f1 100644 --- a/libevent/bufferevent_async.c +++ b/libevent/bufferevent_async.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009-2010 Niels Provos and Nick Mathewson + * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson * * All rights reserved. * @@ -80,8 +80,8 @@ struct bufferevent_async { struct event_overlapped connect_overlapped; struct event_overlapped read_overlapped; struct event_overlapped write_overlapped; - unsigned read_in_progress : 1; - unsigned write_in_progress : 1; + size_t read_in_progress; + size_t write_in_progress; unsigned ok : 1; unsigned read_added : 1; unsigned write_added : 1; @@ -188,7 +188,7 @@ bev_async_consider_writing(struct bufferevent_async *beva) /* Don't write if there's a write in progress, or we do not * want to write, or when there's nothing left to write. */ - if (beva->write_in_progress) + if (beva->write_in_progress || beva->bev.connecting) return; if (!beva->ok || !(bev->enabled&EV_WRITE) || !evbuffer_get_length(bev->output)) { @@ -198,7 +198,6 @@ bev_async_consider_writing(struct bufferevent_async *beva) at_most = evbuffer_get_length(bev->output); - /* XXXX This over-commits. */ /* This is safe so long as bufferevent_get_write_max never returns * more than INT_MAX. That's true for now. XXXX */ limit = (int)_bufferevent_get_write_max(&beva->bev); @@ -218,7 +217,8 @@ bev_async_consider_writing(struct bufferevent_async *beva) beva->ok = 0; _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); } else { - beva->write_in_progress = 1; + beva->write_in_progress = at_most; + _bufferevent_decrement_write_buckets(&beva->bev, at_most); bev_async_add_write(beva); } } @@ -234,7 +234,7 @@ bev_async_consider_reading(struct bufferevent_async *beva) /* Don't read if there is a read in progress, or we do not * want to read. */ - if (beva->read_in_progress) + if (beva->read_in_progress || beva->bev.connecting) return; if (!beva->ok || !(bev->enabled&EV_READ)) { bev_async_del_read(beva); @@ -268,10 +268,11 @@ bev_async_consider_reading(struct bufferevent_async *beva) bufferevent_incref(bev); if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { beva->ok = 0; - bufferevent_decref(bev); _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + bufferevent_decref(bev); } else { - beva->read_in_progress = 1; + beva->read_in_progress = at_most; + _bufferevent_decrement_read_buckets(&beva->bev, at_most); bev_async_add_read(beva); } @@ -324,7 +325,11 @@ be_async_enable(struct bufferevent *buf, short what) if (!bev_async->ok) return -1; - /* NOTE: This interferes with non-blocking connect */ + if (bev_async->bev.connecting) { + /* Don't launch anything during connection attempts. */ + return 0; + } + if (what & EV_READ) BEV_RESET_GENERIC_READ_TIMEOUT(buf); if (what & EV_WRITE) @@ -374,8 +379,10 @@ be_async_destruct(struct bufferevent *bev) bev_async_del_write(bev_async); fd = _evbuffer_overlapped_get_fd(bev->input); - if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { + /* XXXX possible double-close */ evutil_closesocket(fd); + } /* delete this in case non-blocking connect was used */ if (event_initialized(&bev->ev_write)) { event_del(&bev->ev_write); @@ -438,12 +445,15 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent_async *bev_a = upcast_read(eo); struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_READING; - + ev_ssize_t amount_unread; BEV_LOCK(bev); EVUTIL_ASSERT(bev_a->read_in_progress); + amount_unread = bev_a->read_in_progress - nbytes; evbuffer_commit_read(bev->input, nbytes); bev_a->read_in_progress = 0; + if (amount_unread) + _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); if (!ok) bev_async_set_wsa_error(bev, eo); @@ -451,8 +461,6 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key, if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - _bufferevent_decrement_read_buckets(&bev_a->bev, - nbytes); if (evbuffer_get_length(bev->input) >= bev->wm_read.low) _bufferevent_run_readcb(bev); bev_async_consider_reading(bev_a); @@ -477,20 +485,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key, struct bufferevent_async *bev_a = upcast_write(eo); struct bufferevent *bev = &bev_a->bev.bev; short what = BEV_EVENT_WRITING; + ev_ssize_t amount_unwritten; BEV_LOCK(bev); EVUTIL_ASSERT(bev_a->write_in_progress); + + amount_unwritten = bev_a->write_in_progress - nbytes; evbuffer_commit_write(bev->output, nbytes); bev_a->write_in_progress = 0; + if (amount_unwritten) + _bufferevent_decrement_write_buckets(&bev_a->bev, + -amount_unwritten); + + if (!ok) bev_async_set_wsa_error(bev, eo); if (bev_a->ok) { if (ok && nbytes) { BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - _bufferevent_decrement_write_buckets(&bev_a->bev, - nbytes); if (evbuffer_get_length(bev->output) <= bev->wm_write.low) _bufferevent_run_writecb(bev); @@ -657,8 +671,20 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, _evbuffer_overlapped_set_fd(bev->output, data->fd); return 0; } + case BEV_CTRL_CANCEL_ALL: { + struct bufferevent_async *bev_a = upcast(bev); + evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); + if (fd != (evutil_socket_t)INVALID_SOCKET && + (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { + closesocket(fd); + } + bev_a->ok = 0; + return 0; + } case BEV_CTRL_GET_UNDERLYING: default: return -1; } } + +