]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdpush.c
switch from HTTP to stream data
[netdata.git] / src / rrdpush.c
index ec780b2b3d95a764156827d4dba453b63304f69c..c68d0e4f343063df0f07de98639e24cea2d52db9 100644 (file)
@@ -147,47 +147,115 @@ void *central_netdata_push_thread(void *ptr) {
     size_t sent_bytes = 0;
     size_t sent_connection = 0;
     int sock = -1;
-    char buffer[1];
+    char buffer[1000 + 1];
+
+    struct pollfd fds[2], *ifd, *ofd;
+
+    ifd = &fds[0];
+    ofd = &fds[1];
+
+    ifd->fd = rrdpush_pipe[PIPE_READ];
+    ifd->events = POLLIN;
+    ofd->events = POLLOUT;
+
+    nfds_t fdmax = 2;
 
     for(;;) {
+        if(netdata_exit) break;
+
         if(unlikely(sock == -1)) {
             info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
 
-            if(unlikely(sock != -1)) {
-                info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+            if(unlikely(sock == -1)) {
+                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
 
-                if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
-                    error("PUSH: cannot set non-blocking mode for socket.");
+            char http[1000 + 1];
+            snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
+            if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+                close(sock);
+                sock = -1;
+                error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
             }
-            else
-                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+            if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+                close(sock);
+                sock = -1;
+                error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            if(strncmp(http, "STREAM", 6)) {
+                close(sock);
+                sock = -1;
+                error("PUSH: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
+                sleep(5);
+                continue;
+            }
+
+            if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+                error("PUSH: cannot set non-blocking mode for socket.");
 
             rrdpush_lock();
             if(buffer_strlen(rrdpush_buffer))
                 error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
 
             buffer_flush(rrdpush_buffer);
-            buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
             reset_all_charts();
+            last_host = NULL;
             rrdpush_unlock();
             sent_connection = 0;
         }
 
-        if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
-            error("PUSH: Cannot read from internal pipe.");
-            sleep(1);
+        ifd->revents = 0;
+        ofd->revents = 0;
+        ofd->fd = sock;
+
+        if(begin < buffer_strlen(rrdpush_buffer))
+            ofd->events = POLLOUT;
+        else
+            ofd->events = 0;
+
+        if(netdata_exit) break;
+        int retval = poll(fds, fdmax, 60 * 1000);
+        if(netdata_exit) break;
+
+        if(unlikely(retval == -1)) {
+            if(errno == EAGAIN || errno == EINTR)
+                continue;
+
+            error("PUSH: Failed to poll().");
+            close(sock);
+            sock = -1;
+            break;
+        }
+        else if(unlikely(!retval)) {
+            // timeout
+            continue;
+        }
+
+        if(ifd->revents & POLLIN) {
+            if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+                error("PUSH: Cannot read from internal pipe.");
         }
 
-        if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+        if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
             // fprintf(stderr, "PUSH BEGIN\n");
-            // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+            // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
             // fprintf(stderr, "\nPUSH END\n");
 
             rrdpush_lock();
-            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
+            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
             if(ret == -1) {
-                if(errno != EAGAIN) {
+                if(errno != EAGAIN && errno != EINTR) {
                     error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
                     close(sock);
                     sock = -1;
@@ -197,7 +265,7 @@ void *central_netdata_push_thread(void *ptr) {
                 sent_connection += ret;
                 sent_bytes += ret;
                 begin += ret;
-                if(begin == rrdpush_buffer->len) {
+                if(begin == buffer_strlen(rrdpush_buffer)) {
                     buffer_flush(rrdpush_buffer);
                     begin = 0;
                 }