default_rrdpush_enabled = appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enabled", default_rrdpush_enabled);
default_rrdpush_destination = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "destination", "");
default_rrdpush_api_key = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "api key", "");
- rrdhost_free_orphan_time = appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "free orphan hosts after seconds", rrdhost_free_orphan_time);
+ rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_GLOBAL, "cleanup orphan hosts after seconds", rrdhost_free_orphan_time);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
error("STREAM [send]: cannot enable sending thread - information is missing.");
// this is for the first iterations of each chart
static unsigned int remote_clock_resync_iterations = 60;
-#define rrdpush_lock(host) pthread_mutex_lock(&((host)->rrdpush_mutex))
-#define rrdpush_unlock(host) pthread_mutex_unlock(&((host)->rrdpush_mutex))
+#define rrdpush_lock(host) netdata_mutex_lock(&((host)->rrdpush_mutex))
+#define rrdpush_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_mutex))
// checks if the current chart definition has been sent
static inline int need_to_send_chart_definition(RRDSET *st) {
static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) {
rrdpush_lock(host);
+
if(buffer_strlen(host->rrdpush_buffer))
error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_buffer));
buffer_flush(host->rrdpush_buffer);
- rrdpush_sender_thread_reset_all_charts(host);
- rrdpush_unlock(host);
-}
-
-static inline void rrdpush_sender_thread_lock(RRDHOST *host) {
- if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
- rrdpush_lock(host);
-}
-
-static inline void rrdpush_sender_thread_unlock(RRDHOST *host) {
- if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+ rrdpush_sender_thread_reset_all_charts(host);
rrdpush_unlock(host);
}
-static void rrdpush_sender_thread_cleanup(RRDHOST *host) {
- rrdpush_lock(host);
-
+static void rrdpush_sender_thread_cleanup_locked_all(RRDHOST *host) {
host->rrdpush_connected = 0;
if(host->rrdpush_socket != -1) {
host->rrdpush_spawn = 0;
- rrdpush_unlock(host);
+ rrdhost_flag_set(host, RRDHOST_ORPHAN);
}
void rrdpush_sender_thread_stop(RRDHOST *host) {
- rrdhost_check_wrlock(host);
+ rrdpush_lock(host);
+ rrdhost_wrlock(host);
if(host->rrdpush_spawn) {
info("STREAM %s [send]: stopping sending thread...", host->hostname);
pthread_cancel(host->rrdpush_thread);
- rrdpush_sender_thread_cleanup(host);
+ rrdpush_sender_thread_cleanup_locked_all(host);
}
+
+ rrdhost_unlock(host);
+ rrdpush_unlock(host);
}
void *rrdpush_sender_thread(void *ptr) {
}
if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
- rrdpush_sender_thread_lock(host);
+
+ // BEGIN RRDPUSH LOCKED SESSION
+
+ // during this session, data collectors
+ // will not be able to append data to our buffer
+ // but the socket is in non-blocking mode
+ // so, we will not block at send()
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+
+ rrdpush_lock(host);
+
ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
sent_bytes += ret;
begin += ret;
if(begin == buffer_strlen(host->rrdpush_buffer)) {
+ // we send it all
+
buffer_flush(host->rrdpush_buffer);
begin = 0;
}
}
- rrdpush_sender_thread_unlock(host);
+
+ rrdpush_unlock(host);
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
+
+ // END RRDPUSH LOCKED SESSION
}
// protection from overflow
cleanup:
debug(D_WEB_CLIENT, "STREAM %s [send]: sending thread exits.", host->hostname);
- rrdpush_sender_thread_cleanup(host);
+ if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to DISABLE.", host->hostname);
+
+ rrdpush_lock(host);
+ rrdhost_wrlock(host);
+ rrdpush_sender_thread_cleanup_locked_all(host);
+ rrdhost_unlock(host);
+ rrdpush_unlock(host);
+
+ if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
+ error("STREAM %s [send]: cannot set pthread cancel state to ENABLE.", host->hostname);
pthread_exit(NULL);
return NULL;
}
#ifdef NETDATA_INTERNAL_CHECKS
- info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s"
+ info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s"
, hostname
, client_ip
, client_port
info("STREAM %s [receive from [%s]:%s]: initializing communication...", host->hostname, client_ip, client_port);
if(send_timeout(fd, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT), 0, 60) != strlen(START_STREAMING_PROMPT)) {
error("STREAM %s [receive from [%s]:%s]: cannot send ready command.", host->hostname, client_ip, client_port);
+ close(fd);
return 0;
}
FILE *fp = fdopen(fd, "r");
if(!fp) {
error("STREAM %s [receive from [%s]:%s]: failed to get a FILE for FD %d.", host->hostname, client_ip, client_port, fd);
+ close(fd);
return 0;
}
error("STREAM %s [receive from [%s]:%s]: disconnected (completed updates %zu).", host->hostname, client_ip, client_port, count);
rrdhost_wrlock(host);
+ host->senders_disconnected_time = now_realtime_sec();
host->connected_senders--;
if(!host->connected_senders) {
if(health_enabled == CONFIG_BOOLEAN_AUTO)
host->health_enabled = 0;
-
- host->senders_disconnected_time = now_realtime_sec();
-
- rrdpush_sender_thread_stop(host);
}
rrdhost_unlock(host);
+ rrdpush_sender_thread_stop(host);
+
// cleanup
fclose(fp);
rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port);
info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- close(rpt->fd);
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->machine_guid);
}
void rrdpush_sender_thread_spawn(RRDHOST *host) {
- if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
- error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
+ rrdhost_wrlock(host);
+
+ if(!host->rrdpush_spawn) {
+ if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *) host))
+ error("STREAM %s [send]: failed to create new thread for client.", host->hostname);
- else if(pthread_detach(host->rrdpush_thread))
- error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname);
+ else if(pthread_detach(host->rrdpush_thread))
+ error("STREAM %s [send]: cannot request detach newly created thread.", host->hostname);
- host->rrdpush_spawn = 1;
+ rrdhost_flag_clear(host, RRDHOST_ORPHAN);
+ host->rrdpush_spawn = 1;
+ }
+
+ rrdhost_unlock(host);
}
int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
- char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
+ char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = "unknown";
int update_every = default_rrd_update_every;
char buf[GUID_LEN + 1];