]> arthur.barton.de Git - netdata.git/commitdiff
allow each netdata host to have its own thread for streaming metrics
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Thu, 23 Feb 2017 23:47:00 +0000 (01:47 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Thu, 23 Feb 2017 23:47:00 +0000 (01:47 +0200)
src/backends.c
src/health.c
src/main.c
src/registry_init.c
src/rrd.h
src/rrdhost.c
src/rrdpush.c
src/rrdpush.h
src/rrdset.c

index 5f725e402c18791b91773049d82a13b0a6808491..c5b57dbe7ef291022d2e7d86ae558eaa96fae672 100644 (file)
@@ -147,11 +147,6 @@ void *backends_main(void *ptr) {
     // ------------------------------------------------------------------------
     // collect configuration options
 
-    if(rrdpush_exclusive) {
-        info("Backend is disabled - use the central netdata");
-        goto cleanup;
-    }
-
     struct timeval timeout = {
             .tv_sec = 0,
             .tv_usec = 0
@@ -312,6 +307,9 @@ void *backends_main(void *ptr) {
         rrd_rdlock();
         RRDHOST *host;
         rrdhost_foreach_read(host) {
+            if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE)
+                continue;
+
             rrdhost_rdlock(host);
 
             RRDSET *st;
index 9e02b5449b7b45d1bf78223028350b15a4d125ca..2ebbe0658d322bf92f5667d04a3a272f1642261b 100644 (file)
@@ -15,16 +15,9 @@ inline char *health_config_dir(void) {
 void health_init(void) {
     debug(D_HEALTH, "Health configuration initializing");
 
-    if(!rrdpush_exclusive) {
-        if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) {
-            debug(D_HEALTH, "Health is disabled.");
-            return;
-        }
-    }
-    else {
-        info("Health is disabled - setup alarms at the central netdata.");
-        config_set_boolean("health", "enabled", 0);
-        default_health_enabled = 0;
+    if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) {
+        debug(D_HEALTH, "Health is disabled.");
+        return;
     }
 }
 
@@ -32,6 +25,9 @@ void health_init(void) {
 // re-load health configuration
 
 void health_reload_host(RRDHOST *host) {
+    if(unlikely(!host->health_enabled))
+        return;
+
     char *path = health_config_dir();
 
     // free all running alarms
@@ -363,6 +359,9 @@ void *health_main(void *ptr) {
 
         RRDHOST *host;
         rrdhost_foreach_read(host) {
+            if(unlikely(!host->health_enabled))
+                continue;
+
             if(unlikely(apply_hibernation_delay)) {
 
                 info("Postponing alarm checks for %ld seconds, on host '%s', due to boottime discrepancy (realtime dt: %ld, boottime dt: %ld)."
index d8381adba604868a8ec2fcdbb9dc79463d3a27b3..77ffe1de2808c3f398be7c3ebbf3256c209e3d02 100644 (file)
@@ -55,19 +55,16 @@ struct netdata_static_thread static_threads[] = {
     {"plugins.d",           NULL,       NULL,         1, NULL, NULL, pluginsd_main},
     {"web",                 NULL,       NULL,         1, NULL, NULL, socket_listen_main_multi_threaded},
     {"web-single-threaded", NULL,       NULL,         0, NULL, NULL, socket_listen_main_single_threaded},
-    {"central-netdata-push",NULL,       NULL,         0, NULL, NULL, rrdpush_sender_thread},
+    {"push-metrics",        NULL,       NULL,         0, NULL, NULL, rrdpush_sender_thread},
     {NULL,                  NULL,       NULL,         0, NULL, NULL, NULL}
 };
 
 void web_server_threading_selection(void) {
     int multi_threaded = 0;
     int single_threaded = 0;
-    int rrdpush_thread = 1;
-    int backends_thread = 1;
 
-    if(rrdpush_exclusive) {
-        backends_thread = 0;
-        info("Web servers and backends thread are disabled - use the central netdata.");
+    if(default_rrdpush_exclusive) {
+        info("Web server is disabled - use the remote netdata.");
     }
     else {
         multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
@@ -81,15 +78,9 @@ void web_server_threading_selection(void) {
 
         if(static_threads[i].start_routine == socket_listen_main_single_threaded)
             static_threads[i].enabled = single_threaded;
-
-        if(static_threads[i].start_routine == rrdpush_sender_thread)
-            static_threads[i].enabled = rrdpush_thread;
-
-        if(static_threads[i].start_routine == backends_main)
-            static_threads[i].enabled = backends_thread;
     }
 
-    if(rrdpush_exclusive)
+    if(default_rrdpush_exclusive)
         return;
 
     web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
@@ -694,7 +685,7 @@ int main(int argc, char **argv) {
 
 
         // --------------------------------------------------------------------
-        // find we need to send data to a central netdata
+        // find we need to send data to another netdata
 
         rrdpush_init();
 
@@ -702,7 +693,7 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // get default memory mode for the database
 
-        if(rrdpush_exclusive) {
+        if(default_rrdpush_exclusive) {
             default_rrd_memory_mode = RRD_MEMORY_MODE_NONE;
             config_set("global", "memory mode", rrd_memory_mode_name(default_rrd_memory_mode));
         }
@@ -713,14 +704,14 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // get default database size
 
-        if(rrdpush_exclusive) {
+        if(default_rrdpush_exclusive) {
             default_rrd_history_entries = 10;
             config_set_number("global", "history", default_rrd_history_entries);
         }
         else
-            default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(RRD_DEFAULT_HISTORY_ENTRIES));
+            default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(default_rrd_memory_mode, RRD_DEFAULT_HISTORY_ENTRIES));
 
-        long h = align_entries_to_pagesize(default_rrd_history_entries);
+        long h = align_entries_to_pagesize(default_rrd_memory_mode, default_rrd_history_entries);
         if(h != default_rrd_history_entries) {
             config_set_number("global", "history", h);
             default_rrd_history_entries = (int)h;
@@ -844,11 +835,16 @@ int main(int argc, char **argv) {
         // --------------------------------------------------------------------
         // create the listening sockets
 
-        if(!check_config && !rrdpush_exclusive) {
+        if(!check_config && !default_rrdpush_exclusive)
+            create_listen_sockets();
+
+
+        // --------------------------------------------------------------------
+        // load the aggregated host configuration file
+        {
             char filename[FILENAME_MAX + 1];
             snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
             appconfig_load(&stream_config, filename, 0);
-            create_listen_sockets();
         }
     }
 
index 267284e771bb385e3639101cc00ee00ed01c047d..2d8eb5709ee47817af0ffa173f67f7fbad2c367f 100644 (file)
@@ -4,7 +4,7 @@ int registry_init(void) {
     char filename[FILENAME_MAX + 1];
 
     // registry enabled?
-    if(!rrdpush_exclusive) {
+    if(!default_rrdpush_exclusive) {
         registry.enabled = config_get_boolean("registry", "enabled", 0);
     }
     else {
index 46ee8e00f9cb1ac8b2b00524f9c4525b2be398f2..cf141cc5f0ad84fafb7f079918c65a6c9d0fc582 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -341,6 +341,17 @@ struct rrdhost {
     int rrd_update_every;                           // the update frequency of the host
     int rrd_history_entries;                        // the number of history entries for the host's charts
 
+    int rrdpush_enabled;                            // 1 when this host sends metrics to another netdata
+    int rrdpush_exclusive;                          // 1 when this host is exclusively sending metrics without a database
+    volatile int rrdpush_connected;                 // 1 when the sender is ready to push metrics
+    volatile int rrdpush_spawn;                     // 1 when the sender thread has been spawn
+    volatile int rrdpush_error_shown;               // 1 when we have logged a communication error
+    int rrdpush_socket;                             // the fd of the socket to the remote host, or -1
+    pthread_t rrdpush_thread;                       // the sender thread
+    pthread_mutex_t rrdpush_mutex;                  // exclusive access to rrdpush_buffer
+    int rrdpush_pipe[2];                            // collector to sender thread communication
+    BUFFER *rrdpush_buffer;                         // collector fills it, sender sends them
+
     int health_enabled;                             // 1 when this host has health enabled
     time_t health_delay_up_to;                      // a timestamp to delay alarms processing up to
     RRD_MEMORY_MODE rrd_memory_mode;                // the memory more for the charts of this host
@@ -526,7 +537,7 @@ extern int rrddim_unhide(RRDSET *st, const char *id);
 extern collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value);
 extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number value);
 
-extern long align_entries_to_pagesize(long entries);
+extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
 
 
 // ----------------------------------------------------------------------------
index 50ddabe8c96de9b23da3855e51ac1065335c84e5..fffa6758e487cfe6ad6f7dcb8fa6e4aecfcb77a6 100644 (file)
@@ -73,18 +73,25 @@ RRDHOST *rrdhost_create(const char *hostname,
     host->rrd_update_every    = update_every;
     host->rrd_history_entries = entries;
     host->rrd_memory_mode     = memory_mode;
-    host->health_enabled      = health_enabled;
+    host->health_enabled      = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
+    host->rrdpush_enabled     = default_rrdpush_enabled;
+    host->rrdpush_exclusive   = default_rrdpush_exclusive;
 
-    pthread_rwlock_init(&(host->rrdhost_rwlock), NULL);
+    host->rrdpush_pipe[0] = -1;
+    host->rrdpush_pipe[1] = -1;
+    host->rrdpush_socket = -1;
+
+    pthread_mutex_init(&host->rrdpush_mutex, NULL);
+    pthread_rwlock_init(&host->rrdhost_rwlock, NULL);
 
     rrdhost_init_hostname(host, hostname);
     rrdhost_init_machine_guid(host, guid);
     rrdhost_init_os(host, os);
 
-    avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
+    avl_init_lock(&(host->rrdset_root_index),      rrdset_compare);
     avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
-    avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare);
-    avl_init_lock(&(host->variables_root_index), rrdvar_compare);
+    avl_init_lock(&(host->rrdfamily_root_index),   rrdfamily_compare);
+    avl_init_lock(&(host->variables_root_index),   rrdvar_compare);
 
     // ------------------------------------------------------------------------
     // initialize health variables
@@ -110,12 +117,9 @@ RRDHOST *rrdhost_create(const char *hostname,
     if(!localhost) {
         // this is localhost
 
-        host->cache_dir = strdupz(netdata_configured_cache_dir);
+        host->cache_dir  = strdupz(netdata_configured_cache_dir);
         host->varlib_dir = strdupz(netdata_configured_varlib_dir);
 
-        snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
-        host->health_log_filename = strdupz(config_get("health", "health db file", filename));
-
     }
     else {
         // this is not localhost - append our GUID to localhost path
@@ -136,18 +140,18 @@ RRDHOST *rrdhost_create(const char *hostname,
             int r = mkdir(host->varlib_dir, 0775);
             if(r != 0 && errno != EEXIST)
                 error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir);
-        }
 
-        snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
-        int r = mkdir(filename, 0775);
-        if(r != 0 && errno != EEXIST)
-            error("Host '%s': cannot create directory '%s'", host->hostname, filename);
-
-        snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
-        host->health_log_filename = strdupz(filename);
+            snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
+            r = mkdir(filename, 0775);
+            if(r != 0 && errno != EEXIST)
+                error("Host '%s': cannot create directory '%s'", host->hostname, filename);
+        }
 
     }
 
+    snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
+    host->health_log_filename = strdupz(config_get("health", "health db file", filename));
+
     snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_plugins_dir);
     host->health_default_exec = strdupz(config_get("health", "script to execute on alarm", filename));
     host->health_default_recipient = strdup("root");
@@ -156,12 +160,14 @@ RRDHOST *rrdhost_create(const char *hostname,
     // ------------------------------------------------------------------------
     // load health configuration
 
-    health_alarm_log_load(host);
-    health_alarm_log_open(host);
+    if(host->health_enabled) {
+        health_alarm_log_load(host);
+        health_alarm_log_open(host);
 
-    rrdhost_wrlock(host);
-    health_readdir(host, health_config_dir());
-    rrdhost_unlock(host);
+        rrdhost_wrlock(host);
+        health_readdir(host, health_config_dir());
+        rrdhost_unlock(host);
+    }
 
 
     // ------------------------------------------------------------------------
@@ -312,6 +318,11 @@ void rrdhost_free(RRDHOST *host) {
     // ------------------------------------------------------------------------
     // free it
 
+    if(host->rrdpush_spawn) {
+        pthread_cancel(host->rrdpush_thread);
+        rrdpush_sender_cleanup(host);
+    }
+
     freez(host->os);
     freez(host->cache_dir);
     freez(host->varlib_dir);
index d66624921729685f7bab774a5c03409d1b2c5319..9bcd232ab5382bc798c7d421f21d93def53c400b 100644 (file)
@@ -1,7 +1,7 @@
 #include "common.h"
 
-int rrdpush_enabled = 0;
-int rrdpush_exclusive = 1;
+int default_rrdpush_enabled   = 0;
+int default_rrdpush_exclusive = 1;
 
 static char *remote_netdata_config = NULL;
 static char *api_key = NULL;
@@ -15,19 +15,6 @@ static char *api_key = NULL;
 // that there are more data ready to be sent
 #define PIPE_READ 0
 #define PIPE_WRITE 1
-int rrdpush_pipe[2] = { -1, -1 };
-
-// a buffer used to store data to be sent.
-// the format is the same as external plugins.
-static BUFFER *rrdpush_buffer = NULL;
-
-// locking to get exclusive access to shared resources
-// (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer
-static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// if the streaming thread is connected to a remote netdata
-// this is set to 1, otherwise 0.
-static volatile int rrdpush_connected = 0;
 
 // to have the remote netdata re-sync the charts
 // to its current clock, we send for this many
@@ -35,8 +22,8 @@ static volatile int rrdpush_connected = 0;
 // this is for the first iterations of each chart
 static unsigned int remote_clock_resync_iterations = 60;
 
-#define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex)
-#define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex)
+#define rrdpush_lock(host) pthread_mutex_lock(&((host)->rrdpush_mutex))
+#define rrdpush_unlock(host) pthread_mutex_unlock(&((host)->rrdpush_mutex))
 
 // checks if the current chart definition has been sent
 static inline int need_to_send_chart_definition(RRDSET *st) {
@@ -50,7 +37,7 @@ static inline int need_to_send_chart_definition(RRDSET *st) {
 
 // sends the current chart definition
 static inline void send_chart_definition(RRDSET *st) {
-    buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
+    buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
                 , st->id
                 , st->name
                 , st->title
@@ -64,7 +51,7 @@ static inline void send_chart_definition(RRDSET *st) {
 
     RRDDIM *rd;
     rrddim_foreach_read(rd, st) {
-        buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
+        buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
                        , rd->id
                        , rd->name
                        , rrd_algorithm_name(rd->algorithm)
@@ -79,69 +66,69 @@ static inline void send_chart_definition(RRDSET *st) {
 
 // sends the current chart dimensions
 static inline void send_chart_metrics(RRDSET *st) {
-    buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
+    buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
 
     RRDDIM *rd;
     rrddim_foreach_read(rd, st) {
         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
-            buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
+            buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
                        , rd->id
                        , rd->collected_value
         );
     }
 
-    buffer_strcat(rrdpush_buffer, "END\n");
+    buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n");
 }
 
 // resets all the chart, so that their definitions
 // will be resent to the central netdata
-static void reset_all_charts(void) {
-    rrd_rdlock();
-
-    RRDHOST *host;
-    rrdhost_foreach_read(host) {
-        rrdhost_rdlock(host);
+static void reset_all_charts(RRDHOST *host) {
+    rrdhost_rdlock(host);
 
-        RRDSET *st;
-        rrdset_foreach_read(st, host) {
+    RRDSET *st;
+    rrdset_foreach_read(st, host) {
 
-            // make it re-align the current time
-            // on the remote host
-            st->counter_done = 0;
+        // make it re-align the current time
+        // on the remote host
+        st->counter_done = 0;
 
-            rrdset_rdlock(st);
+        rrdset_rdlock(st);
 
-            RRDDIM *rd;
-            rrddim_foreach_read(rd, st)
-                rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
+        RRDDIM *rd;
+        rrddim_foreach_read(rd, st)
+            rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
 
-            rrdset_unlock(st);
-        }
-        rrdhost_unlock(host);
+        rrdset_unlock(st);
     }
-    rrd_unlock();
+
+    rrdhost_unlock(host);
 }
 
+void rrdpush_sender_thread_spawn(RRDHOST *host);
+
 void rrdset_done_push(RRDSET *st) {
-    static int error_shown = 0;
+    RRDHOST *host = st->rrdhost;
 
     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
         return;
 
-    rrdpush_lock();
+    rrdpush_lock(host);
 
-    if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
-        if(unlikely(!error_shown))
+    if(unlikely(host->rrdpush_enabled && !host->rrdpush_spawn))
+        rrdpush_sender_thread_spawn(host);
+
+    if(unlikely(!host->rrdpush_buffer || !host->rrdpush_connected)) {
+        if(unlikely(!host->rrdpush_error_shown))
             error("STREAM [send]: not ready - discarding collected metrics.");
 
-        error_shown = 1;
+        host->rrdpush_error_shown = 1;
 
-        rrdpush_unlock();
+        rrdpush_unlock(host);
         return;
     }
-    else if(unlikely(error_shown)) {
+    else if(unlikely(host->rrdpush_error_shown)) {
         error("STREAM [send]: ready - sending metrics...");
-        error_shown = 0;
+        host->rrdpush_error_shown = 0;
     }
 
     rrdset_rdlock(st);
@@ -152,38 +139,74 @@ void rrdset_done_push(RRDSET *st) {
     rrdset_unlock(st);
 
     // signal the sender there are more data
-    if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
+    if(write(host->rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
         error("STREAM [send]: cannot write to internal pipe");
 
-    rrdpush_unlock();
+    rrdpush_unlock(host);
 }
 
-static inline void rrdpush_flush(void) {
-    rrdpush_lock();
-    if(buffer_strlen(rrdpush_buffer))
-        error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(rrdpush_buffer));
+static inline void rrdpush_flush(RRDHOST *host) {
+    rrdpush_lock(host);
+    if(buffer_strlen(host->rrdpush_buffer))
+        error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(host->rrdpush_buffer));
 
-    buffer_flush(rrdpush_buffer);
-    reset_all_charts();
-    rrdpush_unlock();
+    buffer_flush(host->rrdpush_buffer);
+    reset_all_charts(host);
+    rrdpush_unlock(host);
 }
 
 int rrdpush_init() {
-    rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
-    rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
+    default_rrdpush_enabled = config_get_boolean("stream", "enabled", default_rrdpush_enabled);
+    default_rrdpush_exclusive = config_get_boolean("stream", "exclusive", default_rrdpush_exclusive);
     remote_netdata_config = config_get("stream", "stream metrics to", "");
     api_key = config_get("stream", "api key", "");
 
-    if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
-        rrdpush_enabled = 0;
-        rrdpush_exclusive = 0;
+    if(!default_rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
+        default_rrdpush_enabled = 0;
+        default_rrdpush_exclusive = 0;
     }
 
-    return rrdpush_enabled;
+    return default_rrdpush_enabled;
+}
+
+static inline void rrdpush_sender_lock(RRDHOST *host) {
+    if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+        error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
+
+    rrdpush_lock(host);
+}
+
+static inline void rrdpush_sender_unlock(RRDHOST *host) {
+    if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+        error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
+
+    rrdpush_unlock(host);
+}
+
+void rrdpush_sender_cleanup(RRDHOST *host) {
+    rrdpush_lock(host);
+
+    host->rrdpush_connected = 0;
+
+    if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
+
+    // close the pipe
+    if(host->rrdpush_pipe[PIPE_READ] != -1)  close(host->rrdpush_pipe[PIPE_READ]);
+    if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
+    host->rrdpush_pipe[PIPE_READ] = -1;
+    host->rrdpush_pipe[PIPE_WRITE] = -1;
+
+    buffer_free(host->rrdpush_buffer);
+    host->rrdpush_buffer = NULL;
+
+    host->rrdpush_spawn = 0;
+    host->rrdpush_enabled = 0;
+
+    rrdpush_unlock(host);
 }
 
 void *rrdpush_sender_thread(void *ptr) {
-    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+    RRDHOST *host = (RRDHOST *)ptr;
 
     info("STREAM [send]: thread created (task id %d)", gettid());
 
@@ -198,16 +221,15 @@ void *rrdpush_sender_thread(void *ptr) {
     size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
     unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
     remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
-    int sock = -1;
     char connected_to[CONNECTED_TO_SIZE + 1] = "";
 
-    if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
+    if(!host->rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
         goto cleanup;
 
     // initialize rrdpush globals
-    rrdpush_buffer = buffer_create(1);
-    rrdpush_connected = 0;
-    if(pipe(rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
+    host->rrdpush_buffer = buffer_create(1);
+    host->rrdpush_connected = 0;
+    if(pipe(host->rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
 
     // initialize local variables
     size_t begin = 0;
@@ -226,18 +248,17 @@ void *rrdpush_sender_thread(void *ptr) {
     ifd = &fds[0];
     ofd = &fds[1];
 
-    for(;;) {
-        if(netdata_exit) break;
+    for(; host->rrdpush_enabled && !netdata_exit ;) {
 
-        if(unlikely(sock == -1)) {
+        if(unlikely(host->rrdpush_socket == -1)) {
             // stop appending data into rrdpush_buffer
             // they will be lost, so there is no point to do it
-            rrdpush_connected = 0;
+            host->rrdpush_connected = 0;
 
             info("STREAM [send to %s]: connecting...", remote_netdata_config);
-            sock = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
+            host->rrdpush_socket = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
 
-            if(unlikely(sock == -1)) {
+            if(unlikely(host->rrdpush_socket == -1)) {
                 error("STREAM [send to %s]: failed to connect", remote_netdata_config);
                 sleep(reconnect_delay);
                 continue;
@@ -258,9 +279,9 @@ void *rrdpush_sender_thread(void *ptr) {
                       , program_version
             );
 
-            if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) {
-                close(sock);
-                sock = -1;
+            if(send_timeout(host->rrdpush_socket, http, strlen(http), 0, timeout) == -1) {
+                close(host->rrdpush_socket);
+                host->rrdpush_socket = -1;
                 error("STREAM [send to %s]: failed to send http header to netdata", connected_to);
                 sleep(reconnect_delay);
                 continue;
@@ -268,17 +289,17 @@ void *rrdpush_sender_thread(void *ptr) {
 
             info("STREAM [send to %s]: waiting response from remote netdata...", connected_to);
 
-            if(recv_timeout(sock, http, 1000, 0, timeout) == -1) {
-                close(sock);
-                sock = -1;
+            if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) {
+                close(host->rrdpush_socket);
+                host->rrdpush_socket = -1;
                 error("STREAM [send to %s]: failed to initialize communication", connected_to);
                 sleep(reconnect_delay);
                 continue;
             }
 
             if(strncmp(http, "STREAM", 6)) {
-                close(sock);
-                sock = -1;
+                close(host->rrdpush_socket);
+                host->rrdpush_socket = -1;
                 error("STREAM [send to %s]: server is not replying properly.", connected_to);
                 sleep(reconnect_delay);
                 continue;
@@ -286,23 +307,23 @@ void *rrdpush_sender_thread(void *ptr) {
 
             info("STREAM [send to %s]: established communication - sending metrics...", connected_to);
 
-            if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+            if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0)
                 error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to);
 
-            rrdpush_flush();
+            rrdpush_flush(host);
             sent_connection = 0;
 
             // allow appending data into rrdpush_buffer
-            rrdpush_connected = 1;
+            host->rrdpush_connected = 1;
         }
 
-        ifd->fd = rrdpush_pipe[PIPE_READ];
+        ifd->fd = host->rrdpush_pipe[PIPE_READ];
         ifd->events = POLLIN;
         ifd->revents = 0;
 
-        ofd->fd = sock;
+        ofd->fd = host->rrdpush_socket;
         ofd->revents = 0;
-        if(begin < buffer_strlen(rrdpush_buffer)) {
+        if(begin < buffer_strlen(host->rrdpush_buffer)) {
             ofd->events = POLLOUT;
             fdmax = 2;
         }
@@ -320,8 +341,8 @@ void *rrdpush_sender_thread(void *ptr) {
                 continue;
 
             error("STREAM [send to %s]: failed to poll().", connected_to);
-            close(sock);
-            sock = -1;
+            close(host->rrdpush_socket);
+            host->rrdpush_socket = -1;
             break;
         }
         else if(unlikely(!retval)) {
@@ -331,39 +352,39 @@ void *rrdpush_sender_thread(void *ptr) {
 
         if(ifd->revents & POLLIN) {
             char buffer[1000 + 1];
-            if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+            if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
                 error("STREAM [send to %s]: cannot read from internal pipe.", connected_to);
         }
 
-        if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
-            rrdpush_lock();
-            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
+        if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
+            rrdpush_sender_lock(host);
+            ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
             if(ret == -1) {
                 if(errno != EAGAIN && errno != EINTR) {
                     error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection);
-                    close(sock);
-                    sock = -1;
+                    close(host->rrdpush_socket);
+                    host->rrdpush_socket = -1;
                 }
             }
             else {
                 sent_connection += ret;
                 sent_bytes += ret;
                 begin += ret;
-                if(begin == buffer_strlen(rrdpush_buffer)) {
-                    buffer_flush(rrdpush_buffer);
+                if(begin == buffer_strlen(host->rrdpush_buffer)) {
+                    buffer_flush(host->rrdpush_buffer);
                     begin = 0;
                 }
             }
-            rrdpush_unlock();
+            rrdpush_sender_unlock(host);
         }
 
         // protection from overflow
-        if(rrdpush_buffer->len > max_size) {
+        if(host->rrdpush_buffer->len > max_size) {
             errno = 0;
-            error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
-            if(sock != -1) {
-                close(sock);
-                sock = -1;
+            error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection);
+            if(host->rrdpush_socket != -1) {
+                close(host->rrdpush_socket);
+                host->rrdpush_socket = -1;
             }
         }
     }
@@ -371,22 +392,8 @@ void *rrdpush_sender_thread(void *ptr) {
 cleanup:
     debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits.");
 
-    // make sure the data collection threads do not write data
-    rrdpush_connected = 0;
+    rrdpush_sender_cleanup(host);
 
-    // close the pipe
-    if(rrdpush_pipe[PIPE_READ] != -1)  close(rrdpush_pipe[PIPE_READ]);
-    if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]);
-
-    // close the socket
-    if(sock != -1) close(sock);
-
-    rrdpush_lock();
-    buffer_free(rrdpush_buffer);
-    rrdpush_buffer = NULL;
-    rrdpush_unlock();
-
-    static_thread->enabled = 0;
     pthread_exit(NULL);
     return NULL;
 }
@@ -531,6 +538,16 @@ static inline int rrdpush_receive_validate_api_key(const char *key) {
     return appconfig_get_boolean(&stream_config, key, "enabled", 0);
 }
 
+void rrdpush_sender_thread_spawn(RRDHOST *host) {
+    if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
+        error("STREAM [send for host %s]: failed to create new thread for client.", host->hostname);
+
+    else if(pthread_detach(host->rrdpush_thread))
+        error("STREAM [send for host %s]: cannot request detach newly created thread.", host->hostname);
+
+    host->rrdpush_spawn = 1;
+}
+
 int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
     (void)host;
     
index 33eb3442649dc06eac8de4b8a48e8d642af98261..8188d597e641a9b37c65b07e730c6c6874e67b78 100644 (file)
@@ -1,13 +1,14 @@
 #ifndef NETDATA_RRDPUSH_H
 #define NETDATA_RRDPUSH_H
 
-extern int rrdpush_enabled;
-extern int rrdpush_exclusive;
+extern int default_rrdpush_enabled;
+extern int default_rrdpush_exclusive;
 
 extern int rrdpush_init();
 extern void rrdset_done_push(RRDSET *st);
 extern void *rrdpush_sender_thread(void *ptr);
 
 extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url);
+extern void rrdpush_sender_cleanup(RRDHOST *host);
 
 #endif //NETDATA_RRDPUSH_H
index d16bb259a69a29f137e72212f643ffe3cc77efce..89f873903a5e9b98a8c424bc4e6215d32d22dff9 100644 (file)
@@ -195,16 +195,16 @@ void rrdset_reset(RRDSET *st) {
 // ----------------------------------------------------------------------------
 // RRDSET - helpers for rrdset_create()
 
-inline long align_entries_to_pagesize(long entries) {
-    if(rrdpush_exclusive)
-        return entries;
+inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) {
+    if(unlikely(entries < 5)) entries = 5;
+    if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX;
 
-    if(entries < 5) entries = 5;
-    if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX;
+    if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM))
+        return entries;
 
     long page = (size_t)sysconf(_SC_PAGESIZE);
     long size = sizeof(RRDDIM) + entries * sizeof(storage_number);
-    if(size % page) {
+    if(unlikely(size % page)) {
         size -= (size % page);
         size += page;
 
@@ -322,7 +322,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
     // get the options from the config, we need to create it
 
     long rentries = config_get_number(config_section, "history", host->rrd_history_entries);
-    long entries = align_entries_to_pagesize(rentries);
+    long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries);
     if(entries != rentries) entries = config_set_number(config_section, "history", entries);
 
     if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
@@ -628,7 +628,7 @@ static inline void rrdset_done_push_int(RRDSET *st) {
 void rrdset_done(RRDSET *st) {
     if(unlikely(netdata_exit)) return;
 
-    if(unlikely(rrdpush_exclusive)) {
+    if(unlikely(st->rrdhost->rrdpush_exclusive)) {
         rrdset_done_push_int(st);
         return;
     }
@@ -1218,7 +1218,7 @@ void rrdset_done(RRDSET *st) {
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
 
-    if(unlikely(rrdpush_enabled))
+    if(unlikely(st->rrdhost->rrdpush_enabled))
         rrdset_done_push_int(st);
 }