static BUFFER *rrdpush_buffer = NULL;
static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
-static RRDHOST *last_host = NULL;
+static volatile RRDHOST *last_host = NULL;
+static volatile int rrdpush_connected = 0;
static inline void rrdpush_lock() {
pthread_mutex_lock(&rrdpush_mutex);
}
void rrdset_done_push(RRDSET *st) {
+ static int error_shown = 0;
- if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED) || !rrdpush_buffer))
+ if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
return;
+ if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
+ if(!error_shown)
+ error("PUSH: not ready - discarding collected metrics.");
+
+ error_shown = 1;
+ return;
+ }
+ error_shown = 0;
+
rrdpush_lock();
rrdset_rdlock(st);
rrdpush_unlock();
}
+static inline void rrdpush_flush(void) {
+ rrdpush_lock();
+ if(buffer_strlen(rrdpush_buffer))
+ error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+
+ buffer_flush(rrdpush_buffer);
+ reset_all_charts();
+ last_host = NULL;
+ rrdpush_unlock();
+}
+
void *central_netdata_push_thread(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
.tv_usec = 0
};
+ rrdpush_connected = 0;
size_t begin = 0;
size_t max_size = 1024 * 1024;
size_t reconnects_counter = 0;
size_t sent_bytes = 0;
size_t sent_connection = 0;
int sock = -1;
- char buffer[1000 + 1];
struct pollfd fds[2], *ifd, *ofd;
if(netdata_exit) break;
if(unlikely(sock == -1)) {
+ rrdpush_connected = 0;
+
info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
if(unlikely(sock == -1)) {
error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
- sleep(5);
continue;
}
if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
error("PUSH: cannot set non-blocking mode for socket.");
- rrdpush_lock();
- if(buffer_strlen(rrdpush_buffer))
- error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
-
- buffer_flush(rrdpush_buffer);
- reset_all_charts();
- last_host = NULL;
- rrdpush_unlock();
+ rrdpush_flush();
sent_connection = 0;
+ rrdpush_connected = 1;
}
ifd->fd = rrdpush_pipe[PIPE_READ];
}
if(ifd->revents & POLLIN) {
+ char buffer[1000 + 1];
if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
error("PUSH: Cannot read from internal pipe.");
}
}
debug(D_WEB_CLIENT, "Central netdata push thread exits.");
- if(sock != -1)
+ if(sock != -1) {
close(sock);
+ }
static_thread->enabled = 0;
pthread_exit(NULL);