{"plugins.d", NULL, NULL, 1, NULL, NULL, pluginsd_main},
{"web", NULL, NULL, 1, NULL, NULL, socket_listen_main_multi_threaded},
{"web-single-threaded", NULL, NULL, 0, NULL, NULL, socket_listen_main_single_threaded},
- {"central-netdata-push",NULL, NULL, 0, NULL, NULL, rrdpush_sender_thread},
+ {"push-metrics", NULL, NULL, 0, NULL, NULL, rrdpush_sender_thread},
{NULL, NULL, NULL, 0, NULL, NULL, NULL}
};
void web_server_threading_selection(void) {
int multi_threaded = 0;
int single_threaded = 0;
- int rrdpush_thread = 1;
- int backends_thread = 1;
- if(rrdpush_exclusive) {
- backends_thread = 0;
- info("Web servers and backends thread are disabled - use the central netdata.");
+ if(default_rrdpush_exclusive) {
+ info("Web server is disabled - use the remote netdata.");
}
else {
multi_threaded = config_get_boolean("global", "multi threaded web server", 1);
if(static_threads[i].start_routine == socket_listen_main_single_threaded)
static_threads[i].enabled = single_threaded;
-
- if(static_threads[i].start_routine == rrdpush_sender_thread)
- static_threads[i].enabled = rrdpush_thread;
-
- if(static_threads[i].start_routine == backends_main)
- static_threads[i].enabled = backends_thread;
}
- if(rrdpush_exclusive)
+ if(default_rrdpush_exclusive)
return;
web_client_timeout = (int) config_get_number("global", "disconnect idle web clients after seconds", DEFAULT_DISCONNECT_IDLE_WEB_CLIENTS_AFTER_SECONDS);
// --------------------------------------------------------------------
- // find we need to send data to a central netdata
+ // find we need to send data to another netdata
rrdpush_init();
// --------------------------------------------------------------------
// get default memory mode for the database
- if(rrdpush_exclusive) {
+ if(default_rrdpush_exclusive) {
default_rrd_memory_mode = RRD_MEMORY_MODE_NONE;
config_set("global", "memory mode", rrd_memory_mode_name(default_rrd_memory_mode));
}
// --------------------------------------------------------------------
// get default database size
- if(rrdpush_exclusive) {
+ if(default_rrdpush_exclusive) {
default_rrd_history_entries = 10;
config_set_number("global", "history", default_rrd_history_entries);
}
else
- default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(RRD_DEFAULT_HISTORY_ENTRIES));
+ default_rrd_history_entries = (int) config_get_number("global", "history", align_entries_to_pagesize(default_rrd_memory_mode, RRD_DEFAULT_HISTORY_ENTRIES));
- long h = align_entries_to_pagesize(default_rrd_history_entries);
+ long h = align_entries_to_pagesize(default_rrd_memory_mode, default_rrd_history_entries);
if(h != default_rrd_history_entries) {
config_set_number("global", "history", h);
default_rrd_history_entries = (int)h;
// --------------------------------------------------------------------
// create the listening sockets
- if(!check_config && !rrdpush_exclusive) {
+ if(!check_config && !default_rrdpush_exclusive)
+ create_listen_sockets();
+
+
+ // --------------------------------------------------------------------
+ // load the aggregated host configuration file
+ {
char filename[FILENAME_MAX + 1];
snprintfz(filename, FILENAME_MAX, "%s/aggregated_hosts.conf", netdata_configured_config_dir);
appconfig_load(&stream_config, filename, 0);
- create_listen_sockets();
}
}
host->rrd_update_every = update_every;
host->rrd_history_entries = entries;
host->rrd_memory_mode = memory_mode;
- host->health_enabled = health_enabled;
+ host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
+ host->rrdpush_enabled = default_rrdpush_enabled;
+ host->rrdpush_exclusive = default_rrdpush_exclusive;
- pthread_rwlock_init(&(host->rrdhost_rwlock), NULL);
+ host->rrdpush_pipe[0] = -1;
+ host->rrdpush_pipe[1] = -1;
+ host->rrdpush_socket = -1;
+
+ pthread_mutex_init(&host->rrdpush_mutex, NULL);
+ pthread_rwlock_init(&host->rrdhost_rwlock, NULL);
rrdhost_init_hostname(host, hostname);
rrdhost_init_machine_guid(host, guid);
rrdhost_init_os(host, os);
- avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
+ avl_init_lock(&(host->rrdset_root_index), rrdset_compare);
avl_init_lock(&(host->rrdset_root_index_name), rrdset_compare_name);
- avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare);
- avl_init_lock(&(host->variables_root_index), rrdvar_compare);
+ avl_init_lock(&(host->rrdfamily_root_index), rrdfamily_compare);
+ avl_init_lock(&(host->variables_root_index), rrdvar_compare);
// ------------------------------------------------------------------------
// initialize health variables
if(!localhost) {
// this is localhost
- host->cache_dir = strdupz(netdata_configured_cache_dir);
+ host->cache_dir = strdupz(netdata_configured_cache_dir);
host->varlib_dir = strdupz(netdata_configured_varlib_dir);
- snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
- host->health_log_filename = strdupz(config_get("health", "health db file", filename));
-
}
else {
// this is not localhost - append our GUID to localhost path
int r = mkdir(host->varlib_dir, 0775);
if(r != 0 && errno != EEXIST)
error("Host '%s': cannot create directory '%s'", host->hostname, host->varlib_dir);
- }
- snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
- int r = mkdir(filename, 0775);
- if(r != 0 && errno != EEXIST)
- error("Host '%s': cannot create directory '%s'", host->hostname, filename);
-
- snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
- host->health_log_filename = strdupz(filename);
+ snprintfz(filename, FILENAME_MAX, "%s/health", host->varlib_dir);
+ r = mkdir(filename, 0775);
+ if(r != 0 && errno != EEXIST)
+ error("Host '%s': cannot create directory '%s'", host->hostname, filename);
+ }
}
+ snprintfz(filename, FILENAME_MAX, "%s/health/health-log.db", host->varlib_dir);
+ host->health_log_filename = strdupz(config_get("health", "health db file", filename));
+
snprintfz(filename, FILENAME_MAX, "%s/alarm-notify.sh", netdata_configured_plugins_dir);
host->health_default_exec = strdupz(config_get("health", "script to execute on alarm", filename));
host->health_default_recipient = strdup("root");
// ------------------------------------------------------------------------
// load health configuration
- health_alarm_log_load(host);
- health_alarm_log_open(host);
+ if(host->health_enabled) {
+ health_alarm_log_load(host);
+ health_alarm_log_open(host);
- rrdhost_wrlock(host);
- health_readdir(host, health_config_dir());
- rrdhost_unlock(host);
+ rrdhost_wrlock(host);
+ health_readdir(host, health_config_dir());
+ rrdhost_unlock(host);
+ }
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// free it
+ if(host->rrdpush_spawn) {
+ pthread_cancel(host->rrdpush_thread);
+ rrdpush_sender_cleanup(host);
+ }
+
freez(host->os);
freez(host->cache_dir);
freez(host->varlib_dir);
#include "common.h"
-int rrdpush_enabled = 0;
-int rrdpush_exclusive = 1;
+int default_rrdpush_enabled = 0;
+int default_rrdpush_exclusive = 1;
static char *remote_netdata_config = NULL;
static char *api_key = NULL;
// that there are more data ready to be sent
#define PIPE_READ 0
#define PIPE_WRITE 1
-int rrdpush_pipe[2] = { -1, -1 };
-
-// a buffer used to store data to be sent.
-// the format is the same as external plugins.
-static BUFFER *rrdpush_buffer = NULL;
-
-// locking to get exclusive access to shared resources
-// (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer
-static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
-
-// if the streaming thread is connected to a remote netdata
-// this is set to 1, otherwise 0.
-static volatile int rrdpush_connected = 0;
// to have the remote netdata re-sync the charts
// to its current clock, we send for this many
// this is for the first iterations of each chart
static unsigned int remote_clock_resync_iterations = 60;
-#define rrdpush_lock() pthread_mutex_lock(&rrdpush_mutex)
-#define rrdpush_unlock() pthread_mutex_unlock(&rrdpush_mutex)
+#define rrdpush_lock(host) pthread_mutex_lock(&((host)->rrdpush_mutex))
+#define rrdpush_unlock(host) pthread_mutex_unlock(&((host)->rrdpush_mutex))
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
// sends the current chart definition
static inline void send_chart_definition(RRDSET *st) {
- buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
, st->id
, st->name
, st->title
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
, rd->id
, rd->name
, rrd_algorithm_name(rd->algorithm)
// sends the current chart dimensions
static inline void send_chart_metrics(RRDSET *st) {
- buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
- buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
+ buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
);
}
- buffer_strcat(rrdpush_buffer, "END\n");
+ buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n");
}
// resets all the chart, so that their definitions
// will be resent to the central netdata
-static void reset_all_charts(void) {
- rrd_rdlock();
-
- RRDHOST *host;
- rrdhost_foreach_read(host) {
- rrdhost_rdlock(host);
+static void reset_all_charts(RRDHOST *host) {
+ rrdhost_rdlock(host);
- RRDSET *st;
- rrdset_foreach_read(st, host) {
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
- // make it re-align the current time
- // on the remote host
- st->counter_done = 0;
+ // make it re-align the current time
+ // on the remote host
+ st->counter_done = 0;
- rrdset_rdlock(st);
+ rrdset_rdlock(st);
- RRDDIM *rd;
- rrddim_foreach_read(rd, st)
- rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st)
+ rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
- rrdset_unlock(st);
- }
- rrdhost_unlock(host);
+ rrdset_unlock(st);
}
- rrd_unlock();
+
+ rrdhost_unlock(host);
}
+void rrdpush_sender_thread_spawn(RRDHOST *host);
+
void rrdset_done_push(RRDSET *st) {
- static int error_shown = 0;
+ RRDHOST *host = st->rrdhost;
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
return;
- rrdpush_lock();
+ rrdpush_lock(host);
- if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
- if(unlikely(!error_shown))
+ if(unlikely(host->rrdpush_enabled && !host->rrdpush_spawn))
+ rrdpush_sender_thread_spawn(host);
+
+ if(unlikely(!host->rrdpush_buffer || !host->rrdpush_connected)) {
+ if(unlikely(!host->rrdpush_error_shown))
error("STREAM [send]: not ready - discarding collected metrics.");
- error_shown = 1;
+ host->rrdpush_error_shown = 1;
- rrdpush_unlock();
+ rrdpush_unlock(host);
return;
}
- else if(unlikely(error_shown)) {
+ else if(unlikely(host->rrdpush_error_shown)) {
error("STREAM [send]: ready - sending metrics...");
- error_shown = 0;
+ host->rrdpush_error_shown = 0;
}
rrdset_rdlock(st);
rrdset_unlock(st);
// signal the sender there are more data
- if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
+ if(write(host->rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
error("STREAM [send]: cannot write to internal pipe");
- rrdpush_unlock();
+ rrdpush_unlock(host);
}
-static inline void rrdpush_flush(void) {
- rrdpush_lock();
- if(buffer_strlen(rrdpush_buffer))
- error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(rrdpush_buffer));
+static inline void rrdpush_flush(RRDHOST *host) {
+ rrdpush_lock(host);
+ if(buffer_strlen(host->rrdpush_buffer))
+ error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(host->rrdpush_buffer));
- buffer_flush(rrdpush_buffer);
- reset_all_charts();
- rrdpush_unlock();
+ buffer_flush(host->rrdpush_buffer);
+ reset_all_charts(host);
+ rrdpush_unlock(host);
}
int rrdpush_init() {
- rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
- rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
+ default_rrdpush_enabled = config_get_boolean("stream", "enabled", default_rrdpush_enabled);
+ default_rrdpush_exclusive = config_get_boolean("stream", "exclusive", default_rrdpush_exclusive);
remote_netdata_config = config_get("stream", "stream metrics to", "");
api_key = config_get("stream", "api key", "");
- if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
- rrdpush_enabled = 0;
- rrdpush_exclusive = 0;
+ if(!default_rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
+ default_rrdpush_enabled = 0;
+ default_rrdpush_exclusive = 0;
}
- return rrdpush_enabled;
+ return default_rrdpush_enabled;
+}
+
+static inline void rrdpush_sender_lock(RRDHOST *host) {
+ if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
+
+ rrdpush_lock(host);
+}
+
+static inline void rrdpush_sender_unlock(RRDHOST *host) {
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
+
+ rrdpush_unlock(host);
+}
+
+void rrdpush_sender_cleanup(RRDHOST *host) {
+ rrdpush_lock(host);
+
+ host->rrdpush_connected = 0;
+
+ if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
+
+ // close the pipe
+ if(host->rrdpush_pipe[PIPE_READ] != -1) close(host->rrdpush_pipe[PIPE_READ]);
+ if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
+ host->rrdpush_pipe[PIPE_READ] = -1;
+ host->rrdpush_pipe[PIPE_WRITE] = -1;
+
+ buffer_free(host->rrdpush_buffer);
+ host->rrdpush_buffer = NULL;
+
+ host->rrdpush_spawn = 0;
+ host->rrdpush_enabled = 0;
+
+ rrdpush_unlock(host);
}
void *rrdpush_sender_thread(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
+ RRDHOST *host = (RRDHOST *)ptr;
info("STREAM [send]: thread created (task id %d)", gettid());
size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
- int sock = -1;
char connected_to[CONNECTED_TO_SIZE + 1] = "";
- if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
+ if(!host->rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
goto cleanup;
// initialize rrdpush globals
- rrdpush_buffer = buffer_create(1);
- rrdpush_connected = 0;
- if(pipe(rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
+ host->rrdpush_buffer = buffer_create(1);
+ host->rrdpush_connected = 0;
+ if(pipe(host->rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
// initialize local variables
size_t begin = 0;
ifd = &fds[0];
ofd = &fds[1];
- for(;;) {
- if(netdata_exit) break;
+ for(; host->rrdpush_enabled && !netdata_exit ;) {
- if(unlikely(sock == -1)) {
+ if(unlikely(host->rrdpush_socket == -1)) {
// stop appending data into rrdpush_buffer
// they will be lost, so there is no point to do it
- rrdpush_connected = 0;
+ host->rrdpush_connected = 0;
info("STREAM [send to %s]: connecting...", remote_netdata_config);
- sock = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
+ host->rrdpush_socket = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
- if(unlikely(sock == -1)) {
+ if(unlikely(host->rrdpush_socket == -1)) {
error("STREAM [send to %s]: failed to connect", remote_netdata_config);
sleep(reconnect_delay);
continue;
, program_version
);
- if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) {
- close(sock);
- sock = -1;
+ if(send_timeout(host->rrdpush_socket, http, strlen(http), 0, timeout) == -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
error("STREAM [send to %s]: failed to send http header to netdata", connected_to);
sleep(reconnect_delay);
continue;
info("STREAM [send to %s]: waiting response from remote netdata...", connected_to);
- if(recv_timeout(sock, http, 1000, 0, timeout) == -1) {
- close(sock);
- sock = -1;
+ if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
error("STREAM [send to %s]: failed to initialize communication", connected_to);
sleep(reconnect_delay);
continue;
}
if(strncmp(http, "STREAM", 6)) {
- close(sock);
- sock = -1;
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
error("STREAM [send to %s]: server is not replying properly.", connected_to);
sleep(reconnect_delay);
continue;
info("STREAM [send to %s]: established communication - sending metrics...", connected_to);
- if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+ if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0)
error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to);
- rrdpush_flush();
+ rrdpush_flush(host);
sent_connection = 0;
// allow appending data into rrdpush_buffer
- rrdpush_connected = 1;
+ host->rrdpush_connected = 1;
}
- ifd->fd = rrdpush_pipe[PIPE_READ];
+ ifd->fd = host->rrdpush_pipe[PIPE_READ];
ifd->events = POLLIN;
ifd->revents = 0;
- ofd->fd = sock;
+ ofd->fd = host->rrdpush_socket;
ofd->revents = 0;
- if(begin < buffer_strlen(rrdpush_buffer)) {
+ if(begin < buffer_strlen(host->rrdpush_buffer)) {
ofd->events = POLLOUT;
fdmax = 2;
}
continue;
error("STREAM [send to %s]: failed to poll().", connected_to);
- close(sock);
- sock = -1;
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
break;
}
else if(unlikely(!retval)) {
if(ifd->revents & POLLIN) {
char buffer[1000 + 1];
- if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+ if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
error("STREAM [send to %s]: cannot read from internal pipe.", connected_to);
}
- if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
- rrdpush_lock();
- ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
+ if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
+ rrdpush_sender_lock(host);
+ ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection);
- close(sock);
- sock = -1;
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
}
}
else {
sent_connection += ret;
sent_bytes += ret;
begin += ret;
- if(begin == buffer_strlen(rrdpush_buffer)) {
- buffer_flush(rrdpush_buffer);
+ if(begin == buffer_strlen(host->rrdpush_buffer)) {
+ buffer_flush(host->rrdpush_buffer);
begin = 0;
}
}
- rrdpush_unlock();
+ rrdpush_sender_unlock(host);
}
// protection from overflow
- if(rrdpush_buffer->len > max_size) {
+ if(host->rrdpush_buffer->len > max_size) {
errno = 0;
- error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
- if(sock != -1) {
- close(sock);
- sock = -1;
+ error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection);
+ if(host->rrdpush_socket != -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
}
}
}
cleanup:
debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits.");
- // make sure the data collection threads do not write data
- rrdpush_connected = 0;
+ rrdpush_sender_cleanup(host);
- // close the pipe
- if(rrdpush_pipe[PIPE_READ] != -1) close(rrdpush_pipe[PIPE_READ]);
- if(rrdpush_pipe[PIPE_WRITE] != -1) close(rrdpush_pipe[PIPE_WRITE]);
-
- // close the socket
- if(sock != -1) close(sock);
-
- rrdpush_lock();
- buffer_free(rrdpush_buffer);
- rrdpush_buffer = NULL;
- rrdpush_unlock();
-
- static_thread->enabled = 0;
pthread_exit(NULL);
return NULL;
}
return appconfig_get_boolean(&stream_config, key, "enabled", 0);
}
+void rrdpush_sender_thread_spawn(RRDHOST *host) {
+ if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
+ error("STREAM [send for host %s]: failed to create new thread for client.", host->hostname);
+
+ else if(pthread_detach(host->rrdpush_thread))
+ error("STREAM [send for host %s]: cannot request detach newly created thread.", host->hostname);
+
+ host->rrdpush_spawn = 1;
+}
+
int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
(void)host;