]> arthur.barton.de Git - netdata.git/commitdiff
Merge pull request #1815 from lfdominguez/master
authorCosta Tsaousis <costa@tsaousis.gr>
Sun, 26 Feb 2017 19:05:09 +0000 (21:05 +0200)
committerGitHub <noreply@github.com>
Sun, 26 Feb 2017 19:05:09 +0000 (21:05 +0200)
New JSON backend

1  2 
src/backends.c

diff --combined src/backends.c
index 94e8f5f671802c387d33192f86838ea40df7fe72,de267de5ffe6f8c9dc1a7db8780753f64a525fc3..c7b086459a84b898a858736b7cd995cb24b7c260
@@@ -93,6 -93,89 +93,89 @@@ static inline int format_dimension_stor
      return 0;
  }
  
+ static inline int format_dimension_collected_json_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) {
+     (void)host;
+     (void)after;
+     (void)before;
+     (void)options;
+     buffer_sprintf(b, "{"
+         "\"prefix\":\"%s\","
+         "\"hostname\":\"%s\","
+         "\"chart_id\":\"%s\","
+         "\"chart_name\":\"%s\","
+         "\"family\":\"%s\","
+         "\"context\": \"%s\","
+         "\"type\":\"%s\","
+         "\"units\": \"%s\","
+         "\"id\":\"%s\","
+         "\"name\":\"%s\","
+         "\"value\":" COLLECTED_NUMBER_FORMAT ","
+         "\"timestamp\": %u}\n", 
+             prefix,
+             hostname,
+             
+             st->id,
+             st->name,
+             st->family,
+             st->context,
+             st->type,
+             st->units,
+             rd->id,
+             rd->name,
+             rd->last_collected_value, 
+             
+             (uint32_t)rd->last_collected_time.tv_sec
+     );
+     return 1;
+ }
+ static inline int format_dimension_stored_json_plaintext(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) {
+     (void)host;
+     calculated_number value = backend_calculate_value_from_stored_data(st, rd, after, before, options);
+     if(!isnan(value)) {
+         buffer_sprintf(b, "{"
+             "\"prefix\":\"%s\","
+             "\"hostname\":\"%s\","
+             "\"chart_id\":\"%s\","
+             "\"chart_name\":\"%s\","
+             "\"family\":\"%s\","
+             "\"context\": \"%s\","
+             "\"type\":\"%s\","
+             "\"units\": \"%s\","
+             "\"id\":\"%s\","
+             "\"name\":\"%s\","
+             "\"value\":" CALCULATED_NUMBER_FORMAT ","
+             "\"timestamp\": %u}\n", 
+                 prefix,
+                 hostname,
+                 
+                 st->id,
+                 st->name,
+                 st->family,
+                 st->context,
+                 st->type,
+                 st->units,
+                 rd->id,
+                 rd->name,
+                 value, 
+                 
+                 (uint32_t)before
+         );
+         
+         return 1;
+     }
+     return 0;
+ }
  static inline int process_graphite_response(BUFFER *b) {
      char sample[1024];
      const char *s = buffer_tostring(b);
      return 0;
  }
  
+ static inline int process_json_response(BUFFER *b) {
+     char sample[1024];
+     const char *s = buffer_tostring(b);
+     char *d = sample, *e = &sample[sizeof(sample) - 1];
+     for(; *s && d < e ;s++) {
+         char c = *s;
+         if(unlikely(!isprint(c))) c = ' ';
+         *d++ = c;
+     }
+     *d = '\0';
+     info("Received %zu bytes from json backend. Ignoring them. Sample: '%s'", buffer_strlen(b), sample);
+     buffer_flush(b);
+     return 0;
+ }
  static inline int process_opentsdb_response(BUFFER *b) {
      char sample[1024];
      const char *s = buffer_tostring(b);
  }
  
  void *backends_main(void *ptr) {
 +    int default_port = 0;
 +    int sock = -1;
      struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  
      BUFFER *b = buffer_create(1), *response = buffer_create(1);
 -    int (*backend_request_formatter)(BUFFER *b, const char *prefix, RRDHOST *host, const char *hostname, RRDSET *st, RRDDIM *rd, time_t after, time_t before, uint32_t options) = NULL;
 -    int (*backend_response_checker)(BUFFER *b) = NULL;
 +    int (*backend_request_formatter)(BUFFER *, const char *, RRDHOST *, const char *, RRDSET *, RRDDIM *, time_t, time_t, uint32_t) = NULL;
 +    int (*backend_response_checker)(BUFFER *) = NULL;
  
      info("BACKEND thread created with task id %d", gettid());
  
              .tv_sec = 0,
              .tv_usec = 0
      };
 -    int default_port = 0;
 -    int sock = -1;
      uint32_t options;
 -    int enabled = config_get_boolean("backend", "enabled", 0);
 -    const char *source = config_get("backend", "data source", "average");
 -    const char *type = config_get("backend", "type", "graphite");
 -    const char *destination = config_get("backend", "destination", "localhost");
 -    const char *prefix = config_get("backend", "prefix", "netdata");
 -    const char *hostname = config_get("backend", "hostname", localhost.hostname);
 -    int frequency = (int)config_get_number("backend", "update every", 10);
 -    int buffer_on_failures = (int)config_get_number("backend", "buffer on failures", 10);
 -    long timeoutms = config_get_number("backend", "timeout ms", frequency * 2 * 1000);
 +    int enabled             = config_get_boolean(CONFIG_SECTION_BACKEND, "enabled", 0);
 +    const char *source      = config_get(CONFIG_SECTION_BACKEND, "data source", "average");
 +    const char *type        = config_get(CONFIG_SECTION_BACKEND, "type", "graphite");
 +    const char *destination = config_get(CONFIG_SECTION_BACKEND, "destination", "localhost");
 +    const char *prefix      = config_get(CONFIG_SECTION_BACKEND, "prefix", "netdata");
 +    const char *hostname    = config_get(CONFIG_SECTION_BACKEND, "hostname", localhost->hostname);
 +    int frequency           = (int)config_get_number(CONFIG_SECTION_BACKEND, "update every", 10);
 +    int buffer_on_failures  = (int)config_get_number(CONFIG_SECTION_BACKEND, "buffer on failures", 10);
 +    long timeoutms          = config_get_number(CONFIG_SECTION_BACKEND, "timeout ms", frequency * 2 * 1000);
  
      // ------------------------------------------------------------------------
      // validate configuration options
      // and prepare for sending data to our backend
 +
      if(!enabled || frequency < 1)
          goto cleanup;
  
          goto cleanup;
      }
  
 +    if(timeoutms < 1) {
 +        error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
 +        timeoutms = frequency * 2 * 1000;
 +    }
 +    timeout.tv_sec  = (timeoutms * 1000) / 1000000;
 +    timeout.tv_usec = (timeoutms * 1000) % 1000000;
 +
 +
 +    // ------------------------------------------------------------------------
 +    // select the backend type
 +
      if(!strcmp(type, "graphite") || !strcmp(type, "graphite:plaintext")) {
          default_port = 2003;
          if(options == BACKEND_SOURCE_DATA_AS_COLLECTED)
  
          backend_response_checker = process_opentsdb_response;
      }
+     else if (!strcmp(type, "json") || !strcmp(type, "json:plaintext"))
+     {
+         default_port = 5448;
+         if (options == BACKEND_SOURCE_DATA_AS_COLLECTED)
+         {
+             backend_request_formatter = format_dimension_collected_json_plaintext;
+         }
+         else
+         {
+             backend_request_formatter = format_dimension_stored_json_plaintext;
+         }
+         backend_response_checker = process_json_response;
+     }
      else {
          error("Unknown backend type '%s'", type);
          goto cleanup;
          goto cleanup;
      }
  
 -    if(timeoutms < 1) {
 -        error("BACKED invalid timeout %ld ms given. Assuming %d ms.", timeoutms, frequency * 2 * 1000);
 -        timeoutms = frequency * 2 * 1000;
 -    }
 -    timeout.tv_sec  = (timeoutms * 1000) / 1000000;
 -    timeout.tv_usec = (timeoutms * 1000) % 1000000;
  
      // ------------------------------------------------------------------------
 -    // prepare the charts for monitoring the backend
 +    // prepare the charts for monitoring the backend operation
  
      struct rusage thread;
  
              chart_backend_reconnects = 0,
              chart_backend_latency = 0;
  
 -    RRDSET *chart_metrics = rrdset_find("netdata.backend_metrics");
 -    if(!chart_metrics) {
 -        chart_metrics = rrdset_create("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE);
 -        rrddim_add(chart_metrics, "buffered", NULL,  1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_metrics, "lost",     NULL,  1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_metrics, "sent",     NULL,  1, 1, RRDDIM_ABSOLUTE);
 -    }
 +    RRDSET *chart_metrics = rrdset_create_localhost("netdata", "backend_metrics", NULL, "backend", NULL, "Netdata Buffered Metrics", "metrics", 130600, frequency, RRDSET_TYPE_LINE);
 +    rrddim_add(chart_metrics, "buffered", NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_metrics, "lost",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_metrics, "sent",     NULL,  1, 1, RRD_ALGORITHM_ABSOLUTE);
  
 -    RRDSET *chart_bytes = rrdset_find("netdata.backend_bytes");
 -    if(!chart_bytes) {
 -        chart_bytes = rrdset_create("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA);
 -        rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_bytes, "lost",     NULL, 1, 1024, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_bytes, "sent",     NULL, 1, 1024, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRDDIM_ABSOLUTE);
 -    }
 +    RRDSET *chart_bytes = rrdset_create_localhost("netdata", "backend_bytes", NULL, "backend", NULL, "Netdata Backend Data Size", "KB", 130610, frequency, RRDSET_TYPE_AREA);
 +    rrddim_add(chart_bytes, "buffered", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_bytes, "lost",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_bytes, "sent",     NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_bytes, "received", NULL, 1, 1024, RRD_ALGORITHM_ABSOLUTE);
  
 -    RRDSET *chart_ops = rrdset_find("netdata.backend_ops");
 -    if(!chart_ops) {
 -        chart_ops = rrdset_create("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE);
 -        rrddim_add(chart_ops, "write",     NULL, 1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_ops, "discard",   NULL, 1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_ops, "failure",   NULL, 1, 1, RRDDIM_ABSOLUTE);
 -        rrddim_add(chart_ops, "read",      NULL, 1, 1, RRDDIM_ABSOLUTE);
 -    }
 +    RRDSET *chart_ops = rrdset_create_localhost("netdata", "backend_ops", NULL, "backend", NULL, "Netdata Backend Operations", "operations", 130630, frequency, RRDSET_TYPE_LINE);
 +    rrddim_add(chart_ops, "write",     NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_ops, "discard",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_ops, "reconnect", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_ops, "failure",   NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
 +    rrddim_add(chart_ops, "read",      NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
  
      /*
       * this is misleading - we can only measure the time we need to send data
       *
       * issue #1432 and https://www.softlab.ntua.gr/facilities/documentation/unix/unix-socket-faq/unix-socket-faq-2.html
       *
 -    RRDSET *chart_latency = rrdset_find("netdata.backend_latency");
 -    if(!chart_latency) {
 -        chart_latency = rrdset_create("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
 -        rrddim_add(chart_latency, "latency",   NULL,  1, 1000, RRDDIM_ABSOLUTE);
 -    }
 +    RRDSET *chart_latency = rrdset_create_localhost("netdata", "backend_latency", NULL, "backend", NULL, "Netdata Backend Latency", "ms", 130620, frequency, RRDSET_TYPE_AREA);
 +    rrddim_add(chart_latency, "latency",   NULL,  1, 1000, RRD_ALGORITHM_ABSOLUTE);
      */
  
 -    RRDSET *chart_rusage = rrdset_find("netdata.backend_thread_cpu");
 -    if(!chart_rusage) {
 -        chart_rusage = rrdset_create("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED);
 -        rrddim_add(chart_rusage, "user",   NULL, 1, 1000, RRDDIM_INCREMENTAL);
 -        rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRDDIM_INCREMENTAL);
 -    }
 +    RRDSET *chart_rusage = rrdset_create_localhost("netdata", "backend_thread_cpu", NULL, "backend", NULL, "NetData Backend Thread CPU usage", "milliseconds/s", 130630, frequency, RRDSET_TYPE_STACKED);
 +    rrddim_add(chart_rusage, "user",   NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
 +    rrddim_add(chart_rusage, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
 +
  
      // ------------------------------------------------------------------------
      // prepare the backend main loop
      heartbeat_init(&hb);
  
      for(;;) {
 +
          // ------------------------------------------------------------------------
          // Wait for the next iteration point.
          heartbeat_next(&hb, step_ut);
          time_t before = now_realtime_sec();
  
 +
          // ------------------------------------------------------------------------
          // add to the buffer the data we need to send to the backend
 -        RRDSET *st;
 +
          int pthreadoldcancelstate;
  
          if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
              error("Cannot set pthread cancel state to DISABLE.");
  
 -        rrdhost_rdlock(&localhost);
 -        for(st = localhost.rrdset_root; st ;st = st->next) {
 -            pthread_rwlock_rdlock(&st->rwlock);
 +        rrd_rdlock();
 +        RRDHOST *host;
 +        rrdhost_foreach_read(host) {
 +            if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE)
 +                continue;
  
 -            RRDDIM *rd;
 -            for(rd = st->dimensions; rd ;rd = rd->next) {
 -                if(rd->last_collected_time.tv_sec >= after)
 -                    chart_buffered_metrics += backend_request_formatter(b, prefix, &localhost, hostname, st, rd, after, before, options);
 -            }
 +            rrdhost_rdlock(host);
  
 -            pthread_rwlock_unlock(&st->rwlock);
 +            RRDSET *st;
 +            rrdset_foreach_read(st, host) {
 +                rrdset_rdlock(st);
 +
 +                RRDDIM *rd;
 +                rrddim_foreach_read(rd, st) {
 +                    if(rd->last_collected_time.tv_sec >= after)
 +                        chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options);
 +                }
 +                rrdset_unlock(st);
 +            }
 +            rrdhost_unlock(host);
          }
 -        rrdhost_unlock(&localhost);
 +        rrd_unlock();
  
          if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
              error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
  
          if(unlikely(sock == -1)) {
              usec_t start_ut = now_monotonic_usec();
 -            const char *s = destination;
 -            while(*s) {
 -                const char *e = s;
 -
 -                // skip separators, moving both s(tart) and e(nd)
 -                while(isspace(*e) || *e == ',') s = ++e;
 +            size_t reconnects = 0;
  
 -                // move e(nd) to the first separator
 -                while(*e && !isspace(*e) && *e != ',') e++;
 +            sock = connect_to_one_of(destination, default_port, &timeout, &reconnects, NULL, 0);
  
 -                // is there anything?
 -                if(!*s || s == e) break;
 -
 -                char buf[e - s + 1];
 -                strncpyz(buf, s, e - s);
 -                chart_backend_reconnects++;
 -                sock = connect_to(buf, default_port, &timeout);
 -                if(sock != -1) break;
 -                s = e;
 -            }
 +            chart_backend_reconnects += reconnects;
              chart_backend_latency += now_monotonic_usec() - start_ut;
          }
  
          // ------------------------------------------------------------------------
          // update the monitoring charts
  
 -        if(chart_ops->counter_done) rrdset_next(chart_ops);
 +        if(likely(chart_ops->counter_done)) rrdset_next(chart_ops);
          rrddim_set(chart_ops, "read",         chart_receptions);
          rrddim_set(chart_ops, "write",        chart_transmission_successes);
          rrddim_set(chart_ops, "discard",      chart_data_lost_events);
          rrddim_set(chart_ops, "reconnect",    chart_backend_reconnects);
          rrdset_done(chart_ops);
  
 -        if(chart_metrics->counter_done) rrdset_next(chart_metrics);
 +        if(likely(chart_metrics->counter_done)) rrdset_next(chart_metrics);
          rrddim_set(chart_metrics, "buffered", chart_buffered_metrics);
          rrddim_set(chart_metrics, "lost",     chart_lost_metrics);
          rrddim_set(chart_metrics, "sent",     chart_sent_metrics);
          rrdset_done(chart_metrics);
  
 -        if(chart_bytes->counter_done) rrdset_next(chart_bytes);
 +        if(likely(chart_bytes->counter_done)) rrdset_next(chart_bytes);
          rrddim_set(chart_bytes, "buffered",   chart_buffered_bytes);
          rrddim_set(chart_bytes, "lost",       chart_lost_bytes);
          rrddim_set(chart_bytes, "sent",       chart_sent_bytes);
          rrdset_done(chart_bytes);
  
          /*
 -        if(chart_latency->counter_done) rrdset_next(chart_latency);
 +        if(likely(chart_latency->counter_done)) rrdset_next(chart_latency);
          rrddim_set(chart_latency, "latency",  chart_backend_latency);
          rrdset_done(chart_latency);
          */
  
          getrusage(RUSAGE_THREAD, &thread);
 -        if(chart_rusage->counter_done) rrdset_next(chart_rusage);
 +        if(likely(chart_rusage->counter_done)) rrdset_next(chart_rusage);
          rrddim_set(chart_rusage, "user",   thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
          rrddim_set(chart_rusage, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
          rrdset_done(chart_rusage);