]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
operational stream mode - added more descriptive logs for errors
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 #define PIPE_READ 0
4 #define PIPE_WRITE 1
5
6 int rrdpush_pipe[2];
7
8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static RRDHOST *last_host = NULL;
11
12 static inline void rrdpush_lock() {
13     pthread_mutex_lock(&rrdpush_mutex);
14 }
15
16 static inline void rrdpush_unlock() {
17     pthread_mutex_unlock(&rrdpush_mutex);
18 }
19
20 static inline int need_to_send_chart_definition(RRDSET *st) {
21     RRDDIM *rd;
22     rrddim_foreach_read(rd, st)
23         if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
24             return 1;
25
26
27     // fprintf(stderr, "NOT Sending CHART '%s' '%s'\n", st->id, st->name);
28     return 0;
29 }
30
31 static inline void send_chart_definition(RRDSET *st) {
32     // fprintf(stderr, "Sending CHART '%s' '%s'\n", st->id, st->name);
33
34     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
35                 , st->id
36                 , st->name
37                 , st->title
38                 , st->units
39                 , st->family
40                 , st->context
41                 , rrdset_type_name(st->chart_type)
42                 , st->priority
43                 , st->update_every
44     );
45
46     RRDDIM *rd;
47     rrddim_foreach_read(rd, st) {
48         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
49                        , rd->id
50                        , rd->name
51                        , rrd_algorithm_name(rd->algorithm)
52                        , rd->multiplier
53                        , rd->divisor
54                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
55                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
56         );
57         rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
58     }
59 }
60
61 static inline void send_chart_metrics(RRDSET *st) {
62     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
63
64     RRDDIM *rd;
65     rrddim_foreach_read(rd, st) {
66         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
67             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
68                        , rd->id
69                        , rd->collected_value
70         );
71     }
72
73     buffer_strcat(rrdpush_buffer, "END\n");
74 }
75
76 static void reset_all_charts(void) {
77     rrd_rdlock();
78
79     RRDHOST *host;
80     rrdhost_foreach_read(host) {
81         rrdhost_rdlock(host);
82
83         RRDSET *st;
84         rrdset_foreach_read(st, host) {
85             rrdset_rdlock(st);
86
87             RRDDIM *rd;
88             rrddim_foreach_read(rd, st)
89                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
90
91             rrdset_unlock(st);
92         }
93         rrdhost_unlock(host);
94     }
95     rrd_unlock();
96
97     last_host = NULL;
98 }
99
100 void rrdset_done_push(RRDSET *st) {
101
102     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED) || !rrdpush_buffer))
103         return;
104
105     rrdpush_lock();
106     rrdset_rdlock(st);
107
108     if(st->rrdhost != last_host) {
109         buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
110         last_host = st->rrdhost;
111     }
112
113     if(need_to_send_chart_definition(st))
114         send_chart_definition(st);
115
116     send_chart_metrics(st);
117
118     // signal the sender there are more data
119     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
120         error("Cannot write to internal pipe");
121
122     rrdset_unlock(st);
123     rrdpush_unlock();
124 }
125
126 void *central_netdata_push_thread(void *ptr) {
127     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
128
129     info("Central netdata push thread created with task id %d", gettid());
130
131     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
132         error("Cannot set pthread cancel type to DEFERRED.");
133
134     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
135         error("Cannot set pthread cancel state to ENABLE.");
136
137
138     rrdpush_buffer = buffer_create(1);
139
140     if(pipe(rrdpush_pipe) == -1)
141         fatal("Cannot create required pipe.");
142
143     struct timeval tv = {
144             .tv_sec = 60,
145             .tv_usec = 0
146     };
147
148     size_t begin = 0;
149     size_t max_size = 1024 * 1024;
150     size_t reconnects_counter = 0;
151     size_t sent_bytes = 0;
152     size_t sent_connection = 0;
153     int sock = -1;
154     char buffer[1000 + 1];
155
156     struct pollfd fds[2], *ifd, *ofd;
157
158     ifd = &fds[0];
159     ofd = &fds[1];
160
161     for(;;) {
162         if(netdata_exit) break;
163
164         if(unlikely(sock == -1)) {
165             info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
166             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
167
168             if(unlikely(sock == -1)) {
169                 error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
170                 sleep(5);
171                 continue;
172             }
173
174             info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
175
176             char http[1000 + 1];
177             snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
178             if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
179                 close(sock);
180                 sock = -1;
181                 error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
182                 sleep(5);
183                 continue;
184             }
185
186             if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
187                 close(sock);
188                 sock = -1;
189                 error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
190                 sleep(5);
191                 continue;
192             }
193
194             if(strncmp(http, "STREAM", 6)) {
195                 close(sock);
196                 sock = -1;
197                 error("PUSH: netdata servers at  %s, did not send STREAM", central_netdata_to_push_data);
198                 sleep(5);
199                 continue;
200             }
201
202             if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
203                 error("PUSH: cannot set non-blocking mode for socket.");
204
205             rrdpush_lock();
206             if(buffer_strlen(rrdpush_buffer))
207                 error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
208
209             buffer_flush(rrdpush_buffer);
210             reset_all_charts();
211             last_host = NULL;
212             rrdpush_unlock();
213             sent_connection = 0;
214         }
215
216         ifd->fd = rrdpush_pipe[PIPE_READ];
217         ifd->events = POLLIN;
218         ifd->revents = 0;
219
220         ofd->fd = sock;
221         ofd->events = POLLOUT;
222         ofd->revents = 0;
223
224         nfds_t fdmax = 2;
225
226         if(begin < buffer_strlen(rrdpush_buffer))
227             ofd->events = POLLOUT;
228         else
229             ofd->events = 0;
230
231         if(netdata_exit) break;
232         int retval = poll(fds, fdmax, 60 * 1000);
233         if(netdata_exit) break;
234
235         if(unlikely(retval == -1)) {
236             if(errno == EAGAIN || errno == EINTR)
237                 continue;
238
239             error("PUSH: Failed to poll().");
240             close(sock);
241             sock = -1;
242             break;
243         }
244         else if(unlikely(!retval)) {
245             // timeout
246             continue;
247         }
248
249         if(ifd->revents & POLLIN) {
250             if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
251                 error("PUSH: Cannot read from internal pipe.");
252         }
253
254         if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
255             // info("PUSH: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
256
257             // fprintf(stderr, "PUSH BEGIN\n");
258             // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
259             // fprintf(stderr, "\nPUSH END\n");
260
261             rrdpush_lock();
262             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
263             if(ret == -1) {
264                 if(errno != EAGAIN && errno != EINTR) {
265                     error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
266                     close(sock);
267                     sock = -1;
268                 }
269             }
270             else {
271                 sent_connection += ret;
272                 sent_bytes += ret;
273                 begin += ret;
274                 if(begin == buffer_strlen(rrdpush_buffer)) {
275                     buffer_flush(rrdpush_buffer);
276                     begin = 0;
277                 }
278             }
279             rrdpush_unlock();
280         }
281
282         // protection from overflow
283         if(rrdpush_buffer->len > max_size) {
284             errno = 0;
285             error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
286             if(sock != -1) {
287                 close(sock);
288                 sock = -1;
289             }
290         }
291     }
292
293     debug(D_WEB_CLIENT, "Central netdata push thread exits.");
294     if(sock != -1)
295         close(sock);
296
297     static_thread->enabled = 0;
298     pthread_exit(NULL);
299     return NULL;
300 }