]> arthur.barton.de Git - netdata.git/commitdiff
added single threaded web server
authorCosta Tsaousis <costa@tsaousis.gr>
Sat, 28 May 2016 12:57:13 +0000 (15:57 +0300)
committerCosta Tsaousis <costa@tsaousis.gr>
Sun, 29 May 2016 10:24:08 +0000 (13:24 +0300)
src/main.c
src/web_client.c
src/web_client.h
src/web_server.c
src/web_server.h

index d6d2579b28c280ff350619f45664ae8aa4093d71..b988f0dece7ce50529eaf4720b0bd8b461a73775 100644 (file)
@@ -70,26 +70,38 @@ struct netdata_static_thread {
 
        void (*init_routine) (void);
        void *(*start_routine) (void *);
-};
-
-struct netdata_static_thread static_threads[] = {
-       {"tc",                  "plugins",      "tc",                   1, NULL, NULL,  tc_main},
-       {"idlejitter",  "plugins",      "idlejitter",   1, NULL, NULL,  cpuidlejitter_main},
-       {"proc",                "plugins",      "proc",                 1, NULL, NULL,  proc_main},
-       {"cgroups",             "plugins",      "cgroups",              1, NULL, NULL,  cgroups_main},
-
+} static_threads[] = {
 #ifdef INTERNAL_PLUGIN_NFACCT
-       // nfacct requires root access
+// nfacct requires root access
        // so, we build it as an external plugin with setuid to root
-       {"nfacct",              "plugins",      "nfacct",               1, NULL, NULL,  nfacct_main},
+       {"nfacct",              "plugins",  "nfacct",     1, NULL, NULL, nfacct_main},
 #endif
 
-       {"plugins.d",   NULL,           NULL,                   1, NULL, NULL,  pluginsd_main},
-       {"check",               "plugins",      "checks",               0, NULL, NULL,  checks_main},
-       {"web",                 NULL,           NULL,                   1, NULL, NULL,  socket_listen_main},
-       {NULL,                  NULL,           NULL,                   0, NULL, NULL,  NULL}
+       {"tc",                 "plugins",   "tc",         1, NULL, NULL, tc_main},
+       {"idlejitter",         "plugins",   "idlejitter", 1, NULL, NULL, cpuidlejitter_main},
+       {"proc",               "plugins",   "proc",       1, NULL, NULL, proc_main},
+       {"cgroups",            "plugins",   "cgroups",    1, NULL, NULL, cgroups_main},
+       {"plugins.d",           NULL,       NULL,         1, NULL, NULL, pluginsd_main},
+       {"check",               "plugins",  "checks",     0, NULL, NULL, checks_main},
+       {"web",                 NULL,       NULL,         1, NULL, NULL, socket_listen_main_multi_threaded},
+       {"web-single-threaded", NULL,       NULL,         0, NULL, NULL, socket_listen_main_single_threaded},
+       {NULL,                  NULL,       NULL,         0, NULL, NULL, NULL}
 };
 
+void web_server_threading_selection(void) {
+       int threaded = config_get_boolean("global", "multi threaded web server", 1);
+
+       int i;
+       for(i = 0; static_threads[i].name ; i++) {
+               if(static_threads[i].start_routine == socket_listen_main_multi_threaded)
+                       static_threads[i].enabled = threaded?1:0;
+
+               if(static_threads[i].start_routine == socket_listen_main_single_threaded)
+                       static_threads[i].enabled = threaded?0:1;
+       }
+}
+
+
 int killpid(pid_t pid, int sig)
 {
        int ret = -1;
@@ -591,28 +603,7 @@ int main(int argc, char **argv)
 
                // --------------------------------------------------------------------
 
-               listen_backlog = (int) config_get_number("global", "http port listen backlog", LISTEN_BACKLOG);
-
-               listen_port = (int) config_get_number("global", "port", LISTEN_PORT);
-               if(listen_port < 1 || listen_port > 65535) {
-                       info("Invalid listen port %d given. Defaulting to %d.", listen_port, LISTEN_PORT);
-                       listen_port = LISTEN_PORT;
-               }
-               else debug(D_OPTIONS, "Listen port set to %d.", listen_port);
-
-               int ip = 0;
-               char *ipv = config_get("global", "ip version", "any");
-               if(!strcmp(ipv, "any") || !strcmp(ipv, "both") || !strcmp(ipv, "all")) ip = 0;
-               else if(!strcmp(ipv, "ipv4") || !strcmp(ipv, "IPV4") || !strcmp(ipv, "IPv4") || !strcmp(ipv, "4")) ip = 4;
-               else if(!strcmp(ipv, "ipv6") || !strcmp(ipv, "IPV6") || !strcmp(ipv, "IPv6") || !strcmp(ipv, "6")) ip = 6;
-               else info("Cannot understand ip version '%s'. Assuming 'any'.", ipv);
-
-               if(ip == 0 || ip == 6) listen_fd = create_listen_socket6(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
-               if(listen_fd < 0) {
-                       listen_fd = create_listen_socket4(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
-                       if(listen_fd >= 0 && ip != 4) info("Managed to open an IPv4 socket on port %d.", listen_port);
-               }
-
+               listen_fd = create_listen_socket();
                if(listen_fd < 0) fatal("Cannot listen socket.");
        }
 
@@ -656,6 +647,8 @@ int main(int argc, char **argv)
        // ------------------------------------------------------------------------
        // spawn the threads
 
+       web_server_threading_selection();
+
        for (i = 0; static_threads[i].name != NULL ; i++) {
                struct netdata_static_thread *st = &static_threads[i];
 
index 4e262821a9292fa1673b81183ec7c9c8d8d5ada0..78f6269776d93ff379ed1dd6cf171f7c818b0b30 100644 (file)
@@ -159,16 +159,16 @@ void web_client_reset(struct web_client *w)
        w->stats_received_bytes = 0;
        w->stats_sent_bytes = 0;
 
-       long sent = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+       size_t sent = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
 
 #ifdef NETDATA_WITH_ZLIB
-       if(likely(w->response.zoutput)) sent = (long)w->response.zstream.total_out;
+       if(likely(w->response.zoutput)) sent = (size_t)w->response.zstream.total_out;
 #endif
 
-       long size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
+       size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
 
        if(likely(w->last_url[0]))
-               log_access("%llu: (sent/all = %ld/%ld bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %s: %d '%s'",
+               log_access("%llu: (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %s: %d '%s'",
                        w->id,
                        sent, size, -((size>0)?((float)(size-sent)/(float)size * 100.0):0.0),
                        (float)usecdiff(&w->tv_ready, &w->tv_in) / 1000.0,
@@ -180,9 +180,11 @@ void web_client_reset(struct web_client *w)
                );
 
        if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY)) {
-               debug(D_WEB_CLIENT, "%llu: Closing filecopy input file.", w->id);
-               close(w->ifd);
-               w->ifd = w->ofd;
+               if(w->ifd != w->ofd) {
+                       debug(D_WEB_CLIENT, "%llu: Closing filecopy input file descriptor %d.", w->id, w->ifd);
+                       close(w->ifd);
+                       w->ifd = w->ofd;
+               }
        }
 
        w->last_url[0] = '\0';
@@ -192,6 +194,7 @@ void web_client_reset(struct web_client *w)
        w->origin[1] = '\0';
 
        w->mode = WEB_CLIENT_MODE_NORMAL;
+
        w->keepalive = 0;
        w->decoded_url[0] = '\0';
 
@@ -226,19 +229,17 @@ void web_client_reset(struct web_client *w)
 struct web_client *web_client_free(struct web_client *w)
 {
        struct web_client *n = w->next;
+       if(w == web_clients) web_clients = n;
 
        debug(D_WEB_CLIENT_ACCESS, "%llu: Closing web client from %s port %s.", w->id, w->client_ip, w->client_port);
 
        if(w->prev)     w->prev->next = w->next;
        if(w->next) w->next->prev = w->prev;
-
-       if(w == web_clients) web_clients = w->next;
-
        if(w->response.header_output) buffer_free(w->response.header_output);
        if(w->response.header) buffer_free(w->response.header);
        if(w->response.data) buffer_free(w->response.data);
-       close(w->ifd);
-       if(w->ofd != w->ifd) close(w->ofd);
+       if(w->ifd != -1) close(w->ifd);
+       if(w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
        free(w);
 
        global_statistics.connected_clients--;
@@ -425,7 +426,7 @@ int mysendfile(struct web_client *w, char *filename)
 
 #ifdef NETDATA_WITH_ZLIB
 void web_client_enable_deflate(struct web_client *w, int gzip) {
-       if(w->response.zinitialized == 1) {
+       if(w->response.zinitialized) {
                error("%llu: Compression has already be initialized for this client.", w->id);
                return;
        }
@@ -1715,11 +1716,11 @@ void web_client_process(struct web_client *w) {
 
        buffer_strcat(w->response.header_output, "\r\n");
 
-       // disable TCP_NODELAY, to buffer the header
+/*     // disable TCP_NODELAY, to buffer the header
        int flag = 0;
        if(setsockopt(w->ofd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0)
                error("%llu: failed to disable TCP_NODELAY on socket.", w->id);
-
+*/
        // sent the HTTP header
        debug(D_WEB_DATA, "%llu: Sending response HTTP header of size %d: '%s'"
                        , w->id
@@ -1728,18 +1729,23 @@ void web_client_process(struct web_client *w) {
                        );
 
        bytes = send(w->ofd, buffer_tostring(w->response.header_output), buffer_strlen(w->response.header_output), 0);
-       if(bytes != (ssize_t) buffer_strlen(w->response.header_output))
-               error("%llu: HTTP Header failed to be sent (I sent %d bytes but the system sent %d bytes)."
-                               , w->id
-                               , buffer_strlen(w->response.header_output)
-                               , bytes);
+       if(bytes != (ssize_t) buffer_strlen(w->response.header_output)) {
+               if(bytes > 0)
+                       w->stats_sent_bytes += bytes;
+
+               error("%llu: HTTP Header failed to be sent (I sent %d bytes but the system sent %d bytes). Closing web client.", w->id,
+                         buffer_strlen(w->response.header_output), bytes);
+
+               WEB_CLIENT_IS_DEAD(w);
+               return;
+       }
        else 
                w->stats_sent_bytes += bytes;
 
-       // enable TCP_NODELAY, to send all data immediately at the next send()
+/*     // enable TCP_NODELAY, to send all data immediately at the next send()
        flag = 1;
        if(setsockopt(w->ofd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) != 0) error("%llu: failed to enable TCP_NODELAY on socket.", w->id);
-
+*/
        // enable sending immediately if we have data
        if(w->response.data->len) w->wait_send = 1;
        else w->wait_send = 0;
@@ -1780,50 +1786,78 @@ void web_client_process(struct web_client *w) {
        }
 }
 
-long web_client_send_chunk_header(struct web_client *w, long len)
+ssize_t web_client_send_chunk_header(struct web_client *w, size_t len)
 {
        debug(D_DEFLATE, "%llu: OPEN CHUNK of %d bytes (hex: %x).", w->id, len, len);
        char buf[1024];
        sprintf(buf, "%lX\r\n", len);
-       ssize_t bytes = send(w->ofd, buf, strlen(buf), MSG_DONTWAIT);
+       
+       ssize_t bytes = send(w->ofd, buf, strlen(buf), 0);
+       if(bytes > 0) {
+               debug(D_DEFLATE, "%llu: Sent chunk header %d bytes.", w->id, bytes);
+               w->stats_sent_bytes += bytes;
+       }
 
-       if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk header %d bytes.", w->id, bytes);
-       else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk header to the client.", w->id);
-       else debug(D_DEFLATE, "%llu: Failed to send chunk header to client.", w->id);
+       else if(bytes == 0) {
+               error("%llu: Did not send chunk header to the client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
+       else {
+               error("%llu: Failed to send chunk header to client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return bytes;
 }
 
-long web_client_send_chunk_close(struct web_client *w)
+ssize_t web_client_send_chunk_close(struct web_client *w)
 {
        //debug(D_DEFLATE, "%llu: CLOSE CHUNK.", w->id);
 
-       ssize_t bytes = send(w->ofd, "\r\n", 2, MSG_DONTWAIT);
+       ssize_t bytes = send(w->ofd, "\r\n", 2, 0);
+       if(bytes > 0) {
+               debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
+               w->stats_sent_bytes += bytes;
+       }
 
-       if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
-       else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk suffix to the client.", w->id);
-       else debug(D_DEFLATE, "%llu: Failed to send chunk suffix to client.", w->id);
+       else if(bytes == 0) {
+               error("%llu: Did not send chunk suffix to the client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
+       else {
+               error("%llu: Failed to send chunk suffix to client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return bytes;
 }
 
-long web_client_send_chunk_finalize(struct web_client *w)
+ssize_t web_client_send_chunk_finalize(struct web_client *w)
 {
        //debug(D_DEFLATE, "%llu: FINALIZE CHUNK.", w->id);
 
-       ssize_t bytes = send(w->ofd, "\r\n0\r\n\r\n", 7, MSG_DONTWAIT);
+       ssize_t bytes = send(w->ofd, "\r\n0\r\n\r\n", 7, 0);
+       if(bytes > 0) {
+               debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
+               w->stats_sent_bytes += bytes;
+       }
 
-       if(bytes > 0) debug(D_DEFLATE, "%llu: Sent chunk suffix %d bytes.", w->id, bytes);
-       else if(bytes == 0) debug(D_DEFLATE, "%llu: Did not send chunk suffix to the client.", w->id);
-       else debug(D_DEFLATE, "%llu: Failed to send chunk suffix to client.", w->id);
+       else if(bytes == 0) {
+               error("%llu: Did not send chunk suffix to the client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
+       else {
+               error("%llu: Failed to send chunk suffix to client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return bytes;
 }
 
 #ifdef NETDATA_WITH_ZLIB
-long web_client_send_deflate(struct web_client *w)
+ssize_t web_client_send_deflate(struct web_client *w)
 {
-       long len = 0, t = 0;
+       ssize_t len = 0, t = 0;
 
        // when using compression,
        // w->response.sent is the amount of bytes passed through compression
@@ -1836,21 +1870,19 @@ long web_client_send_deflate(struct web_client *w)
                debug(D_WEB_CLIENT, "%llu: Out of output data.", w->id);
 
                // finalize the chunk
-               if(w->response.sent != 0)
-                       t += web_client_send_chunk_finalize(w);
-
-               // there can be two cases for this
-               // A. we have done everything
-               // B. we temporarily have nothing to send, waiting for the buffer to be filled by ifd
+               if(w->response.sent != 0) {
+                       t = web_client_send_chunk_finalize(w);
+                       if(t < 0) return t;
+               }
 
-               if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->ifd != w->ofd && w->response.rlen && w->response.rlen > w->response.data->len) {
+               if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->response.rlen && w->response.rlen > w->response.data->len) {
                        // we have to wait, more data will come
                        debug(D_WEB_CLIENT, "%llu: Waiting for more data to become available.", w->id);
                        w->wait_send = 0;
-                       return(0);
+                       return t;
                }
 
-               if(w->keepalive == 0) {
+               if(unlikely(w->keepalive == 0)) {
                        debug(D_WEB_CLIENT, "%llu: Closing (keep-alive is not enabled). %ld bytes sent.", w->id, w->response.sent);
                        errno = 0;
                        return(-1);
@@ -1858,24 +1890,23 @@ long web_client_send_deflate(struct web_client *w)
 
                // reset the client
                web_client_reset(w);
-               debug(D_WEB_CLIENT, "%llu: Done sending all data on socket. Waiting for next request on the same socket.", w->id);
-               return(0);
+               debug(D_WEB_CLIENT, "%llu: Done sending all data on socket.", w->id);
+               return t;
        }
 
        if(w->response.zhave == w->response.zsent) {
                // compress more input data
 
                // close the previous open chunk
-               if(w->response.sent != 0) t += web_client_send_chunk_close(w);
+               if(w->response.sent != 0) {
+                       t = web_client_send_chunk_close(w);
+                       if(t < 0) return t;
+               }
 
                debug(D_DEFLATE, "%llu: Compressing %d new bytes starting from %d (and %d left behind).", w->id, (w->response.data->len - w->response.sent), w->response.sent, w->response.zstream.avail_in);
 
                // give the compressor all the data not passed through the compressor yet
                if(w->response.data->len > w->response.sent) {
-#ifdef NETDATA_INTERNAL_CHECKS
-                       if((long)w->response.sent - (long)w->response.zstream.avail_in < 0)
-                               error("internal error: avail_in is corrupted.");
-#endif
                        w->response.zstream.next_in = (Bytef *)&w->response.data->buffer[w->response.sent - w->response.zstream.avail_in];
                        w->response.zstream.avail_in += (uInt) (w->response.data->len - w->response.sent);
                }
@@ -1911,31 +1942,39 @@ long web_client_send_deflate(struct web_client *w)
                debug(D_DEFLATE, "%llu: Compression produced %d bytes.", w->id, w->response.zhave);
 
                // open a new chunk
-               t += web_client_send_chunk_header(w, w->response.zhave);
+               ssize_t t2 = web_client_send_chunk_header(w, w->response.zhave);
+               if(t2 < 0) return t2;
+               t += t2;
        }
        
        debug(D_WEB_CLIENT, "%llu: Sending %d bytes of data (+%d of chunk header).", w->id, w->response.zhave - w->response.zsent, t);
 
        len = send(w->ofd, &w->response.zbuffer[w->response.zsent], (size_t) (w->response.zhave - w->response.zsent), MSG_DONTWAIT);
        if(len > 0) {
+               w->stats_sent_bytes += len;
                w->response.zsent += len;
-               if(t > 0) len += t;
+               len += t;
                debug(D_WEB_CLIENT, "%llu: Sent %d bytes.", w->id, len);
        }
-       else if(len == 0) debug(D_WEB_CLIENT, "%llu: Did not send any bytes to the client (zhave = %ld, zsent = %ld, need to send = %ld).", w->id, w->response.zhave, w->response.zsent, w->response.zhave - w->response.zsent);
-       else debug(D_WEB_CLIENT, "%llu: Failed to send data to client. Reason: %s", w->id, strerror(errno));
+       else if(len == 0) {
+               error("%llu: Did not send any bytes to the client (zhave = %ld, zsent = %ld, need to send = %ld).", w->id, w->response.zhave, w->response.zsent, w->response.zhave - w->response.zsent);
+               WEB_CLIENT_IS_DEAD(w);
+       }
+       else {
+               error("%llu: Failed to send data to client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return(len);
 }
 #endif // NETDATA_WITH_ZLIB
 
-long web_client_send(struct web_client *w)
-{
+ssize_t web_client_send(struct web_client *w) {
 #ifdef NETDATA_WITH_ZLIB
        if(likely(w->response.zoutput)) return web_client_send_deflate(w);
 #endif // NETDATA_WITH_ZLIB
 
-       long bytes;
+       ssize_t bytes;
 
        if(unlikely(w->response.data->len - w->response.sent == 0)) {
                // there is nothing to send
@@ -1946,7 +1985,7 @@ long web_client_send(struct web_client *w)
                // A. we have done everything
                // B. we temporarily have nothing to send, waiting for the buffer to be filled by ifd
 
-               if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->ifd != w->ofd && w->response.rlen && w->response.rlen > w->response.data->len) {
+               if(w->mode == WEB_CLIENT_MODE_FILECOPY && w->wait_receive && w->response.rlen && w->response.rlen > w->response.data->len) {
                        // we have to wait, more data will come
                        debug(D_WEB_CLIENT, "%llu: Waiting for more data to become available.", w->id);
                        w->wait_send = 0;
@@ -1966,22 +2005,29 @@ long web_client_send(struct web_client *w)
 
        bytes = send(w->ofd, &w->response.data->buffer[w->response.sent], w->response.data->len - w->response.sent, MSG_DONTWAIT);
        if(likely(bytes > 0)) {
+               w->stats_sent_bytes += bytes;
                w->response.sent += bytes;
                debug(D_WEB_CLIENT, "%llu: Sent %d bytes.", w->id, bytes);
        }
-       else if(likely(bytes == 0)) debug(D_WEB_CLIENT, "%llu: Did not send any bytes to the client.", w->id);
-       else debug(D_WEB_CLIENT, "%llu: Failed to send data to client.", w->id);
+       else if(likely(bytes == 0)) {
+               error("%llu: Did not send any bytes to the client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
+       else {
+               error("%llu: Failed to send data to client.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return(bytes);
 }
 
-long web_client_receive(struct web_client *w)
+ssize_t web_client_receive(struct web_client *w)
 {
        // do we have any space for more data?
        buffer_need_bytes(w->response.data, WEB_REQUEST_LENGTH);
 
-       long left = w->response.data->size - w->response.data->len;
-       long bytes;
+       ssize_t left = w->response.data->size - w->response.data->len;
+       ssize_t bytes;
 
        if(unlikely(w->mode == WEB_CLIENT_MODE_FILECOPY))
                bytes = read(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1));
@@ -1989,6 +2035,9 @@ long web_client_receive(struct web_client *w)
                bytes = recv(w->ifd, &w->response.data->buffer[w->response.data->len], (size_t) (left - 1), MSG_DONTWAIT);
 
        if(likely(bytes > 0)) {
+               if(w->mode != WEB_CLIENT_MODE_FILECOPY)
+                       w->stats_received_bytes += bytes;
+
                size_t old = w->response.data->len;
                w->response.data->len += bytes;
                w->response.data->buffer[w->response.data->len] = '\0';
@@ -1998,7 +2047,9 @@ long web_client_receive(struct web_client *w)
 
                if(w->mode == WEB_CLIENT_MODE_FILECOPY) {
                        w->wait_send = 1;
-                       if(w->response.rlen && w->response.data->len >= w->response.rlen) w->wait_receive = 0;
+
+                       if(w->response.rlen && w->response.data->len >= w->response.rlen)
+                               w->wait_receive = 0;
                }
        }
        else if(likely(bytes == 0)) {
@@ -2012,13 +2063,20 @@ long web_client_receive(struct web_client *w)
                        // we are copying data from ifd to ofd
                        // let it finish copying...
                        w->wait_receive = 0;
-                       debug(D_WEB_CLIENT, "%llu: Disabling input.", w->id);
+
+                       debug(D_WEB_CLIENT, "%llu: Read the whole file.", w->id);
+                       if(w->ifd != w->ofd) close(w->ifd);
+                       w->ifd = w->ofd;
                }
                else {
-                       bytes = -1;
-                       errno = 0;
+                       error("%llu: failed to receive data.", w->id);
+                       WEB_CLIENT_IS_DEAD(w);
                }
        }
+       else {
+               error("%llu: receive data failed.", w->id);
+               WEB_CLIENT_IS_DEAD(w);
+       }
 
        return(bytes);
 }
@@ -2040,7 +2098,6 @@ void *web_client_main(void *ptr)
        if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
                error("Cannot set pthread cancel state to ENABLE.");
 
-       struct timeval tv;
        struct web_client *w = ptr;
        int retval;
        fd_set ifds, ofds, efds;
@@ -2049,6 +2106,15 @@ void *web_client_main(void *ptr)
        log_access("%llu: %s port %s connected on thread task id %d", w->id, w->client_ip, w->client_port, gettid());
 
        for(;;) {
+               if(unlikely(w->dead)) {
+                       error("%llu: client is dead.");
+                       break;
+               }
+               else if(unlikely(!w->wait_receive && !w->wait_send)) {
+                       error("%llu: client is not set for neither receiving nor sending data.");
+                       break;
+               }
+
                FD_ZERO (&ifds);
                FD_ZERO (&ofds);
                FD_ZERO (&efds);
@@ -2068,15 +2134,13 @@ void *web_client_main(void *ptr)
                        if(w->ofd > fdmax) fdmax = w->ofd;
                }
 
-               tv.tv_sec = web_client_timeout;
-               tv.tv_usec = 0;
-
                debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, w->wait_receive?"INPUT":"", w->wait_send?"OUTPUT":"");
+               struct timeval tv = { .tv_sec = web_client_timeout, .tv_usec = 0 };
                retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
 
                if(retval == -1) {
-                       debug(D_WEB_CLIENT_ACCESS, "%llu: LISTENER: select() failed.", w->id);
-                       continue;
+                       error("%llu: LISTENER: select() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
+                       break;
                }
                else if(!retval) {
                        // timeout
@@ -2095,25 +2159,21 @@ void *web_client_main(void *ptr)
                }
 
                if(w->wait_send && FD_ISSET(w->ofd, &ofds)) {
-                       long bytes;
+                       ssize_t bytes;
                        if((bytes = web_client_send(w)) < 0) {
                                debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
                                errno = 0;
                                break;
                        }
-                       else
-                               w->stats_sent_bytes += bytes;
                }
 
                if(w->wait_receive && FD_ISSET(w->ifd, &ifds)) {
-                       long bytes;
+                       ssize_t bytes;
                        if((bytes = web_client_receive(w)) < 0) {
                                debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
                                errno = 0;
                                break;
                        }
-                       else
-                               w->stats_received_bytes += bytes;
 
                        if(w->mode == WEB_CLIENT_MODE_NORMAL) {
                                debug(D_WEB_CLIENT, "%llu: Attempting to process received data (%ld bytes).", w->id, bytes);
@@ -2129,6 +2189,7 @@ void *web_client_main(void *ptr)
 
        web_client_reset(w);
        w->obsolete = 1;
+       pthread_exit(NULL);
 
        return NULL;
 }
index da3707d20a3910c01c92e794b874fdc9144c19e8..f2dc554a3ecf72ab6e928f6490248e4c97a69b94 100644 (file)
@@ -37,16 +37,16 @@ struct response {
 
        int code;                                               // the HTTP response code
 
-       size_t rlen;                                    // if non-zero, the excepted size of ifd (input)
+       size_t rlen;                                    // if non-zero, the excepted size of ifd (input of firecopy)
        size_t sent;                                    // current data length sent to output
 
        int zoutput;                                    // if set to 1, web_client_send() will send compressed data
 #ifdef NETDATA_WITH_ZLIB
        z_stream zstream;                               // zlib stream for sending compressed output to client
        Bytef zbuffer[ZLIB_CHUNK];              // temporary buffer for storing compressed output
-       long zsent;                                             // the compressed bytes we have sent to the client
-       long zhave;                                             // the compressed bytes that we have to send
-       int zinitialized;
+       size_t zsent;                                   // the compressed bytes we have sent to the client
+       size_t zhave;                                   // the compressed bytes that we have received from zlib
+       int zinitialized:1;
 #endif
 
 };
@@ -54,6 +54,23 @@ struct response {
 struct web_client {
        unsigned long long id;
 
+       uint8_t obsolete:1;                                     // if set to 1, the listener will remove this client
+                                                                               // after setting this to 1, you should not touch
+                                                                               // this web_client
+
+       uint8_t dead:1;                                         // if set to 1, this client is dead
+
+       uint8_t keepalive:1;                            // if set to 1, the web client will be re-used
+       uint8_t enable_gzip:1;                          // if set to 1, the response will be compressed
+
+       uint8_t mode:3;                                         // the operational mode of the client
+
+       uint8_t wait_receive:1;                         // 1 = we are waiting more input data
+       uint8_t wait_send:1;                            // 1 = we have data to send to the client
+
+       int ifd;
+       int ofd;
+
        char client_ip[NI_MAXHOST+1];
        char client_port[NI_MAXSERV+1];
 
@@ -66,30 +83,20 @@ struct web_client {
        char cookie2[COOKIE_MAX+1];
        char origin[ORIGIN_MAX+1];
 
-       int mode;
-       int keepalive;
-       int enable_gzip;
-
        struct sockaddr_storage clientaddr;
-
-       pthread_t thread;                               // the thread servicing this client
-       int obsolete;                                   // if set to 1, the listener will remove this client
-
-       int ifd;
-       int ofd;
-
        struct response response;
 
-       int wait_receive;
-       int wait_send;
+       size_t stats_received_bytes;
+       size_t stats_sent_bytes;
 
-       unsigned long stats_received_bytes;
-       unsigned long stats_sent_bytes;
+       pthread_t thread;                               // the thread servicing this client
 
        struct web_client *prev;
        struct web_client *next;
 };
 
+#define WEB_CLIENT_IS_DEAD(w) (w)->dead=1
+
 extern struct web_client *web_clients;
 
 extern uid_t web_files_uid(void);
@@ -97,6 +104,10 @@ extern uid_t web_files_gid(void);
 
 extern struct web_client *web_client_create(int listener);
 extern struct web_client *web_client_free(struct web_client *w);
+extern ssize_t web_client_send(struct web_client *w);
+extern ssize_t web_client_receive(struct web_client *w);
+extern void web_client_process(struct web_client *w);
+extern void web_client_reset(struct web_client *w);
 
 extern void *web_client_main(void *ptr);
 
index 0da72b5be3a7a4a909d30dbeffb12621d5f8563d..b736dae5eeb732b16087b067dfd6b6a07e46d93f 100644 (file)
@@ -86,7 +86,7 @@ int create_listen_socket4(const char *ip, int port, int listen_backlog)
 
        if(is_ip_anything(ip)) {
                name.sin_addr.s_addr = htonl(INADDR_ANY);
-               info("Listening on any IPs (IPv4).");
+               // info("Listening on any IPs (IPv4).");
        }
        else {
                int ret = inet_pton(AF_INET, ip, (void *)&name.sin_addr.s_addr);
@@ -95,7 +95,7 @@ int create_listen_socket4(const char *ip, int port, int listen_backlog)
                        close(sock);
                        return -1;
                }
-               info("Listening on IP '%s' (IPv4).", ip);
+               // info("Listening on IP '%s' (IPv4).", ip);
        }
 
        if(bind (sock, (struct sockaddr *) &name, sizeof (name)) < 0) {
@@ -137,7 +137,7 @@ int create_listen_socket6(const char *ip, int port, int listen_backlog)
 
        if(is_ip_anything(ip)) {
                name.sin6_addr = in6addr_any;
-               info("Listening on all IPs (IPv6 and IPv4)");
+               // info("Listening on all IPs (IPv6 and IPv4)");
        }
        else {
                int ret = inet_pton(AF_INET6, ip, (void *)&name.sin6_addr.s6_addr);
@@ -146,7 +146,7 @@ int create_listen_socket6(const char *ip, int port, int listen_backlog)
                        close(sock);
                        return -1;
                }
-               info("Listening on IP '%s' (IPv6)", ip);
+               // info("Listening on IP '%s' (IPv6)", ip);
        }
 
        name.sin6_scope_id = 0;
@@ -168,6 +168,32 @@ int create_listen_socket6(const char *ip, int port, int listen_backlog)
 }
 
 
+int create_listen_socket(void) {
+       listen_backlog = (int) config_get_number("global", "http port listen backlog", LISTEN_BACKLOG);
+
+       listen_port = (int) config_get_number("global", "port", LISTEN_PORT);
+       if(listen_port < 1 || listen_port > 65535) {
+               error("Invalid listen port %d given. Defaulting to %d.", listen_port, LISTEN_PORT);
+               listen_port = LISTEN_PORT;
+       }
+       else debug(D_OPTIONS, "Listen port set to %d.", listen_port);
+
+       int ip = 0;
+       char *ipv = config_get("global", "ip version", "any");
+       if(!strcmp(ipv, "any") || !strcmp(ipv, "both") || !strcmp(ipv, "all")) ip = 0;
+       else if(!strcmp(ipv, "ipv4") || !strcmp(ipv, "IPV4") || !strcmp(ipv, "IPv4") || !strcmp(ipv, "4")) ip = 4;
+       else if(!strcmp(ipv, "ipv6") || !strcmp(ipv, "IPV6") || !strcmp(ipv, "IPv6") || !strcmp(ipv, "6")) ip = 6;
+       else error("Cannot understand ip version '%s'. Assuming 'any'.", ipv);
+
+       if(ip == 0 || ip == 6) listen_fd = create_listen_socket6(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
+       if(listen_fd < 0) {
+               listen_fd = create_listen_socket4(config_get("global", "bind socket to IP", "*"), listen_port, listen_backlog);
+               // if(listen_fd >= 0 && ip != 4) info("Managed to open an IPv4 socket on port %d.", listen_port);
+       }
+
+       return listen_fd;
+}
+
 // --------------------------------------------------------------------------------------
 // the main socket listener
 
@@ -176,15 +202,14 @@ int create_listen_socket6(const char *ip, int port, int listen_backlog)
 // 3. spawns a new pthread to serve the client (this is optimal for keep-alive clients)
 // 4. cleans up old web_clients that their pthreads have been exited
 
-void *socket_listen_main(void *ptr)
-{
+void *socket_listen_main_multi_threaded(void *ptr) {
        if(ptr) { ; }
 
-       info("WEB SERVER thread created with task id %d", gettid());
+       info("Multi-threaded WEB SERVER thread created with task id %d", gettid());
 
        struct web_client *w;
        struct timeval tv;
-       int retval;
+       int retval, failures = 0;
 
        if(ptr) { ; }
 
@@ -199,32 +224,40 @@ void *socket_listen_main(void *ptr)
 
        if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
 
-       fd_set ifds, ofds, efds;
-       int fdmax = listen_fd;
-
+       fd_set ifds;
        FD_ZERO (&ifds);
-       FD_ZERO (&ofds);
-       FD_ZERO (&efds);
 
        for(;;) {
                tv.tv_sec = 0;
                tv.tv_usec = 200000;
 
-               if(listen_fd >= 0) {
+               if(likely(listen_fd >= 0))
                        FD_SET(listen_fd, &ifds);
-                       FD_SET(listen_fd, &efds);
-               }
 
                // debug(D_WEB_CLIENT, "LISTENER: Waiting...");
-               retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
+               retval = select(listen_fd + 1, &ifds, NULL, NULL, &tv);
 
-               if(retval == -1) {
+               if(unlikely(retval == -1)) {
                        error("LISTENER: select() failed.");
+                       failures++;
+
+                       if(failures > 10) {
+                               error("LISTENER: our listen port %d seems dead. Re-opening it.", listen_fd);
+                               close(listen_fd);
+                               listen_fd = -1;
+                               sleep(5);
+                               create_listen_socket();
+                               if(listen_fd < 0)
+                                       fatal("Cannot listen for web clients (connected clients %llu).", global_statistics.connected_clients);
+
+                               failures = 0;
+                       }
+
                        continue;
                }
-               else if(retval) {
+               else if(likely(retval)) {
                        // check for new incoming connections
-                       if(FD_ISSET(listen_fd, &ifds)) {
+                       if(likely(FD_ISSET(listen_fd, &ifds))) {
                                w = web_client_create(listen_fd);
                                if(unlikely(!w)) {
                                        // no need for error log - web_client_create already logged the error
@@ -247,16 +280,20 @@ void *socket_listen_main(void *ptr)
                //      debug(D_WEB_CLIENT, "LISTENER: select() timeout.");
                //}
 
+               failures = 0;
+
                // cleanup unused clients
-               for(w = web_clients; w ; w = w?w->next:NULL) {
-                       if(w->obsolete) {
+               for (w = web_clients; w; ) {
+                       if (w->obsolete) {
                                debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
-                               // pthread_join(w->thread,  NULL);
+                               // pthread_cancel(w->thread);
+                               // pthread_join(w->thread, NULL);
                                w = web_client_free(w);
 #ifdef NETDATA_INTERNAL_CHECKS
                                log_allocations();
 #endif
                        }
+                       else w = w->next;
                }
        }
 
@@ -268,3 +305,190 @@ void *socket_listen_main(void *ptr)
        return NULL;
 }
 
+void *socket_listen_main_single_threaded(void *ptr) {
+       if(ptr) { ; }
+
+       info("Single threaded WEB SERVER thread created with task id %d", gettid());
+
+       struct web_client *w;
+       int retval, failures = 0;
+
+       if(ptr) { ; }
+
+       if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
+               error("Cannot set pthread cancel type to DEFERRED.");
+
+       if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+               error("Cannot set pthread cancel state to ENABLE.");
+
+       web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
+       web_enable_gzip = config_get_boolean("global", "enable web responses gzip compression", web_enable_gzip);
+
+       if(listen_fd < 0) fatal("LISTENER: Listen socket is not ready.");
+
+       fd_set ifds, ofds, efds;
+       int fdmax = listen_fd;
+
+       for(;;) {
+               int has_obsolete = 0;
+               FD_ZERO (&ifds);
+               FD_ZERO (&ofds);
+               FD_ZERO (&efds);
+
+               if(listen_fd >= 0) {
+                       // debug(D_WEB_CLIENT_ACCESS, "LISTENER: adding listen socket %d to ifds, efds", listen_fd);
+                       FD_SET(listen_fd, &ifds);
+                       FD_SET(listen_fd, &efds);
+               }
+
+               for(w = web_clients; w ; w = w->next) {
+                       if(unlikely(w->dead)) {
+                               error("%llu: client is dead.");
+                               w->obsolete = 1;
+                       }
+                       else if(unlikely(!w->wait_receive && !w->wait_send)) {
+                               error("%llu: client is not set for neither receiving nor sending data.");
+                               w->obsolete = 1;
+                       }
+
+                       if(unlikely(w->obsolete)) {
+                               has_obsolete++;
+                               continue;
+                       }
+
+                       // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to efds", w->id, w->ifd);
+                       FD_SET(w->ifd, &efds);
+                       if(w->ifd > fdmax) fdmax = w->ifd;
+
+                       if(w->ifd != w->ofd) {
+                               // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to efds", w->id, w->ofd);
+                               FD_SET(w->ofd, &efds);
+                               if(w->ofd > fdmax) fdmax = w->ofd;
+                       }
+
+                       if (w->wait_receive) {
+                               // debug(D_WEB_CLIENT_ACCESS, "%llu: adding input socket %d to ifds", w->id, w->ifd);
+                               FD_SET(w->ifd, &ifds);
+                               if(w->ifd > fdmax) fdmax = w->ifd;
+                       }
+
+                       if (w->wait_send) {
+                               // debug(D_WEB_CLIENT_ACCESS, "%llu: adding output socket %d to ofds", w->id, w->ofd);
+                               FD_SET(w->ofd, &ofds);
+                               if(w->ofd > fdmax) fdmax = w->ofd;
+                       }
+               }
+
+               // cleanup unused clients
+               if(unlikely(has_obsolete)) {
+                       for (w = web_clients; w; ) {
+                               if (w->obsolete) {
+                                       debug(D_WEB_CLIENT, "%llu: Removing client.", w->id);
+                                       w = web_client_free(w);
+#ifdef NETDATA_INTERNAL_CHECKS
+                                       log_allocations();
+#endif
+                               }
+                               else w = w->next;
+                       }
+               }
+
+               debug(D_WEB_CLIENT_ACCESS, "LISTENER: Waiting...");
+               struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
+               errno = 0;
+               retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
+
+               if(retval == -1) {
+                       error("LISTENER: select() failed.");
+
+                       if(errno != EAGAIN) {
+                               // debug(D_WEB_CLIENT_ACCESS, "LISTENER: select() failed.");
+                               error("REMOVING ALL %lu WEB CLIENTS !", global_statistics.connected_clients);
+                               while (web_clients) web_client_free(web_clients);
+                       }
+
+                       failures++;
+                       if(failures > 10) {
+                               error("LISTENER: our listen port %d seems dead. Re-opening it.", listen_fd);
+                               close(listen_fd);
+                               listen_fd = -1;
+                               sleep(5);
+                               create_listen_socket();
+                               if(listen_fd < 0)
+                                       fatal("Cannot listen for web clients (connected clients %llu).", global_statistics.connected_clients);
+
+                               failures = 0;
+                       }
+
+                       continue;
+               }
+               else if(retval) {
+                       for(w = web_clients; w ; w = w->next) {
+                               if (unlikely(w->obsolete)) continue;
+
+                               if (unlikely(FD_ISSET(w->ifd, &efds))) {
+                                       debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on input socket.", w->id);
+                                       web_client_reset(w);
+                                       w->obsolete = 1;
+                                       continue;
+                               }
+
+                               if (unlikely(FD_ISSET(w->ofd, &efds))) {
+                                       debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on output socket.", w->id);
+                                       web_client_reset(w);
+                                       w->obsolete = 1;
+                                       continue;
+                               }
+
+                               if (unlikely(w->wait_receive && FD_ISSET(w->ifd, &ifds))) {
+                                       long bytes;
+                                       if (unlikely((bytes = web_client_receive(w)) < 0)) {
+                                               debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
+                                               errno = 0;
+                                               web_client_reset(w);
+                                               w->obsolete = 1;
+                                               continue;
+                                       }
+
+                                       if (w->mode == WEB_CLIENT_MODE_NORMAL) {
+                                               debug(D_WEB_CLIENT, "%llu: Processing received data (%ld bytes).", w->id, bytes);
+                                               // info("%llu: Attempting to process received data (%ld bytes).", w->id, bytes);
+                                               web_client_process(w);
+                                       }
+                                       else {
+                                               debug(D_WEB_CLIENT, "%llu: NO Processing for received data (%ld bytes).", w->id, bytes);
+                                       }
+                               }
+
+                               if (unlikely(w->wait_send && FD_ISSET(w->ofd, &ofds))) {
+                                       ssize_t bytes;
+                                       if (unlikely((bytes = web_client_send(w)) < 0)) {
+                                               debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
+                                               errno = 0;
+                                               web_client_reset(w);
+                                               w->obsolete = 1;
+                                               continue;
+                                       }
+                               }
+                       }
+
+                       // check for new incoming connections
+                       if(FD_ISSET(listen_fd, &ifds)) {
+                               debug(D_WEB_CLIENT_ACCESS, "LISTENER: new connection.");
+                               web_client_create(listen_fd);
+                       }
+               }
+               else {
+                       debug(D_WEB_CLIENT_ACCESS, "LISTENER: timeout.");
+               }
+
+               failures = 0;
+       }
+
+       error("LISTENER: exit!");
+
+       if(listen_fd >= 0) close(listen_fd);
+       exit(2);
+
+       return NULL;
+}
index 39f3bad918ab49f251b039499b325540163888dc..68d16748e2e16c8d35fa2a912bba436d666e02ab 100644 (file)
@@ -15,6 +15,8 @@ extern int listen_port;
 
 extern int create_listen_socket4(const char *ip, int port, int listen_backlog);
 extern int create_listen_socket6(const char *ip, int port, int listen_backlog);
-extern void *socket_listen_main(void *ptr);
+extern void *socket_listen_main_multi_threaded(void *ptr);
+extern void *socket_listen_main_single_threaded(void *ptr);
+extern int create_listen_socket(void);
 
 #endif /* NETDATA_WEB_SERVER_H */