/*
- * Copyright (c) 2009-2010 Niels Provos and Nick Mathewson
+ * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
*
* All rights reserved.
*
struct event_overlapped connect_overlapped;
struct event_overlapped read_overlapped;
struct event_overlapped write_overlapped;
- unsigned read_in_progress : 1;
- unsigned write_in_progress : 1;
+ size_t read_in_progress;
+ size_t write_in_progress;
unsigned ok : 1;
unsigned read_added : 1;
unsigned write_added : 1;
/* Don't write if there's a write in progress, or we do not
* want to write, or when there's nothing left to write. */
- if (beva->write_in_progress)
+ if (beva->write_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
!evbuffer_get_length(bev->output)) {
at_most = evbuffer_get_length(bev->output);
- /* XXXX This over-commits. */
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
limit = (int)_bufferevent_get_write_max(&beva->bev);
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- beva->write_in_progress = 1;
+ beva->write_in_progress = at_most;
+ _bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (beva->read_in_progress)
+ if (beva->read_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_READ)) {
bev_async_del_read(beva);
bufferevent_incref(bev);
if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
- bufferevent_decref(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
+ bufferevent_decref(bev);
} else {
- beva->read_in_progress = 1;
+ beva->read_in_progress = at_most;
+ _bufferevent_decrement_read_buckets(&beva->bev, at_most);
bev_async_add_read(beva);
}
if (!bev_async->ok)
return -1;
- /* NOTE: This interferes with non-blocking connect */
+ if (bev_async->bev.connecting) {
+ /* Don't launch anything during connection attempts. */
+ return 0;
+ }
+
if (what & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(buf);
if (what & EV_WRITE)
bev_async_del_write(bev_async);
fd = _evbuffer_overlapped_get_fd(bev->input);
- if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+ if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
+ /* XXXX possible double-close */
evutil_closesocket(fd);
+ }
/* delete this in case non-blocking connect was used */
if (event_initialized(&bev->ev_write)) {
event_del(&bev->ev_write);
struct bufferevent_async *bev_a = upcast_read(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
-
+ ev_ssize_t amount_unread;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->read_in_progress);
+ amount_unread = bev_a->read_in_progress - nbytes;
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
+ if (amount_unread)
+ _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- _bufferevent_decrement_read_buckets(&bev_a->bev,
- nbytes);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
struct bufferevent_async *bev_a = upcast_write(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
+ ev_ssize_t amount_unwritten;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);
+
+ amount_unwritten = bev_a->write_in_progress - nbytes;
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
+ if (amount_unwritten)
+ _bufferevent_decrement_write_buckets(&bev_a->bev,
+ -amount_unwritten);
+
+
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- _bufferevent_decrement_write_buckets(&bev_a->bev,
- nbytes);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
_evbuffer_overlapped_set_fd(bev->output, data->fd);
return 0;
}
+ case BEV_CTRL_CANCEL_ALL: {
+ struct bufferevent_async *bev_a = upcast(bev);
+ evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
+ if (fd != (evutil_socket_t)INVALID_SOCKET &&
+ (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
+ closesocket(fd);
+ }
+ bev_a->ok = 0;
+ return 0;
+ }
case BEV_CTRL_GET_UNDERLYING:
default:
return -1;
}
}
+
+