]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
draft implementation of netdata central push server - untested
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 #define PIPE_READ 0
4 #define PIPE_WRITE 1
5
6 int rrdpush_pipe[2];
7
8 static BUFFER *rrdpush_buffer = NULL;
9 static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER;
10 static RRDHOST *last_host = NULL;
11
12 static inline void rrdpush_lock() {
13     pthread_mutex_lock(&rrdpush_mutex);
14 }
15
16 static inline void rrdpush_unlock() {
17     pthread_mutex_unlock(&rrdpush_mutex);
18 }
19
20 static inline int need_to_send_chart_definitions(RRDSET *st) {
21     RRDDIM *rd;
22     for(rd = st->dimensions; rd ;rd = rd->next)
23         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && !rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
24             return 1;
25
26     return 0;
27 }
28
29 static inline void send_chart_definitions(RRDSET *st) {
30     buffer_sprintf(rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
31                 , st->id
32                 , st->name
33                 , st->title
34                 , st->units
35                 , st->family
36                 , st->context
37                 , rrdset_type_name(st->chart_type)
38                 , st->priority
39                 , st->update_every
40     );
41
42     RRDDIM *rd;
43     for(rd = st->dimensions; rd ;rd = rd->next) {
44         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
45                        , rd->id
46                        , rd->name
47                        , rrd_algorithm_name(rd->algorithm)
48                        , rd->multiplier
49                        , rd->divisor
50                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
51                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
52         );
53     }
54 }
55
56 static inline void send_chart_metrics(RRDSET *st) {
57     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
58
59     RRDDIM *rd;
60     for(rd = st->dimensions; rd ;rd = rd->next) {
61         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))
62             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
63                        , rd->id
64                        , rd->collected_value
65         );
66     }
67
68     buffer_strcat(rrdpush_buffer, "END\n");
69 }
70
71 static void reset_all_charts(void) {
72     rrd_rdlock();
73
74     RRDHOST *h;
75     for(h = localhost; h ;h = h->next) {
76         RRDSET *st;
77         for(st = h->rrdset_root ; st ; st = st->next) {
78             rrdset_rdlock(st);
79
80             RRDDIM *rd;
81             for(rd = st->dimensions; rd ;rd = rd->next)
82                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
83
84             rrdset_unlock(st);
85         }
86     }
87
88     last_host = NULL;
89
90     rrd_unlock();
91 }
92
93 void rrdset_done_push(RRDSET *st) {
94
95     if(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))
96         return;
97
98     rrdpush_lock();
99     rrdset_rdlock(st);
100
101     if(st->rrdhost != last_host)
102         buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->hostname, st->rrdhost->machine_guid);
103
104     if(need_to_send_chart_definitions(st))
105         send_chart_definitions(st);
106
107     send_chart_metrics(st);
108
109     // signal the sender there are more data
110     if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
111         error("Cannot write to internal pipe");
112
113     rrdset_unlock(st);
114     rrdpush_unlock();
115 }
116
117 void *central_netdata_push_thread(void *ptr) {
118     struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
119
120     info("Central netdata push thread created with task id %d", gettid());
121
122     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
123         error("Cannot set pthread cancel type to DEFERRED.");
124
125     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
126         error("Cannot set pthread cancel state to ENABLE.");
127
128
129     rrdpush_buffer = buffer_create(1);
130
131     if(pipe(rrdpush_pipe) == -1)
132         fatal("Cannot create required pipe.");
133
134     struct timeval tv = {
135             .tv_sec = 60,
136             .tv_usec = 0
137     };
138
139     size_t begin = 0;
140     size_t max_size = 1024 * 1024;
141     size_t reconnects_counter = 0;
142     int sock = -1;
143     char buffer[1];
144
145     for(;;) {
146         if(unlikely(sock == -1)) {
147             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
148
149             if(unlikely(sock != -1)) {
150                 if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
151                     error("Cannot set non-blocking mode for socket.");
152
153                 buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s\r\n\r\n", config_get("global", "central netdata api key", ""));
154                 reset_all_charts();
155             }
156         }
157
158         if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
159             error("Cannot read from internal pipe.");
160             sleep(1);
161         }
162
163         if(likely(sock != -1)) {
164             rrdpush_lock();
165             ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len, MSG_DONTWAIT);
166             if(ret == -1) {
167                 error("Failed to send metrics to central netdata at %s", central_netdata_to_push_data);
168                 close(sock);
169                 sock = -1;
170             }
171             else {
172                 begin += ret;
173                 if(begin == rrdpush_buffer->len) {
174                     buffer_flush(rrdpush_buffer);
175                     begin = 0;
176                 }
177             }
178             rrdpush_unlock();
179         }
180
181         // protection from overflow
182         if(rrdpush_buffer->len > max_size) {
183             rrdpush_lock();
184
185             error("Discarding %zu bytes of metrics data, because we cannot connect to central netdata at %s"
186                   , buffer_strlen(rrdpush_buffer), central_netdata_to_push_data);
187
188             buffer_flush(rrdpush_buffer);
189
190             if(sock != -1) {
191                 close(sock);
192                 sock = -1;
193             }
194
195             rrdpush_unlock();
196         }
197     }
198
199 cleanup:
200     debug(D_WEB_CLIENT, "Central netdata push thread exits.");
201     if(sock != -1)
202         close(sock);
203
204     static_thread->enabled = 0;
205     pthread_exit(NULL);
206     return NULL;
207 }