]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdset.c
unified rrd locks
[netdata.git] / src / rrdset.c
index cfd3b9db2c5888dab59f3b29446660216368fd14..aa002dd26d98293b29682a155f201db994e81e66 100644 (file)
@@ -100,7 +100,6 @@ inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) {
     return(st);
 }
 
-
 // ----------------------------------------------------------------------------
 // RRDSET - rename charts
 
@@ -133,20 +132,20 @@ void rrdset_set_name(RRDSET *st, const char *name) {
 
     if(st->name) {
         rrdset_index_del_name(st->rrdhost, st);
-        st->name = config_set_default(st->id, "name", b);
+        st->name = config_set_default(st->config_section, "name", b);
         st->hash_name = simple_hash(st->name);
         rrdsetvar_rename_all(st);
     }
     else {
-        st->name = config_get(st->id, "name", b);
+        st->name = config_get(st->config_section, "name", b);
         st->hash_name = simple_hash(st->name);
     }
 
-    pthread_rwlock_wrlock(&st->rwlock);
+    rrdset_wrlock(st);
     RRDDIM *rd;
     for(rd = st->dimensions; rd ;rd = rd->next)
         rrddimvar_rename_all(rd);
-    pthread_rwlock_unlock(&st->rwlock);
+    rrdset_unlock(st);
 
     if(unlikely(rrdset_index_add_name(st->rrdhost, st) != st))
         error("RRDSET: INTERNAL ERROR: attempted to index duplicate chart name '%s'", st->name);
@@ -206,12 +205,44 @@ static inline void timeval_align(struct timeval *tv, int update_every) {
     tv->tv_usec = 500000;
 }
 
+// ----------------------------------------------------------------------------
+// RRDSET - free a chart
+
+void rrdset_free(RRDSET *st) {
+    rrdset_wrlock(st);
+
+    while(st->variables)  rrdsetvar_free(st->variables);
+    while(st->alarms)     rrdsetcalc_unlink(st->alarms);
+    while(st->dimensions) rrddim_free(st, st->dimensions);
+
+    if(unlikely(rrdset_index_del(st->rrdhost, st) != st))
+        error("RRDSET: INTERNAL ERROR: attempt to remove from index chart '%s', removed a different chart.", st->id);
+
+    rrdset_index_del_name(st->rrdhost, st);
+
+    st->rrdfamily->use_count--;
+    if(!st->rrdfamily->use_count)
+        rrdfamily_free(st->rrdhost, st->rrdfamily);
+
+    rrdset_unlock(st);
+
+    // free directly allocated memory
+    freez(st->config_section);
+
+    if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
+        debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
+        munmap(st, st->memsize);
+    }
+    else
+        freez(st);
+}
+
 // ----------------------------------------------------------------------------
 // RRDSET - create a chart
 
 RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const char *name, const char *family
                       , const char *context, const char *title, const char *units, long priority
-                      , int update_every, int chart_type) {
+                      , int update_every, RRDSET_TYPE chart_type) {
 
     if(!type || !type[0]) {
         fatal("Cannot create rrd stats without a type.");
@@ -223,9 +254,10 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
         return NULL;
     }
 
-    char fullid[RRD_ID_LENGTH_MAX + 1];
-    char fullfilename[FILENAME_MAX + 1];
+    // ------------------------------------------------------------------------
+    // check if it already exists
 
+    char fullid[RRD_ID_LENGTH_MAX + 1];
     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
 
     RRDSET *st = rrdset_find(host, fullid);
@@ -234,77 +266,101 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
         return st;
     }
 
-    long rentries = config_get_number(fullid, "history", host->rrd_history_entries);
+    char fullfilename[FILENAME_MAX + 1];
+
+    // ------------------------------------------------------------------------
+    // compose the config_section for this chart
+
+    char config_section[RRD_ID_LENGTH_MAX + 1];
+    if(host == localhost)
+        strcpy(config_section, fullid);
+    else
+        snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid);
+
+    // ------------------------------------------------------------------------
+    // get the options from the config, we need to create it
+
+    long rentries = config_get_number(config_section, "history", host->rrd_history_entries);
     long entries = align_entries_to_pagesize(rentries);
-    if(entries != rentries) entries = config_set_number(fullid, "history", entries);
+    if(entries != rentries) entries = config_set_number(config_section, "history", entries);
 
-    int enabled = config_get_boolean(fullid, "enabled", 1);
+    int enabled = config_get_boolean(config_section, "enabled", 1);
     if(!enabled) entries = 5;
 
     unsigned long size = sizeof(RRDSET);
-    char *cache_dir = rrdset_cache_dir(host, fullid);
+    char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
+
+    // ------------------------------------------------------------------------
+    // load it or allocate it
 
     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
 
     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
-    if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
-    if(st) {
-        if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
-            errno = 0;
-            info("Initializing file %s.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if(strcmp(st->id, fullid) != 0) {
-            errno = 0;
-            error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
-            // munmap(st, size);
-            // st = NULL;
-            memset(st, 0, size);
-        }
-        else if(st->memsize != size || st->entries != entries) {
-            errno = 0;
-            error("File %s does not have the desired size. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if(st->update_every != update_every) {
-            errno = 0;
-            error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) {
-            errno = 0;
-            error("File %s is too old. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
+    if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) {
+        st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0);
+        if(st) {
+            memset(&st->avl, 0, sizeof(avl));
+            memset(&st->avlname, 0, sizeof(avl));
+            memset(&st->variables_root_index, 0, sizeof(avl_tree_lock));
+            memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
+            memset(&st->rrdset_rwlock, 0, sizeof(pthread_rwlock_t));
+
+            st->name = NULL;
+            st->type = NULL;
+            st->family = NULL;
+            st->context = NULL;
+            st->title = NULL;
+            st->units = NULL;
+            st->dimensions = NULL;
+            st->next = NULL;
+            st->variables = NULL;
+            st->alarms = NULL;
+            st->flags = 0x00000000;
+
+            if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
+                errno = 0;
+                info("Initializing file %s.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if(strcmp(st->id, fullid) != 0) {
+                errno = 0;
+                error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
+                // munmap(st, size);
+                // st = NULL;
+                memset(st, 0, size);
+            }
+            else if(st->memsize != size || st->entries != entries) {
+                errno = 0;
+                error("File %s does not have the desired size. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if(st->update_every != update_every) {
+                errno = 0;
+                error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) {
+                errno = 0;
+                error("File %s is too old. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
 
-        // make sure the database is aligned
-        if(st->last_updated.tv_sec)
-            timeval_align(&st->last_updated, update_every);
-    }
+            // make sure the database is aligned
+            if(st->last_updated.tv_sec)
+                timeval_align(&st->last_updated, update_every);
 
-    if(st) {
-        st->name = NULL;
-        st->type = NULL;
-        st->family = NULL;
-        st->context = NULL;
-        st->title = NULL;
-        st->units = NULL;
-        st->dimensions = NULL;
-        st->next = NULL;
-        st->rrd_memory_mode = host->rrd_memory_mode;
-        st->variables = NULL;
-        st->alarms = NULL;
-        memset(&st->rwlock, 0, sizeof(pthread_rwlock_t));
-        memset(&st->avl, 0, sizeof(avl));
-        memset(&st->avlname, 0, sizeof(avl));
-        memset(&st->variables_root_index, 0, sizeof(avl_tree_lock));
-        memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
+            // make sure we have the right memory mode
+            // even if we cleared the memory
+            st->rrd_memory_mode = host->rrd_memory_mode;
+        }
     }
-    else {
+
+    if(unlikely(!st)) {
         st = callocz(1, size);
         st->rrd_memory_mode = RRD_MEMORY_MODE_RAM;
     }
 
+    st->config_section = strdup(config_section);
     st->rrdhost = host;
     st->memsize = size;
     st->entries = entries;
@@ -320,19 +376,22 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 
     st->cache_dir = cache_dir;
 
-    st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
-    st->type       = config_get(st->id, "type", type);
-    st->family     = config_get(st->id, "family", family?family:st->type);
-    st->units      = config_get(st->id, "units", units?units:"");
+    st->chart_type = rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type)));
+    st->type       = config_get(st->config_section, "type", type);
+    st->family     = config_get(st->config_section, "family", family?family:st->type);
+    st->units      = config_get(st->config_section, "units", units?units:"");
 
-    st->context    = config_get(st->id, "context", context?context:st->id);
+    st->context    = config_get(st->config_section, "context", context?context:st->id);
     st->hash_context = simple_hash(st->context);
 
-    st->priority = config_get_number(st->id, "priority", priority);
-    st->enabled = enabled;
+    st->priority = config_get_number(st->config_section, "priority", priority);
+    if(enabled)
+        rrdset_flag_set(st, RRDSET_FLAG_ENABLED);
+    else
+        rrdset_flag_clear(st, RRDSET_FLAG_ENABLED);
 
-    st->isdetail = 0;
-    st->debug = 0;
+    rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+    rrdset_flag_clear(st, RRDSET_FLAG_DEBUG);
 
     // if(!strcmp(st->id, "disk_util.dm-0")) {
     //     st->debug = 1;
@@ -348,13 +407,13 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
     st->counter_done = 0;
 
     st->gap_when_lost_iterations_above = (int) (
-            config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
+            config_get_number(st->config_section, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
 
     avl_init_lock(&st->dimensions_index, rrddim_compare);
     avl_init_lock(&st->variables_root_index, rrdvar_compare);
 
-    pthread_rwlock_init(&st->rwlock, NULL);
-    rrdhost_rwlock(host);
+    pthread_rwlock_init(&st->rrdset_rwlock, NULL);
+    rrdhost_wrlock(host);
 
     if(name && *name) rrdset_set_name(st, name);
     else rrdset_set_name(st, id);
@@ -364,7 +423,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
         char varvalue2[CONFIG_MAX_VALUE + 1];
         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
         json_escape_string(varvalue2, varvalue, sizeof(varvalue2));
-        st->title = config_get(st->id, "title", varvalue2);
+        st->title = config_get(st->config_section, "title", varvalue2);
     }
 
     st->rrdfamily = rrdfamily_create(host, st->family);
@@ -446,7 +505,9 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
     }
     debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
 
-    if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
+    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+        debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
+
     st->usec_since_last_update = microseconds;
 }
 
@@ -482,7 +543,7 @@ usec_t rrdset_done(RRDSET *st) {
         error("Cannot set pthread cancel state to DISABLE.");
 
     // a read lock is OK here
-    pthread_rwlock_rdlock(&st->rwlock);
+    rrdset_rdlock(st);
 
 /*
     // enable the chart, if it was disabled
@@ -497,7 +558,9 @@ usec_t rrdset_done(RRDSET *st) {
         st->usec_since_last_update = update_every_ut;
         first_entry = 1;
     }
-    if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
+
+    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+        debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
 
     // set last_collected_time
     if(unlikely(!st->last_collected_time.tv_sec)) {
@@ -512,7 +575,8 @@ usec_t rrdset_done(RRDSET *st) {
         store_this_entry = 0;
         first_entry = 1;
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
     }
     else {
         // it is not the first entry
@@ -536,7 +600,8 @@ usec_t rrdset_done(RRDSET *st) {
         store_this_entry = 0;
         first_entry = 1;
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
     }
 
     // check if we will re-write the entire data set
@@ -566,7 +631,7 @@ usec_t rrdset_done(RRDSET *st) {
     now_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
     next_store_ut  = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
 
-    if(unlikely(st->debug)) {
+    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
         debug(D_RRD_STATS, "%s: last_collect_ut = %0.3Lf (last collection time)", st->name, (long double)last_collect_ut/1000000.0);
         debug(D_RRD_STATS, "%s: now_collect_ut  = %0.3Lf (current collection time)", st->name, (long double)now_collect_ut/1000000.0);
         debug(D_RRD_STATS, "%s: last_stored_ut  = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
@@ -575,7 +640,8 @@ usec_t rrdset_done(RRDSET *st) {
 
     if(unlikely(!st->counter_done)) {
         store_this_entry = 0;
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
     }
     st->counter_done++;
 
@@ -598,7 +664,8 @@ usec_t rrdset_done(RRDSET *st) {
             continue;
         }
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s/%s: START "
                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
                     " collected_value = " COLLECTED_NUMBER_FORMAT
                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
@@ -616,7 +683,7 @@ usec_t rrdset_done(RRDSET *st) {
                                        * (calculated_number)rd->multiplier
                                        / (calculated_number)rd->divisor;
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
                             CALCULATED_NUMBER_FORMAT " = "
                             COLLECTED_NUMBER_FORMAT
@@ -641,7 +708,7 @@ usec_t rrdset_done(RRDSET *st) {
                             * (calculated_number)rd->collected_value
                             / (calculated_number)st->collected_total;
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
                             CALCULATED_NUMBER_FORMAT " = 100"
                                     " * " COLLECTED_NUMBER_FORMAT
@@ -678,7 +745,7 @@ usec_t rrdset_done(RRDSET *st) {
                         * (calculated_number)rd->multiplier
                         / (calculated_number)rd->divisor;
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
                             CALCULATED_NUMBER_FORMAT " = ("
                             COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
@@ -723,7 +790,7 @@ usec_t rrdset_done(RRDSET *st) {
                             * (calculated_number)(rd->collected_value - rd->last_collected_value)
                             / (calculated_number)(st->collected_total - st->last_collected_total);
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
                             CALCULATED_NUMBER_FORMAT " = 100"
                                     " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
@@ -740,7 +807,7 @@ usec_t rrdset_done(RRDSET *st) {
                 // it gets noticed when we add new types
                 rd->calculated_value = 0;
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: CALC "
                             CALCULATED_NUMBER_FORMAT " = 0"
                           , st->id, rd->name
@@ -749,7 +816,8 @@ usec_t rrdset_done(RRDSET *st) {
                 break;
         }
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s/%s: PHASE2 "
                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
                     " collected_value = " COLLECTED_NUMBER_FORMAT
                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
@@ -768,7 +836,10 @@ usec_t rrdset_done(RRDSET *st) {
 
     if(unlikely(now_collect_ut < next_store_ut)) {
         // this is collected in the same interpolation point
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
+
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
+
 #ifdef NETDATA_INTERNAL_CHECKS
         info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut);
 #endif
@@ -783,7 +854,7 @@ usec_t rrdset_done(RRDSET *st) {
         if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
 #endif
 
-        if(unlikely(st->debug)) {
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
             debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
             debug(D_RRD_STATS, "%s: next_store_ut  = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0);
         }
@@ -802,7 +873,7 @@ usec_t rrdset_done(RRDSET *st) {
                                    / (calculated_number)(now_collect_ut - last_collect_ut)
                             );
 
-                    if(unlikely(st->debug))
+                    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                         debug(D_RRD_STATS, "%s/%s: CALC2 INC "
                                 CALCULATED_NUMBER_FORMAT " = "
                                 CALCULATED_NUMBER_FORMAT
@@ -821,7 +892,7 @@ usec_t rrdset_done(RRDSET *st) {
                     new_value /= (calculated_number)st->update_every;
 
                     if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) {
-                        if(unlikely(st->debug))
+                        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                             debug(D_RRD_STATS, "%s/%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING",
                                     st->id, rd->name
                                   , (calculated_number)(next_store_ut - last_stored_ut)
@@ -853,7 +924,7 @@ usec_t rrdset_done(RRDSET *st) {
                                     +  rd->last_calculated_value
                                 );
 
-                        if(unlikely(st->debug))
+                        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                             debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
                                     CALCULATED_NUMBER_FORMAT " = ((("
                                             "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
@@ -878,7 +949,7 @@ usec_t rrdset_done(RRDSET *st) {
                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
                 rd->last_stored_value = new_value;
 
-                if(unlikely(st->debug))
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
                             CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
                           , st->id, rd->name
@@ -887,9 +958,10 @@ usec_t rrdset_done(RRDSET *st) {
                     );
             }
             else {
-                if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
-                                              , st->id, rd->name
-                                              , st->current_entry
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+                    debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
+                          , st->id, rd->name
+                          , st->current_entry
                     );
                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
                 rd->last_stored_value = NAN;
@@ -897,7 +969,7 @@ usec_t rrdset_done(RRDSET *st) {
 
             stored_entries++;
 
-            if(unlikely(st->debug)) {
+            if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
                 calculated_number accuracy = accuracy_loss(t1, t2);
@@ -939,24 +1011,29 @@ usec_t rrdset_done(RRDSET *st) {
         if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
             continue;
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
+
         rd->last_collected_value = rd->collected_value;
 
         switch(rd->algorithm) {
             case RRD_ALGORITHM_INCREMENTAL:
                 if(unlikely(!first_entry)) {
-                    if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
+                    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+                        debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
                     rd->last_calculated_value += rd->calculated_value;
                 }
                 else {
-                    if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name);
+                    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+                        debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name);
                 }
                 break;
 
             case RRD_ALGORITHM_ABSOLUTE:
             case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL:
             case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
-                if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
+                if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+                    debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
                 rd->last_calculated_value = rd->calculated_value;
                 break;
         }
@@ -965,7 +1042,8 @@ usec_t rrdset_done(RRDSET *st) {
         rd->collected_value = 0;
         rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED);
 
-        if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
+        if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
+            debug(D_RRD_STATS, "%s/%s: END "
                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
                     " collected_value = " COLLECTED_NUMBER_FORMAT
                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
@@ -993,8 +1071,8 @@ usec_t rrdset_done(RRDSET *st) {
             RRDDIM *last;
             // there is dimension to free
             // upgrade our read lock to a write lock
-            pthread_rwlock_unlock(&st->rwlock);
-            pthread_rwlock_wrlock(&st->rwlock);
+            pthread_rwlock_unlock(&st->rrdset_rwlock);
+            pthread_rwlock_wrlock(&st->rrdset_rwlock);
 
             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
@@ -1030,7 +1108,7 @@ usec_t rrdset_done(RRDSET *st) {
     }
 */
 
-    pthread_rwlock_unlock(&st->rwlock);
+    rrdset_unlock(st);
 
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);