]> arthur.barton.de Git - netatalk.git/blobdiff - libevent/bufferevent_async.c
Writing metadata xattr on directories with sticky bit set, FR#94
[netatalk.git] / libevent / bufferevent_async.c
index b7284fda6cb2dbeab9fcfcdb5b68a14445e3a02b..f34ca5f1152fec8ec725488f99b9c0480b5d321c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009-2010 Niels Provos and Nick Mathewson
+ * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
  *
  * All rights reserved.
  *
 #include "util-internal.h"
 #include "iocp-internal.h"
 
+#ifndef SO_UPDATE_CONNECT_CONTEXT
+/* Mingw is sometimes missing this */
+#define SO_UPDATE_CONNECT_CONTEXT 0x7010
+#endif
+
 /* prototypes */
 static int be_async_enable(struct bufferevent *, short);
 static int be_async_disable(struct bufferevent *, short);
@@ -75,8 +80,8 @@ struct bufferevent_async {
        struct event_overlapped connect_overlapped;
        struct event_overlapped read_overlapped;
        struct event_overlapped write_overlapped;
-       unsigned read_in_progress : 1;
-       unsigned write_in_progress : 1;
+       size_t read_in_progress;
+       size_t write_in_progress;
        unsigned ok : 1;
        unsigned read_added : 1;
        unsigned write_added : 1;
@@ -183,7 +188,7 @@ bev_async_consider_writing(struct bufferevent_async *beva)
 
        /* Don't write if there's a write in progress, or we do not
         * want to write, or when there's nothing left to write. */
-       if (beva->write_in_progress)
+       if (beva->write_in_progress || beva->bev.connecting)
                return;
        if (!beva->ok || !(bev->enabled&EV_WRITE) ||
            !evbuffer_get_length(bev->output)) {
@@ -193,7 +198,6 @@ bev_async_consider_writing(struct bufferevent_async *beva)
 
        at_most = evbuffer_get_length(bev->output);
 
-       /* XXXX This over-commits. */
        /* This is safe so long as bufferevent_get_write_max never returns
         * more than INT_MAX.  That's true for now. XXXX */
        limit = (int)_bufferevent_get_write_max(&beva->bev);
@@ -213,7 +217,8 @@ bev_async_consider_writing(struct bufferevent_async *beva)
                beva->ok = 0;
                _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
        } else {
-               beva->write_in_progress = 1;
+               beva->write_in_progress = at_most;
+               _bufferevent_decrement_write_buckets(&beva->bev, at_most);
                bev_async_add_write(beva);
        }
 }
@@ -229,7 +234,7 @@ bev_async_consider_reading(struct bufferevent_async *beva)
 
        /* Don't read if there is a read in progress, or we do not
         * want to read. */
-       if (beva->read_in_progress)
+       if (beva->read_in_progress || beva->bev.connecting)
                return;
        if (!beva->ok || !(bev->enabled&EV_READ)) {
                bev_async_del_read(beva);
@@ -263,10 +268,11 @@ bev_async_consider_reading(struct bufferevent_async *beva)
        bufferevent_incref(bev);
        if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
                beva->ok = 0;
-               bufferevent_decref(bev);
                _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
+               bufferevent_decref(bev);
        } else {
-               beva->read_in_progress = 1;
+               beva->read_in_progress = at_most;
+               _bufferevent_decrement_read_buckets(&beva->bev, at_most);
                bev_async_add_read(beva);
        }
 
@@ -319,7 +325,11 @@ be_async_enable(struct bufferevent *buf, short what)
        if (!bev_async->ok)
                return -1;
 
-       /* NOTE: This interferes with non-blocking connect */
+       if (bev_async->bev.connecting) {
+               /* Don't launch anything during connection attempts. */
+               return 0;
+       }
+
        if (what & EV_READ)
                BEV_RESET_GENERIC_READ_TIMEOUT(buf);
        if (what & EV_WRITE)
@@ -369,8 +379,10 @@ be_async_destruct(struct bufferevent *bev)
        bev_async_del_write(bev_async);
 
        fd = _evbuffer_overlapped_get_fd(bev->input);
-       if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+       if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
+               /* XXXX possible double-close */
                evutil_closesocket(fd);
+       }
        /* delete this in case non-blocking connect was used */
        if (event_initialized(&bev->ev_write)) {
                event_del(&bev->ev_write);
@@ -403,11 +415,15 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
 {
        struct bufferevent_async *bev_a = upcast_connect(eo);
        struct bufferevent *bev = &bev_a->bev.bev;
+       evutil_socket_t sock;
 
        BEV_LOCK(bev);
 
        EVUTIL_ASSERT(bev_a->bev.connecting);
        bev_a->bev.connecting = 0;
+       sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input);
+       /* XXXX Handle error? */
+       setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
 
        if (ok)
                bufferevent_async_set_connected(bev);
@@ -429,12 +445,15 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
        struct bufferevent_async *bev_a = upcast_read(eo);
        struct bufferevent *bev = &bev_a->bev.bev;
        short what = BEV_EVENT_READING;
-
+       ev_ssize_t amount_unread;
        BEV_LOCK(bev);
        EVUTIL_ASSERT(bev_a->read_in_progress);
 
+       amount_unread = bev_a->read_in_progress - nbytes;
        evbuffer_commit_read(bev->input, nbytes);
        bev_a->read_in_progress = 0;
+       if (amount_unread)
+               _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
 
        if (!ok)
                bev_async_set_wsa_error(bev, eo);
@@ -442,8 +461,6 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
        if (bev_a->ok) {
                if (ok && nbytes) {
                        BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-                       _bufferevent_decrement_read_buckets(&bev_a->bev,
-                                       nbytes);
                        if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
                                _bufferevent_run_readcb(bev);
                        bev_async_consider_reading(bev_a);
@@ -468,20 +485,26 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
        struct bufferevent_async *bev_a = upcast_write(eo);
        struct bufferevent *bev = &bev_a->bev.bev;
        short what = BEV_EVENT_WRITING;
+       ev_ssize_t amount_unwritten;
 
        BEV_LOCK(bev);
        EVUTIL_ASSERT(bev_a->write_in_progress);
+
+       amount_unwritten = bev_a->write_in_progress - nbytes;
        evbuffer_commit_write(bev->output, nbytes);
        bev_a->write_in_progress = 0;
 
+       if (amount_unwritten)
+               _bufferevent_decrement_write_buckets(&bev_a->bev,
+                                                    -amount_unwritten);
+
+
        if (!ok)
                bev_async_set_wsa_error(bev, eo);
 
        if (bev_a->ok) {
                if (ok && nbytes) {
                        BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-                       _bufferevent_decrement_write_buckets(&bev_a->bev,
-                                       nbytes);
                        if (evbuffer_get_length(bev->output) <=
                            bev->wm_write.low)
                                _bufferevent_run_writecb(bev);
@@ -648,8 +671,20 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
                _evbuffer_overlapped_set_fd(bev->output, data->fd);
                return 0;
        }
+       case BEV_CTRL_CANCEL_ALL: {
+               struct bufferevent_async *bev_a = upcast(bev);
+               evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
+               if (fd != (evutil_socket_t)INVALID_SOCKET &&
+                   (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
+                       closesocket(fd);
+               }
+               bev_a->ok = 0;
+               return 0;
+       }
        case BEV_CTRL_GET_UNDERLYING:
        default:
                return -1;
        }
 }
+
+