default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ rrdhost_free_orphan_time = appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "free orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
- default_rrdpush_api_key = NULL;
- default_rrdpush_destination = NULL;
}
return default_rrdpush_enabled;
static inline int need_to_send_chart_definition(RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(!rd->exposed)
return 1;
return 0;
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 1;
}
}
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(rd->updated && rd->exposed)
buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 0;
rrdset_unlock(st);
}
rrdpush_unlock(host);
}
-void rrdpush_sender_thread_cleanup(RRDHOST *host) {
+static void rrdpush_sender_thread_cleanup(RRDHOST *host) {
rrdpush_lock(host);
host->rrdpush_connected = 0;
- if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
+ if(host->rrdpush_socket != -1) {
+ close(host->rrdpush_socket);
+ host->rrdpush_socket = -1;
+ }
// close the pipe
- if(host->rrdpush_pipe[PIPE_READ] != -1) close(host->rrdpush_pipe[PIPE_READ]);
- if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
- host->rrdpush_pipe[PIPE_READ] = -1;
- host->rrdpush_pipe[PIPE_WRITE] = -1;
+ if(host->rrdpush_pipe[PIPE_READ] != -1) {
+ close(host->rrdpush_pipe[PIPE_READ]);
+ host->rrdpush_pipe[PIPE_READ] = -1;
+ }
+
+ if(host->rrdpush_pipe[PIPE_WRITE] != -1) {
+ close(host->rrdpush_pipe[PIPE_WRITE]);
+ host->rrdpush_pipe[PIPE_WRITE] = -1;
+ }
buffer_free(host->rrdpush_buffer);
host->rrdpush_buffer = NULL;
host->rrdpush_spawn = 0;
- host->rrdpush_enabled = 0;
rrdpush_unlock(host);
}
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+ rrdhost_check_wrlock(host);
+
+ if(host->rrdpush_spawn) {
+ info("STREAM %s [send]: stopping sending thread...", host->hostname);
+ pthread_cancel(host->rrdpush_thread);
+ rrdpush_sender_thread_cleanup(host);
+ }
+}
+
void *rrdpush_sender_thread(void *ptr) {
RRDHOST *host = (RRDHOST *)ptr;
}
rrdhost_wrlock(host);
- host->use_counter++;
+ host->connected_senders++;
if(health_enabled != CONFIG_BOOLEAN_NO)
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
rrdhost_unlock(host);
error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
rrdhost_wrlock(host);
- host->use_counter--;
- if(!host->use_counter && health_enabled == CONFIG_BOOLEAN_AUTO)
- host->health_enabled = 0;
+ host->connected_senders--;
+ if(!host->connected_senders) {
+ if(health_enabled == CONFIG_BOOLEAN_AUTO)
+ host->health_enabled = 0;
+
+ host->senders_disconnected_time = now_realtime_sec();
+
+ rrdpush_sender_thread_stop(host);
+ }
rrdhost_unlock(host);
// cleanup