]> arthur.barton.de Git - netdata.git/blob - src/rrdset.c
draft implementation of netdata central push server - untested
[netdata.git] / src / rrdset.c
1 #define NETDATA_RRD_INTERNALS 1
2 #include "common.h"
3
4 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
5
6 // ----------------------------------------------------------------------------
7 // RRDSET index
8
9 int rrdset_compare(void* a, void* b) {
10     if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
11     else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
12     else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
13 }
14
15 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
16     RRDSET tmp;
17     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
18     tmp.hash = (hash)?hash:simple_hash(tmp.id);
19
20     return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
21 }
22
23 // ----------------------------------------------------------------------------
24 // RRDSET name index
25
26 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
27
28 int rrdset_compare_name(void* a, void* b) {
29     RRDSET *A = rrdset_from_avlname(a);
30     RRDSET *B = rrdset_from_avlname(b);
31
32     // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
33
34     if(A->hash_name < B->hash_name) return -1;
35     else if(A->hash_name > B->hash_name) return 1;
36     else return strcmp(A->name, B->name);
37 }
38
39 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
40     void *result;
41     // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
42     result = avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
43     if(result) return rrdset_from_avlname(result);
44     return NULL;
45 }
46
47 RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
48     void *result;
49     // fprintf(stderr, "DELETING: %s (name: %s)\n", st->id, st->name);
50     result = (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname));
51     if(result) return rrdset_from_avlname(result);
52     return NULL;
53 }
54
55
56 // ----------------------------------------------------------------------------
57 // RRDSET - find charts
58
59 static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
60     void *result = NULL;
61     RRDSET tmp;
62     tmp.name = name;
63     tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
64
65     // fprintf(stderr, "SEARCHING: %s\n", name);
66     result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
67     if(result) {
68         RRDSET *st = rrdset_from_avlname(result);
69         if(strcmp(st->magic, RRDSET_MAGIC))
70             error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
71
72         // fprintf(stderr, "FOUND: %s\n", name);
73         return rrdset_from_avlname(result);
74     }
75     // fprintf(stderr, "NOT FOUND: %s\n", name);
76     return NULL;
77 }
78
79 inline RRDSET *rrdset_find(RRDHOST *host, const char *id) {
80     debug(D_RRD_CALLS, "rrdset_find() for chart '%s' in host '%s'", id, host->hostname);
81     RRDSET *st = rrdset_index_find(host, id, 0);
82     return(st);
83 }
84
85 inline RRDSET *rrdset_find_bytype(RRDHOST *host, const char *type, const char *id) {
86     debug(D_RRD_CALLS, "rrdset_find_bytype() for chart '%s.%s' in host '%s'", type, id, host->hostname);
87
88     char buf[RRD_ID_LENGTH_MAX + 1];
89     strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
90     strcat(buf, ".");
91     int len = (int) strlen(buf);
92     strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
93
94     return(rrdset_find(host, buf));
95 }
96
97 inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) {
98     debug(D_RRD_CALLS, "rrdset_find_byname() for chart '%s' in host '%s'", name, host->hostname);
99     RRDSET *st = rrdset_index_find_name(host, name, 0);
100     return(st);
101 }
102
103 // ----------------------------------------------------------------------------
104 // RRDSET - rename charts
105
106 char *rrdset_strncpyz_name(char *to, const char *from, size_t length) {
107     char c, *p = to;
108
109     while (length-- && (c = *from++)) {
110         if(c != '.' && !isalnum(c))
111             c = '_';
112
113         *p++ = c;
114     }
115
116     *p = '\0';
117
118     return to;
119 }
120
121 void rrdset_set_name(RRDSET *st, const char *name) {
122     if(unlikely(st->name && !strcmp(st->name, name)))
123         return;
124
125     debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
126
127     char b[CONFIG_MAX_VALUE + 1];
128     char n[RRD_ID_LENGTH_MAX + 1];
129
130     snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
131     rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
132
133     if(st->name) {
134         rrdset_index_del_name(st->rrdhost, st);
135         st->name = config_set_default(st->config_section, "name", b);
136         st->hash_name = simple_hash(st->name);
137         rrdsetvar_rename_all(st);
138     }
139     else {
140         st->name = config_get(st->config_section, "name", b);
141         st->hash_name = simple_hash(st->name);
142     }
143
144     rrdset_wrlock(st);
145     RRDDIM *rd;
146     for(rd = st->dimensions; rd ;rd = rd->next)
147         rrddimvar_rename_all(rd);
148     rrdset_unlock(st);
149
150     if(unlikely(rrdset_index_add_name(st->rrdhost, st) != st))
151         error("RRDSET: INTERNAL ERROR: attempted to index duplicate chart name '%s'", st->name);
152 }
153
154
155 // ----------------------------------------------------------------------------
156 // RRDSET - reset a chart
157
158 void rrdset_reset(RRDSET *st) {
159     debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
160
161     st->last_collected_time.tv_sec = 0;
162     st->last_collected_time.tv_usec = 0;
163     st->last_updated.tv_sec = 0;
164     st->last_updated.tv_usec = 0;
165     st->current_entry = 0;
166     st->counter = 0;
167     st->counter_done = 0;
168
169     RRDDIM *rd;
170     for(rd = st->dimensions; rd ; rd = rd->next) {
171         rd->last_collected_time.tv_sec = 0;
172         rd->last_collected_time.tv_usec = 0;
173         rd->counter = 0;
174         memset(rd->values, 0, rd->entries * sizeof(storage_number));
175     }
176 }
177
178 // ----------------------------------------------------------------------------
179 // RRDSET - helpers for rrdset_create()
180
181 inline long align_entries_to_pagesize(long entries) {
182     if(central_netdata_to_push_data)
183         return entries;
184
185     if(entries < 5) entries = 5;
186     if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX;
187
188     long page = (size_t)sysconf(_SC_PAGESIZE);
189     long size = sizeof(RRDDIM) + entries * sizeof(storage_number);
190     if(size % page) {
191         size -= (size % page);
192         size += page;
193
194         long n = (size - sizeof(RRDDIM)) / sizeof(storage_number);
195         return n;
196     }
197
198     return entries;
199 }
200
201 static inline void timeval_align(struct timeval *tv, int update_every) {
202     tv->tv_sec -= tv->tv_sec % update_every;
203     tv->tv_usec = 500000;
204 }
205
206 // ----------------------------------------------------------------------------
207 // RRDSET - free a chart
208
209 void rrdset_free(RRDSET *st) {
210     if(unlikely(!st)) return;
211
212     rrdhost_check_wrlock(st->rrdhost);  // make sure we have a write lock on the host
213     rrdset_wrlock(st);                  // lock this RRDSET
214
215     // ------------------------------------------------------------------------
216     // free its children structures
217
218     while(st->variables)  rrdsetvar_free(st->variables);
219     while(st->alarms)     rrdsetcalc_unlink(st->alarms);
220     while(st->dimensions) rrddim_free(st, st->dimensions);
221
222     rrdfamily_free(st->rrdhost, st->rrdfamily);
223
224     // ------------------------------------------------------------------------
225     // remove it from the indexes
226
227     if(unlikely(rrdset_index_del(st->rrdhost, st) != st))
228         error("RRDSET: INTERNAL ERROR: attempt to remove from index chart '%s', removed a different chart.", st->id);
229
230     rrdset_index_del_name(st->rrdhost, st);
231
232     // ------------------------------------------------------------------------
233     // unlink it from the host
234
235     if(st == st->rrdhost->rrdset_root) {
236         st->rrdhost->rrdset_root = st->next;
237     }
238     else {
239         // find the previous one
240         RRDSET *s;
241         for(s = st->rrdhost->rrdset_root; s && s->next != st ; s = s->next) ;
242
243         // bypass it
244         if(s) s->next = st->next;
245         else error("Request to free RRDSET '%s': cannot find it under host '%s'", st->id, st->rrdhost->hostname);
246     }
247
248     rrdset_unlock(st);
249
250     // ------------------------------------------------------------------------
251     // free it
252
253     // free directly allocated members
254     freez(st->config_section);
255
256     if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) {
257         debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
258         munmap(st, st->memsize);
259     }
260     else
261         freez(st);
262 }
263
264 // ----------------------------------------------------------------------------
265 // RRDSET - create a chart
266
267 RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const char *name, const char *family
268                       , const char *context, const char *title, const char *units, long priority
269                       , int update_every, RRDSET_TYPE chart_type) {
270
271     if(!type || !type[0]) {
272         fatal("Cannot create rrd stats without a type.");
273         return NULL;
274     }
275
276     if(!id || !id[0]) {
277         fatal("Cannot create rrd stats without an id.");
278         return NULL;
279     }
280
281     // ------------------------------------------------------------------------
282     // check if it already exists
283
284     char fullid[RRD_ID_LENGTH_MAX + 1];
285     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
286
287     RRDSET *st = rrdset_find(host, fullid);
288     if(st) {
289         debug(D_RRD_CALLS, "RRDSET '%s', already exists.", fullid);
290         return st;
291     }
292
293     char fullfilename[FILENAME_MAX + 1];
294
295     // ------------------------------------------------------------------------
296     // compose the config_section for this chart
297
298     char config_section[RRD_ID_LENGTH_MAX + 1];
299     if(host == localhost)
300         strcpy(config_section, fullid);
301     else
302         snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid);
303
304     // ------------------------------------------------------------------------
305     // get the options from the config, we need to create it
306
307     long rentries = config_get_number(config_section, "history", host->rrd_history_entries);
308     long entries = align_entries_to_pagesize(rentries);
309     if(entries != rentries) entries = config_set_number(config_section, "history", entries);
310
311     int enabled = config_get_boolean(config_section, "enabled", 1);
312     if(!enabled) entries = 5;
313
314     unsigned long size = sizeof(RRDSET);
315     char *cache_dir = rrdset_cache_dir(host, fullid, config_section);
316
317     // ------------------------------------------------------------------------
318     // load it or allocate it
319
320     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
321
322     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
323     if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) {
324         st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0);
325         if(st) {
326             memset(&st->avl, 0, sizeof(avl));
327             memset(&st->avlname, 0, sizeof(avl));
328             memset(&st->variables_root_index, 0, sizeof(avl_tree_lock));
329             memset(&st->dimensions_index, 0, sizeof(avl_tree_lock));
330             memset(&st->rrdset_rwlock, 0, sizeof(pthread_rwlock_t));
331
332             st->name = NULL;
333             st->type = NULL;
334             st->family = NULL;
335             st->context = NULL;
336             st->title = NULL;
337             st->units = NULL;
338             st->dimensions = NULL;
339             st->next = NULL;
340             st->variables = NULL;
341             st->alarms = NULL;
342             st->flags = 0x00000000;
343
344             if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
345                 errno = 0;
346                 info("Initializing file %s.", fullfilename);
347                 memset(st, 0, size);
348             }
349             else if(strcmp(st->id, fullid) != 0) {
350                 errno = 0;
351                 error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
352                 // munmap(st, size);
353                 // st = NULL;
354                 memset(st, 0, size);
355             }
356             else if(st->memsize != size || st->entries != entries) {
357                 errno = 0;
358                 error("File %s does not have the desired size. Clearing it.", fullfilename);
359                 memset(st, 0, size);
360             }
361             else if(st->update_every != update_every) {
362                 errno = 0;
363                 error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
364                 memset(st, 0, size);
365             }
366             else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) {
367                 errno = 0;
368                 error("File %s is too old. Clearing it.", fullfilename);
369                 memset(st, 0, size);
370             }
371
372             // make sure the database is aligned
373             if(st->last_updated.tv_sec)
374                 timeval_align(&st->last_updated, update_every);
375
376             // make sure we have the right memory mode
377             // even if we cleared the memory
378             st->rrd_memory_mode = host->rrd_memory_mode;
379         }
380     }
381
382     if(unlikely(!st)) {
383         st = callocz(1, size);
384         st->rrd_memory_mode = RRD_MEMORY_MODE_RAM;
385     }
386
387     st->config_section = strdup(config_section);
388     st->rrdhost = host;
389     st->memsize = size;
390     st->entries = entries;
391     st->update_every = update_every;
392
393     if(st->current_entry >= st->entries) st->current_entry = 0;
394
395     strcpy(st->cache_filename, fullfilename);
396     strcpy(st->magic, RRDSET_MAGIC);
397
398     strcpy(st->id, fullid);
399     st->hash = simple_hash(st->id);
400
401     st->cache_dir = cache_dir;
402
403     st->chart_type = rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type)));
404     st->type       = config_get(st->config_section, "type", type);
405     st->family     = config_get(st->config_section, "family", family?family:st->type);
406     st->units      = config_get(st->config_section, "units", units?units:"");
407
408     st->context    = config_get(st->config_section, "context", context?context:st->id);
409     st->hash_context = simple_hash(st->context);
410
411     st->priority = config_get_number(st->config_section, "priority", priority);
412     if(enabled)
413         rrdset_flag_set(st, RRDSET_FLAG_ENABLED);
414     else
415         rrdset_flag_clear(st, RRDSET_FLAG_ENABLED);
416
417     rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
418     rrdset_flag_clear(st, RRDSET_FLAG_DEBUG);
419
420     // if(!strcmp(st->id, "disk_util.dm-0")) {
421     //     st->debug = 1;
422     //     error("enabled debugging for '%s'", st->id);
423     // }
424     // else error("not enabled debugging for '%s'", st->id);
425
426     st->green = NAN;
427     st->red = NAN;
428
429     st->last_collected_time.tv_sec = 0;
430     st->last_collected_time.tv_usec = 0;
431     st->counter_done = 0;
432
433     st->gap_when_lost_iterations_above = (int) (
434             config_get_number(st->config_section, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
435
436     avl_init_lock(&st->dimensions_index, rrddim_compare);
437     avl_init_lock(&st->variables_root_index, rrdvar_compare);
438
439     pthread_rwlock_init(&st->rrdset_rwlock, NULL);
440     rrdhost_wrlock(host);
441
442     if(name && *name) rrdset_set_name(st, name);
443     else rrdset_set_name(st, id);
444
445     {
446         char varvalue[CONFIG_MAX_VALUE + 1];
447         char varvalue2[CONFIG_MAX_VALUE + 1];
448         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
449         json_escape_string(varvalue2, varvalue, sizeof(varvalue2));
450         st->title = config_get(st->config_section, "title", varvalue2);
451     }
452
453     st->rrdfamily = rrdfamily_create(host, st->family);
454
455     st->next = host->rrdset_root;
456     host->rrdset_root = st;
457
458     if(host->health_enabled) {
459         rrdsetvar_create(st, "last_collected_t", RRDVAR_TYPE_TIME_T, &st->last_collected_time.tv_sec, 0);
460         rrdsetvar_create(st, "collected_total_raw", RRDVAR_TYPE_TOTAL, &st->last_collected_total, 0);
461         rrdsetvar_create(st, "green", RRDVAR_TYPE_CALCULATED, &st->green, 0);
462         rrdsetvar_create(st, "red", RRDVAR_TYPE_CALCULATED, &st->red, 0);
463         rrdsetvar_create(st, "update_every", RRDVAR_TYPE_INT, &st->update_every, 0);
464     }
465
466     if(unlikely(rrdset_index_add(host, st) != st))
467         error("RRDSET: INTERNAL ERROR: attempt to index duplicate chart '%s'", st->id);
468
469     rrdsetcalc_link_matching(st);
470     rrdcalctemplate_link_matching(st);
471
472     rrdhost_unlock(host);
473
474     return(st);
475 }
476
477
478 // ----------------------------------------------------------------------------
479 // RRDSET - data collection iteration control
480
481 void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
482     if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
483         // the first entry
484         microseconds = st->update_every * USEC_PER_SEC;
485     }
486     st->usec_since_last_update = microseconds;
487 }
488
489 void rrdset_next_usec(RRDSET *st, usec_t microseconds)
490 {
491     struct timeval now;
492     now_realtime_timeval(&now);
493
494     if(unlikely(!st->last_collected_time.tv_sec)) {
495         // the first entry
496         microseconds = st->update_every * USEC_PER_SEC;
497     }
498     else if(unlikely(!microseconds)) {
499         // no dt given by the plugin
500         microseconds = dt_usec(&now, &st->last_collected_time);
501     }
502     else {
503         // microseconds has the time since the last collection
504 //#ifdef NETDATA_INTERNAL_CHECKS
505 //        usec_t now_usec = timeval_usec(&now);
506 //        usec_t last_usec = timeval_usec(&st->last_collected_time);
507 //#endif
508         usec_t since_last_usec = dt_usec(&now, &st->last_collected_time);
509
510         // verify the microseconds given is good
511         if(unlikely(microseconds > since_last_usec)) {
512             debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
513
514 //#ifdef NETDATA_INTERNAL_CHECKS
515 //            if(unlikely(last_usec + microseconds > now_usec + 1000))
516 //                error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
517 //#endif
518
519             microseconds = since_last_usec;
520         }
521         else if(unlikely(microseconds < since_last_usec * 0.8)) {
522             debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
523
524 //#ifdef NETDATA_INTERNAL_CHECKS
525 //            error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
526 //#endif
527             microseconds = since_last_usec;
528         }
529     }
530     debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
531
532     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
533         debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
534
535     st->usec_since_last_update = microseconds;
536 }
537
538
539 // ----------------------------------------------------------------------------
540 // RRDSET - process the collected values for all dimensions of a chart
541
542 void rrdset_done(RRDSET *st) {
543     if(unlikely(netdata_exit)) return;
544     if(unlikely(central_netdata_to_push_data)) {
545         rrdset_done_push(st);
546         return;
547     }
548
549     debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
550
551     RRDDIM *rd;
552
553     int
554             pthreadoldcancelstate;  // store the old cancelable pthread state, to restore it at the end
555
556     char
557             store_this_entry = 1,   // boolean: 1 = store this entry, 0 = don't store this entry
558             first_entry = 0;        // boolean: 1 = this is the first entry seen for this chart, 0 = all other entries
559
560     unsigned int
561             stored_entries = 0;     // the number of entries we have stored in the db, during this call to rrdset_done()
562
563     usec_t
564             last_collect_ut,        // the timestamp in microseconds, of the last collected value
565             now_collect_ut,         // the timestamp in microseconds, of this collected value (this is NOW)
566             last_stored_ut,         // the timestamp in microseconds, of the last stored entry in the db
567             next_store_ut,          // the timestamp in microseconds, of the next entry to store in the db
568             update_every_ut = st->update_every * USEC_PER_SEC; // st->update_every in microseconds
569
570     if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &pthreadoldcancelstate) != 0))
571         error("Cannot set pthread cancel state to DISABLE.");
572
573     // a read lock is OK here
574     rrdset_rdlock(st);
575
576 /*
577     // enable the chart, if it was disabled
578     if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
579         st->enabled = 1;
580 */
581
582     // check if the chart has a long time to be updated
583     if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) {
584         info("%s: took too long to be updated (%0.3Lf secs). Resetting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
585         rrdset_reset(st);
586         st->usec_since_last_update = update_every_ut;
587         first_entry = 1;
588     }
589
590     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
591         debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
592
593     // set last_collected_time
594     if(unlikely(!st->last_collected_time.tv_sec)) {
595         // it is the first entry
596         // set the last_collected_time to now
597         now_realtime_timeval(&st->last_collected_time);
598         timeval_align(&st->last_collected_time, st->update_every);
599
600         last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut;
601
602         // the first entry should not be stored
603         store_this_entry = 0;
604         first_entry = 1;
605
606         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
607             debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
608     }
609     else {
610         // it is not the first entry
611         // calculate the proper last_collected_time, using usec_since_last_update
612         last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
613         usec_t ut = last_collect_ut + st->usec_since_last_update;
614         st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
615         st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
616     }
617
618     // if this set has not been updated in the past
619     // we fake the last_update time to be = now - usec_since_last_update
620     if(unlikely(!st->last_updated.tv_sec)) {
621         // it has never been updated before
622         // set a fake last_updated, in the past using usec_since_last_update
623         usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
624         st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
625         st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
626
627         // the first entry should not be stored
628         store_this_entry = 0;
629         first_entry = 1;
630
631         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
632             debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
633     }
634
635     // check if we will re-write the entire data set
636     if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
637         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
638         rrdset_reset(st);
639
640         st->usec_since_last_update = update_every_ut;
641
642         now_realtime_timeval(&st->last_collected_time);
643         timeval_align(&st->last_collected_time, st->update_every);
644
645         usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
646         st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
647         st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
648
649         // the first entry should not be stored
650         store_this_entry = 0;
651         first_entry = 1;
652     }
653
654     // these are the 3 variables that will help us in interpolation
655     // last_stored_ut = the last time we added a value to the storage
656     // now_collect_ut = the time the current value has been collected
657     // next_store_ut  = the time of the next interpolation point
658     last_stored_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec;
659     now_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
660     next_store_ut  = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC;
661
662     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
663         debug(D_RRD_STATS, "%s: last_collect_ut = %0.3Lf (last collection time)", st->name, (long double)last_collect_ut/1000000.0);
664         debug(D_RRD_STATS, "%s: now_collect_ut  = %0.3Lf (current collection time)", st->name, (long double)now_collect_ut/1000000.0);
665         debug(D_RRD_STATS, "%s: last_stored_ut  = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
666         debug(D_RRD_STATS, "%s: next_store_ut   = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0);
667     }
668
669     if(unlikely(!st->counter_done)) {
670         store_this_entry = 0;
671         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
672             debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
673     }
674     st->counter_done++;
675
676     // calculate totals and count the dimensions
677     int dimensions;
678     st->collected_total = 0;
679     for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
680         if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
681             st->collected_total += rd->collected_value;
682
683     uint32_t storage_flags = SN_EXISTS;
684
685     // process all dimensions to calculate their values
686     // based on the collected figures only
687     // at this stage we do not interpolate anything
688     for( rd = st->dimensions ; rd ; rd = rd->next ) {
689
690         if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) {
691             rd->calculated_value = 0;
692             continue;
693         }
694
695         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
696             debug(D_RRD_STATS, "%s/%s: START "
697                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
698                     " collected_value = " COLLECTED_NUMBER_FORMAT
699                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
700                     " calculated_value = " CALCULATED_NUMBER_FORMAT
701                                       , st->id, rd->name
702                                       , rd->last_collected_value
703                                       , rd->collected_value
704                                       , rd->last_calculated_value
705                                       , rd->calculated_value
706             );
707
708         switch(rd->algorithm) {
709             case RRD_ALGORITHM_ABSOLUTE:
710                 rd->calculated_value = (calculated_number)rd->collected_value
711                                        * (calculated_number)rd->multiplier
712                                        / (calculated_number)rd->divisor;
713
714                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
715                     debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
716                             CALCULATED_NUMBER_FORMAT " = "
717                             COLLECTED_NUMBER_FORMAT
718                             " * " CALCULATED_NUMBER_FORMAT
719                             " / " CALCULATED_NUMBER_FORMAT
720                           , st->id, rd->name
721                           , rd->calculated_value
722                           , rd->collected_value
723                           , (calculated_number)rd->multiplier
724                           , (calculated_number)rd->divisor
725                     );
726                 break;
727
728             case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL:
729                 if(unlikely(!st->collected_total))
730                     rd->calculated_value = 0;
731                 else
732                     // the percentage of the current value
733                     // over the total of all dimensions
734                     rd->calculated_value =
735                             (calculated_number)100
736                             * (calculated_number)rd->collected_value
737                             / (calculated_number)st->collected_total;
738
739                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
740                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
741                             CALCULATED_NUMBER_FORMAT " = 100"
742                                     " * " COLLECTED_NUMBER_FORMAT
743                             " / " COLLECTED_NUMBER_FORMAT
744                           , st->id, rd->name
745                           , rd->calculated_value
746                           , rd->collected_value
747                           , st->collected_total
748                     );
749                 break;
750
751             case RRD_ALGORITHM_INCREMENTAL:
752                 if(unlikely(rd->counter <= 1)) {
753                     rd->calculated_value = 0;
754                     continue;
755                 }
756
757                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
758                 // to reset the calculation (it will give zero as the calculation for this second)
759                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
760                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
761                           , st->name, rd->name
762                           , rd->last_collected_value
763                           , rd->collected_value);
764
765                     if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)))
766                         storage_flags = SN_EXISTS_RESET;
767
768                     rd->last_collected_value = rd->collected_value;
769                 }
770
771                 rd->calculated_value +=
772                         (calculated_number)(rd->collected_value - rd->last_collected_value)
773                         * (calculated_number)rd->multiplier
774                         / (calculated_number)rd->divisor;
775
776                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
777                     debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
778                             CALCULATED_NUMBER_FORMAT " = ("
779                             COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
780                             ")"
781                                     " * " CALCULATED_NUMBER_FORMAT
782                             " / " CALCULATED_NUMBER_FORMAT
783                           , st->id, rd->name
784                           , rd->calculated_value
785                           , rd->collected_value, rd->last_collected_value
786                           , (calculated_number)rd->multiplier
787                           , (calculated_number)rd->divisor
788                     );
789                 break;
790
791             case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
792                 if(unlikely(rd->counter <= 1)) {
793                     rd->calculated_value = 0;
794                     continue;
795                 }
796
797                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
798                 // to reset the calculation (it will give zero as the calculation for this second)
799                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
800                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
801                           , st->name, rd->name
802                           , rd->last_collected_value
803                           , rd->collected_value);
804
805                     if(!(rrddim_flag_check(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)))
806                         storage_flags = SN_EXISTS_RESET;
807
808                     rd->last_collected_value = rd->collected_value;
809                 }
810
811                 // the percentage of the current increment
812                 // over the increment of all dimensions together
813                 if(unlikely(st->collected_total == st->last_collected_total))
814                     rd->calculated_value = 0;
815                 else
816                     rd->calculated_value =
817                             (calculated_number)100
818                             * (calculated_number)(rd->collected_value - rd->last_collected_value)
819                             / (calculated_number)(st->collected_total - st->last_collected_total);
820
821                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
822                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
823                             CALCULATED_NUMBER_FORMAT " = 100"
824                                     " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
825                                     " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
826                           , st->id, rd->name
827                           , rd->calculated_value
828                           , rd->collected_value, rd->last_collected_value
829                           , st->collected_total, st->last_collected_total
830                     );
831                 break;
832
833             default:
834                 // make the default zero, to make sure
835                 // it gets noticed when we add new types
836                 rd->calculated_value = 0;
837
838                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
839                     debug(D_RRD_STATS, "%s/%s: CALC "
840                             CALCULATED_NUMBER_FORMAT " = 0"
841                           , st->id, rd->name
842                           , rd->calculated_value
843                     );
844                 break;
845         }
846
847         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
848             debug(D_RRD_STATS, "%s/%s: PHASE2 "
849                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
850                     " collected_value = " COLLECTED_NUMBER_FORMAT
851                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
852                     " calculated_value = " CALCULATED_NUMBER_FORMAT
853                                       , st->id, rd->name
854                                       , rd->last_collected_value
855                                       , rd->collected_value
856                                       , rd->last_calculated_value
857                                       , rd->calculated_value
858             );
859
860     }
861
862     // at this point we have all the calculated values ready
863     // it is now time to interpolate values on a second boundary
864
865     if(unlikely(now_collect_ut < next_store_ut)) {
866         // this is collected in the same interpolation point
867
868         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
869             debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
870
871 //#ifdef NETDATA_INTERNAL_CHECKS
872 //        info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut);
873 //#endif
874     }
875
876     usec_t first_ut = last_stored_ut;
877     long long iterations = (now_collect_ut - last_stored_ut) / (update_every_ut);
878     if((now_collect_ut % (update_every_ut)) == 0) iterations++;
879
880     for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) {
881 //#ifdef NETDATA_INTERNAL_CHECKS
882 //        if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
883 //#endif
884
885         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
886             debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
887             debug(D_RRD_STATS, "%s: next_store_ut  = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0);
888         }
889
890         st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC);
891         st->last_updated.tv_usec = 0;
892
893         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
894             calculated_number new_value;
895
896             switch(rd->algorithm) {
897                 case RRD_ALGORITHM_INCREMENTAL:
898                     new_value = (calculated_number)
899                             (      rd->calculated_value
900                                    * (calculated_number)(next_store_ut - last_collect_ut)
901                                    / (calculated_number)(now_collect_ut - last_collect_ut)
902                             );
903
904                     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
905                         debug(D_RRD_STATS, "%s/%s: CALC2 INC "
906                                 CALCULATED_NUMBER_FORMAT " = "
907                                 CALCULATED_NUMBER_FORMAT
908                                 " * %llu"
909                                         " / %llu"
910                               , st->id, rd->name
911                               , new_value
912                               , rd->calculated_value
913                               , (next_store_ut - last_stored_ut)
914                               , (now_collect_ut - last_stored_ut)
915                         );
916
917                     rd->calculated_value -= new_value;
918                     new_value += rd->last_calculated_value;
919                     rd->last_calculated_value = 0;
920                     new_value /= (calculated_number)st->update_every;
921
922                     if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) {
923                         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
924                             debug(D_RRD_STATS, "%s/%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING",
925                                     st->id, rd->name
926                                   , (calculated_number)(next_store_ut - last_stored_ut)
927                             );
928                         new_value = new_value * (calculated_number)(st->update_every * 1000000) / (calculated_number)(next_store_ut - last_stored_ut);
929                     }
930                     break;
931
932                 case RRD_ALGORITHM_ABSOLUTE:
933                 case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL:
934                 case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
935                 default:
936                     if(iterations == 1) {
937                         // this is the last iteration
938                         // do not interpolate
939                         // just show the calculated value
940
941                         new_value = rd->calculated_value;
942                     }
943                     else {
944                         // we have missed an update
945                         // interpolate in the middle values
946
947                         new_value = (calculated_number)
948                                 (   (     (rd->calculated_value - rd->last_calculated_value)
949                                           * (calculated_number)(next_store_ut - last_collect_ut)
950                                           / (calculated_number)(now_collect_ut - last_collect_ut)
951                                     )
952                                     +  rd->last_calculated_value
953                                 );
954
955                         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
956                             debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
957                                     CALCULATED_NUMBER_FORMAT " = ((("
958                                             "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
959                                             " * %llu"
960                                             " / %llu) + " CALCULATED_NUMBER_FORMAT
961                                   , st->id, rd->name
962                                   , new_value
963                                   , rd->calculated_value, rd->last_calculated_value
964                                   , (next_store_ut - first_ut)
965                                   , (now_collect_ut - first_ut), rd->last_calculated_value
966                             );
967                     }
968                     break;
969             }
970
971             if(unlikely(!store_this_entry)) {
972                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
973                 continue;
974             }
975
976             if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
977                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
978                 rd->last_stored_value = new_value;
979
980                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
981                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
982                             CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
983                           , st->id, rd->name
984                           , st->current_entry
985                           , unpack_storage_number(rd->values[st->current_entry]), new_value
986                     );
987             }
988             else {
989                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
990                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
991                           , st->id, rd->name
992                           , st->current_entry
993                     );
994                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
995                 rd->last_stored_value = NAN;
996             }
997
998             stored_entries++;
999
1000             if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) {
1001                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1002                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1003                 calculated_number accuracy = accuracy_loss(t1, t2);
1004                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1005                       , st->id, rd->name
1006                       , st->current_entry
1007                       , t2
1008                       , get_storage_number_flags(rd->values[st->current_entry])
1009                       , t1
1010                       , accuracy
1011                       , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1012                 );
1013
1014                 rd->collected_volume += t1;
1015                 rd->stored_volume += t2;
1016                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1017                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1018                       , st->id, rd->name
1019                       , st->current_entry
1020                       , rd->stored_volume
1021                       , rd->collected_volume
1022                       , accuracy
1023                       , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1024                 );
1025
1026             }
1027         }
1028         // reset the storage flags for the next point, if any;
1029         storage_flags = SN_EXISTS;
1030
1031         st->counter++;
1032         st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1033         last_stored_ut = next_store_ut;
1034     }
1035
1036     st->last_collected_total  = st->collected_total;
1037
1038     for( rd = st->dimensions; rd ; rd = rd->next ) {
1039         if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED)))
1040             continue;
1041
1042         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
1043             debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1044
1045         rd->last_collected_value = rd->collected_value;
1046
1047         switch(rd->algorithm) {
1048             case RRD_ALGORITHM_INCREMENTAL:
1049                 if(unlikely(!first_entry)) {
1050                     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
1051                         debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
1052                     rd->last_calculated_value += rd->calculated_value;
1053                 }
1054                 else {
1055                     if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
1056                         debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name);
1057                 }
1058                 break;
1059
1060             case RRD_ALGORITHM_ABSOLUTE:
1061             case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL:
1062             case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL:
1063                 if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
1064                     debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1065                 rd->last_calculated_value = rd->calculated_value;
1066                 break;
1067         }
1068
1069         rd->calculated_value = 0;
1070         rd->collected_value = 0;
1071         rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED);
1072
1073         if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
1074             debug(D_RRD_STATS, "%s/%s: END "
1075                     " last_collected_value = " COLLECTED_NUMBER_FORMAT
1076                     " collected_value = " COLLECTED_NUMBER_FORMAT
1077                     " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1078                     " calculated_value = " CALCULATED_NUMBER_FORMAT
1079                                       , st->id, rd->name
1080                                       , rd->last_collected_value
1081                                       , rd->collected_value
1082                                       , rd->last_calculated_value
1083                                       , rd->calculated_value
1084             );
1085     }
1086
1087     // ALL DONE ABOUT THE DATA UPDATE
1088     // --------------------------------------------------------------------
1089
1090 /*
1091     // find if there are any obsolete dimensions (not updated recently)
1092     if(unlikely(rrd_delete_unupdated_dimensions)) {
1093
1094         for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1095             if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1096                 break;
1097
1098         if(unlikely(rd)) {
1099             RRDDIM *last;
1100             // there is dimension to free
1101             // upgrade our read lock to a write lock
1102             pthread_rwlock_unlock(&st->rrdset_rwlock);
1103             pthread_rwlock_wrlock(&st->rrdset_rwlock);
1104
1105             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1106                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1107
1108                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1109                     info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1110
1111                     if(unlikely(!last)) {
1112                         st->dimensions = rd->next;
1113                         rd->next = NULL;
1114                         rrddim_free(st, rd);
1115                         rd = st->dimensions;
1116                         continue;
1117                     }
1118                     else {
1119                         last->next = rd->next;
1120                         rd->next = NULL;
1121                         rrddim_free(st, rd);
1122                         rd = last->next;
1123                         continue;
1124                     }
1125                 }
1126
1127                 last = rd;
1128                 rd = rd->next;
1129             }
1130
1131             if(unlikely(!st->dimensions)) {
1132                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1133                 st->enabled = 0;
1134             }
1135         }
1136     }
1137 */
1138
1139     rrdset_unlock(st);
1140
1141     if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
1142         error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
1143 }
1144