]> arthur.barton.de Git - netdata.git/blobdiff - src/rrdset.c
self-cleaning obsolete cgroups and network interfaces from memory; fixes #1163; fixes...
[netdata.git] / src / rrdset.c
index 3d6b2f8028718c588fa93e693837a9efa4cceee7..63aeacebd04fb42d0e3ac06fc110f398616295be 100644 (file)
@@ -3,6 +3,23 @@
 
 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
 
+void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+    debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id);
+
+    int ret = pthread_rwlock_trywrlock(&st->rrdset_rwlock);
+    if(ret == 0)
+        fatal("RRDSET '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) {
+    debug(D_RRD_CALLS, "Checking write lock on chart '%s'", st->id);
+
+    int ret = pthread_rwlock_tryrdlock(&st->rrdset_rwlock);
+    if(ret == 0)
+        fatal("RRDSET '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file);
+}
+
+
 // ----------------------------------------------------------------------------
 // RRDSET index
 
@@ -100,7 +117,6 @@ inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) {
     return(st);
 }
 
-
 // ----------------------------------------------------------------------------
 // RRDSET - rename charts
 
@@ -133,20 +149,20 @@ void rrdset_set_name(RRDSET *st, const char *name) {
 
     if(st->name) {
         rrdset_index_del_name(st->rrdhost, st);
-        st->name = config_set_default(st->id, "name", b);
+        st->name = config_set_default(st->config_section, "name", b);
         st->hash_name = simple_hash(st->name);
         rrdsetvar_rename_all(st);
     }
     else {
-        st->name = config_get(st->id, "name", b);
+        st->name = config_get(st->config_section, "name", b);
         st->hash_name = simple_hash(st->name);
     }
 
-    pthread_rwlock_wrlock(&st->rwlock);
+    rrdset_wrlock(st);
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ;rd = rd->next)
+    rrddim_foreach_write(rd, st)
         rrddimvar_rename_all(rd);
-    pthread_rwlock_unlock(&st->rwlock);
+    rrdset_unlock(st);
 
     if(unlikely(rrdset_index_add_name(st->rrdhost, st) != st))
         error("RRDSET: INTERNAL ERROR: attempted to index duplicate chart name '%s'", st->name);
@@ -168,10 +184,10 @@ void rrdset_reset(RRDSET *st) {
     st->counter_done = 0;
 
     RRDDIM *rd;
-    for(rd = st->dimensions; rd ; rd = rd->next) {
+    rrddim_foreach_read(rd, st) {
         rd->last_collected_time.tv_sec = 0;
         rd->last_collected_time.tv_usec = 0;
-        rd->counter = 0;
+        rd->collections_counter = 0;
         memset(rd->values, 0, rd->entries * sizeof(storage_number));
     }
 }
@@ -179,15 +195,16 @@ void rrdset_reset(RRDSET *st) {
 // ----------------------------------------------------------------------------
 // RRDSET - helpers for rrdset_create()
 
-static inline long align_entries_to_pagesize(long entries) {
-    if(entries < 5) entries = 5;
-    if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX;
+inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) {
+    if(unlikely(entries < 5)) entries = 5;
+    if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX;
 
-#ifdef NETDATA_LOG_ALLOCATIONS
-    long page = (size_t)sysconf(_SC_PAGESIZE);
+    if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM))
+        return entries;
 
+    long page = (size_t)sysconf(_SC_PAGESIZE);
     long size = sizeof(RRDDIM) + entries * sizeof(storage_number);
-    if(size % page) {
+    if(unlikely(size % page)) {
         size -= (size % page);
         size += page;
 
@@ -196,23 +213,114 @@ static inline long align_entries_to_pagesize(long entries) {
     }
 
     return entries;
-#else
-    return entries;
-#endif
 }
 
-static inline void timeval_align(struct timeval *tv, int update_every) {
+static inline void last_collected_time_align(struct timeval *tv, int update_every) {
     tv->tv_sec -= tv->tv_sec % update_every;
     tv->tv_usec = 500000;
 }
 
+static inline void last_updated_time_align(struct timeval *tv, int update_every) {
+    tv->tv_sec -= tv->tv_sec % update_every;
+    tv->tv_usec = 0;
+}
+
 // ----------------------------------------------------------------------------
-// RRDSET - create a chart
+// RRDSET - free a chart
+
+void rrdset_free(RRDSET *st) {
+    if(unlikely(!st)) return;
+
+    rrdhost_check_wrlock(st->rrdhost);  // make sure we have a write lock on the host
+    rrdset_wrlock(st);                  // lock this RRDSET
+
+    // info("Removing chart '%s' ('%s')", st->id, st->name);
+
+    // ------------------------------------------------------------------------
+    // remove it from the indexes
+
+    if(unlikely(rrdset_index_del(st->rrdhost, st) != st))
+        error("RRDSET: INTERNAL ERROR: attempt to remove from index chart '%s', removed a different chart.", st->id);
+
+    rrdset_index_del_name(st->rrdhost, st);
+
+    // ------------------------------------------------------------------------
+    // free its children structures
+
+    while(st->variables)  rrdsetvar_free(st->variables);
+    while(st->alarms)     rrdsetcalc_unlink(st->alarms);
+    while(st->dimensions) rrddim_free(st, st->dimensions);
+
+    rrdfamily_free(st->rrdhost, st->rrdfamily);
+
+    // ------------------------------------------------------------------------
+    // unlink it from the host
+
+    if(st == st->rrdhost->rrdset_root) {
+        st->rrdhost->rrdset_root = st->next;
+    }
+    else {
+        // find the previous one
+        RRDSET *s;
+        for(s = st->rrdhost->rrdset_root; s && s->next != st ; s = s->next) ;
+
+        // bypass it
+        if(s) s->next = st->next;
+        else error("Request to free RRDSET '%s': cannot find it under host '%s'", st->id, st->rrdhost->hostname);
+    }
 
-RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const char *name, const char *family
-                      , const char *context, const char *title, const char *units, long priority
-                      , int update_every, int chart_type) {
+    rrdset_unlock(st);
 
+    // ------------------------------------------------------------------------
+    // free it
+
+    // free directly allocated members
+    freez(st->config_section);
+
+    if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
+        debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
+        munmap(st, st->memsize);
+    }
+    else
+        freez(st);
+}
+
+void rrdset_save(RRDSET *st) {
+    RRDDIM *rd;
+
+    rrdset_check_rdlock(st);
+
+    // info("Saving chart '%s' ('%s')", st->id, st->name);
+
+    if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
+        debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
+        savememory(st->cache_filename, st, st->memsize);
+    }
+
+    rrddim_foreach_read(rd, st) {
+        if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) {
+            debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
+            savememory(rd->cache_filename, rd, rd->memsize);
+        }
+    }
+}
+
+// ----------------------------------------------------------------------------
+// RRDSET - create a chart
+
+RRDSET *rrdset_create(
+          RRDHOST *host
+        , const char *type
+        , const char *id
+        , const char *name
+        , const char *family
+        , const char *context
+        , const char *title
+        , const char *units
+        , long priority
+        , int update_every
+        , RRDSET_TYPE chart_type
+) {
     if(!type || !type[0]) {
         fatal("Cannot create rrd stats without a type.");
         return NULL;
@@ -223,88 +331,125 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
         return NULL;
     }
 
-    char fullid[RRD_ID_LENGTH_MAX + 1];
-    char fullfilename[FILENAME_MAX + 1];
+    // ------------------------------------------------------------------------
+    // check if it already exists
 
+    char fullid[RRD_ID_LENGTH_MAX + 1];
     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
 
     RRDSET *st = rrdset_find(host, fullid);
     if(st) {
+        rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
         debug(D_RRD_CALLS, "RRDSET '%s', already exists.", fullid);
         return st;
     }
 
-    long rentries = config_get_number(fullid, "history", host->rrd_history_entries);
-    long entries = align_entries_to_pagesize(rentries);
-    if(entries != rentries) entries = config_set_number(fullid, "history", entries);
+    char fullfilename[FILENAME_MAX + 1];
 
-    int enabled = config_get_boolean(fullid, "enabled", 1);
+    // ------------------------------------------------------------------------
+    // compose the config_section for this chart
+
+    char config_section[RRD_ID_LENGTH_MAX + 1];
+    if(host == localhost)
+        strcpy(config_section, fullid);
+    else
+        snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid);
+
+    // ------------------------------------------------------------------------
+    // get the options from the config, we need to create it
+
+    long rentries = config_get_number(config_section, "history", host->rrd_history_entries);
+    long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries);
+    if(entries != rentries) entries = config_set_number(config_section, "history", entries);
+
+    if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries)
+        entries = config_set_number(config_section, "history", 10);
+
+    int enabled = config_get_boolean(config_section, "enabled", 1);
     if(!enabled) entries = 5;
 
     unsigned long size = sizeof(RRDSET);
-    char *cache_dir = rrdset_cache_dir(host, fullid);
+    char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
+
+    time_t now = now_realtime_sec();
+
+    // ------------------------------------------------------------------------
+    // load it or allocate it
 
     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
 
     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
-    if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
-    if(st) {
-        if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
-            errno = 0;
-            info("Initializing file %s.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if(strcmp(st->id, fullid) != 0) {
-            errno = 0;
-            error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
-            // munmap(st, size);
-            // st = NULL;
-            memset(st, 0, size);
-        }
-        else if(st->memsize != size || st->entries != entries) {
-            errno = 0;
-            error("File %s does not have the desired size. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if(st->update_every != update_every) {
-            errno = 0;
-            error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
-        else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) {
-            errno = 0;
-            error("File %s is too old. Clearing it.", fullfilename);
-            memset(st, 0, size);
-        }
+    if(host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
+        st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0);
+        if(st) {
+            memset(&st->avl, 0, sizeof(avl));
+            memset(&st->avlname, 0, sizeof(avl));
+            memset(&st->variables_root_index, 0, sizeof(avl_tree_lock));
+            memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
+            memset(&st->rrdset_rwlock, 0, sizeof(pthread_rwlock_t));
+
+            st->name = NULL;
+            st->type = NULL;
+            st->family = NULL;
+            st->context = NULL;
+            st->title = NULL;
+            st->units = NULL;
+            st->dimensions = NULL;
+            st->next = NULL;
+            st->variables = NULL;
+            st->alarms = NULL;
+            st->flags = 0x00000000;
+
+            if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
+                errno = 0;
+                info("Initializing file %s.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if(strcmp(st->id, fullid) != 0) {
+                errno = 0;
+                error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
+                // munmap(st, size);
+                // st = NULL;
+                memset(st, 0, size);
+            }
+            else if(st->memsize != size || st->entries != entries) {
+                errno = 0;
+                error("File %s does not have the desired size. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if(st->update_every != update_every) {
+                errno = 0;
+                error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if((now - st->last_updated.tv_sec) > update_every * entries) {
+                errno = 0;
+                error("File %s is too old. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
+            else if(st->last_updated.tv_sec > now + update_every) {
+                errno = 0;
+                error("File %s refers to the future. Clearing it.", fullfilename);
+                memset(st, 0, size);
+            }
 
-        // make sure the database is aligned
-        if(st->last_updated.tv_sec)
-            timeval_align(&st->last_updated, update_every);
-    }
+            // make sure the database is aligned
+            if(st->last_updated.tv_sec)
+                last_updated_time_align(&st->last_updated, update_every);
 
-    if(st) {
-        st->name = NULL;
-        st->type = NULL;
-        st->family = NULL;
-        st->context = NULL;
-        st->title = NULL;
-        st->units = NULL;
-        st->dimensions = NULL;
-        st->next = NULL;
-        st->rrd_memory_mode = host->rrd_memory_mode;
-        st->variables = NULL;
-        st->alarms = NULL;
-        memset(&st->rwlock, 0, sizeof(pthread_rwlock_t));
-        memset(&st->avl, 0, sizeof(avl));
-        memset(&st->avlname, 0, sizeof(avl));
-        memset(&st->variables_root_index, 0, sizeof(avl_tree_lock));
-        memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
+
+            // make sure we have the right memory mode
+            // even if we cleared the memory
+            st->rrd_memory_mode = host->rrd_memory_mode;
+        }
     }
-    else {
+
+    if(unlikely(!st)) {
         st = callocz(1, size);
-        st->rrd_memory_mode = RRD_MEMORY_MODE_RAM;
+        st->rrd_memory_mode = (host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_RAM;
     }
 
+    st->config_section = strdup(config_section);
     st->rrdhost = host;
     st->memsize = size;
     st->entries = entries;
@@ -320,15 +465,15 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 
     st->cache_dir = cache_dir;
 
-    st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
-    st->type       = config_get(st->id, "type", type);
-    st->family     = config_get(st->id, "family", family?family:st->type);
-    st->units      = config_get(st->id, "units", units?units:"");
+    st->chart_type = rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type)));
+    st->type       = config_get(st->config_section, "type", type);
+    st->family     = config_get(st->config_section, "family", family?family:st->type);
+    st->units      = config_get(st->config_section, "units", units?units:"");
 
-    st->context    = config_get(st->id, "context", context?context:st->id);
+    st->context    = config_get(st->config_section, "context", context?context:st->id);
     st->hash_context = simple_hash(st->context);
 
-    st->priority = config_get_number(st->id, "priority", priority);
+    st->priority = config_get_number(st->config_section, "priority", priority);
     if(enabled)
         rrdset_flag_set(st, RRDSET_FLAG_ENABLED);
     else
@@ -336,6 +481,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 
     rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
     rrdset_flag_clear(st, RRDSET_FLAG_DEBUG);
+    rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
 
     // if(!strcmp(st->id, "disk_util.dm-0")) {
     //     st->debug = 1;
@@ -351,13 +497,13 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
     st->counter_done = 0;
 
     st->gap_when_lost_iterations_above = (int) (
-            config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
+            config_get_number(st->config_section, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
 
     avl_init_lock(&st->dimensions_index, rrddim_compare);
     avl_init_lock(&st->variables_root_index, rrdvar_compare);
 
-    pthread_rwlock_init(&st->rwlock, NULL);
-    rrdhost_rwlock(host);
+    pthread_rwlock_init(&st->rrdset_rwlock, NULL);
+    rrdhost_wrlock(host);
 
     if(name && *name) rrdset_set_name(st, name);
     else rrdset_set_name(st, id);
@@ -367,7 +513,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
         char varvalue2[CONFIG_MAX_VALUE + 1];
         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
         json_escape_string(varvalue2, varvalue, sizeof(varvalue2));
-        st->title = config_get(st->id, "title", varvalue2);
+        st->title = config_get(st->config_section, "title", varvalue2);
     }
 
     st->rrdfamily = rrdfamily_create(host, st->family);
@@ -389,6 +535,8 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
     rrdsetcalc_link_matching(st);
     rrdcalctemplate_link_matching(st);
 
+    rrdhost_cleanup(host);
+
     rrdhost_unlock(host);
 
     return(st);
@@ -398,16 +546,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha
 // ----------------------------------------------------------------------------
 // RRDSET - data collection iteration control
 
-void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
-    if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
+inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
+
+    if(unlikely(!st->last_collected_time.tv_sec)) {
         // the first entry
         microseconds = st->update_every * USEC_PER_SEC;
     }
+    else if(unlikely(!microseconds)) {
+        // no dt given by the plugin
+        struct timeval now;
+        now_realtime_timeval(&now);
+        microseconds = dt_usec(&now, &st->last_collected_time);
+    }
+
     st->usec_since_last_update = microseconds;
 }
 
-void rrdset_next_usec(RRDSET *st, usec_t microseconds)
-{
+inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
     struct timeval now;
     now_realtime_timeval(&now);
 
@@ -421,30 +576,46 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
     }
     else {
         // microseconds has the time since the last collection
-#ifdef NETDATA_INTERNAL_CHECKS
-        usec_t now_usec = timeval_usec(&now);
-        usec_t last_usec = timeval_usec(&st->last_collected_time);
-#endif
-        usec_t since_last_usec = dt_usec(&now, &st->last_collected_time);
+//#ifdef NETDATA_INTERNAL_CHECKS
+//        usec_t now_usec = timeval_usec(&now);
+//        usec_t last_usec = timeval_usec(&st->last_collected_time);
+//#endif
+        susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time);
+
+        if(unlikely(since_last_usec < 0)) {
+            // oops! the database is in the future
+            error("Database for chart '%s' on host '%s' is %lld microseconds in the future. Adjusting it to current time.", st->id, st->rrdhost->hostname, -since_last_usec);
+
+            st->last_collected_time.tv_sec  = now.tv_sec - st->update_every;
+            st->last_collected_time.tv_usec = now.tv_usec;
+            last_collected_time_align(&st->last_collected_time, st->update_every);
+
+            st->last_updated.tv_sec  = now.tv_sec - st->update_every;
+            st->last_updated.tv_usec = now.tv_usec;
+            last_updated_time_align(&st->last_updated, st->update_every);
+
+            microseconds    = st->update_every * USEC_PER_SEC;
+            since_last_usec = st->update_every * USEC_PER_SEC;
+        }
 
         // verify the microseconds given is good
-        if(unlikely(microseconds > since_last_usec)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+        if(unlikely(microseconds > (usec_t)since_last_usec)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
 
-#ifdef NETDATA_INTERNAL_CHECKS
-            if(unlikely(last_usec + microseconds > now_usec + 1000))
-                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
-#endif
+//#ifdef NETDATA_INTERNAL_CHECKS
+//            if(unlikely(last_usec + microseconds > now_usec + 1000))
+//                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
+//#endif
 
-            microseconds = since_last_usec;
+            microseconds = (usec_t)since_last_usec;
         }
-        else if(unlikely(microseconds < since_last_usec * 0.8)) {
-            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+        else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) {
+            debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
 
-#ifdef NETDATA_INTERNAL_CHECKS
-            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
-#endif
-            microseconds = since_last_usec;
+//#ifdef NETDATA_INTERNAL_CHECKS
+//            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
+//#endif
+            microseconds = (usec_t)since_last_usec;
         }
     }
     debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
@@ -459,8 +630,54 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds)
 // ----------------------------------------------------------------------------
 // RRDSET - process the collected values for all dimensions of a chart
 
-usec_t rrdset_done(RRDSET *st) {
-    if(unlikely(netdata_exit)) return 0;
+static inline void rrdset_init_last_collected_time(RRDSET *st) {
+    now_realtime_timeval(&st->last_collected_time);
+    last_collected_time_align(&st->last_collected_time, st->update_every);
+}
+
+static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
+    usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
+    usec_t ut = last_collect_ut + st->usec_since_last_update;
+    st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
+    st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+    return last_collect_ut;
+}
+
+static inline void rrdset_init_last_updated_time(RRDSET *st) {
+    // copy the last collected time to last updated time
+    st->last_updated.tv_sec  = st->last_collected_time.tv_sec;
+    st->last_updated.tv_usec = st->last_collected_time.tv_usec;
+    last_updated_time_align(&st->last_updated, st->update_every);
+}
+
+static inline void rrdset_done_push_exclusive(RRDSET *st) {
+    if(unlikely(!st->last_collected_time.tv_sec)) {
+        // it is the first entry
+        // set the last_collected_time to now
+        rrdset_init_last_collected_time(st);
+    }
+    else {
+        // it is not the first entry
+        // calculate the proper last_collected_time, using usec_since_last_update
+        rrdset_update_last_collected_time(st);
+    }
+
+    st->counter_done++;
+
+    rrdset_rdlock(st);
+    rrdset_done_push(st);
+    rrdset_unlock(st);
+}
+
+void rrdset_done(RRDSET *st) {
+    if(unlikely(netdata_exit)) return;
+
+    if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) {
+        if(unlikely(st->rrdhost->rrdpush_enabled))
+            rrdset_done_push_exclusive(st);
+
+        return;
+    }
 
     debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
 
@@ -487,7 +704,7 @@ usec_t rrdset_done(RRDSET *st) {
         error("Cannot set pthread cancel state to DISABLE.");
 
     // a read lock is OK here
-    pthread_rwlock_rdlock(&st->rwlock);
+    rrdset_rdlock(st);
 
 /*
     // enable the chart, if it was disabled
@@ -495,6 +712,11 @@ usec_t rrdset_done(RRDSET *st) {
         st->enabled = 1;
 */
 
+    if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) {
+        error("Chart '%s' has the OBSOLETE flag set, but it is collected.", st->id);
+        rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
+    }
+
     // check if the chart has a long time to be updated
     if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) {
         info("%s: took too long to be updated (%0.3Lf secs). Resetting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
@@ -510,8 +732,7 @@ usec_t rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_collected_time.tv_sec)) {
         // it is the first entry
         // set the last_collected_time to now
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
+        rrdset_init_last_collected_time(st);
 
         last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut;
 
@@ -525,10 +746,7 @@ usec_t rrdset_done(RRDSET *st) {
     else {
         // it is not the first entry
         // calculate the proper last_collected_time, using usec_since_last_update
-        last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
-        usec_t ut = last_collect_ut + st->usec_since_last_update;
-        st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        last_collect_ut = rrdset_update_last_collected_time(st);
     }
 
     // if this set has not been updated in the past
@@ -536,32 +754,24 @@ usec_t rrdset_done(RRDSET *st) {
     if(unlikely(!st->last_updated.tv_sec)) {
         // it has never been updated before
         // set a fake last_updated, in the past using usec_since_last_update
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+        rrdset_init_last_updated_time(st);
 
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
-            debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
+            debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update);
     }
 
     // check if we will re-write the entire data set
     if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
         rrdset_reset(st);
+        rrdset_init_last_updated_time(st);
 
         st->usec_since_last_update = update_every_ut;
 
-        now_realtime_timeval(&st->last_collected_time);
-        timeval_align(&st->last_collected_time, st->update_every);
-
-        usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
-        st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
-        st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
-
         // the first entry should not be stored
         store_this_entry = 0;
         first_entry = 1;
@@ -589,21 +799,26 @@ usec_t rrdset_done(RRDSET *st) {
     }
     st->counter_done++;
 
+    if(unlikely(st->rrdhost->rrdpush_enabled))
+        rrdset_done_push(st);
+
     // calculate totals and count the dimensions
-    int dimensions;
+    int dimensions = 0;
     st->collected_total = 0;
-    for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
-        if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+    rrddim_foreach_read(rd, st) {
+        dimensions++;
+        if(likely(rd->updated))
             st->collected_total += rd->collected_value;
+    }
 
     uint32_t storage_flags = SN_EXISTS;
 
     // process all dimensions to calculate their values
     // based on the collected figures only
     // at this stage we do not interpolate anything
-    for( rd = st->dimensions ; rd ; rd = rd->next ) {
+    rrddim_foreach_read(rd, st) {
 
-        if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
+        if(unlikely(!rd->updated)) {
             rd->calculated_value = 0;
             continue;
         }
@@ -665,7 +880,7 @@ usec_t rrdset_done(RRDSET *st) {
                 break;
 
             case RRD_ALGORITHM_INCREMENTAL:
-                if(unlikely(rd->counter <= 1)) {
+                if(unlikely(rd->collections_counter <= 1)) {
                     rd->calculated_value = 0;
                     continue;
                 }
@@ -705,7 +920,7 @@ usec_t rrdset_done(RRDSET *st) {
                 break;
 
             case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
-                if(unlikely(rd->counter <= 1)) {
+                if(unlikely(rd->collections_counter <= 1)) {
                     rd->calculated_value = 0;
                     continue;
                 }
@@ -784,9 +999,9 @@ usec_t rrdset_done(RRDSET *st) {
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
             debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
 
-#ifdef NETDATA_INTERNAL_CHECKS
-        info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut);
-#endif
+//#ifdef NETDATA_INTERNAL_CHECKS
+//        info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut);
+//#endif
     }
 
     usec_t first_ut = last_stored_ut;
@@ -794,9 +1009,9 @@ usec_t rrdset_done(RRDSET *st) {
     if((now_collect_ut % (update_every_ut)) == 0) iterations++;
 
     for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) {
-#ifdef NETDATA_INTERNAL_CHECKS
-        if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
-#endif
+//#ifdef NETDATA_INTERNAL_CHECKS
+//        if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
+//#endif
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
             debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
@@ -806,7 +1021,7 @@ usec_t rrdset_done(RRDSET *st) {
         st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC);
         st->last_updated.tv_usec = 0;
 
-        for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
+        rrddim_foreach_read(rd, st) {
             calculated_number new_value;
 
             switch(rd->algorithm) {
@@ -889,7 +1104,7 @@ usec_t rrdset_done(RRDSET *st) {
                 continue;
             }
 
-            if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
+            if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
                 rd->last_stored_value = new_value;
 
@@ -951,8 +1166,8 @@ usec_t rrdset_done(RRDSET *st) {
 
     st->last_collected_total  = st->collected_total;
 
-    for( rd = st->dimensions; rd ; rd = rd->next ) {
-        if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
+    rrddim_foreach_read(rd, st) {
+        if(unlikely(!rd->updated))
             continue;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
@@ -984,7 +1199,7 @@ usec_t rrdset_done(RRDSET *st) {
 
         rd->calculated_value = 0;
         rd->collected_value = 0;
-        rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED);
+        rd->updated = 0;
 
         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
             debug(D_RRD_STATS, "%s/%s: END "
@@ -1015,8 +1230,8 @@ usec_t rrdset_done(RRDSET *st) {
             RRDDIM *last;
             // there is dimension to free
             // upgrade our read lock to a write lock
-            pthread_rwlock_unlock(&st->rwlock);
-            pthread_rwlock_wrlock(&st->rwlock);
+            pthread_rwlock_unlock(&st->rrdset_rwlock);
+            pthread_rwlock_wrlock(&st->rrdset_rwlock);
 
             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
@@ -1052,11 +1267,8 @@ usec_t rrdset_done(RRDSET *st) {
     }
 */
 
-    pthread_rwlock_unlock(&st->rwlock);
+    rrdset_unlock(st);
 
     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
-
-    return(st->usec_since_last_update);
 }
-