]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
Merge remote-tracking branch 'upstream/master'
[netdata.git] / src / rrd.c
1 #include "common.h"
2
3 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
4
5 // ----------------------------------------------------------------------------
6 // globals
7
8 // if not zero it gives the time (in seconds) to remove un-updated dimensions
9 // DO NOT ENABLE
10 // if dimensions are removed, the chart generation will have to run again
11 int rrd_delete_unupdated_dimensions = 0;
12
13 int rrd_update_every = UPDATE_EVERY;
14 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
15 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
16
17 static int rrdset_compare(void* a, void* b);
18 static int rrdset_compare_name(void* a, void* b);
19 static int rrdfamily_compare(void *a, void *b);
20
21 // ----------------------------------------------------------------------------
22 // RRDHOST
23
24 RRDHOST localhost = {
25         .hostname = "localhost",
26         .rrdset_root = NULL,
27         .rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER,
28         .rrdset_root_index = {
29             { NULL, rrdset_compare },
30             AVL_LOCK_INITIALIZER
31         },
32         .rrdset_root_index_name = {
33             { NULL, rrdset_compare_name },
34             AVL_LOCK_INITIALIZER
35         },
36         .rrdfamily_root_index = {
37             { NULL, rrdfamily_compare },
38             AVL_LOCK_INITIALIZER
39         },
40         .variables_root_index = {
41             { NULL, rrdvar_compare },
42             AVL_LOCK_INITIALIZER
43         },
44         .health_log = {
45             .next_log_id = 1,
46             .next_alarm_id = 1,
47             .count = 0,
48             .max = 1000,
49             .alarms = NULL,
50             .alarm_log_rwlock = PTHREAD_RWLOCK_INITIALIZER
51         }
52 };
53
54 void rrdhost_init(char *hostname) {
55     localhost.hostname = hostname;
56     localhost.health_log.next_log_id =
57         localhost.health_log.next_alarm_id = time(NULL);
58 }
59
60 void rrdhost_rwlock(RRDHOST *host) {
61     pthread_rwlock_wrlock(&host->rrdset_root_rwlock);
62 }
63
64 void rrdhost_rdlock(RRDHOST *host) {
65     pthread_rwlock_rdlock(&host->rrdset_root_rwlock);
66 }
67
68 void rrdhost_unlock(RRDHOST *host) {
69     pthread_rwlock_unlock(&host->rrdset_root_rwlock);
70 }
71
72 void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
73     int ret = pthread_rwlock_trywrlock(&host->rrdset_root_rwlock);
74
75     if(ret == 0)
76         fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
77 }
78
79 void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
80     int ret = pthread_rwlock_tryrdlock(&host->rrdset_root_rwlock);
81
82     if(ret == 0)
83         fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
84 }
85
86 // ----------------------------------------------------------------------------
87 // RRDFAMILY index
88
89 static int rrdfamily_compare(void *a, void *b) {
90     if(((RRDFAMILY *)a)->hash_family < ((RRDFAMILY *)b)->hash_family) return -1;
91     else if(((RRDFAMILY *)a)->hash_family > ((RRDFAMILY *)b)->hash_family) return 1;
92     else return strcmp(((RRDFAMILY *)a)->family, ((RRDFAMILY *)b)->family);
93 }
94
95 #define rrdfamily_index_add(host, rc) (RRDFAMILY *)avl_insert_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
96 #define rrdfamily_index_del(host, rc) (RRDFAMILY *)avl_remove_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
97
98 static RRDFAMILY *rrdfamily_index_find(RRDHOST *host, const char *id, uint32_t hash) {
99     RRDFAMILY tmp;
100     tmp.family = id;
101     tmp.hash_family = (hash)?hash:simple_hash(tmp.family);
102
103     return (RRDFAMILY *)avl_search_lock(&(host->rrdfamily_root_index), (avl *) &tmp);
104 }
105
106 RRDFAMILY *rrdfamily_create(const char *id) {
107     RRDFAMILY *rc = rrdfamily_index_find(&localhost, id, 0);
108     if(!rc) {
109         rc = callocz(1, sizeof(RRDFAMILY));
110
111         rc->family = strdupz(id);
112         rc->hash_family = simple_hash(rc->family);
113
114         // initialize the variables index
115         avl_init_lock(&rc->variables_root_index, rrdvar_compare);
116
117         RRDFAMILY *ret = rrdfamily_index_add(&localhost, rc);
118         if(ret != rc)
119             fatal("INTERNAL ERROR: Expected to INSERT RRDFAMILY '%s' into index, but inserted '%s'.", rc->family, (ret)?ret->family:"NONE");
120     }
121
122     rc->use_count++;
123     return rc;
124 }
125
126 void rrdfamily_free(RRDFAMILY *rc) {
127     rc->use_count--;
128     if(!rc->use_count) {
129         RRDFAMILY *ret = rrdfamily_index_del(&localhost, rc);
130         if(ret != rc)
131             fatal("INTERNAL ERROR: Expected to DELETE RRDFAMILY '%s' from index, but deleted '%s'.", rc->family, (ret)?ret->family:"NONE");
132
133         if(rc->variables_root_index.avl_tree.root != NULL)
134             fatal("INTERNAL ERROR: Variables index of RRDFAMILY '%s' that is freed, is not empty.", rc->family);
135
136         freez((void *)rc->family);
137         freez(rc);
138     }
139 }
140
141 // ----------------------------------------------------------------------------
142 // RRDSET index
143
144 static int rrdset_compare(void* a, void* b) {
145     if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
146     else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
147     else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
148 }
149
150 #define rrdset_index_add(host, st) (RRDSET *)avl_insert_lock(&((host)->rrdset_root_index), (avl *)(st))
151 #define rrdset_index_del(host, st) (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index), (avl *)(st))
152
153 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
154     RRDSET tmp;
155     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
156     tmp.hash = (hash)?hash:simple_hash(tmp.id);
157
158     return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
159 }
160
161 // ----------------------------------------------------------------------------
162 // RRDSET name index
163
164 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
165
166 static int rrdset_compare_name(void* a, void* b) {
167     RRDSET *A = rrdset_from_avlname(a);
168     RRDSET *B = rrdset_from_avlname(b);
169
170     // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
171
172     if(A->hash_name < B->hash_name) return -1;
173     else if(A->hash_name > B->hash_name) return 1;
174     else return strcmp(A->name, B->name);
175 }
176
177 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
178     void *result;
179     // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
180     result = avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
181     if(result) return rrdset_from_avlname(result);
182     return NULL;
183 }
184
185 RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
186     void *result;
187     // fprintf(stderr, "DELETING: %s (name: %s)\n", st->id, st->name);
188     result = (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname));
189     if(result) return rrdset_from_avlname(result);
190     return NULL;
191 }
192
193 static RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
194     void *result = NULL;
195     RRDSET tmp;
196     tmp.name = name;
197     tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
198
199     // fprintf(stderr, "SEARCHING: %s\n", name);
200     result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
201     if(result) {
202         RRDSET *st = rrdset_from_avlname(result);
203         if(strcmp(st->magic, RRDSET_MAGIC))
204             error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
205
206         // fprintf(stderr, "FOUND: %s\n", name);
207         return rrdset_from_avlname(result);
208     }
209     // fprintf(stderr, "NOT FOUND: %s\n", name);
210     return NULL;
211 }
212
213
214 // ----------------------------------------------------------------------------
215 // RRDDIM index
216
217 static int rrddim_compare(void* a, void* b) {
218     if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
219     else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
220     else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
221 }
222
223 #define rrddim_index_add(st, rd) avl_insert_lock(&((st)->dimensions_index), (avl *)(rd))
224 #define rrddim_index_del(st,rd ) avl_remove_lock(&((st)->dimensions_index), (avl *)(rd))
225
226 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, uint32_t hash) {
227     RRDDIM tmp;
228     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
229     tmp.hash = (hash)?hash:simple_hash(tmp.id);
230
231     return (RRDDIM *)avl_search_lock(&(st->dimensions_index), (avl *) &tmp);
232 }
233
234 // ----------------------------------------------------------------------------
235 // chart types
236
237 int rrdset_type_id(const char *name)
238 {
239     if(unlikely(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0)) return RRDSET_TYPE_AREA;
240     else if(unlikely(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0)) return RRDSET_TYPE_STACKED;
241     else if(unlikely(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0)) return RRDSET_TYPE_LINE;
242     return RRDSET_TYPE_LINE;
243 }
244
245 const char *rrdset_type_name(int chart_type)
246 {
247     static char line[] = RRDSET_TYPE_LINE_NAME;
248     static char area[] = RRDSET_TYPE_AREA_NAME;
249     static char stacked[] = RRDSET_TYPE_STACKED_NAME;
250
251     switch(chart_type) {
252         case RRDSET_TYPE_LINE:
253             return line;
254
255         case RRDSET_TYPE_AREA:
256             return area;
257
258         case RRDSET_TYPE_STACKED:
259             return stacked;
260     }
261     return line;
262 }
263
264 // ----------------------------------------------------------------------------
265 // load / save
266
267 const char *rrd_memory_mode_name(int id)
268 {
269     static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
270     static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
271     static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
272
273     switch(id) {
274         case RRD_MEMORY_MODE_RAM:
275             return ram;
276
277         case RRD_MEMORY_MODE_MAP:
278             return map;
279
280         case RRD_MEMORY_MODE_SAVE:
281         default:
282             return save;
283     }
284
285     return save;
286 }
287
288 int rrd_memory_mode_id(const char *name)
289 {
290     if(unlikely(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME)))
291         return RRD_MEMORY_MODE_RAM;
292     else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME)))
293         return RRD_MEMORY_MODE_MAP;
294
295     return RRD_MEMORY_MODE_SAVE;
296 }
297
298 // ----------------------------------------------------------------------------
299 // algorithms types
300
301 int rrddim_algorithm_id(const char *name)
302 {
303     if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)          return RRDDIM_INCREMENTAL;
304     if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)             return RRDDIM_ABSOLUTE;
305     if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)         return RRDDIM_PCENT_OVER_ROW_TOTAL;
306     if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)    return RRDDIM_PCENT_OVER_DIFF_TOTAL;
307     return RRDDIM_ABSOLUTE;
308 }
309
310 const char *rrddim_algorithm_name(int chart_type)
311 {
312     static char absolute[] = RRDDIM_ABSOLUTE_NAME;
313     static char incremental[] = RRDDIM_INCREMENTAL_NAME;
314     static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
315     static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
316
317     switch(chart_type) {
318         case RRDDIM_ABSOLUTE:
319             return absolute;
320
321         case RRDDIM_INCREMENTAL:
322             return incremental;
323
324         case RRDDIM_PCENT_OVER_ROW_TOTAL:
325             return percentage_of_absolute_row;
326
327         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
328             return percentage_of_incremental_row;
329     }
330     return absolute;
331 }
332
333 // ----------------------------------------------------------------------------
334 // chart names
335
336 char *rrdset_strncpyz_name(char *to, const char *from, size_t length)
337 {
338     char c, *p = to;
339
340     while (length-- && (c = *from++)) {
341         if(c != '.' && !isalnum(c))
342             c = '_';
343
344         *p++ = c;
345     }
346
347     *p = '\0';
348
349     return to;
350 }
351
352 void rrdset_set_name(RRDSET *st, const char *name)
353 {
354     debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
355
356     if(st->name) {
357         rrdset_index_del_name(&localhost, st);
358         rrdsetvar_rename_all(st);
359     }
360
361     char b[CONFIG_MAX_VALUE + 1];
362     char n[RRD_ID_LENGTH_MAX + 1];
363
364     snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
365     rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
366     st->name = config_get(st->id, "name", b);
367     st->hash_name = simple_hash(st->name);
368
369     rrdset_index_add_name(&localhost, st);
370 }
371
372 // ----------------------------------------------------------------------------
373 // cache directory
374
375 char *rrdset_cache_dir(const char *id)
376 {
377     char *ret = NULL;
378
379     static char *cache_dir = NULL;
380     if(!cache_dir) {
381         cache_dir = config_get("global", "cache directory", CACHE_DIR);
382         int r = mkdir(cache_dir, 0755);
383         if(r != 0 && errno != EEXIST)
384             error("Cannot create directory '%s'", cache_dir);
385     }
386
387     char b[FILENAME_MAX + 1];
388     char n[FILENAME_MAX + 1];
389     rrdset_strncpyz_name(b, id, FILENAME_MAX);
390
391     snprintfz(n, FILENAME_MAX, "%s/%s", cache_dir, b);
392     ret = config_get(id, "cache directory", n);
393
394     if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
395         int r = mkdir(ret, 0775);
396         if(r != 0 && errno != EEXIST)
397             error("Cannot create directory '%s'", ret);
398     }
399
400     return ret;
401 }
402
403 // ----------------------------------------------------------------------------
404 // core functions
405
406 void rrdset_reset(RRDSET *st)
407 {
408     debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
409
410     st->last_collected_time.tv_sec = 0;
411     st->last_collected_time.tv_usec = 0;
412     st->last_updated.tv_sec = 0;
413     st->last_updated.tv_usec = 0;
414     st->current_entry = 0;
415     st->counter = 0;
416     st->counter_done = 0;
417
418     RRDDIM *rd;
419     for(rd = st->dimensions; rd ; rd = rd->next) {
420         rd->last_collected_time.tv_sec = 0;
421         rd->last_collected_time.tv_usec = 0;
422         rd->counter = 0;
423         bzero(rd->values, rd->entries * sizeof(storage_number));
424     }
425 }
426
427 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type)
428 {
429     if(!type || !type[0]) {
430         fatal("Cannot create rrd stats without a type.");
431         return NULL;
432     }
433
434     if(!id || !id[0]) {
435         fatal("Cannot create rrd stats without an id.");
436         return NULL;
437     }
438
439     char fullid[RRD_ID_LENGTH_MAX + 1];
440     char fullfilename[FILENAME_MAX + 1];
441     RRDSET *st = NULL;
442
443     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
444
445     st = rrdset_find(fullid);
446     if(st) {
447         error("Cannot create rrd stats for '%s', it already exists.", fullid);
448         return st;
449     }
450
451     long entries = config_get_number(fullid, "history", rrd_default_history_entries);
452     if(entries < 5) entries = config_set_number(fullid, "history", 5);
453     if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
454
455     int enabled = config_get_boolean(fullid, "enabled", 1);
456     if(!enabled) entries = 5;
457
458     unsigned long size = sizeof(RRDSET);
459     char *cache_dir = rrdset_cache_dir(fullid);
460
461     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
462
463     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
464     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
465     if(st) {
466         if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
467             errno = 0;
468             info("Initializing file %s.", fullfilename);
469             bzero(st, size);
470         }
471         else if(strcmp(st->id, fullid) != 0) {
472             errno = 0;
473             error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
474             // munmap(st, size);
475             // st = NULL;
476             bzero(st, size);
477         }
478         else if(st->memsize != size || st->entries != entries) {
479             errno = 0;
480             error("File %s does not have the desired size. Clearing it.", fullfilename);
481             bzero(st, size);
482         }
483         else if(st->update_every != update_every) {
484             errno = 0;
485             error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
486             bzero(st, size);
487         }
488         else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
489             errno = 0;
490             error("File %s is too old. Clearing it.", fullfilename);
491             bzero(st, size);
492         }
493     }
494
495     if(st) {
496         st->name = NULL;
497         st->type = NULL;
498         st->family = NULL;
499         st->context = NULL;
500         st->title = NULL;
501         st->units = NULL;
502         st->dimensions = NULL;
503         st->next = NULL;
504         st->mapped = rrd_memory_mode;
505         st->variables = NULL;
506         st->alarms = NULL;
507     }
508     else {
509         st = callocz(1, size);
510         st->mapped = RRD_MEMORY_MODE_RAM;
511     }
512
513     st->memsize = size;
514     st->entries = entries;
515     st->update_every = update_every;
516
517     if(st->current_entry >= st->entries) st->current_entry = 0;
518
519     strcpy(st->cache_filename, fullfilename);
520     strcpy(st->magic, RRDSET_MAGIC);
521
522     strcpy(st->id, fullid);
523     st->hash = simple_hash(st->id);
524
525     st->cache_dir = cache_dir;
526
527     st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
528     st->type       = config_get(st->id, "type", type);
529     st->family     = config_get(st->id, "family", family?family:st->type);
530     st->units      = config_get(st->id, "units", units?units:"");
531
532     st->context    = config_get(st->id, "context", context?context:st->id);
533     st->hash_context = simple_hash(st->context);
534
535     st->priority = config_get_number(st->id, "priority", priority);
536     st->enabled = enabled;
537
538     st->isdetail = 0;
539     st->debug = 0;
540
541     // if(!strcmp(st->id, "disk_util.dm-0")) {
542     //     st->debug = 1;
543     //     error("enabled debugging for '%s'", st->id);
544     // }
545     // else error("not enabled debugging for '%s'", st->id);
546
547     st->green = NAN;
548     st->red = NAN;
549
550     st->last_collected_time.tv_sec = 0;
551     st->last_collected_time.tv_usec = 0;
552     st->counter_done = 0;
553
554     st->gap_when_lost_iterations_above = (int) (
555             config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
556
557     avl_init_lock(&st->dimensions_index, rrddim_compare);
558     avl_init_lock(&st->variables_root_index, rrdvar_compare);
559
560     pthread_rwlock_init(&st->rwlock, NULL);
561     rrdhost_rwlock(&localhost);
562
563     if(name && *name) rrdset_set_name(st, name);
564     else rrdset_set_name(st, id);
565
566     {
567         char varvalue[CONFIG_MAX_VALUE + 1];
568         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
569         st->title = config_get(st->id, "title", varvalue);
570     }
571
572     st->rrdfamily = rrdfamily_create(st->family);
573     st->rrdhost = &localhost;
574
575     st->next = localhost.rrdset_root;
576     localhost.rrdset_root = st;
577
578     if(health_enabled) {
579         rrdsetvar_create(st, "last_collected_t", RRDVAR_TYPE_TIME_T, &st->last_collected_time.tv_sec, 0);
580         rrdsetvar_create(st, "collected_total_raw", RRDVAR_TYPE_TOTAL, &st->last_collected_total, 0);
581         rrdsetvar_create(st, "green", RRDVAR_TYPE_CALCULATED, &st->green, 0);
582         rrdsetvar_create(st, "red", RRDVAR_TYPE_CALCULATED, &st->red, 0);
583         rrdsetvar_create(st, "update_every", RRDVAR_TYPE_INT, &st->update_every, 0);
584     }
585
586     rrdset_index_add(&localhost, st);
587
588     rrdsetcalc_link_matching(st);
589     rrdcalctemplate_link_matching(st);
590
591     rrdhost_unlock(&localhost);
592
593     return(st);
594 }
595
596 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
597 {
598     char filename[FILENAME_MAX + 1];
599     char fullfilename[FILENAME_MAX + 1];
600
601     char varname[CONFIG_MAX_NAME + 1];
602     RRDDIM *rd = NULL;
603     unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
604
605     debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
606
607     rrdset_strncpyz_name(filename, id, FILENAME_MAX);
608     snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
609     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 1);
610     if(rd) {
611         struct timeval now;
612         gettimeofday(&now, NULL);
613
614         if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
615             errno = 0;
616             info("Initializing file %s.", fullfilename);
617             bzero(rd, size);
618         }
619         else if(rd->memsize != size) {
620             errno = 0;
621             error("File %s does not have the desired size. Clearing it.", fullfilename);
622             bzero(rd, size);
623         }
624         else if(rd->multiplier != multiplier) {
625             errno = 0;
626             error("File %s does not have the same multiplier. Clearing it.", fullfilename);
627             bzero(rd, size);
628         }
629         else if(rd->divisor != divisor) {
630             errno = 0;
631             error("File %s does not have the same divisor. Clearing it.", fullfilename);
632             bzero(rd, size);
633         }
634         else if(rd->algorithm != algorithm) {
635             errno = 0;
636             error("File %s does not have the same algorithm. Clearing it.", fullfilename);
637             bzero(rd, size);
638         }
639         else if(rd->update_every != st->update_every) {
640             errno = 0;
641             error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
642             bzero(rd, size);
643         }
644         else if(usec_dt(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
645             errno = 0;
646             error("File %s is too old. Clearing it.", fullfilename);
647             bzero(rd, size);
648         }
649         else if(strcmp(rd->id, id) != 0) {
650             errno = 0;
651             error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
652             // munmap(rd, size);
653             // rd = NULL;
654             bzero(rd, size);
655         }
656     }
657
658     if(rd) {
659         // we have a file mapped for rd
660         rd->mapped = rrd_memory_mode;
661         rd->flags = 0x00000000;
662         rd->variables = NULL;
663         rd->next = NULL;
664         rd->name = NULL;
665     }
666     else {
667         // if we didn't manage to get a mmap'd dimension, just create one
668
669         rd = callocz(1, size);
670         rd->mapped = RRD_MEMORY_MODE_RAM;
671     }
672     rd->memsize = size;
673
674     strcpy(rd->magic, RRDDIMENSION_MAGIC);
675     strcpy(rd->cache_filename, fullfilename);
676     strncpyz(rd->id, id, RRD_ID_LENGTH_MAX);
677     rd->hash = simple_hash(rd->id);
678
679     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
680     rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
681
682     snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
683     rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
684
685     snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
686     rd->multiplier = config_get_number(st->id, varname, multiplier);
687
688     snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
689     rd->divisor = config_get_number(st->id, varname, divisor);
690     if(!rd->divisor) rd->divisor = 1;
691
692     rd->entries = st->entries;
693     rd->update_every = st->update_every;
694
695     // prevent incremental calculation spikes
696     rd->counter = 0;
697     rd->updated = 0;
698     rd->calculated_value = 0;
699     rd->last_calculated_value = 0;
700     rd->collected_value = 0;
701     rd->last_collected_value = 0;
702     rd->collected_volume = 0;
703     rd->stored_volume = 0;
704     rd->last_stored_value = 0;
705     rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
706     rd->last_collected_time.tv_sec = 0;
707     rd->last_collected_time.tv_usec = 0;
708     rd->rrdset = st;
709
710     // append this dimension
711     pthread_rwlock_wrlock(&st->rwlock);
712     if(!st->dimensions)
713         st->dimensions = rd;
714     else {
715         RRDDIM *td = st->dimensions;
716         for(; td->next; td = td->next) ;
717         td->next = rd;
718     }
719
720     if(health_enabled) {
721         rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, 0);
722         rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, 0);
723         rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, 0);
724     }
725
726     pthread_rwlock_unlock(&st->rwlock);
727
728     rrddim_index_add(st, rd);
729
730     return(rd);
731 }
732
733 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
734 {
735     debug(D_RRD_CALLS, "rrddim_set_name() %s.%s", st->name, rd->name);
736
737     char varname[CONFIG_MAX_NAME + 1];
738     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
739     config_set_default(st->id, varname, name);
740
741     rrddimvar_rename_all(rd);
742 }
743
744 void rrddim_free(RRDSET *st, RRDDIM *rd)
745 {
746     debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
747
748     if(rd == st->dimensions)
749         st->dimensions = rd->next;
750     else {
751         RRDDIM *i;
752         for (i = st->dimensions; i && i->next != rd; i = i->next) ;
753
754         if (i && i->next == rd)
755             i->next = rd->next;
756         else
757             error("Request to free dimension '%s.%s' but it is not linked.", st->id, rd->name);
758     }
759     rd->next = NULL;
760
761     while(rd->variables)
762         rrddimvar_free(rd->variables);
763
764     rrddim_index_del(st, rd);
765
766     // free(rd->annotations);
767     if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
768         debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
769         savememory(rd->cache_filename, rd, rd->memsize);
770
771         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
772         munmap(rd, rd->memsize);
773     }
774     else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
775         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
776         munmap(rd, rd->memsize);
777     }
778     else {
779         debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
780         freez(rd);
781     }
782 }
783
784 void rrdset_free_all(void)
785 {
786     info("Freeing all memory...");
787
788     rrdhost_rwlock(&localhost);
789
790     RRDSET *st;
791     for(st = localhost.rrdset_root; st ;) {
792         RRDSET *next = st->next;
793
794         pthread_rwlock_wrlock(&st->rwlock);
795
796         while(st->variables)
797             rrdsetvar_free(st->variables);
798
799         while(st->alarms)
800             rrdsetcalc_unlink(st->alarms);
801
802         while(st->dimensions)
803             rrddim_free(st, st->dimensions);
804
805         rrdset_index_del(&localhost, st);
806
807         st->rrdfamily->use_count--;
808         if(!st->rrdfamily->use_count)
809             rrdfamily_free(st->rrdfamily);
810
811         pthread_rwlock_unlock(&st->rwlock);
812
813         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
814             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
815             savememory(st->cache_filename, st, st->memsize);
816
817             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
818             munmap(st, st->memsize);
819         }
820         else if(st->mapped == RRD_MEMORY_MODE_MAP) {
821             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
822             munmap(st, st->memsize);
823         }
824         else
825             freez(st);
826
827         st = next;
828     }
829     localhost.rrdset_root = NULL;
830
831     rrdhost_unlock(&localhost);
832
833     info("Memory cleanup completed...");
834 }
835
836 void rrdset_save_all(void) {
837     info("Saving database...");
838
839     RRDSET *st;
840     RRDDIM *rd;
841
842     rrdhost_rwlock(&localhost);
843     for(st = localhost.rrdset_root; st ; st = st->next) {
844         pthread_rwlock_wrlock(&st->rwlock);
845
846         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
847             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
848             savememory(st->cache_filename, st, st->memsize);
849         }
850
851         for(rd = st->dimensions; rd ; rd = rd->next) {
852             if(likely(rd->mapped == RRD_MEMORY_MODE_SAVE)) {
853                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
854                 savememory(rd->cache_filename, rd, rd->memsize);
855             }
856         }
857
858         pthread_rwlock_unlock(&st->rwlock);
859     }
860     rrdhost_unlock(&localhost);
861 }
862
863
864 RRDSET *rrdset_find(const char *id)
865 {
866     debug(D_RRD_CALLS, "rrdset_find() for chart %s", id);
867
868     RRDSET *st = rrdset_index_find(&localhost, id, 0);
869     return(st);
870 }
871
872 RRDSET *rrdset_find_bytype(const char *type, const char *id)
873 {
874     debug(D_RRD_CALLS, "rrdset_find_bytype() for chart %s.%s", type, id);
875
876     char buf[RRD_ID_LENGTH_MAX + 1];
877
878     strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
879     strcat(buf, ".");
880     int len = (int) strlen(buf);
881     strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
882
883     return(rrdset_find(buf));
884 }
885
886 RRDSET *rrdset_find_byname(const char *name)
887 {
888     debug(D_RRD_CALLS, "rrdset_find_byname() for chart %s", name);
889
890     RRDSET *st = rrdset_index_find_name(&localhost, name, 0);
891     return(st);
892 }
893
894 RRDDIM *rrddim_find(RRDSET *st, const char *id)
895 {
896     debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", st->name, id);
897
898     return rrddim_index_find(st, id, 0);
899 }
900
901 int rrddim_hide(RRDSET *st, const char *id)
902 {
903     debug(D_RRD_CALLS, "rrddim_hide() for chart %s, dimension %s", st->name, id);
904
905     RRDDIM *rd = rrddim_find(st, id);
906     if(unlikely(!rd)) {
907         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
908         return 1;
909     }
910
911     rd->flags |= RRDDIM_FLAG_HIDDEN;
912     return 0;
913 }
914
915 int rrddim_unhide(RRDSET *st, const char *id)
916 {
917     debug(D_RRD_CALLS, "rrddim_unhide() for chart %s, dimension %s", st->name, id);
918
919     RRDDIM *rd = rrddim_find(st, id);
920     if(unlikely(!rd)) {
921         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
922         return 1;
923     }
924
925     if(rd->flags & RRDDIM_FLAG_HIDDEN) rd->flags ^= RRDDIM_FLAG_HIDDEN;
926     return 0;
927 }
928
929 collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
930 {
931     debug(D_RRD_CALLS, "rrddim_set_by_pointer() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
932
933     gettimeofday(&rd->last_collected_time, NULL);
934     rd->collected_value = value;
935     rd->updated = 1;
936     rd->counter++;
937
938     return rd->last_collected_value;
939 }
940
941 collected_number rrddim_set(RRDSET *st, const char *id, collected_number value)
942 {
943     RRDDIM *rd = rrddim_find(st, id);
944     if(unlikely(!rd)) {
945         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
946         return 0;
947     }
948
949     return rrddim_set_by_pointer(st, rd, value);
950 }
951
952 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
953 {
954     if(!microseconds) rrdset_next(st);
955     else {
956         debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
957
958         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
959         st->usec_since_last_update = microseconds;
960     }
961 }
962
963 void rrdset_next(RRDSET *st)
964 {
965     unsigned long long microseconds = 0;
966
967     if(likely(st->last_collected_time.tv_sec)) {
968         struct timeval now;
969         gettimeofday(&now, NULL);
970         microseconds = usec_dt(&now, &st->last_collected_time);
971     }
972     // prevent infinite loop
973     else microseconds = st->update_every * 1000000ULL;
974
975     rrdset_next_usec(st, microseconds);
976 }
977
978 void rrdset_next_plugins(RRDSET *st)
979 {
980     rrdset_next(st);
981 }
982
983 unsigned long long rrdset_done(RRDSET *st)
984 {
985     if(unlikely(netdata_exit)) return 0;
986
987     debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
988
989     RRDDIM *rd, *last;
990     int oldstate, store_this_entry = 1, first_entry = 0;
991     unsigned long long last_stored_ut, now_collect_ut, last_collect_ut, next_store_ut, stored_entries = 0;
992
993     if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
994         error("Cannot set pthread cancel state to DISABLE.");
995
996     // a read lock is OK here
997     pthread_rwlock_rdlock(&st->rwlock);
998
999     // enable the chart, if it was disabled
1000     if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
1001         st->enabled = 1;
1002
1003     // check if the chart has a long time to be updated
1004     if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) {
1005         info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
1006         rrdset_reset(st);
1007         st->usec_since_last_update = st->update_every * 1000000ULL;
1008         first_entry = 1;
1009     }
1010     if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
1011
1012     // set last_collected_time
1013     if(unlikely(!st->last_collected_time.tv_sec)) {
1014         // it is the first entry
1015         // set the last_collected_time to now
1016         gettimeofday(&st->last_collected_time, NULL);
1017         last_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - (st->update_every * 1000000);
1018
1019         // the first entry should not be stored
1020         store_this_entry = 0;
1021         first_entry = 1;
1022
1023         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
1024     }
1025     else {
1026         // it is not the first entry
1027         // calculate the proper last_collected_time, using usec_since_last_update
1028         last_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
1029         unsigned long long ut = last_collect_ut + st->usec_since_last_update;
1030         st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL);
1031         st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL);
1032     }
1033
1034     // if this set has not been updated in the past
1035     // we fake the last_update time to be = now - usec_since_last_update
1036     if(unlikely(!st->last_updated.tv_sec)) {
1037         // it has never been updated before
1038         // set a fake last_updated, in the past using usec_since_last_update
1039         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1040         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1041         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1042
1043         // the first entry should not be stored
1044         store_this_entry = 0;
1045         first_entry = 1;
1046
1047         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
1048     }
1049
1050     // check if we will re-write the entire data set
1051     if(unlikely(usec_dt(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) {
1052         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
1053         rrdset_reset(st);
1054
1055         st->usec_since_last_update = st->update_every * 1000000ULL;
1056
1057         gettimeofday(&st->last_collected_time, NULL);
1058
1059         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1060         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1061         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1062
1063         // the first entry should not be stored
1064         store_this_entry = 0;
1065         first_entry = 1;
1066     }
1067
1068     // these are the 3 variables that will help us in interpolation
1069     // last_stored_ut = the last time we added a value to the storage
1070     // now_collect_ut = the time the current value has been collected
1071     // next_store_ut  = the time of the next interpolation point
1072     last_stored_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
1073     now_collect_ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
1074     next_store_ut  = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
1075
1076     if(unlikely(st->debug)) {
1077         debug(D_RRD_STATS, "%s: last_collect_ut = %0.3Lf (last collection time)", st->name, (long double)last_collect_ut/1000000.0);
1078         debug(D_RRD_STATS, "%s: now_collect_ut  = %0.3Lf (current collection time)", st->name, (long double)now_collect_ut/1000000.0);
1079         debug(D_RRD_STATS, "%s: last_stored_ut  = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
1080         debug(D_RRD_STATS, "%s: next_store_ut   = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0);
1081     }
1082
1083     if(unlikely(!st->counter_done)) {
1084         store_this_entry = 0;
1085         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
1086     }
1087     st->counter_done++;
1088
1089     // calculate totals and count the dimensions
1090     int dimensions;
1091     st->collected_total = 0;
1092     for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ )
1093         if(likely(rd->updated)) st->collected_total += rd->collected_value;
1094
1095     uint32_t storage_flags = SN_EXISTS;
1096
1097     // process all dimensions to calculate their values
1098     // based on the collected figures only
1099     // at this stage we do not interpolate anything
1100     for( rd = st->dimensions ; rd ; rd = rd->next ) {
1101
1102         if(unlikely(!rd->updated)) {
1103             rd->calculated_value = 0;
1104             continue;
1105         }
1106
1107         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
1108             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1109             " collected_value = " COLLECTED_NUMBER_FORMAT
1110             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1111             " calculated_value = " CALCULATED_NUMBER_FORMAT
1112             , st->id, rd->name
1113             , rd->last_collected_value
1114             , rd->collected_value
1115             , rd->last_calculated_value
1116             , rd->calculated_value
1117             );
1118
1119         switch(rd->algorithm) {
1120             case RRDDIM_ABSOLUTE:
1121                 rd->calculated_value = (calculated_number)rd->collected_value
1122                     * (calculated_number)rd->multiplier
1123                     / (calculated_number)rd->divisor;
1124
1125                 if(unlikely(st->debug))
1126                     debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
1127                         CALCULATED_NUMBER_FORMAT " = "
1128                         COLLECTED_NUMBER_FORMAT
1129                         " * " CALCULATED_NUMBER_FORMAT
1130                         " / " CALCULATED_NUMBER_FORMAT
1131                         , st->id, rd->name
1132                         , rd->calculated_value
1133                         , rd->collected_value
1134                         , (calculated_number)rd->multiplier
1135                         , (calculated_number)rd->divisor
1136                         );
1137                 break;
1138
1139             case RRDDIM_PCENT_OVER_ROW_TOTAL:
1140                 if(unlikely(!st->collected_total))
1141                     rd->calculated_value = 0;
1142                 else
1143                     // the percentage of the current value
1144                     // over the total of all dimensions
1145                     rd->calculated_value =
1146                           (calculated_number)100
1147                         * (calculated_number)rd->collected_value
1148                         / (calculated_number)st->collected_total;
1149
1150                 if(unlikely(st->debug))
1151                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
1152                         CALCULATED_NUMBER_FORMAT " = 100"
1153                         " * " COLLECTED_NUMBER_FORMAT
1154                         " / " COLLECTED_NUMBER_FORMAT
1155                         , st->id, rd->name
1156                         , rd->calculated_value
1157                         , rd->collected_value
1158                         , st->collected_total
1159                         );
1160                 break;
1161
1162             case RRDDIM_INCREMENTAL:
1163                 if(unlikely(rd->counter <= 1)) {
1164                     rd->calculated_value = 0;
1165                     continue;
1166                 }
1167
1168                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1169                 // to reset the calculation (it will give zero as the calculation for this second)
1170                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1171                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1172                             , st->name, rd->name
1173                             , rd->last_collected_value
1174                             , rd->collected_value);
1175                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1176                     rd->last_collected_value = rd->collected_value;
1177                 }
1178
1179                 rd->calculated_value +=
1180                       (calculated_number)(rd->collected_value - rd->last_collected_value)
1181                     * (calculated_number)rd->multiplier
1182                     / (calculated_number)rd->divisor;
1183
1184                 if(unlikely(st->debug))
1185                     debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
1186                         CALCULATED_NUMBER_FORMAT " = ("
1187                         COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
1188                         ")"
1189                         " * " CALCULATED_NUMBER_FORMAT
1190                         " / " CALCULATED_NUMBER_FORMAT
1191                         , st->id, rd->name
1192                         , rd->calculated_value
1193                         , rd->collected_value, rd->last_collected_value
1194                         , (calculated_number)rd->multiplier
1195                         , (calculated_number)rd->divisor
1196                         );
1197                 break;
1198
1199             case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1200                 if(unlikely(rd->counter <= 1)) {
1201                     rd->calculated_value = 0;
1202                     continue;
1203                 }
1204
1205                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1206                 // to reset the calculation (it will give zero as the calculation for this second)
1207                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1208                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1209                     , st->name, rd->name
1210                     , rd->last_collected_value
1211                     , rd->collected_value);
1212                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1213                     rd->last_collected_value = rd->collected_value;
1214                 }
1215
1216                 // the percentage of the current increment
1217                 // over the increment of all dimensions together
1218                 if(unlikely(st->collected_total == st->last_collected_total))
1219                     rd->calculated_value = 0;
1220                 else
1221                     rd->calculated_value =
1222                           (calculated_number)100
1223                         * (calculated_number)(rd->collected_value - rd->last_collected_value)
1224                         / (calculated_number)(st->collected_total - st->last_collected_total);
1225
1226                 if(unlikely(st->debug))
1227                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
1228                         CALCULATED_NUMBER_FORMAT " = 100"
1229                         " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1230                         " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1231                         , st->id, rd->name
1232                         , rd->calculated_value
1233                         , rd->collected_value, rd->last_collected_value
1234                         , st->collected_total, st->last_collected_total
1235                         );
1236                 break;
1237
1238             default:
1239                 // make the default zero, to make sure
1240                 // it gets noticed when we add new types
1241                 rd->calculated_value = 0;
1242
1243                 if(unlikely(st->debug))
1244                     debug(D_RRD_STATS, "%s/%s: CALC "
1245                         CALCULATED_NUMBER_FORMAT " = 0"
1246                         , st->id, rd->name
1247                         , rd->calculated_value
1248                         );
1249                 break;
1250         }
1251
1252         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
1253             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1254             " collected_value = " COLLECTED_NUMBER_FORMAT
1255             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1256             " calculated_value = " CALCULATED_NUMBER_FORMAT
1257             , st->id, rd->name
1258             , rd->last_collected_value
1259             , rd->collected_value
1260             , rd->last_calculated_value
1261             , rd->calculated_value
1262             );
1263
1264     }
1265
1266     // at this point we have all the calculated values ready
1267     // it is now time to interpolate values on a second boundary
1268
1269     if(unlikely(now_collect_ut < next_store_ut)) {
1270         // this is collected in the same interpolation point
1271         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
1272     }
1273
1274     unsigned long long first_ut = last_stored_ut;
1275     long long iterations = (now_collect_ut - last_stored_ut) / (st->update_every * 1000000ULL);
1276     if((now_collect_ut % (st->update_every * 1000000ULL)) == 0) iterations++;
1277
1278     for( ; next_store_ut <= now_collect_ut ; next_store_ut += st->update_every * 1000000ULL, iterations-- ) {
1279 #ifdef NETDATA_INTERNAL_CHECKS
1280         if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); }
1281 #endif
1282
1283         if(unlikely(st->debug)) {
1284             debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0);
1285             debug(D_RRD_STATS, "%s: next_store_ut  = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0);
1286         }
1287
1288         st->last_updated.tv_sec = (time_t) (next_store_ut / 1000000ULL);
1289         st->last_updated.tv_usec = 0;
1290
1291         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1292             calculated_number new_value;
1293
1294             switch(rd->algorithm) {
1295                 case RRDDIM_INCREMENTAL:
1296                     new_value = (calculated_number)
1297                         (      rd->calculated_value
1298                             * (calculated_number)(next_store_ut - last_collect_ut)
1299                             / (calculated_number)(now_collect_ut - last_collect_ut)
1300                         );
1301
1302                     if(unlikely(st->debug))
1303                         debug(D_RRD_STATS, "%s/%s: CALC2 INC "
1304                             CALCULATED_NUMBER_FORMAT " = "
1305                             CALCULATED_NUMBER_FORMAT
1306                             " * %llu"
1307                             " / %llu"
1308                             , st->id, rd->name
1309                             , new_value
1310                             , rd->calculated_value
1311                             , (next_store_ut - last_stored_ut)
1312                             , (now_collect_ut - last_stored_ut)
1313                             );
1314
1315                     last_collect_ut = next_store_ut;
1316                     rd->calculated_value -= new_value;
1317                     new_value += rd->last_calculated_value;
1318                     rd->last_calculated_value = 0;
1319                     new_value /= (calculated_number)st->update_every;
1320                     break;
1321
1322                 case RRDDIM_ABSOLUTE:
1323                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
1324                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1325                 default:
1326                     if(iterations == 1) {
1327                         // this is the last iteration
1328                         // do not interpolate
1329                         // just show the calculated value
1330
1331                         new_value = rd->calculated_value;
1332                     }
1333                     else {
1334                         // we have missed an update
1335                         // interpolate in the middle values
1336
1337                         new_value = (calculated_number)
1338                             (   (     (rd->calculated_value - rd->last_calculated_value)
1339                                     * (calculated_number)(next_store_ut - last_collect_ut)
1340                                     / (calculated_number)(now_collect_ut - last_collect_ut)
1341                                 )
1342                                 +  rd->last_calculated_value
1343                             );
1344
1345                         if(unlikely(st->debug))
1346                             debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
1347                                 CALCULATED_NUMBER_FORMAT " = ((("
1348                                 "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
1349                                 " * %llu"
1350                                 " / %llu) + " CALCULATED_NUMBER_FORMAT
1351                                 , st->id, rd->name
1352                                 , new_value
1353                                 , rd->calculated_value, rd->last_calculated_value
1354                                 , (next_store_ut - first_ut)
1355                                 , (now_collect_ut - first_ut), rd->last_calculated_value
1356                                 );
1357                     }
1358                     break;
1359             }
1360
1361             if(unlikely(!store_this_entry))
1362                 continue;
1363
1364             if(likely(rd->updated && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
1365                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
1366                 rd->last_stored_value = new_value;
1367
1368                 if(unlikely(st->debug))
1369                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1370                         CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1371                         , st->id, rd->name
1372                         , st->current_entry
1373                         , unpack_storage_number(rd->values[st->current_entry]), new_value
1374                         );
1375             }
1376             else {
1377                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1378                         , st->id, rd->name
1379                         , st->current_entry
1380                         );
1381                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1382                 rd->last_stored_value = NAN;
1383             }
1384
1385             stored_entries++;
1386
1387             if(unlikely(st->debug)) {
1388                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1389                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1390                 calculated_number accuracy = accuracy_loss(t1, t2);
1391                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1392                         , st->id, rd->name
1393                         , st->current_entry
1394                         , t2
1395                         , get_storage_number_flags(rd->values[st->current_entry])
1396                         , t1
1397                         , accuracy
1398                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1399                         );
1400
1401                 rd->collected_volume += t1;
1402                 rd->stored_volume += t2;
1403                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1404                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1405                         , st->id, rd->name
1406                         , st->current_entry
1407                         , rd->stored_volume
1408                         , rd->collected_volume
1409                         , accuracy
1410                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1411                         );
1412
1413             }
1414         }
1415         // reset the storage flags for the next point, if any;
1416         storage_flags = SN_EXISTS;
1417
1418         st->counter++;
1419         st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1420         last_stored_ut = next_store_ut;
1421     }
1422
1423     st->last_collected_total  = st->collected_total;
1424
1425     for( rd = st->dimensions; rd ; rd = rd->next ) {
1426         if(unlikely(!rd->updated)) continue;
1427
1428         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1429         rd->last_collected_value = rd->collected_value;
1430
1431         switch(rd->algorithm) {
1432             case RRDDIM_INCREMENTAL:
1433                 if(unlikely(!first_entry)) {
1434                     if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value);
1435                     rd->last_calculated_value += rd->calculated_value;
1436                 }
1437                 else {
1438                     if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name);
1439                 }
1440                 break;
1441
1442             case RRDDIM_ABSOLUTE:
1443             case RRDDIM_PCENT_OVER_ROW_TOTAL:
1444             case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1445                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1446                 rd->last_calculated_value = rd->calculated_value;
1447                 break;
1448         }
1449
1450         rd->calculated_value = 0;
1451         rd->collected_value = 0;
1452         rd->updated = 0;
1453
1454         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
1455             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1456             " collected_value = " COLLECTED_NUMBER_FORMAT
1457             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1458             " calculated_value = " CALCULATED_NUMBER_FORMAT
1459             , st->id, rd->name
1460             , rd->last_collected_value
1461             , rd->collected_value
1462             , rd->last_calculated_value
1463             , rd->calculated_value
1464             );
1465     }
1466
1467     // ALL DONE ABOUT THE DATA UPDATE
1468     // --------------------------------------------------------------------
1469
1470     // find if there are any obsolete dimensions (not updated recently)
1471     if(unlikely(rrd_delete_unupdated_dimensions)) {
1472
1473         for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1474             if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1475                 break;
1476
1477         if(unlikely(rd)) {
1478             // there is dimension to free
1479             // upgrade our read lock to a write lock
1480             pthread_rwlock_unlock(&st->rwlock);
1481             pthread_rwlock_wrlock(&st->rwlock);
1482
1483             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1484                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1485
1486                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1487                     info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1488
1489                     if(unlikely(!last)) {
1490                         st->dimensions = rd->next;
1491                         rd->next = NULL;
1492                         rrddim_free(st, rd);
1493                         rd = st->dimensions;
1494                         continue;
1495                     }
1496                     else {
1497                         last->next = rd->next;
1498                         rd->next = NULL;
1499                         rrddim_free(st, rd);
1500                         rd = last->next;
1501                         continue;
1502                     }
1503                 }
1504
1505                 last = rd;
1506                 rd = rd->next;
1507             }
1508
1509             if(unlikely(!st->dimensions)) {
1510                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1511                 st->enabled = 0;
1512             }
1513         }
1514     }
1515
1516     pthread_rwlock_unlock(&st->rwlock);
1517
1518     if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
1519         error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1520
1521     return(st->usec_since_last_update);
1522 }