]> arthur.barton.de Git - netdata.git/commitdiff
switch from HTTP to stream data
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Mon, 20 Feb 2017 23:47:38 +0000 (01:47 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Tue, 21 Feb 2017 23:00:26 +0000 (01:00 +0200)
src/rrdpush.c
src/socket.c
src/socket.h
src/web_client.c

index ec780b2b3d95a764156827d4dba453b63304f69c..c68d0e4f343063df0f07de98639e24cea2d52db9 100644 (file)
@@ -147,47 +147,115 @@ void *central_netdata_push_thread(void *ptr) {
     size_t sent_bytes = 0;
     size_t sent_connection = 0;
     int sock = -1;
-    char buffer[1];
+    char buffer[1000 + 1];
+
+    struct pollfd fds[2], *ifd, *ofd;
+
+    ifd = &fds[0];
+    ofd = &fds[1];
+
+    ifd->fd = rrdpush_pipe[PIPE_READ];
+    ifd->events = POLLIN;
+    ofd->events = POLLOUT;
+
+    nfds_t fdmax = 2;
 
     for(;;) {
+        if(netdata_exit) break;
+
         if(unlikely(sock == -1)) {
             info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
 
-            if(unlikely(sock != -1)) {
-                info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+            if(unlikely(sock == -1)) {
+                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
 
-                if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
-                    error("PUSH: cannot set non-blocking mode for socket.");
+            char http[1000 + 1];
+            snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
+            if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+                close(sock);
+                sock = -1;
+                error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
             }
-            else
-                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+            if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+                close(sock);
+                sock = -1;
+                error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            if(strncmp(http, "STREAM", 6)) {
+                close(sock);
+                sock = -1;
+                error("PUSH: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+                error("PUSH: cannot set non-blocking mode for socket.");
 
             rrdpush_lock();
             if(buffer_strlen(rrdpush_buffer))
                 error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
 
             buffer_flush(rrdpush_buffer);
-            buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
             reset_all_charts();
+            last_host = NULL;
             rrdpush_unlock();
             sent_connection = 0;
         }
 
-        if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
-            error("PUSH: Cannot read from internal pipe.");
-            sleep(1);
+        ifd->revents = 0;
+        ofd->revents = 0;
+        ofd->fd = sock;
+
+        if(begin < buffer_strlen(rrdpush_buffer))
+            ofd->events = POLLOUT;
+        else
+            ofd->events = 0;
+
+        if(netdata_exit) break;
+        int retval = poll(fds, fdmax, 60 * 1000);
+        if(netdata_exit) break;
+
+        if(unlikely(retval == -1)) {
+            if(errno == EAGAIN || errno == EINTR)
+                continue;
+
+            error("PUSH: Failed to poll().");
+            close(sock);
+            sock = -1;
+            break;
+        }
+        else if(unlikely(!retval)) {
+            // timeout
+            continue;
+        }
+
+        if(ifd->revents & POLLIN) {
+            if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+                error("PUSH: Cannot read from internal pipe.");
         }
 
-        if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+        if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
             // fprintf(stderr, "PUSH BEGIN\n");
-            // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+            // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
             // fprintf(stderr, "\nPUSH END\n");
 
             rrdpush_lock();
-            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
+            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
             if(ret == -1) {
-                if(errno != EAGAIN) {
+                if(errno != EAGAIN && errno != EINTR) {
                     error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
                     close(sock);
                     sock = -1;
@@ -197,7 +265,7 @@ void *central_netdata_push_thread(void *ptr) {
                 sent_connection += ret;
                 sent_bytes += ret;
                 begin += ret;
-                if(begin == rrdpush_buffer->len) {
+                if(begin == buffer_strlen(rrdpush_buffer)) {
                     buffer_flush(rrdpush_buffer);
                     begin = 0;
                 }
index e2b3e5814bf1387f8999db982954a6b72d210e0c..6906a2882d9f79468743ef3ee2484db486e2bfdb 100644 (file)
@@ -206,3 +206,65 @@ int connect_to_one_of(const char *destination, int default_port, struct timeval
 
     return sock;
 }
+
+ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) {
+    for(;;) {
+        struct pollfd fd = {
+                .fd = sockfd,
+                .events = POLLIN,
+                .revents = 0
+        };
+
+        errno = 0;
+        int retval = poll(&fd, 1, timeout * 1000);
+
+        if(retval == -1) {
+            // failed
+
+            if(errno == EINTR || errno == EAGAIN)
+                continue;
+
+            return -1;
+        }
+
+        if(!retval) {
+            // timeout
+            return 0;
+        }
+
+        if(fd.events & POLLIN) break;
+    }
+
+    return recv(sockfd, buf, len, flags);
+}
+
+ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout) {
+    for(;;) {
+        struct pollfd fd = {
+                .fd = sockfd,
+                .events = POLLOUT,
+                .revents = 0
+        };
+
+        errno = 0;
+        int retval = poll(&fd, 1, timeout * 1000);
+
+        if(retval == -1) {
+            // failed
+
+            if(errno == EINTR || errno == EAGAIN)
+                continue;
+
+            return -1;
+        }
+
+        if(!retval) {
+            // timeout
+            return 0;
+        }
+
+        if(fd.events & POLLOUT) break;
+    }
+
+    return send(sockfd, buf, len, flags);
+}
index 6dffde34c1f868893192054855d3191c7fa06169..2dea4418ce100b0680799b7e076b0a80170933d7 100644 (file)
@@ -8,4 +8,7 @@
 extern int connect_to(const char *definition, int default_port, struct timeval *timeout);
 extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter);
 
+extern ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
+extern ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
+
 #endif //NETDATA_SOCKET_H
index 61901ed8d1e97efd9e9245a2d60acf190fd1a43d..db578f9c2421244335dec776ca5fca4a7fa27056 100644 (file)
@@ -1719,10 +1719,26 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     snprintfz(cd.fullfilename, FILENAME_MAX,     "%s:%s", w->client_ip, w->client_port);
     snprintfz(cd.cmd,          PLUGINSD_CMD_MAX, "%s:%s", w->client_ip, w->client_port);
 
+    if(send_timeout(w->ifd, "STREAM", 6, 0, 60) != 6) {
+        error("Cannot send STREAM to netdata at %s:%s", w->client_ip, w->client_port);
+        buffer_flush(w->response.data);
+        buffer_sprintf(w->response.data, "STREAM failed to reply back with STREAM");
+        return 400;
+    }
+
     // remove the non-blocking flag from the socket
     if(fcntl(w->ifd, F_SETFL, fcntl(w->ifd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
         error("STREAM from '%s:%s': cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
 
+    /*
+    char buffer[1000 + 1];
+    ssize_t len;
+    while((len = read(w->ifd, buffer, 1000)) != -1) {
+        buffer[len] = '\0';
+        fprintf(stderr, "BEGIN READ %zu bytes\n%s\nEND READ\n", (size_t)len, buffer);
+    }
+    */
+
     // convert the socket to a FILE *
     FILE *fp = fdopen(w->ifd, "r");
     if(!fp) {