]> arthur.barton.de Git - netdata.git/commitdiff
properly lock all sensitive linked lists
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Mon, 20 Feb 2017 20:45:56 +0000 (22:45 +0200)
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>
Tue, 21 Feb 2017 23:00:26 +0000 (01:00 +0200)
src/backends.c
src/health.c
src/plugins_d.c
src/rrd.h
src/rrd2json.c
src/rrdcalc.c
src/rrdhost.c
src/rrdpush.c
src/rrdset.c
src/web_client.c

index 30fba7fef6c7bbc8f7fde1e3b817e88b7ac2dfa5..300f6ae31492091aa728443ca58329cf0a5e4e83 100644 (file)
@@ -309,31 +309,25 @@ void *backends_main(void *ptr) {
         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
             error("Cannot set pthread cancel state to DISABLE.");
 
+        rrd_rdlock();
         RRDHOST *host;
-        for(host = localhost; host ; host = host->next) {
-            // for each host
-
+        rrdhost_foreach_read(host) {
             rrdhost_rdlock(host);
 
             RRDSET *st;
-            for(st = host->rrdset_root; st; st = st->next) {
-                // for each chart
-
+            rrdset_foreach_read(st, host) {
                 rrdset_rdlock(st);
 
                 RRDDIM *rd;
-                for(rd = st->dimensions; rd; rd = rd->next) {
-                    // for each dimension
-
+                rrddim_foreach_read(rd, st) {
                     if(rd->last_collected_time.tv_sec >= after)
                         chart_buffered_metrics += backend_request_formatter(b, prefix, host, (host == localhost)?hostname:host->hostname, st, rd, after, before, options);
                 }
-
                 rrdset_unlock(st);
             }
-
             rrdhost_unlock(host);
         }
+        rrd_unlock();
 
         if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
             error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
index 7dbf7292c13c886b4e9da97314c7fa5fd3c1ef66..83390e95a6e7eb01203f920aa7f77f5833577623 100644 (file)
@@ -57,34 +57,37 @@ void health_reload_host(RRDHOST *host) {
             t->flags |= HEALTH_ENTRY_FLAG_UPDATED;
     }
 
+    rrdhost_rdlock(host);
     // reset all thresholds to all charts
     RRDSET *st;
-    for(st = host->rrdset_root; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         st->green = NAN;
         st->red = NAN;
     }
+    rrdhost_unlock(host);
 
     // load the new alarms
     rrdhost_wrlock(host);
     health_readdir(host, path);
-    rrdhost_unlock(host);
 
     // link the loaded alarms to their charts
-    for(st = host->rrdset_root; st ; st = st->next) {
-        rrdhost_wrlock(host);
-
+    rrdset_foreach_write(st, host) {
         rrdsetcalc_link_matching(st);
         rrdcalctemplate_link_matching(st);
-
-        rrdhost_unlock(host);
     }
+
+    rrdhost_unlock(host);
 }
 
 void health_reload(void) {
-    RRDHOST *host;
 
-    for(host = localhost; host ; host = host->next)
+    rrd_rdlock();
+
+    RRDHOST *host;
+    rrdhost_foreach_read(host)
         health_reload_host(host);
+
+    rrd_unlock();
 }
 
 // ----------------------------------------------------------------------------
@@ -348,8 +351,10 @@ void *health_main(void *ptr) {
         if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
             error("Cannot set pthread cancel state to DISABLE.");
 
+        rrd_rdlock();
+
         RRDHOST *host;
-        for(host = localhost; host ; host = host->next) {
+        rrdhost_foreach_read(host) {
             if(unlikely(!host->health_enabled)) continue;
 
             rrdhost_rdlock(host);
@@ -596,6 +601,8 @@ void *health_main(void *ptr) {
 
         } /* host loop */
 
+        rrd_unlock();
+
         if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
             error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
 
index 848162d1a0e194ec305ff5a590b63f8a1ed8de9b..c7a3dac7e16d058c91802d34e142899eb2b2d882 100644 (file)
@@ -107,7 +107,23 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
     RRDSET *st = NULL;
     uint32_t hash;
 
-    while(likely(fgets(line, PLUGINSD_LINE_MAX, fp) != NULL)) {
+    errno = 0;
+    clearerr(fp);
+
+    if(unlikely(fileno(fp) == -1)) {
+        error("PLUGINSD: %s: file is not a valid stream.", cd->fullfilename);
+        goto cleanup;
+    }
+
+    while(!ferror(fp)) {
+        if(unlikely(netdata_exit)) break;
+
+        char *r = fgets(line, PLUGINSD_LINE_MAX, fp);
+        if(unlikely(!r)) {
+            error("PLUGINSD: %s : read failed.", cd->fullfilename);
+            break;
+        }
+
         if(unlikely(netdata_exit)) break;
 
         line[PLUGINSD_LINE_MAX] = '\0';
@@ -335,6 +351,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
         }
     }
 
+cleanup:
     cd->enabled = enabled;
 
     if(likely(count)) {
index 7e34e956c63af0d88112327f414d082e453b9970..3a10f21937cab3725b04a9837908c872d19f5aaa 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -185,6 +185,15 @@ struct rrddim {
 };
 typedef struct rrddim RRDDIM;
 
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrddim_foreach_read(rd, st) \
+    for(rd = st->dimensions, rrdset_check_rdlock(st); rd ; rd = rd->next)
+
+#define rrddim_foreach_write(rd, st) \
+    for(rd = st->dimensions, rrdset_check_wrlock(st); rd ; rd = rd->next)
+
 
 // ----------------------------------------------------------------------------
 // RRDSET - this is a chart
@@ -252,7 +261,7 @@ struct rrdset {
     char *cache_dir;                                // the directory to store dimensions
     char cache_filename[FILENAME_MAX+1];            // the filename to store this set
 
-    pthread_rwlock_t rrdset_rwlock;
+    pthread_rwlock_t rrdset_rwlock;                 // protects dimensions linked list
 
     unsigned long counter;                          // the number of times we added values to this rrd
     unsigned long counter_done;                     // the number of times we added values to this rrd
@@ -305,6 +314,16 @@ typedef struct rrdset RRDSET;
 #define rrdset_wrlock(st) pthread_rwlock_wrlock(&((st)->rrdset_rwlock))
 #define rrdset_unlock(st) pthread_rwlock_unlock(&((st)->rrdset_rwlock))
 
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrdset_foreach_read(st, host) \
+    for(st = host->rrdset_root, rrdhost_check_rdlock(host); st ; st = st->next)
+
+#define rrdset_foreach_write(st, host) \
+    for(st = host->rrdset_root, rrdhost_check_wrlock(host); st ; st = st->next)
+
+
 // ----------------------------------------------------------------------------
 // RRD HOST
 
@@ -325,7 +344,7 @@ struct rrdhost {
 
     RRDSET *rrdset_root;                            // the host charts
 
-    pthread_rwlock_t rrdhost_rwlock;                // lock for this RRDHOST
+    pthread_rwlock_t rrdhost_rwlock;                // lock for this RRDHOST (protects rrdset_root linked list)
 
     avl_tree_lock rrdset_root_index;                // the host's charts index (by id)
     avl_tree_lock rrdset_root_index_name;           // the host's charts index (by name)
@@ -364,35 +383,55 @@ extern RRDHOST *localhost;
 #define rrdhost_wrlock(h) pthread_rwlock_wrlock(&((h)->rrdhost_rwlock))
 #define rrdhost_unlock(h) pthread_rwlock_unlock(&((h)->rrdhost_rwlock))
 
+// ----------------------------------------------------------------------------
+// these loop macros make sure the linked list is accessed with the right lock
+
+#define rrdhost_foreach_read(var) \
+    for(var = localhost, rrd_check_rdlock(); var ; var = var->next)
+
+#define rrdhost_foreach_write(var) \
+    for(var = localhost, rrd_check_wrlock(); var ; var = var->next)
+
+
+// ----------------------------------------------------------------------------
+// global lock for all RRDHOSTs
+
+extern pthread_rwlock_t rrd_rwlock;
+#define rrd_rdlock() pthread_rwlock_rdlock(&rrd_rwlock)
+#define rrd_wrlock() pthread_rwlock_wrlock(&rrd_rwlock)
+#define rrd_unlock() pthread_rwlock_unlock(&rrd_rwlock)
+
+// ----------------------------------------------------------------------------
+
 extern void rrd_init(char *hostname);
 
 extern RRDHOST *rrdhost_find(const char *guid, uint32_t hash);
 extern RRDHOST *rrdhost_find_or_create(const char *hostname, const char *guid);
 
 #ifdef NETDATA_INTERNAL_CHECKS
+extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
+extern void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
+extern void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line);
+extern void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line);
+extern void rrd_check_rdlock_int(const char *file, const char *function, const unsigned long line);
+extern void rrd_check_wrlock_int(const char *file, const char *function, const unsigned long line);
+
 #define rrdhost_check_rdlock(host) rrdhost_check_rdlock_int(host, __FILE__, __FUNCTION__, __LINE__)
 #define rrdhost_check_wrlock(host) rrdhost_check_wrlock_int(host, __FILE__, __FUNCTION__, __LINE__)
+#define rrdset_check_rdlock(st) rrdset_check_rdlock_int(st, __FILE__, __FUNCTION__, __LINE__)
+#define rrdset_check_wrlock(st) rrdset_check_wrlock_int(st, __FILE__, __FUNCTION__, __LINE__)
 #define rrd_check_rdlock() rrd_check_rdlock_int(__FILE__, __FUNCTION__, __LINE__)
 #define rrd_check_wrlock() rrd_check_wrlock_int(__FILE__, __FUNCTION__, __LINE__)
+
 #else
 #define rrdhost_check_rdlock(host) (void)0
 #define rrdhost_check_wrlock(host) (void)0
+#define rrdset_check_rdlock(host) (void)0
+#define rrdset_check_wrlock(host) (void)0
 #define rrd_check_rdlock() (void)0
 #define rrd_check_wrlock() (void)0
 #endif
 
-extern void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
-extern void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line);
-
-// ----------------------------------------------------------------------------
-// global lock for all RRDHOSTs
-
-extern pthread_rwlock_t rrd_rwlock;
-#define rrd_rdlock() pthread_rwlock_rdlock(&rrd_rwlock)
-#define rrd_wrlock() pthread_rwlock_wrlock(&rrd_rwlock)
-#define rrd_unlock() pthread_rwlock_unlock(&rrd_rwlock)
-
-
 // ----------------------------------------------------------------------------
 // RRDSET functions
 
index 5315ca9603ff8ad49f3cb580bee48ee8927a9c54..8186fab0236051a83f848ce98979598f331b2fc8 100644 (file)
@@ -43,7 +43,7 @@ void rrd_stats_api_v1_chart_with_data(RRDSET *st, BUFFER *wb, size_t *dimensions
 
     size_t dimensions = 0;
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ; rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
         if(rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)) continue;
 
         memory += rd->memsize;
@@ -97,8 +97,9 @@ void rrd_stats_api_v1_charts(RRDHOST *host, BUFFER *wb)
         , host->rrd_history_entries
         );
 
+    c = 0;
     rrdhost_rdlock(host);
-    for(st = host->rrdset_root, c = 0; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         if(rrdset_flag_check(st, RRDSET_FLAG_ENABLED) && st->dimensions) {
             if(c) buffer_strcat(wb, ",");
             buffer_strcat(wb, "\n\t\t\"");
@@ -157,7 +158,7 @@ void rrd_stats_api_v1_charts_allmetrics_prometheus(RRDHOST *host, BUFFER *wb) {
 
     // for each chart
     RRDSET *st;
-    for(st = host->rrdset_root; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         char chart[PROMETHEUS_ELEMENT_MAX + 1];
         prometheus_name_copy(chart, st->id, PROMETHEUS_ELEMENT_MAX);
 
@@ -167,7 +168,7 @@ void rrd_stats_api_v1_charts_allmetrics_prometheus(RRDHOST *host, BUFFER *wb) {
 
             // for each dimension
             RRDDIM *rd;
-            for(rd = st->dimensions; rd ; rd = rd->next) {
+            rrddim_foreach_read(rd, st) {
                 if(rd->counter) {
                     char dimension[PROMETHEUS_ELEMENT_MAX + 1];
                     prometheus_name_copy(dimension, rd->id, PROMETHEUS_ELEMENT_MAX);
@@ -228,7 +229,7 @@ void rrd_stats_api_v1_charts_allmetrics_shell(RRDHOST *host, BUFFER *wb) {
 
     // for each chart
     RRDSET *st;
-    for(st = host->rrdset_root; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         calculated_number total = 0.0;
         char chart[SHELL_ELEMENT_MAX + 1];
         shell_name_copy(chart, st->id, SHELL_ELEMENT_MAX);
@@ -239,7 +240,7 @@ void rrd_stats_api_v1_charts_allmetrics_shell(RRDHOST *host, BUFFER *wb) {
 
             // for each dimension
             RRDDIM *rd;
-            for(rd = st->dimensions; rd ; rd = rd->next) {
+            rrddim_foreach_read(rd, st) {
                 if(rd->counter) {
                     char dimension[SHELL_ELEMENT_MAX + 1];
                     shell_name_copy(dimension, rd->id, SHELL_ELEMENT_MAX);
@@ -350,7 +351,7 @@ unsigned long rrd_stats_one_json(RRDSET *st, char *options, BUFFER *wb)
     unsigned long memory = st->memsize;
 
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ; rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
 
         memory += rd->memsize;
 
@@ -411,20 +412,20 @@ void rrd_stats_graph_json(RRDSET *st, char *options, BUFFER *wb)
 void rrd_stats_all_json(RRDHOST *host, BUFFER *wb)
 {
     unsigned long memory = 0;
-    long c;
+    long c = 0;
     RRDSET *st;
 
     buffer_strcat(wb, RRD_GRAPH_JSON_HEADER);
 
     rrdhost_rdlock(host);
-
-    for(st = host->rrdset_root, c = 0; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         if(rrdset_flag_check(st, RRDSET_FLAG_ENABLED) && st->dimensions) {
             if(c) buffer_strcat(wb, ",\n");
             memory += rrd_stats_one_json(st, NULL, wb);
             c++;
         }
     }
+    rrdhost_unlock(host);
 
     buffer_sprintf(wb, "\n\t],\n"
         "\t\"hostname\": \"%s\",\n"
@@ -437,8 +438,6 @@ void rrd_stats_all_json(RRDHOST *host, BUFFER *wb)
         , host->rrd_history_entries
         , memory
         );
-
-    rrdhost_unlock(host);
 }
 
 
@@ -544,6 +543,8 @@ static void rrdr_dump(RRDR *r)
 */
 
 void rrdr_disable_not_selected_dimensions(RRDR *r, uint32_t options, const char *dims) {
+    rrdset_check_rdlock(r->st);
+
     if(unlikely(!dims || !*dims)) return;
 
     char b[strlen(dims) + 1];
@@ -649,6 +650,8 @@ void rrdr_buffer_print_format(BUFFER *wb, uint32_t format)
 
 uint32_t rrdr_check_options(RRDR *r, uint32_t options, const char *dims)
 {
+    rrdset_check_rdlock(r->st);
+
     (void)dims;
 
     if(options & RRDR_OPTION_NONZERO) {
@@ -684,6 +687,8 @@ uint32_t rrdr_check_options(RRDR *r, uint32_t options, const char *dims)
 
 void rrdr_json_wrapper_begin(RRDR *r, BUFFER *wb, uint32_t format, uint32_t options, int string_value)
 {
+    rrdset_check_rdlock(r->st);
+
     long rows = rrdr_rows(r);
     long c, i;
     RRDDIM *rd;
@@ -865,6 +870,8 @@ void rrdr_json_wrapper_end(RRDR *r, BUFFER *wb, uint32_t format, uint32_t option
 
 static void rrdr2json(RRDR *r, BUFFER *wb, uint32_t options, int datatable)
 {
+    rrdset_check_rdlock(r->st);
+
     //info("RRD2JSON(): %s: BEGIN", r->st->id);
     int row_annotations = 0, dates, dates_with_new = 0;
     char kq[2] = "",                    // key quote
@@ -1091,6 +1098,8 @@ static void rrdr2json(RRDR *r, BUFFER *wb, uint32_t options, int datatable)
 
 static void rrdr2csv(RRDR *r, BUFFER *wb, uint32_t options, const char *startline, const char *separator, const char *endline, const char *betweenlines)
 {
+    rrdset_check_rdlock(r->st);
+
     //info("RRD2CSV(): %s: BEGIN", r->st->id);
     long c, i;
     RRDDIM *d;
@@ -1196,6 +1205,8 @@ static void rrdr2csv(RRDR *r, BUFFER *wb, uint32_t options, const char *startlin
 }
 
 inline static calculated_number rrdr2value(RRDR *r, long i, uint32_t options, int *all_values_are_null) {
+    rrdset_check_rdlock(r->st);
+
     long c;
     RRDDIM *d;
 
@@ -1396,7 +1407,7 @@ static RRDR *rrdr_create(RRDSET *st, long n)
     rrdr_lock_rrdset(r);
 
     RRDDIM *rd;
-    for(rd = st->dimensions ; rd ; rd = rd->next) r->d++;
+    rrddim_foreach_read(rd, st) r->d++;
 
     r->n = n;
 
@@ -1599,6 +1610,7 @@ RRDR *rrd2rrdr(RRDSET *st, long points, long long after, long long before, int g
     // initialize them
     RRDDIM *rd;
     long c;
+    rrdset_check_rdlock(st);
     for( rd = st->dimensions, c = 0 ; rd && c < dimensions ; rd = rd->next, c++) {
         last_values[c] = 0;
         group_values[c] = (group_method == GROUP_MAX || group_method == GROUP_MIN)?NAN:0;
@@ -2036,7 +2048,7 @@ time_t rrd_stats_json(int type, RRDSET *st, BUFFER *wb, long points, long group,
 
     int dimensions = 0;
     RRDDIM *rd;
-    for( rd = st->dimensions ; rd ; rd = rd->next) dimensions++;
+    rrddim_foreach_read(rd, st) dimensions++;
     if(!dimensions) {
         rrdset_unlock(st);
         buffer_strcat(wb, "No dimensions yet.");
index bdd66c2b8b7e8028eedcc6cde93eb2591cc7b3b2..1f1845409e409f5ba37696cbcb2afb9af878847a 100644 (file)
@@ -285,7 +285,7 @@ inline void rrdcalc_create_part2(RRDHOST *host, RRDCALC *rc) {
 
     // link it to its chart
     RRDSET *st;
-    for(st = host->rrdset_root; st ; st = st->next) {
+    rrdset_foreach_read(st, host) {
         if(rrdcalc_is_matching_this_rrdset(rc, st)) {
             rrdsetcalc_link(st, rc);
             break;
index 12f35ba8bf5683f15f811a29ee87683c0441c79c..7fed5f617f7387e40983eb6d84bc94009d01fbcb 100644 (file)
@@ -318,7 +318,7 @@ void rrdhost_save(RRDHOST *host) {
     // to ensure only one thread is saving the database
     rrdhost_wrlock(host);
 
-    for(st = host->rrdset_root; st ; st = st->next) {
+    rrdset_foreach_write(st, host) {
         rrdset_rdlock(st);
 
         if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
@@ -326,7 +326,7 @@ void rrdhost_save(RRDHOST *host) {
             savememory(st->cache_filename, st, st->memsize);
         }
 
-        for(rd = st->dimensions; rd ; rd = rd->next) {
+        rrddim_foreach_read(rd, st) {
             if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) {
                 debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
                 savememory(rd->cache_filename, rd, rd->memsize);
@@ -345,7 +345,7 @@ void rrdhost_save_all(void) {
     rrd_rdlock();
 
     RRDHOST *host;
-    for(host = localhost; host ; host = host->next)
+    rrdhost_foreach_read(host)
         rrdhost_save(host);
 
     rrd_unlock();
index a81fc967dc5feababfbbc1c58de0cd70e5343e45..ec780b2b3d95a764156827d4dba453b63304f69c 100644 (file)
@@ -19,7 +19,7 @@ static inline void rrdpush_unlock() {
 
 static inline int need_to_send_chart_definitions(RRDSET *st) {
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ;rd = rd->next)
+    rrddim_foreach_read(rd, st)
         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && !rrddim_flag_check(rd, RRDDIM_FLAG_EXPOSED))
             return 1;
 
@@ -40,7 +40,7 @@ static inline void send_chart_definitions(RRDSET *st) {
     );
 
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ;rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
         buffer_sprintf(rrdpush_buffer, "DIMENSION '%s' '%s' '%s' " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " '%s %s'\n"
                        , rd->id
                        , rd->name
@@ -50,6 +50,7 @@ static inline void send_chart_definitions(RRDSET *st) {
                        , rrddim_flag_check(rd, RRDDIM_FLAG_HIDDEN)?"hidden":""
                        , rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)?"noreset":""
         );
+        rrddim_flag_set(rd, RRDDIM_FLAG_EXPOSED);
     }
 }
 
@@ -57,7 +58,7 @@ static inline void send_chart_metrics(RRDSET *st) {
     buffer_sprintf(rrdpush_buffer, "BEGIN %s %llu\n", st->id, st->usec_since_last_update);
 
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ;rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
         if(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))
             buffer_sprintf(rrdpush_buffer, "SET %s = " COLLECTED_NUMBER_FORMAT "\n"
                        , rd->id
@@ -71,35 +72,39 @@ static inline void send_chart_metrics(RRDSET *st) {
 static void reset_all_charts(void) {
     rrd_rdlock();
 
-    RRDHOST *h;
-    for(h = localhost; h ;h = h->next) {
+    RRDHOST *host;
+    rrdhost_foreach_read(host) {
+        rrdhost_rdlock(host);
+
         RRDSET *st;
-        for(st = h->rrdset_root ; st ; st = st->next) {
+        rrdset_foreach_read(st, host) {
             rrdset_rdlock(st);
 
             RRDDIM *rd;
-            for(rd = st->dimensions; rd ;rd = rd->next)
+            rrddim_foreach_read(rd, st)
                 rrddim_flag_clear(rd, RRDDIM_FLAG_EXPOSED);
 
             rrdset_unlock(st);
         }
+        rrdhost_unlock(host);
     }
+    rrd_unlock();
 
     last_host = NULL;
-
-    rrd_unlock();
 }
 
 void rrdset_done_push(RRDSET *st) {
 
-    if(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))
+    if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED) || !rrdpush_buffer))
         return;
 
     rrdpush_lock();
     rrdset_rdlock(st);
 
-    if(st->rrdhost != last_host)
-        buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->hostname, st->rrdhost->machine_guid);
+    if(st->rrdhost != last_host) {
+        buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
+        last_host = st->rrdhost;
+    }
 
     if(need_to_send_chart_definitions(st))
         send_chart_definitions(st);
@@ -139,36 +144,58 @@ void *central_netdata_push_thread(void *ptr) {
     size_t begin = 0;
     size_t max_size = 1024 * 1024;
     size_t reconnects_counter = 0;
+    size_t sent_bytes = 0;
+    size_t sent_connection = 0;
     int sock = -1;
     char buffer[1];
 
     for(;;) {
         if(unlikely(sock == -1)) {
+            info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
             sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
 
             if(unlikely(sock != -1)) {
-                if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
-                    error("Cannot set non-blocking mode for socket.");
+                info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
 
-                buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s\r\n\r\n", config_get("global", "central netdata api key", ""));
-                reset_all_charts();
+                if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+                    error("PUSH: cannot set non-blocking mode for socket.");
             }
+            else
+                error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+            rrdpush_lock();
+            if(buffer_strlen(rrdpush_buffer))
+                error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+
+            buffer_flush(rrdpush_buffer);
+            buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
+            reset_all_charts();
+            rrdpush_unlock();
+            sent_connection = 0;
         }
 
         if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
-            error("Cannot read from internal pipe.");
+            error("PUSH: Cannot read from internal pipe.");
             sleep(1);
         }
 
-        if(likely(sock != -1)) {
+        if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+            // fprintf(stderr, "PUSH BEGIN\n");
+            // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+            // fprintf(stderr, "\nPUSH END\n");
+
             rrdpush_lock();
-            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len, MSG_DONTWAIT);
+            ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
             if(ret == -1) {
-                error("Failed to send metrics to central netdata at %s", central_netdata_to_push_data);
-                close(sock);
-                sock = -1;
+                if(errno != EAGAIN) {
+                    error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+                    close(sock);
+                    sock = -1;
+                }
             }
             else {
+                sent_connection += ret;
+                sent_bytes += ret;
                 begin += ret;
                 if(begin == rrdpush_buffer->len) {
                     buffer_flush(rrdpush_buffer);
@@ -180,23 +207,15 @@ void *central_netdata_push_thread(void *ptr) {
 
         // protection from overflow
         if(rrdpush_buffer->len > max_size) {
-            rrdpush_lock();
-
-            error("Discarding %zu bytes of metrics data, because we cannot connect to central netdata at %s"
-                  , buffer_strlen(rrdpush_buffer), central_netdata_to_push_data);
-
-            buffer_flush(rrdpush_buffer);
-
+            errno = 0;
+            error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
             if(sock != -1) {
                 close(sock);
                 sock = -1;
             }
-
-            rrdpush_unlock();
         }
     }
 
-cleanup:
     debug(D_WEB_CLIENT, "Central netdata push thread exits.");
     if(sock != -1)
         close(sock);
index fea3d727523f3f1fd2da6eda1ff8a3e38e3196e7..5efc1ce3a116773a1a782910818e5d7d75d7b78a 100644 (file)
@@ -3,6 +3,23 @@
 
 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
 
+void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+    debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id);
+
+    int ret = pthread_rwlock_trywrlock(&st->rrdset_rwlock);
+    if(ret == 0)
+        fatal("RRDSET '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+    debug(D_RRD_CALLS, "Checking write lock on chart '%s'", st->id);
+
+    int ret = pthread_rwlock_tryrdlock(&st->rrdset_rwlock);
+    if(ret == 0)
+        fatal("RRDSET '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+
 // ----------------------------------------------------------------------------
 // RRDSET index
 
@@ -143,7 +160,7 @@ void rrdset_set_name(RRDSET *st, const char *name) {
 
     rrdset_wrlock(st);
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ;rd = rd->next)
+    rrddim_foreach_write(rd, st)
         rrddimvar_rename_all(rd);
     rrdset_unlock(st);
 
@@ -167,7 +184,7 @@ void rrdset_reset(RRDSET *st) {
     st->counter_done = 0;
 
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ; rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
         rd->last_collected_time.tv_sec = 0;
         rd->last_collected_time.tv_usec = 0;
         rd->counter = 0;
@@ -674,18 +691,20 @@ void rrdset_done(RRDSET *st) {
     st->counter_done++;
 
     // calculate totals and count the dimensions
-    int dimensions;
+    int dimensions = 0;
     st->collected_total = 0;
-    for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
+    rrddim_foreach_read(rd, st) {
+        dimensions++;
         if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
             st->collected_total += rd->collected_value;
+    }
 
     uint32_t storage_flags = SN_EXISTS;
 
     // process all dimensions to calculate their values
     // based on the collected figures only
     // at this stage we do not interpolate anything
-    for( rd = st->dimensions ; rd ; rd = rd->next ) {
+    rrddim_foreach_read(rd, st) {
 
         if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
             rd->calculated_value = 0;
@@ -890,7 +909,7 @@ void rrdset_done(RRDSET *st) {
         st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC);
         st->last_updated.tv_usec = 0;
 
-        for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
+        rrddim_foreach_read(rd, st) {
             calculated_number new_value;
 
             switch(rd->algorithm) {
@@ -1035,7 +1054,7 @@ void rrdset_done(RRDSET *st) {
 
     st->last_collected_total  = st->collected_total;
 
-    for( rd = st->dimensions; rd ; rd = rd->next ) {
+    rrddim_foreach_read(rd, st) {
         if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
             continue;
 
index 42ffa4011b507921945090d93dd18da1004dc1d5..61901ed8d1e97efd9e9245a2d60acf190fd1a43d 100644 (file)
@@ -1672,7 +1672,7 @@ int validate_stream_api_key(const char *key) {
 }
 
 int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
-    info("STREAM request from client '%s:%s' for host '%s'", w->client_ip, w->client_port, host->hostname);
+    info("STREAM request from client '%s:%s', starting as host '%s'", w->client_ip, w->client_port, host->hostname);
 
     char *key = NULL;
 
@@ -1689,16 +1689,16 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     }
 
     if(!key || !*key) {
+        error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "You need an API key for this request.");
-        error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
         return 401;
     }
 
     if(!validate_stream_api_key(key)) {
+        error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
         buffer_flush(w->response.data);
         buffer_sprintf(w->response.data, "Your API key is not permitted access.");
-        error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
         return 401;
     }
 
@@ -1733,6 +1733,7 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
     }
 
     // call the plugins.d processor to receive the metrics
+    info("STREAM connecting client '%s:%s' to plugins.d.", w->client_ip, w->client_port);
     size_t count = pluginsd_process(host, &cd, fp, 1);
     error("STREAM from '%s:%s': client disconnected.", w->client_ip, w->client_port);
 
@@ -2188,12 +2189,16 @@ static inline int web_client_switch_host(RRDHOST *host, struct web_client *w, ch
         if(unlikely(hash == hash_localhost && !strcmp(tok, "localhost")))
             return web_client_process_url(localhost, w, url);
 
+        rrd_rdlock();
         RRDHOST *h;
-        for(h = localhost; h; h = h->next) {
+        rrdhost_foreach_read(h) {
             if(unlikely((hash == h->hash_hostname && !strcmp(tok, h->hostname)) ||
-                        (hash == h->hash_machine_guid && !strcmp(tok, h->machine_guid))))
+                        (hash == h->hash_machine_guid && !strcmp(tok, h->machine_guid)))) {
+                rrd_unlock();
                 return web_client_process_url(h, w, url);
+            }
         }
+        rrd_unlock();
     }
 
     buffer_flush(w->response.data);
@@ -2301,10 +2306,11 @@ static inline int web_client_process_url(RRDHOST *host, struct web_client *w, ch
             debug(D_WEB_CLIENT_ACCESS, "%llu: Sending list of RRD_STATS...", w->id);
 
             buffer_flush(w->response.data);
-            RRDSET *st = host->rrdset_root;
+            RRDSET *st;
 
-            for ( ; st ; st = st->next )
-                buffer_sprintf(w->response.data, "%s\n", st->name);
+            rrdhost_rdlock(host);
+            rrdset_foreach_read(st, host) buffer_sprintf(w->response.data, "%s\n", st->name);
+            rrdhost_unlock(host);
 
             return 200;
         }