]> arthur.barton.de Git - netdata.git/commitdiff
multithreaded web server using poll()
authorCosta Tsaousis <costa@tsaousis.gr>
Sun, 29 May 2016 14:50:49 +0000 (17:50 +0300)
committerCosta Tsaousis <costa@tsaousis.gr>
Sun, 29 May 2016 14:50:49 +0000 (17:50 +0300)
src/web_client.c
src/web_server.c

index 368a24b5c6bd846ed866e09066a43d56804e5409..4477b8a4d31d47c2480ef23e46ec02626eaae15b 100644 (file)
@@ -16,6 +16,7 @@
 #include <pwd.h>
 #include <grp.h>
 #include <ctype.h>
+#include <poll.h>
 
 #include "common.h"
 #include "log.h"
@@ -90,7 +91,8 @@ struct web_client *web_client_create(int listener)
                                strcpy(w->client_ip, &w->client_ip[7]);
                                debug(D_WEB_CLIENT_ACCESS, "%llu: New IPv4 web client from %s port %s on socket %d.", w->id, w->client_ip, w->client_port, w->ifd);
                        }
-                       debug(D_WEB_CLIENT_ACCESS, "%llu: New IPv6 web client from %s port %s on socket %d.", w->id, w->client_ip, w->client_port, w->ifd);
+                       else
+                               debug(D_WEB_CLIENT_ACCESS, "%llu: New IPv6 web client from %s port %s on socket %d.", w->id, w->client_ip, w->client_port, w->ifd);
                        break;
 
                default:
@@ -2103,9 +2105,8 @@ void *web_client_main(void *ptr)
                error("Cannot set pthread cancel state to ENABLE.");
 
        struct web_client *w = ptr;
-       int retval;
-       fd_set ifds, ofds, efds;
-       int fdmax = 0;
+       struct pollfd fds[2], *ifd, *ofd;
+       int retval, fdmax = 0, timeout;
 
        log_access("%llu: %s port %s connected on thread task id %d", w->id, w->client_ip, w->client_port, gettid());
 
@@ -2119,62 +2120,73 @@ void *web_client_main(void *ptr)
                        break;
                }
 
-               FD_ZERO (&ifds);
-               FD_ZERO (&ofds);
-               FD_ZERO (&efds);
-
-               if(w->ifd < 0 || w->ifd >= FD_SETSIZE || w->ofd < 0 || w->ofd >= FD_SETSIZE) {
-                       error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd < FD_SETSIZE (%d)", w->id, w->ifd, w->ofd, FD_SETSIZE);
+               if(unlikely(w->ifd < 0 || w->ofd < 0)) {
+                       error("%llu: invalid file descriptor, ifd = %d, ofd = %d (required 0 <= fd", w->id, w->ifd, w->ofd);
                        break;
                }
 
-               FD_SET(w->ifd, &efds);
+               if(w->ifd == w->ofd) {
+                       fds[0].fd = w->ifd;
+                       fds[0].events = 0;
+                       fds[0].revents = 0;
 
-               if(w->ifd != w->ofd)
-                       FD_SET(w->ofd, &efds);
+                       if(w->wait_receive) fds[0].events += POLLIN;
+                       if(w->wait_send)    fds[0].events += POLLOUT;
 
-               if (w->wait_receive) {
-                       FD_SET(w->ifd, &ifds);
-                       if(w->ifd > fdmax) fdmax = w->ifd;
-               }
+                       fds[1].fd = -1;
+                       fds[1].events = 0;
+                       fds[1].revents = 0;
 
-               if (w->wait_send) {
-                       FD_SET(w->ofd, &ofds);
-                       if(w->ofd > fdmax) fdmax = w->ofd;
-               }
+                       ifd = ofd = &fds[0];
 
-               debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, w->wait_receive?"INPUT":"", w->wait_send?"OUTPUT":"");
-               struct timeval tv = { .tv_sec = web_client_timeout, .tv_usec = 0 };
-               retval = select(fdmax+1, &ifds, &ofds, &efds, &tv);
-
-               if(retval == -1) {
-                       error("%llu: LISTENER: select() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
-                       break;
+                       fdmax = 1;
                }
-               else if(!retval) {
-                       // timeout
-                       debug(D_WEB_CLIENT_ACCESS, "%llu: LISTENER: timeout.", w->id);
-                       break;
+               else {
+                       fds[0].fd = w->ifd;
+                       fds[0].events = 0;
+                       fds[0].revents = 0;
+                       if(w->wait_receive) fds[0].events += POLLIN;
+                       ifd = &fds[0];
+
+                       fds[1].fd = w->ofd;
+                       fds[1].events = 0;
+                       fds[1].revents = 0;
+                       if(w->wait_send)    fds[1].events += POLLOUT;
+                       ofd = &fds[1];
+
+                       fdmax = 2;
                }
 
-               if(FD_ISSET(w->ifd, &efds)) {
-                       debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on input socket.", w->id);
+               debug(D_WEB_CLIENT, "%llu: Waiting socket async I/O for %s %s", w->id, w->wait_receive?"INPUT":"", w->wait_send?"OUTPUT":"");
+               errno = 0;
+               timeout = web_client_timeout * 1000;
+               retval = poll(fds, fdmax, timeout);
+
+               if(unlikely(retval == -1)) {
+                       if(errno == EAGAIN || errno == EINTR) {
+                               debug(D_WEB_CLIENT, "%llu: EAGAIN received.", w->id);
+                               continue;
+                       }
+
+                       error("%llu: LISTENER: poll() failed (input fd = %d, output fd = %d). Closing client.", w->id, w->ifd, w->ofd);
                        break;
                }
-
-               if(FD_ISSET(w->ofd, &efds)) {
-                       debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on output socket.", w->id);
+               else if(unlikely(!retval)) {
+                       debug(D_WEB_CLIENT, "%llu: Timeout while waiting socket async I/O for %s %s", w->id, w->wait_receive?"INPUT":"", w->wait_send?"OUTPUT":"");
                        break;
                }
 
-               if(w->wait_send && FD_ISSET(w->ofd, &ofds)) {
+               int used = 0;
+               if(w->wait_send && ofd->revents & POLLOUT) {
+                       used++;
                        if(web_client_send(w) < 0) {
                                debug(D_WEB_CLIENT, "%llu: Cannot send data to client. Closing client.", w->id);
                                break;
                        }
                }
 
-               if(w->wait_receive && FD_ISSET(w->ifd, &ifds)) {
+               if(w->wait_receive && (ifd->revents & POLLIN || ifd->revents & POLLPRI)) {
+                       used++;
                        if(web_client_receive(w) < 0) {
                                debug(D_WEB_CLIENT, "%llu: Cannot receive data from client. Closing client.", w->id);
                                break;
@@ -2185,13 +2197,16 @@ void *web_client_main(void *ptr)
                                web_client_process(w);
                        }
                }
+
+               if(unlikely(!used)) {
+                       debug(D_WEB_CLIENT_ACCESS, "%llu: Received error on socket.", w->id);
+                       break;
+               }
        }
 
        log_access("%llu: %s port %s disconnected from thread task id %d", w->id, w->client_ip, w->client_port, gettid());
        debug(D_WEB_CLIENT, "%llu: done...", w->id);
 
-       web_client_reset(w);
-
        // close the sockets/files now
        // to free file descriptors
        if(w->ifd == w->ofd) {
@@ -2199,7 +2214,7 @@ void *web_client_main(void *ptr)
        }
        else {
                if(w->ifd != -1) close(w->ifd);
-               if(w->ofd != -1) close(w->ifd);
+               if(w->ofd != -1) close(w->ofd);
        }
        w->ifd = -1;
        w->ofd = -1;
index 2c788df3d6978ed38b7b8d8ea827fef8e1782950..fef2c3fb29224d67bb065bfd209f386ba1e596e3 100644 (file)
@@ -13,6 +13,7 @@
 #include <fcntl.h>
 #include <netinet/tcp.h>
 #include <malloc.h>
+#include <poll.h>
 
 #include "common.h"
 #include "log.h"
@@ -205,40 +206,32 @@ int create_listen_socket(void) {
 #define CLEANUP_EVERY_EVENTS 1000
 
 void *socket_listen_main_multi_threaded(void *ptr) {
-       if(ptr) { ; }
+       (void)ptr;
 
        web_server_mode = WEB_SERVER_MODE_MULTI_THREADED;
        info("Multi-threaded WEB SERVER thread created with task id %d", gettid());
 
        struct web_client *w;
-       struct timeval tv;
        int retval, failures = 0, counter = 0;
 
-       if(ptr) { ; }
-
        if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
                error("Cannot set pthread cancel type to DEFERRED.");
 
        if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
                error("Cannot set pthread cancel state to ENABLE.");
 
-       if(listen_fd < 0 || listen_fd >= FD_SETSIZE) fatal("LISTENER: Listen socket %d is not ready, or invalid.", listen_fd);
-
-       fd_set ifds;
-       FD_ZERO (&ifds);
+       if(listen_fd < 0)
+               fatal("LISTENER: Listen socket %d is not ready, or invalid.", listen_fd);
 
        for(;;) {
-               tv.tv_sec = 0;
-               tv.tv_usec = 100000;
-
-               if(likely(listen_fd >= 0))
-                       FD_SET(listen_fd, &ifds);
+               struct pollfd fd = { .fd = listen_fd, .events = POLLIN, .revents = 0 };
+               int timeout = 10 * 1000;
 
                // debug(D_WEB_CLIENT, "LISTENER: Waiting...");
-               retval = select(listen_fd + 1, &ifds, NULL, NULL, &tv);
+               retval = poll(&fd, 1, timeout);
 
                if(unlikely(retval == -1)) {
-                       error("LISTENER: select() failed.");
+                       error("LISTENER: poll() failed.");
                        failures++;
 
                        if(failures > 10) {
@@ -257,7 +250,7 @@ void *socket_listen_main_multi_threaded(void *ptr) {
                }
                else if(likely(retval)) {
                        // check for new incoming connections
-                       if(likely(FD_ISSET(listen_fd, &ifds))) {
+                       if(fd.revents & POLLIN || fd.revents & POLLPRI) {
                                w = web_client_create(listen_fd);
                                if(unlikely(!w)) {
                                        // no need for error log - web_client_create already logged the error
@@ -273,8 +266,11 @@ void *socket_listen_main_multi_threaded(void *ptr) {
                                        w->obsolete = 1;
                                }
                        }
-                       else debug(D_WEB_CLIENT, "LISTENER: select() didn't do anything.");
-
+                       else {
+                               failures++;
+                               debug(D_WEB_CLIENT, "LISTENER: select() didn't do anything.");
+                               continue;
+                       }
                }
                else {
                        debug(D_WEB_CLIENT, "LISTENER: select() timeout.");