]> arthur.barton.de Git - netdata.git/blob - src/rrdpush.c
9bcd232ab5382bc798c7d421f21d93def53c400b
[netdata.git] / src / rrdpush.c
1 #include "common.h"
2
3 int default_rrdpush_enabled   = 0;
4 int default_rrdpush_exclusive = 1;
5
6 static char *remote_netdata_config = NULL;
7 static char *api_key = NULL;
8
9 #define CONNECTED_TO_SIZE 100
10
11 // data collection happens from multiple threads
12 // each of these threads calls rrdset_done()
13 // which in turn calls rrdset_done_push()
14 // which uses this pipe to notify the streaming thread
15 // that there are more data ready to be sent
16 #define PIPE_READ 0
17 #define PIPE_WRITE 1
18
19 // to have the remote netdata re-sync the charts
20 // to its current clock, we send for this many
21 // iterations a BEGIN line without microseconds
22 // this is for the first iterations of each chart
23 static unsigned int remote_clock_resync_iterations = 60;
24
25 #define rrdpush_lock(host) pthread_mutex_lock(&((host)->rrdpush_mutex))
26 #define rrdpush_unlock(host) pthread_mutex_unlock(&((host)->rrdpush_mutex))
27
28 // checks if the current chart definition has been sent
29 static inline int need_to_send_chart_definition(RRDSET *st) {
30     RRDDIM *rd;
31     rrddim_foreach_read(rd, st)
32         if(!rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
33             return 1;
34
35     return 0;
36 }
37
38 // sends the current chart definition
39 static inline void send_chart_definition(RRDSET *st) {
40     buffer_sprintf(st->rrdhost->rrdpush_buffer, "CHART '%s' '%s' '%s' '%s' '%s' '%s' '%s' %ld %d\n"
41                 , st->id
42                 , st->name
43                 , st->title
44                 , st->units
45                 , st->family
46                 , st->context
47                 , rrdset_type_name(st->chart_type)
48                 , st->priority
49                 , st->update_every
50     );
51
52     RRDDIM *rd;
53     rrddim_foreach_read(rd, st) {
54         buffer_sprintf(st->rrdhost->rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
55                        , rd->id
56                        , rd->name
57                        , rrd_algorithm_name(rd->algorithm)
58                        , rd->multiplier
59                        , rd->divisor
60                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
61                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
62         );
63         rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
64     }
65 }
66
67 // sends the current chart dimensions
68 static inline void send_chart_metrics(RRDSET *st) {
69     buffer_sprintf(st->rrdhost->rrdpush_buffer, "BEGIN %s %llu\n", st->id, (st->counter_done > remote_clock_resync_iterations)?st->usec_since_last_update:0);
70
71     RRDDIM *rd;
72     rrddim_foreach_read(rd, st) {
73         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
74             buffer_sprintf(st->rrdhost->rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
75                        , rd->id
76                        , rd->collected_value
77         );
78     }
79
80     buffer_strcat(st->rrdhost->rrdpush_buffer, "END\n");
81 }
82
83 // resets all the chart, so that their definitions
84 // will be resent to the central netdata
85 static void reset_all_charts(RRDHOST *host) {
86     rrdhost_rdlock(host);
87
88     RRDSET *st;
89     rrdset_foreach_read(st, host) {
90
91         // make it re-align the current time
92         // on the remote host
93         st->counter_done = 0;
94
95         rrdset_rdlock(st);
96
97         RRDDIM *rd;
98         rrddim_foreach_read(rd, st)
99             rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
100
101         rrdset_unlock(st);
102     }
103
104     rrdhost_unlock(host);
105 }
106
107 void rrdpush_sender_thread_spawn(RRDHOST *host);
108
109 void rrdset_done_push(RRDSET *st) {
110     RRDHOST *host = st->rrdhost;
111
112     if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
113         return;
114
115     rrdpush_lock(host);
116
117     if(unlikely(host->rrdpush_enabled && !host->rrdpush_spawn))
118         rrdpush_sender_thread_spawn(host);
119
120     if(unlikely(!host->rrdpush_buffer || !host->rrdpush_connected)) {
121         if(unlikely(!host->rrdpush_error_shown))
122             error("STREAM [send]: not ready - discarding collected metrics.");
123
124         host->rrdpush_error_shown = 1;
125
126         rrdpush_unlock(host);
127         return;
128     }
129     else if(unlikely(host->rrdpush_error_shown)) {
130         error("STREAM [send]: ready - sending metrics...");
131         host->rrdpush_error_shown = 0;
132     }
133
134     rrdset_rdlock(st);
135     if(need_to_send_chart_definition(st))
136         send_chart_definition(st);
137
138     send_chart_metrics(st);
139     rrdset_unlock(st);
140
141     // signal the sender there are more data
142     if(write(host->rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
143         error("STREAM [send]: cannot write to internal pipe");
144
145     rrdpush_unlock(host);
146 }
147
148 static inline void rrdpush_flush(RRDHOST *host) {
149     rrdpush_lock(host);
150     if(buffer_strlen(host->rrdpush_buffer))
151         error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(host->rrdpush_buffer));
152
153     buffer_flush(host->rrdpush_buffer);
154     reset_all_charts(host);
155     rrdpush_unlock(host);
156 }
157
158 int rrdpush_init() {
159     default_rrdpush_enabled = config_get_boolean("stream", "enabled", default_rrdpush_enabled);
160     default_rrdpush_exclusive = config_get_boolean("stream", "exclusive", default_rrdpush_exclusive);
161     remote_netdata_config = config_get("stream", "stream metrics to", "");
162     api_key = config_get("stream", "api key", "");
163
164     if(!default_rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) {
165         default_rrdpush_enabled = 0;
166         default_rrdpush_exclusive = 0;
167     }
168
169     return default_rrdpush_enabled;
170 }
171
172 static inline void rrdpush_sender_lock(RRDHOST *host) {
173     if(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL) != 0)
174         error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
175
176     rrdpush_lock(host);
177 }
178
179 static inline void rrdpush_sender_unlock(RRDHOST *host) {
180     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
181         error("STREAM [send]: cannot set pthread cancel state to DISABLE.");
182
183     rrdpush_unlock(host);
184 }
185
186 void rrdpush_sender_cleanup(RRDHOST *host) {
187     rrdpush_lock(host);
188
189     host->rrdpush_connected = 0;
190
191     if(host->rrdpush_socket != -1) close(host->rrdpush_socket);
192
193     // close the pipe
194     if(host->rrdpush_pipe[PIPE_READ] != -1)  close(host->rrdpush_pipe[PIPE_READ]);
195     if(host->rrdpush_pipe[PIPE_WRITE] != -1) close(host->rrdpush_pipe[PIPE_WRITE]);
196     host->rrdpush_pipe[PIPE_READ] = -1;
197     host->rrdpush_pipe[PIPE_WRITE] = -1;
198
199     buffer_free(host->rrdpush_buffer);
200     host->rrdpush_buffer = NULL;
201
202     host->rrdpush_spawn = 0;
203     host->rrdpush_enabled = 0;
204
205     rrdpush_unlock(host);
206 }
207
208 void *rrdpush_sender_thread(void *ptr) {
209     RRDHOST *host = (RRDHOST *)ptr;
210
211     info("STREAM [send]: thread created (task id %d)", gettid());
212
213     if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
214         error("STREAM [send]: cannot set pthread cancel type to DEFERRED.");
215
216     if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
217         error("STREAM [send]: cannot set pthread cancel state to ENABLE.");
218
219     int timeout = (int)config_get_number("stream", "timeout seconds", 60);
220     int default_port = (int)config_get_number("stream", "default port", 19999);
221     size_t max_size = (size_t)config_get_number("stream", "buffer size bytes", 1024 * 1024);
222     unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5);
223     remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations);
224     char connected_to[CONNECTED_TO_SIZE + 1] = "";
225
226     if(!host->rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key)
227         goto cleanup;
228
229     // initialize rrdpush globals
230     host->rrdpush_buffer = buffer_create(1);
231     host->rrdpush_connected = 0;
232     if(pipe(host->rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe.");
233
234     // initialize local variables
235     size_t begin = 0;
236     size_t reconnects_counter = 0;
237     size_t sent_bytes = 0;
238     size_t sent_connection = 0;
239
240     struct timeval tv = {
241             .tv_sec = timeout,
242             .tv_usec = 0
243     };
244
245     struct pollfd fds[2], *ifd, *ofd;
246     nfds_t fdmax;
247
248     ifd = &fds[0];
249     ofd = &fds[1];
250
251     for(; host->rrdpush_enabled && !netdata_exit ;) {
252
253         if(unlikely(host->rrdpush_socket == -1)) {
254             // stop appending data into rrdpush_buffer
255             // they will be lost, so there is no point to do it
256             host->rrdpush_connected = 0;
257
258             info("STREAM [send to %s]: connecting...", remote_netdata_config);
259             host->rrdpush_socket = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE);
260
261             if(unlikely(host->rrdpush_socket == -1)) {
262                 error("STREAM [send to %s]: failed to connect", remote_netdata_config);
263                 sleep(reconnect_delay);
264                 continue;
265             }
266
267             info("STREAM [send to %s]: initializing communication...", connected_to);
268
269             char http[1000 + 1];
270             snprintfz(http, 1000,
271                     "STREAM key=%s&hostname=%s&machine_guid=%s&os=%s&update_every=%d HTTP/1.1\r\n"
272                     "User-Agent: netdata-push-service/%s\r\n"
273                     "Accept: */*\r\n\r\n"
274                       , api_key
275                       , localhost->hostname
276                       , localhost->machine_guid
277                       , localhost->os
278                       , default_rrd_update_every
279                       , program_version
280             );
281
282             if(send_timeout(host->rrdpush_socket, http, strlen(http), 0, timeout) == -1) {
283                 close(host->rrdpush_socket);
284                 host->rrdpush_socket = -1;
285                 error("STREAM [send to %s]: failed to send http header to netdata", connected_to);
286                 sleep(reconnect_delay);
287                 continue;
288             }
289
290             info("STREAM [send to %s]: waiting response from remote netdata...", connected_to);
291
292             if(recv_timeout(host->rrdpush_socket, http, 1000, 0, timeout) == -1) {
293                 close(host->rrdpush_socket);
294                 host->rrdpush_socket = -1;
295                 error("STREAM [send to %s]: failed to initialize communication", connected_to);
296                 sleep(reconnect_delay);
297                 continue;
298             }
299
300             if(strncmp(http, "STREAM", 6)) {
301                 close(host->rrdpush_socket);
302                 host->rrdpush_socket = -1;
303                 error("STREAM [send to %s]: server is not replying properly.", connected_to);
304                 sleep(reconnect_delay);
305                 continue;
306             }
307
308             info("STREAM [send to %s]: established communication - sending metrics...", connected_to);
309
310             if(fcntl(host->rrdpush_socket, F_SETFL, O_NONBLOCK) < 0)
311                 error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to);
312
313             rrdpush_flush(host);
314             sent_connection = 0;
315
316             // allow appending data into rrdpush_buffer
317             host->rrdpush_connected = 1;
318         }
319
320         ifd->fd = host->rrdpush_pipe[PIPE_READ];
321         ifd->events = POLLIN;
322         ifd->revents = 0;
323
324         ofd->fd = host->rrdpush_socket;
325         ofd->revents = 0;
326         if(begin < buffer_strlen(host->rrdpush_buffer)) {
327             ofd->events = POLLOUT;
328             fdmax = 2;
329         }
330         else {
331             ofd->events = 0;
332             fdmax = 1;
333         }
334
335         if(netdata_exit) break;
336         int retval = poll(fds, fdmax, timeout * 1000);
337         if(netdata_exit) break;
338
339         if(unlikely(retval == -1)) {
340             if(errno == EAGAIN || errno == EINTR)
341                 continue;
342
343             error("STREAM [send to %s]: failed to poll().", connected_to);
344             close(host->rrdpush_socket);
345             host->rrdpush_socket = -1;
346             break;
347         }
348         else if(unlikely(!retval)) {
349             // timeout
350             continue;
351         }
352
353         if(ifd->revents & POLLIN) {
354             char buffer[1000 + 1];
355             if(read(host->rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
356                 error("STREAM [send to %s]: cannot read from internal pipe.", connected_to);
357         }
358
359         if(ofd->revents & POLLOUT && begin < buffer_strlen(host->rrdpush_buffer)) {
360             rrdpush_sender_lock(host);
361             ssize_t ret = send(host->rrdpush_socket, &host->rrdpush_buffer->buffer[begin], buffer_strlen(host->rrdpush_buffer) - begin, MSG_DONTWAIT);
362             if(ret == -1) {
363                 if(errno != EAGAIN && errno != EINTR) {
364                     error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection);
365                     close(host->rrdpush_socket);
366                     host->rrdpush_socket = -1;
367                 }
368             }
369             else {
370                 sent_connection += ret;
371                 sent_bytes += ret;
372                 begin += ret;
373                 if(begin == buffer_strlen(host->rrdpush_buffer)) {
374                     buffer_flush(host->rrdpush_buffer);
375                     begin = 0;
376                 }
377             }
378             rrdpush_sender_unlock(host);
379         }
380
381         // protection from overflow
382         if(host->rrdpush_buffer->len > max_size) {
383             errno = 0;
384             error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, host->rrdpush_buffer->len, host->rrdpush_buffer->len - begin, sent_bytes, sent_connection);
385             if(host->rrdpush_socket != -1) {
386                 close(host->rrdpush_socket);
387                 host->rrdpush_socket = -1;
388             }
389         }
390     }
391
392 cleanup:
393     debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits.");
394
395     rrdpush_sender_cleanup(host);
396
397     pthread_exit(NULL);
398     return NULL;
399 }
400
401
402 // ----------------------------------------------------------------------------
403 // STREAM receiver
404
405 int rrdpush_receive(int fd, const char *key, const char *hostname, const char *machine_guid, const char *os, int update_every, char *client_ip, char *client_port) {
406     RRDHOST *host;
407     int history = default_rrd_history_entries;
408     RRD_MEMORY_MODE mode = default_rrd_memory_mode;
409     int health_enabled = default_health_enabled;
410     time_t alarms_delay = 60;
411
412     update_every = (int)appconfig_get_number(&stream_config, machine_guid, "update every", update_every);
413     if(update_every < 0) update_every = 1;
414
415     history = (int)appconfig_get_number(&stream_config, key, "default history", history);
416     history = (int)appconfig_get_number(&stream_config, machine_guid, "history", history);
417     if(history < 5) history = 5;
418
419     mode = rrd_memory_mode_id(appconfig_get(&stream_config, key, "default memory mode", rrd_memory_mode_name(mode)));
420     mode = rrd_memory_mode_id(appconfig_get(&stream_config, machine_guid, "memory mode", rrd_memory_mode_name(mode)));
421
422     health_enabled = appconfig_get_boolean_ondemand(&stream_config, key, "health enabled by default", health_enabled);
423     health_enabled = appconfig_get_boolean_ondemand(&stream_config, machine_guid, "health enabled", health_enabled);
424
425     alarms_delay = appconfig_get_number(&stream_config, key, "default postpone alarms on connect seconds", alarms_delay);
426     alarms_delay = appconfig_get_number(&stream_config, machine_guid, "postpone alarms on connect seconds", alarms_delay);
427
428     if(!strcmp(machine_guid, "localhost"))
429         host = localhost;
430     else
431         host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, health_enabled?1:0);
432
433     info("STREAM [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s",
434          client_ip, client_port,
435          hostname, machine_guid,
436          update_every,
437          history,
438          rrd_memory_mode_name(mode),
439          (health_enabled == CONFIG_BOOLEAN_NO)?"disabled":((health_enabled == CONFIG_BOOLEAN_YES)?"enabled":"auto")
440     );
441
442     struct plugind cd = {
443             .enabled = 1,
444             .update_every = default_rrd_update_every,
445             .pid = 0,
446             .serial_failures = 0,
447             .successful_collections = 0,
448             .obsolete = 0,
449             .started_t = now_realtime_sec(),
450             .next = NULL,
451     };
452
453     // put the client IP and port into the buffers used by plugins.d
454     snprintfz(cd.id,           CONFIG_MAX_NAME,  "%s:%s", client_ip, client_port);
455     snprintfz(cd.filename,     FILENAME_MAX,     "%s:%s", client_ip, client_port);
456     snprintfz(cd.fullfilename, FILENAME_MAX,     "%s:%s", client_ip, client_port);
457     snprintfz(cd.cmd,          PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port);
458
459     info("STREAM [receive from [%s]:%s]: initializing communication...", client_ip, client_port);
460     if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) {
461         error("STREAM [receive from [%s]:%s]: cannot send STREAM command.", client_ip, client_port);
462         return 0;
463     }
464
465     // remove the non-blocking flag from the socket
466     if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
467         error("STREAM [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", client_ip, client_port, fd);
468
469     // convert the socket to a FILE *
470     FILE *fp = fdopen(fd, "r");
471     if(!fp) {
472         error("STREAM [receive from [%s]:%s]: failed to get a FILE for FD %d.", client_ip, client_port, fd);
473         return 0;
474     }
475
476     rrdhost_wrlock(host);
477     host->use_counter++;
478     if(health_enabled != CONFIG_BOOLEAN_NO)
479         host->health_delay_up_to = now_realtime_sec() + alarms_delay;
480     rrdhost_unlock(host);
481
482     // call the plugins.d processor to receive the metrics
483     info("STREAM [receive from [%s]:%s]: receiving metrics... (host '%s', machine GUID '%s').", client_ip, client_port, host->hostname, host->machine_guid);
484     size_t count = pluginsd_process(host, &cd, fp, 1);
485     error("STREAM [receive from [%s]:%s]: disconnected (host '%s', machine GUID '%s', completed updates %zu).", client_ip, client_port, host->hostname, host->machine_guid, count);
486
487     rrdhost_wrlock(host);
488     host->use_counter--;
489     if(!host->use_counter && health_enabled == CONFIG_BOOLEAN_AUTO)
490         host->health_enabled = 0;
491     rrdhost_unlock(host);
492
493     // cleanup
494     fclose(fp);
495
496     return (int)count;
497 }
498
499 struct rrdpush_thread {
500     int fd;
501     char *key;
502     char *hostname;
503     char *machine_guid;
504     char *os;
505     char *client_ip;
506     char *client_port;
507     int update_every;
508 };
509
510 void *rrdpush_receiver_thread(void *ptr) {
511     struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr;
512
513     if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
514         error("STREAM [receive]: cannot set pthread cancel type to DEFERRED.");
515
516     if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
517         error("STREAM [receive]: cannot set pthread cancel state to ENABLE.");
518
519
520     info("STREAM [%s]:%s: receive thread created (task id %d)", rpt->client_ip, rpt->client_port, gettid());
521     rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port);
522     info("STREAM [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->client_ip, rpt->client_port, gettid());
523
524     close(rpt->fd);
525     freez(rpt->key);
526     freez(rpt->hostname);
527     freez(rpt->machine_guid);
528     freez(rpt->os);
529     freez(rpt->client_ip);
530     freez(rpt->client_port);
531     freez(rpt);
532
533     pthread_exit(NULL);
534     return NULL;
535 }
536
537 static inline int rrdpush_receive_validate_api_key(const char *key) {
538     return appconfig_get_boolean(&stream_config, key, "enabled", 0);
539 }
540
541 void rrdpush_sender_thread_spawn(RRDHOST *host) {
542     if(pthread_create(&host->rrdpush_thread, NULL, rrdpush_sender_thread, (void *)host))
543         error("STREAM [send for host %s]: failed to create new thread for client.", host->hostname);
544
545     else if(pthread_detach(host->rrdpush_thread))
546         error("STREAM [send for host %s]: cannot request detach newly created thread.", host->hostname);
547
548     host->rrdpush_spawn = 1;
549 }
550
551 int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) {
552     (void)host;
553     
554     info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port);
555
556     char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL;
557     int update_every = default_rrd_update_every;
558
559     while(url) {
560         char *value = mystrsep(&url, "?&");
561         if(!value || !*value) continue;
562
563         char *name = mystrsep(&value, "=");
564         if(!name || !*name) continue;
565         if(!value || !*value) continue;
566
567         if(!strcmp(name, "key"))
568             key = value;
569         else if(!strcmp(name, "hostname"))
570             hostname = value;
571         else if(!strcmp(name, "machine_guid"))
572             machine_guid = value;
573         else if(!strcmp(name, "update_every"))
574             update_every = (int)strtoul(value, NULL, 0);
575         else if(!strcmp(name, "os"))
576             os = value;
577     }
578
579     if(!key || !*key) {
580         error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
581         buffer_flush(w->response.data);
582         buffer_sprintf(w->response.data, "You need an API key for this request.");
583         return 401;
584     }
585
586     if(!hostname || !*hostname) {
587         error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
588         buffer_flush(w->response.data);
589         buffer_sprintf(w->response.data, "You need to send a hostname too.");
590         return 400;
591     }
592
593     if(!machine_guid || !*machine_guid) {
594         error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
595         buffer_flush(w->response.data);
596         buffer_sprintf(w->response.data, "You need to send a machine GUID too.");
597         return 400;
598     }
599
600     if(!rrdpush_receive_validate_api_key(key)) {
601         error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
602         buffer_flush(w->response.data);
603         buffer_sprintf(w->response.data, "Your API key is not permitted access.");
604         return 401;
605     }
606
607     if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
608         error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
609         buffer_flush(w->response.data);
610         buffer_sprintf(w->response.data, "Your machine guide is not permitted access.");
611         return 404;
612     }
613
614     struct rrdpush_thread *rpt = mallocz(sizeof(struct rrdpush_thread));
615     rpt->fd           = w->ifd;
616     rpt->key          = strdupz(key);
617     rpt->hostname     = strdupz(hostname);
618     rpt->machine_guid = strdupz(machine_guid);
619     rpt->os           = strdupz(os);
620     rpt->client_ip    = strdupz(w->client_ip);
621     rpt->client_port  = strdupz(w->client_port);
622     rpt->update_every = update_every;
623     pthread_t thread;
624
625     debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port);
626
627     if(pthread_create(&thread, NULL, rrdpush_receiver_thread, (void *)rpt))
628         error("STREAM [receive from [%s]:%s]: failed to create new thread for client.", w->client_ip, w->client_port);
629
630     else if(pthread_detach(thread))
631         error("STREAM [receive from [%s]:%s]: cannot request detach newly created thread.", w->client_ip, w->client_port);
632
633     // prevent the caller from closing the streaming socket
634     if(w->ifd == w->ofd)
635         w->ifd = w->ofd = -1;
636     else
637         w->ifd = -1;
638
639     buffer_flush(w->response.data);
640     return 200;
641 }