From: Costa Tsaousis (ktsaou) Date: Thu, 23 Feb 2017 18:35:15 +0000 (+0200) Subject: unified streaming logging X-Git-Tag: ab-debian_0.20170227.01-0ab1~1^2~14^2~31 X-Git-Url: https://arthur.barton.de/gitweb/?p=netdata.git;a=commitdiff_plain;h=4c0f57dbe7699faeb52b4eed04f7604db846f459 unified streaming logging --- diff --git a/src/rrdpush.c b/src/rrdpush.c index 417b9511..a6b487e3 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -3,7 +3,7 @@ int rrdpush_enabled = 0; int rrdpush_exclusive = 1; -static char *central_netdata = NULL; +static char *remote_netdata_config = NULL; static char *api_key = NULL; #define CONNECTED_TO_SIZE 100 @@ -25,7 +25,7 @@ static BUFFER *rrdpush_buffer = NULL; // (rrdpush_pipe[PIPE_WRITE], rrdpush_buffer static pthread_mutex_t rrdpush_mutex = PTHREAD_MUTEX_INITIALIZER; -// if the streaming thread is connected to a central netdata +// if the streaming thread is connected to a remote netdata // this is set to 1, otherwise 0. static volatile int rrdpush_connected = 0; @@ -131,15 +131,18 @@ void rrdset_done_push(RRDSET *st) { rrdpush_lock(); if(unlikely(!rrdpush_buffer || !rrdpush_connected)) { - if(!error_shown) - error("STREAM: not ready - discarding collected metrics."); + if(unlikely(!error_shown)) + error("STREAM [send]: not ready - discarding collected metrics."); error_shown = 1; rrdpush_unlock(); return; } - error_shown = 0; + else if(unlikely(error_shown)) { + error("STREAM [send]: ready - sending metrics..."); + error_shown = 0; + } rrdset_rdlock(st); if(need_to_send_chart_definition(st)) @@ -150,7 +153,7 @@ void rrdset_done_push(RRDSET *st) { // signal the sender there are more data if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1) - error("STREAM: cannot write to internal pipe"); + error("STREAM [send]: cannot write to internal pipe"); rrdpush_unlock(); } @@ -158,7 +161,7 @@ void rrdset_done_push(RRDSET *st) { static inline void rrdpush_flush(void) { rrdpush_lock(); if(buffer_strlen(rrdpush_buffer)) - error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer)); + error("STREAM [send]: discarding %zu bytes of metrics already in the buffer.", buffer_strlen(rrdpush_buffer)); buffer_flush(rrdpush_buffer); reset_all_charts(); @@ -168,10 +171,10 @@ static inline void rrdpush_flush(void) { int rrdpush_init() { rrdpush_enabled = config_get_boolean("stream", "enabled", rrdpush_enabled); rrdpush_exclusive = config_get_boolean("stream", "exclusive", rrdpush_exclusive); - central_netdata = config_get("stream", "stream metrics to", ""); + remote_netdata_config = config_get("stream", "stream metrics to", ""); api_key = config_get("stream", "api key", ""); - if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key) { + if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) { rrdpush_enabled = 0; rrdpush_exclusive = 0; } @@ -182,13 +185,13 @@ int rrdpush_init() { void *rrdpush_sender_thread(void *ptr) { struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr; - info("STREAM: central netdata push thread created with task id %d", gettid()); + info("STREAM [send]: thread created (task id %d)", gettid()); if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("STREAM: cannot set pthread cancel type to DEFERRED."); + error("STREAM [send]: cannot set pthread cancel type to DEFERRED."); if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM: cannot set pthread cancel state to ENABLE."); + error("STREAM [send]: cannot set pthread cancel state to ENABLE."); int timeout = (int)config_get_number("stream", "timeout seconds", 60); int default_port = (int)config_get_number("stream", "default port", 19999); @@ -196,14 +199,15 @@ void *rrdpush_sender_thread(void *ptr) { unsigned int reconnect_delay = (unsigned int)config_get_number("stream", "reconnect delay seconds", 5); remote_clock_resync_iterations = (unsigned int)config_get_number("stream", "initial clock resync iterations", remote_clock_resync_iterations); int sock = -1; + char connected_to[CONNECTED_TO_SIZE + 1] = ""; - if(!rrdpush_enabled || !central_netdata || !*central_netdata || !api_key || !*api_key) + if(!rrdpush_enabled || !remote_netdata_config || !*remote_netdata_config || !api_key || !*api_key) goto cleanup; // initialize rrdpush globals rrdpush_buffer = buffer_create(1); rrdpush_connected = 0; - if(pipe(rrdpush_pipe) == -1) fatal("STREAM: cannot create required pipe."); + if(pipe(rrdpush_pipe) == -1) fatal("STREAM [send]: cannot create required pipe."); // initialize local variables size_t begin = 0; @@ -222,8 +226,6 @@ void *rrdpush_sender_thread(void *ptr) { ifd = &fds[0]; ofd = &fds[1]; - char connected_to[CONNECTED_TO_SIZE + 1]; - for(;;) { if(netdata_exit) break; @@ -232,16 +234,16 @@ void *rrdpush_sender_thread(void *ptr) { // they will be lost, so there is no point to do it rrdpush_connected = 0; - info("STREAM: connecting to central netdata at: %s", central_netdata); - sock = connect_to_one_of(central_netdata, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); + info("STREAM [send to %s]: connecting...", remote_netdata_config); + sock = connect_to_one_of(remote_netdata_config, default_port, &tv, &reconnects_counter, connected_to, CONNECTED_TO_SIZE); if(unlikely(sock == -1)) { - error("STREAM: failed to connect to central netdata at: %s", central_netdata); + error("STREAM [send to %s]: failed to connect", remote_netdata_config); sleep(reconnect_delay); continue; } - info("STREAM: initializing communication to central netdata at: %s", connected_to); + info("STREAM [send to %s]: initializing communication...", connected_to); char http[1000 + 1]; snprintfz(http, 1000, @@ -259,17 +261,17 @@ void *rrdpush_sender_thread(void *ptr) { if(send_timeout(sock, http, strlen(http), 0, timeout) == -1) { close(sock); sock = -1; - error("STREAM: failed to send http header to netdata at: %s", connected_to); + error("STREAM [send to %s]: failed to send http header to netdata", connected_to); sleep(reconnect_delay); continue; } - info("STREAM: Waiting for STREAM from central netdata at: %s", connected_to); + info("STREAM [send to %s]: waiting response from remote netdata...", connected_to); if(recv_timeout(sock, http, 1000, 0, timeout) == -1) { close(sock); sock = -1; - error("STREAM: failed to receive STREAM from netdata at: %s", connected_to); + error("STREAM [send to %s]: failed to initialize communication", connected_to); sleep(reconnect_delay); continue; } @@ -277,15 +279,15 @@ void *rrdpush_sender_thread(void *ptr) { if(strncmp(http, "STREAM", 6)) { close(sock); sock = -1; - error("STREAM: server at %s, did not send STREAM", connected_to); + error("STREAM [send to %s]: server is not replying properly.", connected_to); sleep(reconnect_delay); continue; } - info("STREAM: Established communication with central netdata at: %s - sending metrics...", connected_to); + info("STREAM [send to %s]: established communication - sending metrics...", connected_to); if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0) - error("STREAM: cannot set non-blocking mode for socket."); + error("STREAM [send to %s]: cannot set non-blocking mode for socket.", connected_to); rrdpush_flush(); sent_connection = 0; @@ -317,7 +319,7 @@ void *rrdpush_sender_thread(void *ptr) { if(errno == EAGAIN || errno == EINTR) continue; - error("STREAM: Failed to poll()."); + error("STREAM [send to %s]: failed to poll().", connected_to); close(sock); sock = -1; break; @@ -330,21 +332,15 @@ void *rrdpush_sender_thread(void *ptr) { if(ifd->revents & POLLIN) { char buffer[1000 + 1]; if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) - error("STREAM: Cannot read from internal pipe."); + error("STREAM [send to %s]: cannot read from internal pipe.", connected_to); } if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) { - // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin); - - // fprintf(stderr, "PUSH BEGIN\n"); - // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr); - // fprintf(stderr, "\nPUSH END\n"); - rrdpush_lock(); ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT); if(ret == -1) { if(errno != EAGAIN && errno != EINTR) { - error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", connected_to, sent_connection); + error("STREAM [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", connected_to, sent_connection); close(sock); sock = -1; } @@ -364,7 +360,7 @@ void *rrdpush_sender_thread(void *ptr) { // protection from overflow if(rrdpush_buffer->len > max_size) { errno = 0; - error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection); + error("STREAM [send to %s]: too many data pending - buffer is %zu bytes long, %zu unsent - we have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", connected_to, rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection); if(sock != -1) { close(sock); sock = -1; @@ -373,7 +369,7 @@ void *rrdpush_sender_thread(void *ptr) { } cleanup: - debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits."); + debug(D_WEB_CLIENT, "STREAM [send]: sending thread exits."); // make sure the data collection threads do not write data rrdpush_connected = 0; @@ -427,7 +423,7 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m else host = rrdhost_find_or_create(hostname, machine_guid, os, update_every, history, mode, health_enabled?1:0); - info("STREAM request from client '%s:%s' for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s", + info("STREAM [receive from [%s]:%s]: metrics for host '%s' with machine_guid '%s': update every = %d, history = %d, memory mode = %s, health %s", client_ip, client_port, hostname, machine_guid, update_every, @@ -453,20 +449,20 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", client_ip, client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", client_ip, client_port); - info("STREAM [%s]:%s: sending STREAM to initiate streaming...", client_ip, client_port); + info("STREAM [receive from [%s]:%s]: initializing communication...", client_ip, client_port); if(send_timeout(fd, "STREAM", 6, 0, 60) != 6) { - error("STREAM [%s]:%s: cannot send STREAM.", client_ip, client_port); + error("STREAM [receive from [%s]:%s]: cannot send STREAM command.", client_ip, client_port); return 0; } // remove the non-blocking flag from the socket if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) & ~O_NONBLOCK) == -1) - error("STREAM [%s]:%s: cannot remove the non-blocking flag from socket %d", client_ip, client_port, fd); + error("STREAM [receive from [%s]:%s]: cannot remove the non-blocking flag from socket %d", client_ip, client_port, fd); // convert the socket to a FILE * FILE *fp = fdopen(fd, "r"); if(!fp) { - error("STREAM [%s]:%s: failed to get a FILE for FD %d.", client_ip, client_port, fd); + error("STREAM [receive from [%s]:%s]: failed to get a FILE for FD %d.", client_ip, client_port, fd); return 0; } @@ -477,9 +473,9 @@ int rrdpush_receive(int fd, const char *key, const char *hostname, const char *m rrdhost_unlock(host); // call the plugins.d processor to receive the metrics - info("STREAM [%s]:%s: connecting client to plugins.d (host '%s', machine GUID '%s').", client_ip, client_port, host->hostname, host->machine_guid); + info("STREAM [receive from [%s]:%s]: connecting client to plugins.d (host '%s', machine GUID '%s').", client_ip, client_port, host->hostname, host->machine_guid); size_t count = pluginsd_process(host, &cd, fp, 1); - error("STREAM [%s]:%s: client disconnected (host '%s', machine GUID '%s', completed updates %zu).", client_ip, client_port, host->hostname, host->machine_guid, count); + error("STREAM [receive from [%s]:%s]: client disconnected (host '%s', machine GUID '%s', completed updates %zu).", client_ip, client_port, host->hostname, host->machine_guid, count); rrdhost_wrlock(host); host->use_counter--; @@ -508,15 +504,15 @@ void *rrdpush_receiver_thread(void *ptr) { struct rrdpush_thread *rpt = (struct rrdpush_thread *)ptr; if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0) - error("STREAM: cannot set pthread cancel type to DEFERRED."); + error("STREAM [receive]: cannot set pthread cancel type to DEFERRED."); if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0) - error("STREAM: cannot set pthread cancel state to ENABLE."); + error("STREAM [receive]: cannot set pthread cancel state to ENABLE."); info("STREAM [%s]:%s: receive thread created (task id %d)", rpt->client_ip, rpt->client_port, gettid()); rrdpush_receive(rpt->fd, rpt->key, rpt->hostname, rpt->machine_guid, rpt->os, rpt->update_every, rpt->client_ip, rpt->client_port); - info("STREAM [%s]:%s: receive thread ended (task id %d)", rpt->client_ip, rpt->client_port, gettid()); + info("STREAM [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->client_ip, rpt->client_port, gettid()); close(rpt->fd); freez(rpt->key); @@ -538,7 +534,7 @@ static inline int rrdpush_receive_validate_api_key(const char *key) { int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url) { (void)host; - info("STREAM [%s]:%s: client connection.", w->client_ip, w->client_port); + info("STREAM [receive from [%s]:%s]: new client connection.", w->client_ip, w->client_port); char *key = NULL, *hostname = NULL, *machine_guid = NULL, *os = NULL; int update_every = default_rrd_update_every; @@ -564,35 +560,35 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url } if(!key || !*key) { - error("STREAM [%s]:%s: request without an API key. Forbidding access.", w->client_ip, w->client_port); + error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "You need an API key for this request."); return 401; } if(!hostname || !*hostname) { - error("STREAM [%s]:%s: request without a hostname. Forbidding access.", w->client_ip, w->client_port); + error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "You need to send a hostname too."); return 400; } if(!machine_guid || !*machine_guid) { - error("STREAM [%s]:%s: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port); + error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "You need to send a machine GUID too."); return 400; } if(!rrdpush_receive_validate_api_key(key)) { - error("STREAM [%s]:%s: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); + error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your API key is not permitted access."); return 401; } if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) { - error("STREAM [%s]:%s: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); + error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid); buffer_flush(w->response.data); buffer_sprintf(w->response.data, "Your machine guide is not permitted access."); return 404; @@ -609,13 +605,13 @@ int rrdpush_receiver_thread_spawn(RRDHOST *host, struct web_client *w, char *url rpt->update_every = update_every; pthread_t thread; - debug(D_SYSTEM, "Starting STREAM thread for client [%s]:%s.", w->client_ip, w->client_port); + debug(D_SYSTEM, "STREAM [receive from [%s]:%s]: starting receiving thread.", w->client_ip, w->client_port); if(pthread_create(&thread, NULL, rrdpush_receiver_thread, (void *)rpt)) - error("failed to create new STREAM thread for client [%s]:%s.", w->client_ip, w->client_port); + error("STREAM [receive from [%s]:%s]: failed to create new thread for client.", w->client_ip, w->client_port); else if(pthread_detach(thread)) - error("Cannot request detach newly created thread for client [%s]:%s.", w->client_ip, w->client_port); + error("STREAM [receive from [%s]:%s]: cannot request detach newly created thread.", w->client_ip, w->client_port); // prevent the caller from closing the streaming socket if(w->ifd == w->ofd)