]> arthur.barton.de Git - netdata.git/commitdiff
allow metrics streaming to work in parallel with local database; propagate O/S type...
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Wed, 22 Feb 2017 22:09:46 +0000 (00:09 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Wed, 22 Feb 2017 22:09:46 +0000 (00:09 +0200)
17 files changed:
src/appconfig.c
src/backends.c
src/common.h
src/health.c
src/main.c
src/registry_init.c
src/rrd.h
src/rrd2json.c
src/rrdhost.c
src/rrdpush.c
src/rrdpush.h
src/rrdset.c
src/socket.c
src/socket.h
src/web_buffer.c
src/web_client.c
src/web_client.h

index bc7ba2b2cdf85357a9e821594e569a3910d92e9e..3a71b099ee6478d1b05089f6cca2d9ed59c38392 100644 (file)
@@ -503,11 +503,13 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed)
 
         appconfig_wrlock(root);
         for(co = root->sections; co ; co = co->next) {
-            if(!strcmp(co->name, "global") ||
-                    !strcmp(co->name, "plugins")  ||
-                    !strcmp(co->name, "registry") ||
-                    !strcmp(co->name, "health")   ||
-                    !strcmp(co->name, "backend"))
+            if(!strcmp(co->name, "global")
+               || !strcmp(co->name, "plugins")
+               || !strcmp(co->name, "registry")
+               || !strcmp(co->name, "health")
+               || !strcmp(co->name, "backend")
+               || !strcmp(co->name, "stream")
+                    )
                 pri = 0;
             else if(!strncmp(co->name, "plugin:", 7)) pri = 1;
             else pri = 2;
index 300f6ae31492091aa728443ca58329cf0a5e4e83..5f725e402c18791b91773049d82a13b0a6808491 100644 (file)
@@ -147,7 +147,7 @@ void *backends_main(void *ptr) {
     // ------------------------------------------------------------------------
     // collect configuration options
 
-    if(central_netdata_to_push_data) {
+    if(rrdpush_exclusive) {
         info("Backend is disabled - use the central netdata");
         goto cleanup;
     }
@@ -399,7 +399,7 @@ void *backends_main(void *ptr) {
             usec_t start_ut = now_monotonic_usec();
             size_t reconnects = 0;
 
-            sock = connect_to_one_of(destination, default_port, &timeout, &reconnects);
+            sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
 
             chart_backend_reconnects += reconnects;
             chart_backend_latency += now_monotonic_usec() - start_ut;
index 240ab0d097b456d4b1f24861f0dff5a2eba2f38c..8692e5edb5bdd035933a3a67c6f3001ebf723de1 100644 (file)
@@ -226,7 +226,6 @@ extern char *netdata_configured_cache_dir;
 extern char *netdata_configured_varlib_dir;
 extern char *netdata_configured_home_dir;
 extern char *netdata_configured_host_prefix;
-extern char *central_netdata_to_push_data;
 
 extern void netdata_fix_chart_id(char *s);
 extern void netdata_fix_chart_name(char *s);
index 494904c097434dbd821b55641e4c797b75402222..0fdedfb4c32b0ca256383f9f9636772aa49d8a4f 100644 (file)
@@ -15,7 +15,7 @@ inline char *health_config_dir(void) {
 void health_init(void) {
     debug(D_HEALTH, "Health configuration initializing");
 
-    if(!central_netdata_to_push_data) {
+    if(!rrdpush_exclusive) {
         if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) {
             debug(D_HEALTH, "Health is disabled.");
             return;
index 3c9c16d563d61bd352479ef8d1b8ea6778ced3de..48bb6b90b2787daf1979b44cc763e0e80d6d4470 100644 (file)
@@ -2,8 +2,6 @@
 
 extern void *cgroups_main(void *ptr);
 
-char *central_netdata_to_push_data = NULL;
-
 void netdata_cleanup_and_exit(int ret) {
     netdata_exit = 1;
 
@@ -64,15 +62,16 @@ struct netdata_static_thread static_threads[] = {
 void web_server_threading_selection(void) {
     int multi_threaded = 0;
     int single_threaded = 0;
-    int central_thread = 0;
+    int rrdpush_thread = 1;
+    int backends_thread = 1;
 
-    if(!central_netdata_to_push_data) {
-        multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
-        single_threaded = !multi_threaded;
+    if(rrdpush_exclusive) {
+        backends_thread = 0;
+        info("Web servers and backends thread are disabled - use the central netdata.");
     }
     else {
-        central_thread = 1;
-        info("Web servers are disabled - use the central netdata.");
+        multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
+        single_threaded = !multi_threaded;
     }
 
     int i;
@@ -84,10 +83,13 @@ void web_server_threading_selection(void) {
             static_threads[i].enabled = single_threaded;
 
         if(static_threads[i].start_routine == central_netdata_push_thread)
-            static_threads[i].enabled = central_thread;
+            static_threads[i].enabled = rrdpush_thread;
+
+        if(static_threads[i].start_routine == backends_main)
+            static_threads[i].enabled = backends_thread;
     }
 
-    if(central_netdata_to_push_data)
+    if(rrdpush_exclusive)
         return;
 
     web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
@@ -694,15 +696,13 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // find we need to send data to a central netdata
 
-        central_netdata_to_push_data = config_get("global", "central netdata to send all data", "");
-        if(central_netdata_to_push_data && !*central_netdata_to_push_data)
-            central_netdata_to_push_data = NULL;
+        rrdpush_init();
 
 
         // --------------------------------------------------------------------
         // get default memory mode for the database
 
-        if(central_netdata_to_push_data) {
+        if(rrdpush_exclusive) {
             default_rrd_memory_mode = RRD_MEMORY_MODE_RAM;
             config_set("global", "memory mode", rrd_memory_mode_name(default_rrd_memory_mode));
         }
@@ -713,7 +713,7 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // get default database size
 
-        if(central_netdata_to_push_data) {
+        if(rrdpush_exclusive) {
             default_rrd_history_entries = 10;
             config_set_number("global", "history", default_rrd_history_entries);
         }
@@ -844,7 +844,7 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // create the listening sockets
 
-        if(!check_config && !central_netdata_to_push_data) {
+        if(!check_config && !rrdpush_exclusive) {
             char filename[FILENAME_MAX + 1];
             snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
             appconfig_load(&stream_config, filename, 0);
index 601f9a79ddb75476672392f6ef27403fe54e72ac..267284e771bb385e3639101cc00ee00ed01c047d 100644 (file)
@@ -4,7 +4,7 @@ int registry_init(void) {
     char filename[FILENAME_MAX + 1];
 
     // registry enabled?
-    if(!central_netdata_to_push_data) {
+    if(!rrdpush_exclusive) {
         registry.enabled = config_get_boolean("registry", "enabled", 0);
     }
     else {
index c531c4f9f74eca6501809f5b6e862d679b2b4cfa..0e7a1efa17719fdbca5376d78c797cca624b0185 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -367,6 +367,10 @@ struct rrdhost {
     // are created or renamed, that match them
     RRDCALCTEMPLATE *templates;
 
+    char *os;                                       // the O/S type of the host
+    volatile size_t use_counter;                    // when remote hosts are streaming to this
+                                                    // host, this is the counter of connected clients
+
     // health / alarm settings
     char *health_default_exec;
     char *health_default_recipient;
@@ -406,7 +410,7 @@ extern pthread_rwlock_t rrd_rwlock;
 extern void rrd_init(char *hostname);
 
 extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
-extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
+extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled);
 
 #ifdef NETDATA_INTERNAL_CHECKS
 extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
index 8186fab0236051a83f848ce98979598f331b2fc8..d066bcb7d22488e8b20229ecf96576d427e93edc 100644 (file)
@@ -92,7 +92,7 @@ void rrd_stats_api_v1_charts(RRDHOST *host, BUFFER *wb)
         ",\n\t\"charts\": {"
         , host->hostname
         , program_version
-        , os_type
+        , host->os
         , host->rrd_update_every
         , host->rrd_history_entries
         );
index 965008524c071614a2ddd5be5480eda2c2238567..50ddabe8c96de9b23da3855e51ac1065335c84e5 100644 (file)
@@ -43,6 +43,11 @@ static inline void rrdhost_init_hostname(RRDHOST *host, const char *hostname) {
     host->hash_hostname = simple_hash(host->hostname);
 }
 
+static inline void rrdhost_init_os(RRDHOST *host, const char *os) {
+    freez(host->os);
+    host->os = strdupz(os?os:"unknown");
+}
+
 static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_guid) {
     strncpy(host->machine_guid, machine_guid, GUID_LEN);
     host->machine_guid[GUID_LEN] = '\0';
@@ -55,6 +60,7 @@ static inline void rrdhost_init_machine_guid(RRDHOST *host, const char *machine_
 
 RRDHOST *rrdhost_create(const char *hostname,
         const char *guid,
+        const char *os,
         int update_every,
         int entries,
         RRD_MEMORY_MODE memory_mode,
@@ -73,6 +79,7 @@ RRDHOST *rrdhost_create(const char *hostname,
 
     rrdhost_init_hostname(host, hostname);
     rrdhost_init_machine_guid(host, guid);
+    rrdhost_init_os(host, os);
 
     avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
     avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
@@ -176,12 +183,12 @@ RRDHOST *rrdhost_create(const char *hostname,
     return host;
 }
 
-RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
+RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid, const char *os, int update_every, int history, RRD_MEMORY_MODE mode, int health_enabled) {
     debug(D_RRDHOST, "Searching for host '%s' with guid '%s'", hostname, guid);
 
     RRDHOST *host = rrdhost_find(guid, 0);
     if(!host) {
-        host = rrdhost_create(hostname, guid, update_every, history, mode, health_enabled);
+        host = rrdhost_create(hostname, guid, os, update_every, history, mode, health_enabled);
     }
     else {
         host->health_enabled = health_enabled;
@@ -214,6 +221,7 @@ void rrd_init(char *hostname) {
 
     localhost = rrdhost_create(hostname,
             registry_get_this_machine_guid(),
+            os_type,
             default_rrd_update_every,
             default_rrd_history_entries,
             default_rrd_memory_mode,
@@ -304,6 +312,7 @@ void rrdhost_free(RRDHOST *host) {
     // ------------------------------------------------------------------------
     // free it
 
+    freez(host->os);
     freez(host->cache_dir);
     freez(host->varlib_dir);
     freez(host->health_default_exec);
index 8a5e462da9ee89edd3d8811db306e2381e300b02..e5c18ef124650fe0e95b4b6a95eef00ce7a09ad3 100644 (file)
@@ -1,36 +1,55 @@
 #include "common.h"
 
+int rrdpush_enabled = 0;
+int rrdpush_exclusive = 1;
+
+static char *central_netdata = NULL;
+static char *api_key = NULL;
+
+#define CONNECTED_TO_SIZE 100
+
+// data collection happens from multiple threads
+// each of these threads calls rrdset_done()
+// which in turn calls rrdset_done_push()
+// which uses this pipe to notify the streaming thread
+// that there are more data ready to be sent
 #define PIPE_READ 0
 #define PIPE_WRITE 1
+int rrdpush_pipe[2] = { -1, -1 };
 
-int rrdpush_pipe[2];
-
+// a buffer used to store data to be sent.
+// the format is the same as external plugins.
 static BUFFER *rrdpush_buffer = NULL;
+
+// locking to get exclusive access to shared resources
+// (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer
 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// if the streaming thread is connected to a central netdata
+// this is set to 1, otherwise 0.
 static volatile int rrdpush_connected = 0;
 
-static inline void rrdpush_lock() {
-    pthread_mutex_lock(&rrdpush_mutex);
-}
+// to have the remote netdata re-sync the charts
+// to its current clock, we send for this many
+// iterations a BEGIN line without microseconds
+// this is for the first iterations of each chart
+static unsigned int remote_clock_resync_iterations = 60;
 
-static inline void rrdpush_unlock() {
-    pthread_mutex_unlock(&rrdpush_mutex);
-}
+#define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex)
+#define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex)
 
+// checks if the current chart definition has been sent
 static inline int need_to_send_chart_definition(RRDSET *st) {
     RRDDIM *rd;
     rrddim_foreach_read(rd, st)
         if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
             return 1;
 
-
-    // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
     return 0;
 }
 
+// sends the current chart definition
 static inline void send_chart_definition(RRDSET *st) {
-    // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
-
     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
                 , st->id
                 , st->name
@@ -58,8 +77,9 @@ static inline void send_chart_definition(RRDSET *st) {
     }
 }
 
+// sends the current chart dimensions
 static inline void send_chart_metrics(RRDSET *st) {
-    buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > 60)?st->usec_since_last_update:0);
+    buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
 
     RRDDIM *rd;
     rrddim_foreach_read(rd, st) {
@@ -73,6 +93,8 @@ static inline void send_chart_metrics(RRDSET *st) {
     buffer_strcat(rrdpush_buffer, "END\n");
 }
 
+// resets all the chart, so that their definitions
+// will be resent to the central netdata
 static void reset_all_charts(void) {
     rrd_rdlock();
 
@@ -106,7 +128,6 @@ void rrdset_done_push(RRDSET *st) {
     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
         return;
 
-
     rrdpush_lock();
 
     if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
@@ -144,6 +165,20 @@ static inline void rrdpush_flush(void) {
     rrdpush_unlock();
 }
 
+int rrdpush_init() {
+    rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
+    rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
+    central_netdata = config_get("stream", "stream metrics to", "");
+    api_key = config_get("stream", "api key", "");
+
+    if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key) {
+        rrdpush_enabled = 0;
+        rrdpush_exclusive = 0;
+    }
+
+    return rrdpush_enabled;
+}
+
 void *central_netdata_push_thread(void *ptr) {
     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
 
@@ -155,24 +190,31 @@ void *central_netdata_push_thread(void *ptr) {
     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
         error("STREAM: cannot set pthread cancel state to ENABLE.");
 
+    int timeout = (int)config_get_number("stream", "timeout seconds", 60);
+    int default_port = (int)config_get_number("stream", "default port", 19999);
+    size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
+    unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
+    remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
+    int sock = -1;
 
-    rrdpush_buffer = buffer_create(1);
-
-    if(pipe(rrdpush_pipe) == -1)
-        fatal("STREAM: cannot create required pipe.");
-
-    struct timeval tv = {
-            .tv_sec = 60,
-            .tv_usec = 0
-    };
+    if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key)
+        goto cleanup;
 
+    // initialize rrdpush globals
+    rrdpush_buffer = buffer_create(1);
     rrdpush_connected = 0;
+    if(pipe(rrdpush_pipe) == -1) fatal("STREAM: cannot create required pipe.");
+
+    // initialize local variables
     size_t begin = 0;
-    size_t max_size = 1024 * 1024;
     size_t reconnects_counter = 0;
     size_t sent_bytes = 0;
     size_t sent_connection = 0;
-    int sock = -1;
+
+    struct timeval tv = {
+            .tv_sec = timeout,
+            .tv_usec = 0
+    };
 
     struct pollfd fds[2], *ifd, *ofd;
     nfds_t fdmax;
@@ -180,6 +222,8 @@ void *central_netdata_push_thread(void *ptr) {
     ifd = &fds[0];
     ofd = &fds[1];
 
+    char connected_to[CONNECTED_TO_SIZE + 1];
+
     for(;;) {
         if(netdata_exit) break;
 
@@ -188,55 +232,57 @@ void *central_netdata_push_thread(void *ptr) {
             // they will be lost, so there is no point to do it
             rrdpush_connected = 0;
 
-            info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
-            sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
+            info("STREAM: connecting to central netdata at: %s", central_netdata);
+            sock = connect_to_one_of(central_netdata, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
 
             if(unlikely(sock == -1)) {
-                error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
-                sleep(5);
+                error("STREAM: failed to connect to central netdata at: %s", central_netdata);
+                sleep(reconnect_delay);
                 continue;
             }
 
-            info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
+            info("STREAM: initializing communication to central netdata at: %s", connected_to);
 
             char http[1000 + 1];
-            snprintfz(http, 1000, "GET /stream?key=%s&hostname=%s&machine_guid=%s&update_every=%d HTTP/1.1\r\n"
+            snprintfz(http, 1000,
+                    "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
                     "User-Agent: netdata-push-service/%s\r\n"
                     "Accept: */*\r\n\r\n"
-                      , config_get("global", "central netdata api key", "")
+                      , api_key
                       , localhost->hostname
                       , localhost->machine_guid
+                      , localhost->os
                       , default_rrd_update_every
                       , program_version
             );
 
-            if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+            if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) {
                 close(sock);
                 sock = -1;
-                error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
-                sleep(5);
+                error("STREAM: failed to send http header to netdata at: %s", connected_to);
+                sleep(reconnect_delay);
                 continue;
             }
 
-            info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
+            info("STREAM: Waiting for STREAM from central netdata at: %s", connected_to);
 
-            if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+            if(recv_timeout(sock, http, 1000, 0, timeout) == -1) {
                 close(sock);
                 sock = -1;
-                error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
-                sleep(5);
+                error("STREAM: failed to receive STREAM from netdata at: %s", connected_to);
+                sleep(reconnect_delay);
                 continue;
             }
 
             if(strncmp(http, "STREAM", 6)) {
                 close(sock);
                 sock = -1;
-                error("STREAM: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
-                sleep(5);
+                error("STREAM: server at %s, did not send STREAM", connected_to);
+                sleep(reconnect_delay);
                 continue;
             }
 
-            info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
+            info("STREAM: Established communication with central netdata at: %s - sending metrics...", connected_to);
 
             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
                 error("STREAM: cannot set non-blocking mode for socket.");
@@ -264,7 +310,7 @@ void *central_netdata_push_thread(void *ptr) {
         }
 
         if(netdata_exit) break;
-        int retval = poll(fds, fdmax, 60 * 1000);
+        int retval = poll(fds, fdmax, timeout * 1000);
         if(netdata_exit) break;
 
         if(unlikely(retval == -1)) {
@@ -298,7 +344,7 @@ void *central_netdata_push_thread(void *ptr) {
             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
             if(ret == -1) {
                 if(errno != EAGAIN && errno != EINTR) {
-                    error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+                    error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", connected_to, sent_connection);
                     close(sock);
                     sock = -1;
                 }
@@ -326,10 +372,23 @@ void *central_netdata_push_thread(void *ptr) {
         }
     }
 
+cleanup:
     debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
-    if(sock != -1) {
-        close(sock);
-    }
+
+    // make sure the data collection threads do not write data
+    rrdpush_connected = 0;
+
+    // close the pipe
+    if(rrdpush_pipe[PIPE_READ] != -1)  close(rrdpush_pipe[PIPE_READ]);
+    if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]);
+
+    // close the socket
+    if(sock != -1) close(sock);
+
+    rrdpush_lock();
+    buffer_free(rrdpush_buffer);
+    rrdpush_buffer = NULL;
+    rrdpush_unlock();
 
     static_thread->enabled = 0;
     pthread_exit(NULL);
index ee1a96ead7ef38be611f5917ab4a0779080e510c..d34e8c0332441aea93b8ceb5fd8c6aa9d7f7c1be 100644 (file)
@@ -1,6 +1,10 @@
 #ifndef NETDATA_RRDPUSH_H
 #define NETDATA_RRDPUSH_H
 
+extern int rrdpush_enabled;
+extern int rrdpush_exclusive;
+
+extern int rrdpush_init();
 extern void rrdset_done_push(RRDSET *st);
 extern void *central_netdata_push_thread(void *ptr);
 
index da84186f6ce9fc3622cc87b41cff427e5d91e070..f88ce81804b1ceec2536ec36febb6419323fb6d0 100644 (file)
@@ -196,7 +196,7 @@ void rrdset_reset(RRDSET *st) {
 // RRDSET - helpers for rrdset_create()
 
 inline long align_entries_to_pagesize(long entries) {
-    if(central_netdata_to_push_data)
+    if(rrdpush_exclusive)
         return entries;
 
     if(entries < 5) entries = 5;
@@ -609,16 +609,16 @@ static inline void rrdset_done_push_int(RRDSET *st) {
         rrdset_update_last_collected_time(st);
     }
 
-    rrdset_done_push(st);
-
     st->counter++;
     st->counter_done++;
+
+    rrdset_done_push(st);
 }
 
 void rrdset_done(RRDSET *st) {
     if(unlikely(netdata_exit)) return;
 
-    if(unlikely(central_netdata_to_push_data)) {
+    if(unlikely(rrdpush_exclusive)) {
         rrdset_done_push_int(st);
         return;
     }
@@ -1207,5 +1207,8 @@ void rrdset_done(RRDSET *st) {
 
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
+
+    if(unlikely(rrdpush_enabled))
+        rrdset_done_push_int(st);
 }
 
index 6906a2882d9f79468743ef3ee2484db486e2bfdb..400c1ef4e2456020c5fb36794dc7c8cc449522ab 100644 (file)
@@ -180,7 +180,7 @@ int connect_to(const char *definition, int default_port, struct timeval *timeout
     return fd;
 }
 
-int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter) {
+int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) {
     int sock = -1;
 
     const char *s = destination;
@@ -200,7 +200,13 @@ int connect_to_one_of(const char *destination, int default_port, struct timeval
         strncpyz(buf, s, e - s);
         if(reconnects_counter) *reconnects_counter += 1;
         sock = connect_to(buf, default_port, timeout);
-        if(sock != -1) break;
+        if(sock != -1) {
+            if(connected_to && connected_to_size) {
+                strncpy(connected_to, buf, connected_to_size);
+                connected_to[connected_to_size - 1] = '\0';
+            }
+            break;
+        }
         s = e;
     }
 
index 2dea4418ce100b0680799b7e076b0a80170933d7..89c154a6186147d66072cf55c81838e842edcab3 100644 (file)
@@ -6,7 +6,7 @@
 #define NETDATA_SOCKET_H
 
 extern int connect_to(const char *definition, int default_port, struct timeval *timeout);
-extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter);
+extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size);
 
 extern ssize_t recv_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
 extern ssize_t send_timeout(int sockfd, void *buf, size_t len, int flags, int timeout);
index 6203db0f73e539066dd9aad65eb854a14ffd5bb6..9f9ceda63da26a5f3d9720c863494dabe46755b8 100644 (file)
@@ -359,8 +359,9 @@ BUFFER *buffer_create(size_t size)
     return(b);
 }
 
-void buffer_free(BUFFER *b)
-{
+void buffer_free(BUFFER *b) {
+    if(unlikely(!b)) return;
+
     buffer_overflow_check(b);
 
     debug(D_WEB_BUFFER, "Freeing web buffer of size %zu.", b->size);
index 2aa98ed1a5e90c8fa06002ae9df6c59d384fe373..7c5474edc57549f57d0aa7685f83d1812a8f58a3 100644 (file)
@@ -45,8 +45,7 @@ static inline int web_client_uncrock_socket(struct web_client *w) {
     return 0;
 }
 
-struct web_client *web_client_create(int listener)
-{
+struct web_client *web_client_create(int listener) {
     struct web_client *w;
 
     w = callocz(1, sizeof(struct web_client));
@@ -223,9 +222,9 @@ struct web_client *web_client_free(struct web_client *w) {
 
     if(w->prev) w->prev->next = w->next;
     if(w->next) w->next->prev = w->prev;
-    if(w->response.header_output) buffer_free(w->response.header_output);
-    if(w->response.header) buffer_free(w->response.header);
-    if(w->response.data) buffer_free(w->response.data);
+    buffer_free(w->response.header_output);
+    buffer_free(w->response.header);
+    buffer_free(w->response.data);
     if(w->ifd != -1) close(w->ifd);
     if(w->ofd != -1 && w->ofd != w->ifd) close(w->ofd);
     freez(w);
@@ -1066,8 +1065,7 @@ int web_client_api_request_v1_badge(RRDHOST *host, struct web_client *w, char *u
     }
 
 cleanup:
-    if(dimensions)
-        buffer_free(dimensions);
+    buffer_free(dimensions);
     return ret;
 }
 
@@ -1242,7 +1240,7 @@ int web_client_api_request_v1_data(RRDHOST *host, struct web_client *w, char *ur
         buffer_strcat(w->response.data, ");");
 
 cleanup:
-    if(dimensions) buffer_free(dimensions);
+    buffer_free(dimensions);
     return ret;
 }
 
@@ -1675,7 +1673,7 @@ int validate_stream_api_key(const char *key) {
 }
 
 int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
-    char *key = NULL, *hostname = NULL, *machine_guid = NULL;
+    char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
     int update_every = default_rrd_update_every;
     int history = default_rrd_history_entries;
     RRD_MEMORY_MODE mode = default_rrd_memory_mode;
@@ -1697,6 +1695,8 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
             machine_guid = value;
         else if(!strcmp(name, "update_every"))
             update_every = (int)strtoul(value, NULL, 0);
+        else if(!strcmp(name, "os"))
+            os = value;
     }
 
     if(!key || !*key) {
@@ -1734,7 +1734,6 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
         return 404;
     }
 
-    // update_every = (int)appconfig_get_number(&stream_config, key, "default update every", update_every);
     update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
     if(update_every < 0) update_every = 1;
 
@@ -1748,7 +1747,10 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
     health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
 
-    host = rrdhost_find_or_create(hostname, machine_guid, update_every, history, mode, health_enabled?1:0);
+    if(strcmp(machine_guid, "localhost"))
+        host = localhost;
+    else
+        host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, health_enabled?1:0);
 
     info("STREAM request from client '%s:%s' for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s",
             w->client_ip, w->client_port,
@@ -1806,20 +1808,24 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
         return 500;
     }
 
+    rrdhost_wrlock(host);
+    host->use_counter++;
+    rrdhost_unlock(host);
+
     // call the plugins.d processor to receive the metrics
     info("STREAM [%s]:%s: connecting client to plugins.d on host '%s' with machine GUID '%s'.", w->client_ip, w->client_port, host->hostname, host->machine_guid);
     size_t count = pluginsd_process(host, &cd, fp, 1);
     error("STREAM [%s]:%s: client disconnected (host '%s', machine GUID '%s').", w->client_ip, w->client_port, host->hostname, host->machine_guid);
-    if(health_enabled == CONFIG_BOOLEAN_AUTO)
+
+    rrdhost_wrlock(host);
+    host->use_counter--;
+    if(!host->use_counter && health_enabled == CONFIG_BOOLEAN_AUTO)
         host->health_enabled = 0;
+    rrdhost_unlock(host);
 
-    // close all sockets, to let the socket worker we are done
+    // cleanup
     fclose(fp);
     w->ifd = -1;
-    if(w->ofd != -1 && w->ofd != w->ifd) {
-        close(w->ofd);
-        w->ofd = -1;
-    }
 
     // this will not send anything
     // the socket is closed
@@ -2032,6 +2038,10 @@ static inline HTTP_VALIDATION http_request_validate(struct web_client *w) {
         encoded_url = s = &s[8];
         w->mode = WEB_CLIENT_MODE_OPTIONS;
     }
+    else if(!strncmp(s, "STREAM ", 8)) {
+        encoded_url = s = &s[8];
+        w->mode = WEB_CLIENT_MODE_STREAM;
+    }
     else {
         w->wait_receive = 0;
         return HTTP_VALIDATION_NOT_SUPPORTED;
@@ -2292,8 +2302,7 @@ static inline int web_client_process_url(RRDHOST *host, struct web_client *w, ch
             hash_graph = 0,
             hash_list = 0,
             hash_all_json = 0,
-            hash_host = 0,
-            hash_stream = 0;
+            hash_host = 0;
 
 #ifdef NETDATA_INTERNAL_CHECKS
     static uint32_t hash_exit = 0, hash_debug = 0, hash_mirror = 0;
@@ -2308,7 +2317,6 @@ static inline int web_client_process_url(RRDHOST *host, struct web_client *w, ch
         hash_list = simple_hash("list");
         hash_all_json = simple_hash("all.json");
         hash_host = simple_hash("host");
-        hash_stream = simple_hash("stream");
 #ifdef NETDATA_INTERNAL_CHECKS
         hash_exit = simple_hash("exit");
         hash_debug = simple_hash("debug");
@@ -2329,10 +2337,6 @@ static inline int web_client_process_url(RRDHOST *host, struct web_client *w, ch
             debug(D_WEB_CLIENT_ACCESS, "%llu: host switch request ...", w->id);
             return web_client_switch_host(host, w, url);
         }
-        else if(unlikely(hash == hash_stream && strcmp(tok, "stream") == 0)) {
-            debug(D_WEB_CLIENT_ACCESS, "%llu: stream request ...", w->id);
-            return web_client_stream_request(host, w, url);
-        }
         else if(unlikely(hash == hash_netdata_conf && strcmp(tok, "netdata.conf") == 0)) {
             debug(D_WEB_CLIENT_ACCESS, "%llu: Sending netdata.conf ...", w->id);
             w->response.data->contenttype = CT_TEXT_PLAIN;
@@ -2483,6 +2487,10 @@ void web_client_process_request(struct web_client *w) {
                 buffer_strcat(w->response.data, "OK");
                 w->response.code = 200;
             }
+            else if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM)) {
+                w->response.code = web_client_stream_request(localhost, w, w->decoded_url);
+                return;
+            }
             else
                 w->response.code = web_client_process_url(localhost, w, w->decoded_url);
             break;
@@ -2528,6 +2536,10 @@ void web_client_process_request(struct web_client *w) {
     else w->wait_send = 0;
 
     switch(w->mode) {
+        case WEB_CLIENT_MODE_STREAM:
+            debug(D_WEB_CLIENT, "%llu: STREAM done.", w->id);
+            break;
+
         case WEB_CLIENT_MODE_OPTIONS:
             debug(D_WEB_CLIENT, "%llu: Done preparing the OPTIONS response. Sending data (%zu bytes) to client.", w->id, w->response.data->len);
             break;
@@ -2559,7 +2571,7 @@ void web_client_process_request(struct web_client *w) {
             break;
 
         default:
-            fatal("%llu: Unknown client mode %d.", w->id, w->mode);
+            fatal("%llu: Unknown client mode %u.", w->id, w->mode);
             break;
     }
 }
@@ -2982,7 +2994,8 @@ void *web_client_main(void *ptr)
 
                 // if the sockets are closed, may have transferred this client
                 // to plugins.d
-                if(w->ifd == -1 && w->ofd == -1) break;
+                if(unlikely(w->mode == WEB_CLIENT_MODE_STREAM))
+                    break;
             }
         }
 
index fccb294f7b1570e8f23de613e60904572b0dbac1..3ba2435a35f35c472f0bb99477f1d00951d2a14f 100644 (file)
@@ -13,9 +13,12 @@ extern int web_enable_gzip,
 extern int respect_web_browser_do_not_track_policy;
 extern char *web_x_frame_options;
 
-#define WEB_CLIENT_MODE_NORMAL      0
-#define WEB_CLIENT_MODE_FILECOPY    1
-#define WEB_CLIENT_MODE_OPTIONS     2
+typedef enum web_client_mode {
+    WEB_CLIENT_MODE_NORMAL      = 0,
+    WEB_CLIENT_MODE_FILECOPY    = 1,
+    WEB_CLIENT_MODE_OPTIONS     = 2,
+    WEB_CLIENT_MODE_STREAM      = 3
+} WEB_CLIENT_MODE;
 
 #define URL_MAX 8192
 #define ZLIB_CHUNK  16384
@@ -55,14 +58,14 @@ struct web_client {
 
     uint8_t keepalive:1;                // if set to 1, the web client will be re-used
 
-    uint8_t mode:3;                     // the operational mode of the client
-
     uint8_t wait_receive:1;             // 1 = we are waiting more input data
     uint8_t wait_send:1;                // 1 = we have data to send to the client
 
     uint8_t donottrack:1;               // 1 = we should not set cookies on this client
     uint8_t tracking_required:1;        // 1 = if the request requires cookies
 
+    WEB_CLIENT_MODE mode;               // the operational mode of the client
+
     int tcp_cork;                       // 1 = we have a cork on the socket
 
     int ifd;