8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static volatile int rrdpush_connected = 0;
12 static inline void rrdpush_lock() {
13 pthread_mutex_lock(&rrdpush_mutex);
16 static inline void rrdpush_unlock() {
17 pthread_mutex_unlock(&rrdpush_mutex);
20 static inline int need_to_send_chart_definition(RRDSET *st) {
22 rrddim_foreach_read(rd, st)
23 if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
27 // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
31 static inline void send_chart_definition(RRDSET *st) {
32 // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
34 buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
41 , rrdset_type_name(st->chart_type)
47 rrddim_foreach_read(rd, st) {
48 buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
51 , rrd_algorithm_name(rd->algorithm)
54 , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
55 , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
57 rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
61 static inline void send_chart_metrics(RRDSET *st) {
62 buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > 60)?st->usec_since_last_update:0);
65 rrddim_foreach_read(rd, st) {
66 if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
67 buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
73 buffer_strcat(rrdpush_buffer, "END\n");
76 static void reset_all_charts(void) {
80 rrdhost_foreach_read(host) {
84 rrdset_foreach_read(st, host) {
86 // make it re-align the current time
93 rrddim_foreach_read(rd, st)
94 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
103 void rrdset_done_push(RRDSET *st) {
104 static int error_shown = 0;
106 if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
112 if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
114 error("STREAM: not ready - discarding collected metrics.");
124 if(need_to_send_chart_definition(st))
125 send_chart_definition(st);
127 send_chart_metrics(st);
130 // signal the sender there are more data
131 if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
132 error("STREAM: cannot write to internal pipe");
137 static inline void rrdpush_flush(void) {
139 if(buffer_strlen(rrdpush_buffer))
140 error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
142 buffer_flush(rrdpush_buffer);
147 void *central_netdata_push_thread(void *ptr) {
148 struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
150 info("STREAM: central netdata push thread created with task id %d", gettid());
152 if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
153 error("STREAM: cannot set pthread cancel type to DEFERRED.");
155 if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
156 error("STREAM: cannot set pthread cancel state to ENABLE.");
159 rrdpush_buffer = buffer_create(1);
161 if(pipe(rrdpush_pipe) == -1)
162 fatal("STREAM: cannot create required pipe.");
164 struct timeval tv = {
169 rrdpush_connected = 0;
171 size_t max_size = 1024 * 1024;
172 size_t reconnects_counter = 0;
173 size_t sent_bytes = 0;
174 size_t sent_connection = 0;
177 struct pollfd fds[2], *ifd, *ofd;
184 if(netdata_exit) break;
186 if(unlikely(sock == -1)) {
187 // stop appending data into rrdpush_buffer
188 // they will be lost, so there is no point to do it
189 rrdpush_connected = 0;
191 info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
192 sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
194 if(unlikely(sock == -1)) {
195 error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
200 info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
203 snprintfz(http, 1000, "GET /stream?key=%s&hostname=%s&machine_guid=%s&update_every=%d HTTP/1.1\r\n"
204 "User-Agent: netdata-push-service/%s\r\n"
205 "Accept: */*\r\n\r\n"
206 , config_get("global", "central netdata api key", "")
207 , localhost->hostname
208 , localhost->machine_guid
209 , default_rrd_update_every
213 if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
216 error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
221 info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
223 if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
226 error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
231 if(strncmp(http, "STREAM", 6)) {
234 error("STREAM: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
239 info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
241 if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
242 error("STREAM: cannot set non-blocking mode for socket.");
247 // allow appending data into rrdpush_buffer
248 rrdpush_connected = 1;
251 ifd->fd = rrdpush_pipe[PIPE_READ];
252 ifd->events = POLLIN;
257 if(begin < buffer_strlen(rrdpush_buffer)) {
258 ofd->events = POLLOUT;
266 if(netdata_exit) break;
267 int retval = poll(fds, fdmax, 60 * 1000);
268 if(netdata_exit) break;
270 if(unlikely(retval == -1)) {
271 if(errno == EAGAIN || errno == EINTR)
274 error("STREAM: Failed to poll().");
279 else if(unlikely(!retval)) {
284 if(ifd->revents & POLLIN) {
285 char buffer[1000 + 1];
286 if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
287 error("STREAM: Cannot read from internal pipe.");
290 if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
291 // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
293 // fprintf(stderr, "PUSH BEGIN\n");
294 // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
295 // fprintf(stderr, "\nPUSH END\n");
298 ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
300 if(errno != EAGAIN && errno != EINTR) {
301 error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
307 sent_connection += ret;
310 if(begin == buffer_strlen(rrdpush_buffer)) {
311 buffer_flush(rrdpush_buffer);
318 // protection from overflow
319 if(rrdpush_buffer->len > max_size) {
321 error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
329 debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
334 static_thread->enabled = 0;