8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static RRDHOST *last_host = NULL;
12 static inline void rrdpush_lock() {
13 pthread_mutex_lock(&rrdpush_mutex);
16 static inline void rrdpush_unlock() {
17 pthread_mutex_unlock(&rrdpush_mutex);
20 static inline int need_to_send_chart_definitions(RRDSET *st) {
22 for(rd = st->dimensions; rd ;rd = rd->next)
23 if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && !rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
29 static inline void send_chart_definitions(RRDSET *st) {
30 buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
37 , rrdset_type_name(st->chart_type)
43 for(rd = st->dimensions; rd ;rd = rd->next) {
44 buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
47 , rrd_algorithm_name(rd->algorithm)
50 , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
51 , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
56 static inline void send_chart_metrics(RRDSET *st) {
57 buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
60 for(rd = st->dimensions; rd ;rd = rd->next) {
61 if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))
62 buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
68 buffer_strcat(rrdpush_buffer, "END\n");
71 static void reset_all_charts(void) {
75 for(h = localhost; h ;h = h->next) {
77 for(st = h->rrdset_root ; st ; st = st->next) {
81 for(rd = st->dimensions; rd ;rd = rd->next)
82 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
93 void rrdset_done_push(RRDSET *st) {
95 if(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))
101 if(st->rrdhost != last_host)
102 buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->hostname, st->rrdhost->machine_guid);
104 if(need_to_send_chart_definitions(st))
105 send_chart_definitions(st);
107 send_chart_metrics(st);
109 // signal the sender there are more data
110 if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
111 error("Cannot write to internal pipe");
117 void *central_netdata_push_thread(void *ptr) {
118 struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
120 info("Central netdata push thread created with task id %d", gettid());
122 if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
123 error("Cannot set pthread cancel type to DEFERRED.");
125 if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
126 error("Cannot set pthread cancel state to ENABLE.");
129 rrdpush_buffer = buffer_create(1);
131 if(pipe(rrdpush_pipe) == -1)
132 fatal("Cannot create required pipe.");
134 struct timeval tv = {
140 size_t max_size = 1024 * 1024;
141 size_t reconnects_counter = 0;
146 if(unlikely(sock == -1)) {
147 sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
149 if(unlikely(sock != -1)) {
150 if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
151 error("Cannot set non-blocking mode for socket.");
153 buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s\r\n\r\n", config_get("global", "central netdata api key", ""));
158 if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
159 error("Cannot read from internal pipe.");
163 if(likely(sock != -1)) {
165 ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len, MSG_DONTWAIT);
167 error("Failed to send metrics to central netdata at %s", central_netdata_to_push_data);
173 if(begin == rrdpush_buffer->len) {
174 buffer_flush(rrdpush_buffer);
181 // protection from overflow
182 if(rrdpush_buffer->len > max_size) {
185 error("Discarding %zu bytes of metrics data, because we cannot connect to central netdata at %s"
186 , buffer_strlen(rrdpush_buffer), central_netdata_to_push_data);
188 buffer_flush(rrdpush_buffer);
200 debug(D_WEB_CLIENT, "Central netdata push thread exits.");
204 static_thread->enabled = 0;