default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
static inline int need_to_send_chart_definition(RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(!rd->exposed)
return 1;
return 0;
, rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
, rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
);
- rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 1;
}
}
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
+ if(rd->updated && rd->exposed)
buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
, rd->id
, rd->collected_value
RRDDIM *rd;
rrddim_foreach_read(rd, st)
- rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
+ rd->exposed = 0;
rrdset_unlock(st);
}
static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
rrdpush_lock(host);
+
if(buffer_strlen(host->rrdpush_buffer))
error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_buffer));
buffer_flush(host->rrdpush_buffer);
- rrdpush_sender_thread_reset_all_charts(host);
- rrdpush_unlock(host);
-}
-
-static inline void rrdpush_sender_thread_lock(RRDHOST *host) {
- if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
- rrdpush_lock(host);
-}
-
-static inline void rrdpush_sender_thread_unlock(RRDHOST *host) {
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+ rrdpush_sender_thread_reset_all_charts(host);
rrdpush_unlock(host);
}
-void rrdpush_sender_thread_cleanup(RRDHOST *host) {
- rrdpush_lock(host);
-
+static void rrdpush_sender_thread_cleanup_locked_all(RRDHOST *host) {
host->rrdpush_connected = 0;
if(host->rrdpush_socket != -1) {
host->rrdpush_spawn = 0;
+ rrdhost_flag_set(host, RRDHOST_ORPHAN);
+}
+
+void rrdpush_sender_thread_stop(RRDHOST *host) {
+ rrdpush_lock(host);
+ rrdhost_wrlock(host);
+
+ if(host->rrdpush_spawn) {
+ info("STREAM %s [send]: stopping sending thread...", host->hostname);
+ pthread_cancel(host->rrdpush_thread);
+ rrdpush_sender_thread_cleanup_locked_all(host);
+ }
+
+ rrdhost_unlock(host);
rrdpush_unlock(host);
}
}
if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
- rrdpush_sender_thread_lock(host);
+
+ // BEGIN RRDPUSH LOCKED SESSION
+
+ // during this session, data collectors
+ // will not be able to append data to our buffer
+ // but the socket is in non-blocking mode
+ // so, we will not block at send()
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+
+ rrdpush_lock(host);
+
ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
sent_bytes += ret;
begin += ret;
if(begin == buffer_strlen(host->rrdpush_buffer)) {
+ // we send it all
+
buffer_flush(host->rrdpush_buffer);
begin = 0;
}
}
- rrdpush_sender_thread_unlock(host);
+
+ rrdpush_unlock(host);
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
+
+ // END RRDPUSH LOCKED SESSION
}
// protection from overflow
cleanup:
debug(D_WEB_CLIENT, "STREAM %s [send]: sending thread exits.", host->hostname);
- rrdpush_sender_thread_cleanup(host);
+ if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+
+ rrdpush_lock(host);
+ rrdhost_wrlock(host);
+ rrdpush_sender_thread_cleanup_locked_all(host);
+ rrdhost_unlock(host);
+ rrdpush_unlock(host);
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
pthread_exit(NULL);
return NULL;
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
+ close(fd);
return 0;
}
FILE *fp = fdopen(fd, "r");
if(!fp) {
error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", host->hostname, client_ip, client_port, fd);
+ close(fd);
return 0;
}
rrdhost_wrlock(host);
- host->use_counter++;
+ host->connected_senders++;
if(health_enabled != CONFIG_BOOLEAN_NO)
host->health_delay_up_to = now_realtime_sec() + alarms_delay;
rrdhost_unlock(host);
error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
rrdhost_wrlock(host);
- host->use_counter--;
- if(!host->use_counter) {
+ host->connected_senders--;
+ if(!host->connected_senders) {
if(health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
- if(host->rrdpush_spawn) {
- pthread_cancel(host->rrdpush_thread);
- rrdpush_sender_thread_cleanup(host);
- }
+ host->senders_disconnected_time = now_realtime_sec();
}
rrdhost_unlock(host);
+ rrdpush_sender_thread_stop(host);
+
// cleanup
fclose(fp);
rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port);
info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- close(rpt->fd);
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->machine_guid);
}
void rrdpush_sender_thread_spawn(RRDHOST *host) {
- if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
- error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
+ rrdhost_wrlock(host);
+
+ if(!host->rrdpush_spawn) {
+ if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *) host))
+ error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
- else if(pthread_detach(host->rrdpush_thread))
- error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname);
+ else if(pthread_detach(host->rrdpush_thread))
+ error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname);
- host->rrdpush_spawn = 1;
+ rrdhost_flag_clear(host, RRDHOST_ORPHAN);
+ host->rrdpush_spawn = 1;
+ }
+
+ rrdhost_unlock(host);
}
int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
- char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
+ char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = "unknown";
int update_every = default_rrd_update_every;
char buf[GUID_LEN + 1];
return 404;
}
- if(!appconfig_get_boolean(&stream_config, key, "enabled", 1)) {
+ if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");