-int rrdpush_init() {
- rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled);
- rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive);
- remote_netdata_config = config_get("stream", "stream metrics to", "");
- api_key = config_get("stream", "api key", "");
-
- if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
- rrdpush_enabled = 0;
- rrdpush_exclusive = 0;
+static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
+ rrdpush_lock(host);
+
+ if(buffer_strlen(host->rrdpush_buffer))
+ error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_buffer));
+
+ buffer_flush(host->rrdpush_buffer);
+
+ rrdpush_sender_thread_reset_all_charts(host);
+
+ rrdpush_unlock(host);
+}
+
+static void rrdpush_sender_thread_cleanup_locked_all(RRDHOST *host) {
+ host->rrdpush_connected = 0;
+
+ if(host->rrdpush_socket != -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
+ }
+
+ // close the pipe
+ if(host->rrdpush_pipe[PIPE_READ] != -1) {
+ close(host->rrdpush_pipe[PIPE_READ]);
+ host->rrdpush_pipe[PIPE_READ] = -1;
+ }
+
+ if(host->rrdpush_pipe[PIPE_WRITE] != -1) {
+ close(host->rrdpush_pipe[PIPE_WRITE]);
+ host->rrdpush_pipe[PIPE_WRITE] = -1;
+ }
+
+ buffer_free(host->rrdpush_buffer);
+ host->rrdpush_buffer = NULL;
+
+ host->rrdpush_spawn = 0;
+
+ rrdhost_flag_set(host, RRDHOST_ORPHAN);
+}
+
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+ rrdpush_lock(host);
+ rrdhost_wrlock(host);
+
+ if(host->rrdpush_spawn) {
+ info("STREAM %s [send]: stopping sending thread...", host->hostname);
+ pthread_cancel(host->rrdpush_thread);
+ rrdpush_sender_thread_cleanup_locked_all(host);