return 0;
}
+static inline int process_graphite_response(BUFFER *b) {
+ info("Received %zu bytes from graphite backend. Ignoring them.", buffer_strlen(b));
+ buffer_flush(b);
+ return 0;
+}
+
+static inline int process_opentsdb_response(BUFFER *b) {
+ info("Received %zu bytes from opentsdb backend. Ignoring them.", buffer_strlen(b));
+ buffer_flush(b);
+ return 0;
+}
+
void *backends_main(void *ptr) {
(void)ptr;
- BUFFER *b = buffer_create(1);
- int (*formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options);
+ BUFFER *b = buffer_create(1), *response = buffer_create(1);
+ int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL;
+ int (*backend_response_checker)(BUFFER *b) = NULL;
info("BACKEND thread created with task id %d", gettid());
if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
default_port = 2003;
if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
- formatter = format_dimension_collected_graphite_plaintext;
+ backend_request_formatter = format_dimension_collected_graphite_plaintext;
else
- formatter = format_dimension_stored_graphite_plaintext;
+ backend_request_formatter = format_dimension_stored_graphite_plaintext;
+
+ backend_response_checker = process_graphite_response;
}
else if(!strcmp(type, "opentsdb") || !strcmp(type, "opentsdb:telnet")) {
default_port = 4242;
if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
- formatter = format_dimension_collected_opentsdb_telnet;
+ backend_request_formatter = format_dimension_collected_opentsdb_telnet;
else
- formatter = format_dimension_stored_opentsdb_telnet;
+ backend_request_formatter = format_dimension_stored_opentsdb_telnet;
+
+ backend_response_checker = process_opentsdb_response;
}
else {
error("Unknown backend type '%s'", type);
goto cleanup;
}
+ if(backend_request_formatter == NULL || backend_response_checker == NULL) {
+ error("backend is misconfigured - disabling it.");
+ goto cleanup;
+ }
+
if(timeoutms < 1) {
error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
timeoutms = frequency * 2 * 1000;
RRDDIM *rd;
for(rd = st->dimensions; rd ;rd = rd->next) {
if(rd->last_collected_time.tv_sec >= after)
- chart_buffered_metrics += formatter(b, prefix, &localhost, hostname, st, rd, after, before, options);
+ chart_buffered_metrics += backend_request_formatter(b, prefix, &localhost, hostname, st, rd, after, before, options);
}
pthread_rwlock_unlock(&st->rwlock);
if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
+ // ------------------------------------------------------------------------
+
chart_buffered_bytes = (collected_number)buffer_strlen(b);
// reset the monitoring chart counters
//fprintf(stderr, "after = %lu, before = %lu\n", after, before);
// ------------------------------------------------------------------------
- // connect to a backend server
+ // if we are connected, receive a response, without blocking
+
+ if(likely(sock != -1)) {
+ if(likely(fcntl(sock, F_SETFL, O_NONBLOCK) >= 0)) {
+ errno = 0;
+
+ // loop through to collect all data
+ while(sock != -1 && errno != EWOULDBLOCK) {
+ buffer_need_bytes(response, 4096);
+
+ ssize_t r = recv(sock, &response->buffer[response->len], response->size - response->len
+ , O_NONBLOCK);
+ if(likely(r > 0)) {
+ // we received some data
+ response->len += r;
+ }
+ else if(r == 0) {
+ error("Backend '%s' closed the socket", destination);
+ close(sock);
+ sock = -1;
+ }
+ else {
+ // failed to receive data
+ if(errno != EAGAIN && errno != EWOULDBLOCK)
+ error("Cannot receive data from backend '%s'.", destination);
+ }
+ }
+
+ if(sock != -1) {
+ // unset O_NONBLOCK
+ int val = fcntl(sock, F_GETFL, 0);
+ if(likely(val >= 0)) {
+ int flags = O_NONBLOCK;
+ val &= ~flags;
+ if(unlikely(fcntl(sock, F_SETFL, val) < 0))
+ error("Cannot unset O_NONBLOCK from backend's '%s' socket.", destination);
+ }
+ else
+ error("Cannot get active flags from backend's '%s' socket.", destination);
+ }
+
+ // if we received data, process them
+ if(buffer_strlen(response))
+ backend_response_checker(response);
+ }
+ else {
+ error("Cannot set O_NONBLOCK from backend's '%s' socket. Receiving data from backend skipped.", destination);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // if we are not connected, connect to a backend server
if(unlikely(sock == -1)) {
usec_t start_ut = now_realtime_usec();
if(unlikely(netdata_exit)) break;
// ------------------------------------------------------------------------
- // send our buffer to the backend server
+ // if we are connected, send our buffer to the backend server
if(likely(sock != -1)) {
size_t len = buffer_strlen(b);
#ifdef MSG_NOSIGNAL
flags += MSG_NOSIGNAL;
#endif
+
ssize_t written = send(sock, buffer_tostring(b), len, flags);
chart_backend_latency += now_realtime_usec() - start_ut;
if(written != -1 && (size_t)written == len) {
if(sock != -1)
close(sock);
+ buffer_free(b);
+ buffer_free(response);
+
info("BACKEND thread exiting");
pthread_exit(NULL);