From: Costa Tsaousis (ktsaou) Date: Thu, 23 Feb 2017 23:47:00 +0000 (+0200) Subject: allow each netdata host to have its own thread for streaming metrics X-Git-Tag: ab-debian_0.20170227.01-0ab1~1^2~14^2~26 X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?p=netdata.git;a=commitdiff_plain;h=c98acefa301f23d8c7d2857b8cde5fbb9c2326a9 allow each netdata host to have its own thread for streaming metrics --- diff --git a/src/backends.c b/src/backends.c index 5f725e40..c5b57dbe 100644 --- a/src/backends.c +++ b/src/backends.c @@ -147,11 +147,6 @@ void *backends_main(void *ptr) { // ------------------------------------------------------------------------ // collect configuration options - if(rrdpush_exclusive) { - info("Backend is disabled - use the central netdata"); - goto cleanup; - } - struct timeval timeout = { .tv_sec = 0, .tv_usec = 0 @@ -312,6 +307,9 @@ void *backends_main(void *ptr) { rrd_rdlock(); RRDHOST *host; rrdhost_foreach_read(host) { + if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) + continue; + rrdhost_rdlock(host); RRDSET *st; diff --git a/src/health.c b/src/health.c index 9e02b544..2ebbe065 100644 --- a/src/health.c +++ b/src/health.c @@ -15,16 +15,9 @@ inline char *health_config_dir(void) { void health_init(void) { debug(D_HEALTH, "Health configuration initializing"); - if(!rrdpush_exclusive) { - if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) { - debug(D_HEALTH, "Health is disabled."); - return; - } - } - else { - info("Health is disabled - setup alarms at the central netdata."); - config_set_boolean("health", "enabled", 0); - default_health_enabled = 0; + if(!(default_health_enabled = config_get_boolean("health", "enabled", 1))) { + debug(D_HEALTH, "Health is disabled."); + return; } } @@ -32,6 +25,9 @@ void health_init(void) { // re-load health configuration void health_reload_host(RRDHOST *host) { + if(unlikely(!host->health_enabled)) + return; + char *path = health_config_dir(); // free all running alarms @@ -363,6 +359,9 @@ void *health_main(void *ptr) { RRDHOST *host; rrdhost_foreach_read(host) { + if(unlikely(!host->health_enabled)) + continue; + if(unlikely(apply_hibernation_delay)) { info("Postponing alarm checks for %ld seconds, on host '%s', due to boottime discrepancy (realtime dt: %ld, boottime dt: %ld)." diff --git a/src/main.c b/src/main.c index d8381adb..77ffe1de 100644 --- a/src/main.c +++ b/src/main.c @@ -55,19 +55,16 @@ struct netdata_static_thread static_threads[] = { {"plugins.d", NULL, NULL, 1, NULL, NULL, pluginsd_main}, {"web", NULL, NULL, 1, NULL, NULL, socket_listen_main_multi_threaded}, {"web-single-threaded", NULL, NULL, 0, NULL, NULL, socket_listen_main_single_threaded}, - {"central-netdata-push",NULL, NULL, 0, NULL, NULL, rrdpush_sender_thread}, + {"push-metrics", NULL, NULL, 0, NULL, NULL, rrdpush_sender_thread}, {NULL, NULL, NULL, 0, NULL, NULL, NULL} }; void web_server_threading_selection(void) { int multi_threaded = 0; int single_threaded = 0; - int rrdpush_thread = 1; - int backends_thread = 1; - if(rrdpush_exclusive) { - backends_thread = 0; - info("Web servers and backends thread are disabled - use the central netdata."); + if(default_rrdpush_exclusive) { + info("Web server is disabled - use the remote netdata."); } else { multi_threaded = config_get_boolean("global", "multi threaded web server", 1); @@ -81,15 +78,9 @@ void web_server_threading_selection(void) { if(static_threads[i].start_routine == socket_listen_main_single_threaded) static_threads[i].enabled = single_threaded; - - if(static_threads[i].start_routine == rrdpush_sender_thread) - static_threads[i].enabled = rrdpush_thread; - - if(static_threads[i].start_routine == backends_main) - static_threads[i].enabled = backends_thread; } - if(rrdpush_exclusive) + if(default_rrdpush_exclusive) return; web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS); @@ -694,7 +685,7 @@ int main(int argc, char **argv) { // -------------------------------------------------------------------- - // find we need to send data to a central netdata + // find we need to send data to another netdata rrdpush_init(); @@ -702,7 +693,7 @@ int main(int argc, char **argv) { // -------------------------------------------------------------------- // get default memory mode for the database - if(rrdpush_exclusive) { + if(default_rrdpush_exclusive) { default_rrd_memory_mode = RRD_MEMORY_MODE_NONE; config_set("global", "memory mode", rrd_memory_mode_name(default_rrd_memory_mode)); } @@ -713,14 +704,14 @@ int main(int argc, char **argv) { // -------------------------------------------------------------------- // get default database size - if(rrdpush_exclusive) { + if(default_rrdpush_exclusive) { default_rrd_history_entries = 10; config_set_number("global", "history", default_rrd_history_entries); } else - default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(RRD_DEFAULT_HISTORY_ENTRIES)); + default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(default_rrd_memory_mode, RRD_DEFAULT_HISTORY_ENTRIES)); - long h = align_entries_to_pagesize(default_rrd_history_entries); + long h = align_entries_to_pagesize(default_rrd_memory_mode, default_rrd_history_entries); if(h != default_rrd_history_entries) { config_set_number("global", "history", h); default_rrd_history_entries = (int)h; @@ -844,11 +835,16 @@ int main(int argc, char **argv) { // -------------------------------------------------------------------- // create the listening sockets - if(!check_config && !rrdpush_exclusive) { + if(!check_config && !default_rrdpush_exclusive) + create_listen_sockets(); + + + // -------------------------------------------------------------------- + // load the aggregated host configuration file + { char filename[FILENAME_MAX + 1]; snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir); appconfig_load(&stream_config, filename, 0); - create_listen_sockets(); } } diff --git a/src/registry_init.c b/src/registry_init.c index 267284e7..2d8eb570 100644 --- a/src/registry_init.c +++ b/src/registry_init.c @@ -4,7 +4,7 @@ int registry_init(void) { char filename[FILENAME_MAX + 1]; // registry enabled? - if(!rrdpush_exclusive) { + if(!default_rrdpush_exclusive) { registry.enabled = config_get_boolean("registry", "enabled", 0); } else { diff --git a/src/rrd.h b/src/rrd.h index 46ee8e00..cf141cc5 100644 --- a/src/rrd.h +++ b/src/rrd.h @@ -341,6 +341,17 @@ struct rrdhost { int rrd_update_every; // the update frequency of the host int rrd_history_entries; // the number of history entries for the host's charts + int rrdpush_enabled; // 1 when this host sends metrics to another netdata + int rrdpush_exclusive; // 1 when this host is exclusively sending metrics without a database + volatile int rrdpush_connected; // 1 when the sender is ready to push metrics + volatile int rrdpush_spawn; // 1 when the sender thread has been spawn + volatile int rrdpush_error_shown; // 1 when we have logged a communication error + int rrdpush_socket; // the fd of the socket to the remote host, or -1 + pthread_t rrdpush_thread; // the sender thread + pthread_mutex_t rrdpush_mutex; // exclusive access to rrdpush_buffer + int rrdpush_pipe[2]; // collector to sender thread communication + BUFFER *rrdpush_buffer; // collector fills it, sender sends them + int health_enabled; // 1 when this host has health enabled time_t health_delay_up_to; // a timestamp to delay alarms processing up to RRD_MEMORY_MODE rrd_memory_mode; // the memory more for the charts of this host @@ -526,7 +537,7 @@ extern int rrddim_unhide(RRDSET *st, const char *id); extern collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value); extern collected_number rrddim_set(RRDSET *st, const char *id, collected_number value); -extern long align_entries_to_pagesize(long entries); +extern long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries); // ---------------------------------------------------------------------------- diff --git a/src/rrdhost.c b/src/rrdhost.c index 50ddabe8..fffa6758 100644 --- a/src/rrdhost.c +++ b/src/rrdhost.c @@ -73,18 +73,25 @@ RRDHOST *rrdhost_create(const char *hostname, host->rrd_update_every = update_every; host->rrd_history_entries = entries; host->rrd_memory_mode = memory_mode; - host->health_enabled = health_enabled; + host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled; + host->rrdpush_enabled = default_rrdpush_enabled; + host->rrdpush_exclusive = default_rrdpush_exclusive; - pthread_rwlock_init(&(host->rrdhost_rwlock), NULL); + host->rrdpush_pipe[0] = -1; + host->rrdpush_pipe[1] = -1; + host->rrdpush_socket = -1; + + pthread_mutex_init(&host->rrdpush_mutex, NULL); + pthread_rwlock_init(&host->rrdhost_rwlock, NULL); rrdhost_init_hostname(host, hostname); rrdhost_init_machine_guid(host, guid); rrdhost_init_os(host, os); - avl_init_lock(&(host->rrdset_root_index), rrdset_compare); + avl_init_lock(&(host->rrdset_root_index), rrdset_compare); avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name); - avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare); - avl_init_lock(&(host->variables_root_index), rrdvar_compare); + avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare); + avl_init_lock(&(host->variables_root_index), rrdvar_compare); // ------------------------------------------------------------------------ // initialize health variables @@ -110,12 +117,9 @@ RRDHOST *rrdhost_create(const char *hostname, if(!localhost) { // this is localhost - host->cache_dir = strdupz(netdata_configured_cache_dir); + host->cache_dir = strdupz(netdata_configured_cache_dir); host->varlib_dir = strdupz(netdata_configured_varlib_dir); - snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir); - host->health_log_filename = strdupz(config_get("health", "health db file", filename)); - } else { // this is not localhost - append our GUID to localhost path @@ -136,18 +140,18 @@ RRDHOST *rrdhost_create(const char *hostname, int r = mkdir(host->varlib_dir, 0775); if(r != 0 && errno != EEXIST) error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir); - } - snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir); - int r = mkdir(filename, 0775); - if(r != 0 && errno != EEXIST) - error("Host '%s': cannot create directory '%s'", host->hostname, filename); - - snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir); - host->health_log_filename = strdupz(filename); + snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir); + r = mkdir(filename, 0775); + if(r != 0 && errno != EEXIST) + error("Host '%s': cannot create directory '%s'", host->hostname, filename); + } } + snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir); + host->health_log_filename = strdupz(config_get("health", "health db file", filename)); + snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_plugins_dir); host->health_default_exec = strdupz(config_get("health", "script to execute on alarm", filename)); host->health_default_recipient = strdup("root"); @@ -156,12 +160,14 @@ RRDHOST *rrdhost_create(const char *hostname, // ------------------------------------------------------------------------ // load health configuration - health_alarm_log_load(host); - health_alarm_log_open(host); + if(host->health_enabled) { + health_alarm_log_load(host); + health_alarm_log_open(host); - rrdhost_wrlock(host); - health_readdir(host, health_config_dir()); - rrdhost_unlock(host); + rrdhost_wrlock(host); + health_readdir(host, health_config_dir()); + rrdhost_unlock(host); + } // ------------------------------------------------------------------------ @@ -312,6 +318,11 @@ void rrdhost_free(RRDHOST *host) { // ------------------------------------------------------------------------ // free it + if(host->rrdpush_spawn) { + pthread_cancel(host->rrdpush_thread); + rrdpush_sender_cleanup(host); + } + freez(host->os); freez(host->cache_dir); freez(host->varlib_dir); diff --git a/src/rrdpush.c b/src/rrdpush.c index d6662492..9bcd232a 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -1,7 +1,7 @@ #include "common.h" -int rrdpush_enabled = 0; -int rrdpush_exclusive = 1; +int default_rrdpush_enabled = 0; +int default_rrdpush_exclusive = 1; static char *remote_netdata_config = NULL; static char *api_key = NULL; @@ -15,19 +15,6 @@ static char *api_key = NULL; // that there are more data ready to be sent #define PIPE_READ 0 #define PIPE_WRITE 1 -int rrdpush_pipe[2] = { -1, -1 }; - -// a buffer used to store data to be sent. -// the format is the same as external plugins. -static BUFFER *rrdpush_buffer = NULL; - -// locking to get exclusive access to shared resources -// (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer -static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER; - -// if the streaming thread is connected to a remote netdata -// this is set to 1, otherwise 0. -static volatile int rrdpush_connected = 0; // to have the remote netdata re-sync the charts // to its current clock, we send for this many @@ -35,8 +22,8 @@ static volatile int rrdpush_connected = 0; // this is for the first iterations of each chart static unsigned int remote_clock_resync_iterations = 60; -#define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex) -#define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex) +#define rrdpush_lock(host) pthread_mutex_lock(&((host)->rrdpush_mutex)) +#define rrdpush_unlock(host) pthread_mutex_unlock(&((host)->rrdpush_mutex)) // checks if the current chart definition has been sent static inline int need_to_send_chart_definition(RRDSET *st) { @@ -50,7 +37,7 @@ static inline int need_to_send_chart_definition(RRDSET *st) { // sends the current chart definition static inline void send_chart_definition(RRDSET *st) { - buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n" + buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n" , st->id , st->name , st->title @@ -64,7 +51,7 @@ static inline void send_chart_definition(RRDSET *st) { RRDDIM *rd; rrddim_foreach_read(rd, st) { - buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n" + buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n" , rd->id , rd->name , rrd_algorithm_name(rd->algorithm) @@ -79,69 +66,69 @@ static inline void send_chart_definition(RRDSET *st) { // sends the current chart dimensions static inline void send_chart_metrics(RRDSET *st) { - buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0); + buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0); RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED)) - buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n" + buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n" , rd->id , rd->collected_value ); } - buffer_strcat(rrdpush_buffer, "END\n"); + buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n"); } // resets all the chart, so that their definitions // will be resent to the central netdata -static void reset_all_charts(void) { - rrd_rdlock(); - - RRDHOST *host; - rrdhost_foreach_read(host) { - rrdhost_rdlock(host); +static void reset_all_charts(RRDHOST *host) { + rrdhost_rdlock(host); - RRDSET *st; - rrdset_foreach_read(st, host) { + RRDSET *st; + rrdset_foreach_read(st, host) { - // make it re-align the current time - // on the remote host - st->counter_done = 0; + // make it re-align the current time + // on the remote host + st->counter_done = 0; - rrdset_rdlock(st); + rrdset_rdlock(st); - RRDDIM *rd; - rrddim_foreach_read(rd, st) - rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED); + RRDDIM *rd; + rrddim_foreach_read(rd, st) + rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED); - rrdset_unlock(st); - } - rrdhost_unlock(host); + rrdset_unlock(st); } - rrd_unlock(); + + rrdhost_unlock(host); } +void rrdpush_sender_thread_spawn(RRDHOST *host); + void rrdset_done_push(RRDSET *st) { - static int error_shown = 0; + RRDHOST *host = st->rrdhost; if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) return; - rrdpush_lock(); + rrdpush_lock(host); - if(unlikely(!rrdpush_buffer || !rrdpush_connected)) { - if(unlikely(!error_shown)) + if(unlikely(host->rrdpush_enabled && !host->rrdpush_spawn)) + rrdpush_sender_thread_spawn(host); + + if(unlikely(!host->rrdpush_buffer || !host->rrdpush_connected)) { + if(unlikely(!host->rrdpush_error_shown)) error("STREAM [send]: not ready - discarding collected metrics."); - error_shown = 1; + host->rrdpush_error_shown = 1; - rrdpush_unlock(); + rrdpush_unlock(host); return; } - else if(unlikely(error_shown)) { + else if(unlikely(host->rrdpush_error_shown)) { error("STREAM [send]: ready - sending metrics..."); - error_shown = 0; + host->rrdpush_error_shown = 0; } rrdset_rdlock(st); @@ -152,38 +139,74 @@ void rrdset_done_push(RRDSET *st) { rrdset_unlock(st); // signal the sender there are more data - if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1) + if(write(host->rrdpush_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM [send]: cannot write to internal pipe"); - rrdpush_unlock(); + rrdpush_unlock(host); } -static inline void rrdpush_flush(void) { - rrdpush_lock(); - if(buffer_strlen(rrdpush_buffer)) - error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(rrdpush_buffer)); +static inline void rrdpush_flush(RRDHOST *host) { + rrdpush_lock(host); + if(buffer_strlen(host->rrdpush_buffer)) + error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(host->rrdpush_buffer)); - buffer_flush(rrdpush_buffer); - reset_all_charts(); - rrdpush_unlock(); + buffer_flush(host->rrdpush_buffer); + reset_all_charts(host); + rrdpush_unlock(host); } int rrdpush_init() { - rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled); - rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive); + default_rrdpush_enabled = config_get_boolean("stream", "enabled", default_rrdpush_enabled); + default_rrdpush_exclusive = config_get_boolean("stream", "exclusive", default_rrdpush_exclusive); remote_netdata_config = config_get("stream", "stream metrics to", ""); api_key = config_get("stream", "api key", ""); - if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) { - rrdpush_enabled = 0; - rrdpush_exclusive = 0; + if(!default_rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) { + default_rrdpush_enabled = 0; + default_rrdpush_exclusive = 0; } - return rrdpush_enabled; + return default_rrdpush_enabled; +} + +static inline void rrdpush_sender_lock(RRDHOST *host) { + if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0) + error("STREAM [send]: cannot set pthread cancel state to DISABLE."); + + rrdpush_lock(host); +} + +static inline void rrdpush_sender_unlock(RRDHOST *host) { + if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) + error("STREAM [send]: cannot set pthread cancel state to DISABLE."); + + rrdpush_unlock(host); +} + +void rrdpush_sender_cleanup(RRDHOST *host) { + rrdpush_lock(host); + + host->rrdpush_connected = 0; + + if(host->rrdpush_socket != -1) close(host->rrdpush_socket); + + // close the pipe + if(host->rrdpush_pipe[PIPE_READ] != -1) close(host->rrdpush_pipe[PIPE_READ]); + if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]); + host->rrdpush_pipe[PIPE_READ] = -1; + host->rrdpush_pipe[PIPE_WRITE] = -1; + + buffer_free(host->rrdpush_buffer); + host->rrdpush_buffer = NULL; + + host->rrdpush_spawn = 0; + host->rrdpush_enabled = 0; + + rrdpush_unlock(host); } void *rrdpush_sender_thread(void *ptr) { - struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; + RRDHOST *host = (RRDHOST *)ptr; info("STREAM [send]: thread created (task id %d)", gettid()); @@ -198,16 +221,15 @@ void *rrdpush_sender_thread(void *ptr) { size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024); unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5); remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations); - int sock = -1; char connected_to[CONNECTED_TO_SIZE + 1] = ""; - if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) + if(!host->rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) goto cleanup; // initialize rrdpush globals - rrdpush_buffer = buffer_create(1); - rrdpush_connected = 0; - if(pipe(rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe."); + host->rrdpush_buffer = buffer_create(1); + host->rrdpush_connected = 0; + if(pipe(host->rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe."); // initialize local variables size_t begin = 0; @@ -226,18 +248,17 @@ void *rrdpush_sender_thread(void *ptr) { ifd = &fds[0]; ofd = &fds[1]; - for(;;) { - if(netdata_exit) break; + for(; host->rrdpush_enabled && !netdata_exit ;) { - if(unlikely(sock == -1)) { + if(unlikely(host->rrdpush_socket == -1)) { // stop appending data into rrdpush_buffer // they will be lost, so there is no point to do it - rrdpush_connected = 0; + host->rrdpush_connected = 0; info("STREAM [send to %s]: connecting...", remote_netdata_config); - sock = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); + host->rrdpush_socket = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); - if(unlikely(sock == -1)) { + if(unlikely(host->rrdpush_socket == -1)) { error("STREAM [send to %s]: failed to connect", remote_netdata_config); sleep(reconnect_delay); continue; @@ -258,9 +279,9 @@ void *rrdpush_sender_thread(void *ptr) { , program_version ); - if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) { - close(sock); - sock = -1; + if(send_timeout(host->rrdpush_socket, http, strlen(http), 0, timeout) == -1) { + close(host->rrdpush_socket); + host->rrdpush_socket = -1; error("STREAM [send to %s]: failed to send http header to netdata", connected_to); sleep(reconnect_delay); continue; @@ -268,17 +289,17 @@ void *rrdpush_sender_thread(void *ptr) { info("STREAM [send to %s]: waiting response from remote netdata...", connected_to); - if(recv_timeout(sock, http, 1000, 0, timeout) == -1) { - close(sock); - sock = -1; + if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) { + close(host->rrdpush_socket); + host->rrdpush_socket = -1; error("STREAM [send to %s]: failed to initialize communication", connected_to); sleep(reconnect_delay); continue; } if(strncmp(http, "STREAM", 6)) { - close(sock); - sock = -1; + close(host->rrdpush_socket); + host->rrdpush_socket = -1; error("STREAM [send to %s]: server is not replying properly.", connected_to); sleep(reconnect_delay); continue; @@ -286,23 +307,23 @@ void *rrdpush_sender_thread(void *ptr) { info("STREAM [send to %s]: established communication - sending metrics...", connected_to); - if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0) + if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0) error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to); - rrdpush_flush(); + rrdpush_flush(host); sent_connection = 0; // allow appending data into rrdpush_buffer - rrdpush_connected = 1; + host->rrdpush_connected = 1; } - ifd->fd = rrdpush_pipe[PIPE_READ]; + ifd->fd = host->rrdpush_pipe[PIPE_READ]; ifd->events = POLLIN; ifd->revents = 0; - ofd->fd = sock; + ofd->fd = host->rrdpush_socket; ofd->revents = 0; - if(begin < buffer_strlen(rrdpush_buffer)) { + if(begin < buffer_strlen(host->rrdpush_buffer)) { ofd->events = POLLOUT; fdmax = 2; } @@ -320,8 +341,8 @@ void *rrdpush_sender_thread(void *ptr) { continue; error("STREAM [send to %s]: failed to poll().", connected_to); - close(sock); - sock = -1; + close(host->rrdpush_socket); + host->rrdpush_socket = -1; break; } else if(unlikely(!retval)) { @@ -331,39 +352,39 @@ void *rrdpush_sender_thread(void *ptr) { if(ifd->revents & POLLIN) { char buffer[1000 + 1]; - if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) + if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) error("STREAM [send to %s]: cannot read from internal pipe.", connected_to); } - if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) { - rrdpush_lock(); - ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT); + if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) { + rrdpush_sender_lock(host); + ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT); if(ret == -1) { if(errno != EAGAIN && errno != EINTR) { error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection); - close(sock); - sock = -1; + close(host->rrdpush_socket); + host->rrdpush_socket = -1; } } else { sent_connection += ret; sent_bytes += ret; begin += ret; - if(begin == buffer_strlen(rrdpush_buffer)) { - buffer_flush(rrdpush_buffer); + if(begin == buffer_strlen(host->rrdpush_buffer)) { + buffer_flush(host->rrdpush_buffer); begin = 0; } } - rrdpush_unlock(); + rrdpush_sender_unlock(host); } // protection from overflow - if(rrdpush_buffer->len > max_size) { + if(host->rrdpush_buffer->len > max_size) { errno = 0; - error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection); - if(sock != -1) { - close(sock); - sock = -1; + error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection); + if(host->rrdpush_socket != -1) { + close(host->rrdpush_socket); + host->rrdpush_socket = -1; } } } @@ -371,22 +392,8 @@ void *rrdpush_sender_thread(void *ptr) { cleanup: debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits."); - // make sure the data collection threads do not write data - rrdpush_connected = 0; + rrdpush_sender_cleanup(host); - // close the pipe - if(rrdpush_pipe[PIPE_READ] != -1) close(rrdpush_pipe[PIPE_READ]); - if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]); - - // close the socket - if(sock != -1) close(sock); - - rrdpush_lock(); - buffer_free(rrdpush_buffer); - rrdpush_buffer = NULL; - rrdpush_unlock(); - - static_thread->enabled = 0; pthread_exit(NULL); return NULL; } @@ -531,6 +538,16 @@ static inline int rrdpush_receive_validate_api_key(const char *key) { return appconfig_get_boolean(&stream_config, key, "enabled", 0); } +void rrdpush_sender_thread_spawn(RRDHOST *host) { + if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host)) + error("STREAM [send for host %s]: failed to create new thread for client.", host->hostname); + + else if(pthread_detach(host->rrdpush_thread)) + error("STREAM [send for host %s]: cannot request detach newly created thread.", host->hostname); + + host->rrdpush_spawn = 1; +} + int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) { (void)host; diff --git a/src/rrdpush.h b/src/rrdpush.h index 33eb3442..8188d597 100644 --- a/src/rrdpush.h +++ b/src/rrdpush.h @@ -1,13 +1,14 @@ #ifndef NETDATA_RRDPUSH_H #define NETDATA_RRDPUSH_H -extern int rrdpush_enabled; -extern int rrdpush_exclusive; +extern int default_rrdpush_enabled; +extern int default_rrdpush_exclusive; extern int rrdpush_init(); extern void rrdset_done_push(RRDSET *st); extern void *rrdpush_sender_thread(void *ptr); extern int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url); +extern void rrdpush_sender_cleanup(RRDHOST *host); #endif //NETDATA_RRDPUSH_H diff --git a/src/rrdset.c b/src/rrdset.c index d16bb259..89f87390 100644 --- a/src/rrdset.c +++ b/src/rrdset.c @@ -195,16 +195,16 @@ void rrdset_reset(RRDSET *st) { // ---------------------------------------------------------------------------- // RRDSET - helpers for rrdset_create() -inline long align_entries_to_pagesize(long entries) { - if(rrdpush_exclusive) - return entries; +inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) { + if(unlikely(entries < 5)) entries = 5; + if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX; - if(entries < 5) entries = 5; - if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX; + if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM)) + return entries; long page = (size_t)sysconf(_SC_PAGESIZE); long size = sizeof(RRDDIM) + entries * sizeof(storage_number); - if(size % page) { + if(unlikely(size % page)) { size -= (size % page); size += page; @@ -322,7 +322,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha // get the options from the config, we need to create it long rentries = config_get_number(config_section, "history", host->rrd_history_entries); - long entries = align_entries_to_pagesize(rentries); + long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries); if(entries != rentries) entries = config_set_number(config_section, "history", entries); if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries) @@ -628,7 +628,7 @@ static inline void rrdset_done_push_int(RRDSET *st) { void rrdset_done(RRDSET *st) { if(unlikely(netdata_exit)) return; - if(unlikely(rrdpush_exclusive)) { + if(unlikely(st->rrdhost->rrdpush_exclusive)) { rrdset_done_push_int(st); return; } @@ -1218,7 +1218,7 @@ void rrdset_done(RRDSET *st) { if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); - if(unlikely(rrdpush_enabled)) + if(unlikely(st->rrdhost->rrdpush_enabled)) rrdset_done_push_int(st); }