]> arthur.barton.de Git - netdata.git/blob - src/rrd.c
fixes issues found by coverity static code analysis
[netdata.git] / src / rrd.c
1 #include "common.h"
2
3 #define RRD_DEFAULT_GAP_INTERPOLATIONS 1
4
5 // ----------------------------------------------------------------------------
6 // globals
7
8 // if not zero it gives the time (in seconds) to remove un-updated dimensions
9 // DO NOT ENABLE
10 // if dimensions are removed, the chart generation will have to run again
11 int rrd_delete_unupdated_dimensions = 0;
12
13 int rrd_update_every = UPDATE_EVERY;
14 int rrd_default_history_entries = RRD_DEFAULT_HISTORY_ENTRIES;
15 int rrd_memory_mode = RRD_MEMORY_MODE_SAVE;
16
17 static int rrdset_compare(void* a, void* b);
18 static int rrdset_compare_name(void* a, void* b);
19 static int rrdfamily_compare(void *a, void *b);
20
21 // ----------------------------------------------------------------------------
22 // RRDHOST
23
24 RRDHOST localhost = {
25         .hostname = "localhost",
26         .rrdset_root = NULL,
27         .rrdset_root_rwlock = PTHREAD_RWLOCK_INITIALIZER,
28         .rrdset_root_index = {
29             { NULL, rrdset_compare },
30             AVL_LOCK_INITIALIZER
31         },
32         .rrdset_root_index_name = {
33             { NULL, rrdset_compare_name },
34             AVL_LOCK_INITIALIZER
35         },
36         .rrdfamily_root_index = {
37             { NULL, rrdfamily_compare },
38             AVL_LOCK_INITIALIZER
39         },
40         .variables_root_index = {
41             { NULL, rrdvar_compare },
42             AVL_LOCK_INITIALIZER
43         }
44 };
45
46 void rrdhost_rwlock(RRDHOST *host) {
47     pthread_rwlock_wrlock(&host->rrdset_root_rwlock);
48 }
49
50 void rrdhost_rdlock(RRDHOST *host) {
51     pthread_rwlock_rdlock(&host->rrdset_root_rwlock);
52 }
53
54 void rrdhost_unlock(RRDHOST *host) {
55     pthread_rwlock_unlock(&host->rrdset_root_rwlock);
56 }
57
58 void rrdhost_check_rdlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
59     int ret = pthread_rwlock_trywrlock(&host->rrdset_root_rwlock);
60
61     if(ret == 0)
62         fatal("RRDHOST '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
63 }
64
65 void rrdhost_check_wrlock_int(RRDHOST *host, const char *file, const char *function, const unsigned long line) {
66     int ret = pthread_rwlock_tryrdlock(&host->rrdset_root_rwlock);
67
68     if(ret == 0)
69         fatal("RRDHOST '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", host->hostname, function, line, file);
70 }
71
72 // ----------------------------------------------------------------------------
73 // RRDFAMILY index
74
75 static int rrdfamily_compare(void *a, void *b) {
76     if(((RRDFAMILY *)a)->hash_family < ((RRDFAMILY *)b)->hash_family) return -1;
77     else if(((RRDFAMILY *)a)->hash_family > ((RRDFAMILY *)b)->hash_family) return 1;
78     else return strcmp(((RRDFAMILY *)a)->family, ((RRDFAMILY *)b)->family);
79 }
80
81 #define rrdfamily_index_add(host, rc) (RRDFAMILY *)avl_insert_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
82 #define rrdfamily_index_del(host, rc) (RRDFAMILY *)avl_remove_lock(&((host)->rrdfamily_root_index), (avl *)(rc))
83
84 static RRDFAMILY *rrdfamily_index_find(RRDHOST *host, const char *id, uint32_t hash) {
85     RRDFAMILY tmp;
86     tmp.family = id;
87     tmp.hash_family = (hash)?hash:simple_hash(tmp.family);
88
89     return (RRDFAMILY *)avl_search_lock(&(host->rrdfamily_root_index), (avl *) &tmp);
90 }
91
92 RRDFAMILY *rrdfamily_create(const char *id) {
93     RRDFAMILY *rc = rrdfamily_index_find(&localhost, id, 0);
94     if(!rc) {
95         rc = callocz(1, sizeof(RRDFAMILY));
96
97         rc->family = strdupz(id);
98         rc->hash_family = simple_hash(rc->family);
99
100         // initialize the variables index
101         avl_init_lock(&rc->variables_root_index, rrdvar_compare);
102
103         RRDFAMILY *ret = rrdfamily_index_add(&localhost, rc);
104         if(ret != rc)
105             fatal("INTERNAL ERROR: Expected to INSERT RRDFAMILY '%s' into index, but inserted '%s'.", rc->family, (ret)?ret->family:"NONE");
106     }
107
108     rc->use_count++;
109     return rc;
110 }
111
112 void rrdfamily_free(RRDFAMILY *rc) {
113     rc->use_count--;
114     if(!rc->use_count) {
115         RRDFAMILY *ret = rrdfamily_index_del(&localhost, rc);
116         if(ret != rc)
117             fatal("INTERNAL ERROR: Expected to DELETE RRDFAMILY '%s' from index, but deleted '%s'.", rc->family, (ret)?ret->family:"NONE");
118
119         if(rc->variables_root_index.avl_tree.root != NULL)
120             fatal("INTERNAL ERROR: Variables index of RRDFAMILY '%s' that is freed, is not empty.", rc->family);
121
122         freez((void *)rc->family);
123         freez(rc);
124     }
125 }
126
127 // ----------------------------------------------------------------------------
128 // RRDSET index
129
130 static int rrdset_compare(void* a, void* b) {
131     if(((RRDSET *)a)->hash < ((RRDSET *)b)->hash) return -1;
132     else if(((RRDSET *)a)->hash > ((RRDSET *)b)->hash) return 1;
133     else return strcmp(((RRDSET *)a)->id, ((RRDSET *)b)->id);
134 }
135
136 #define rrdset_index_add(host, st) (RRDSET *)avl_insert_lock(&((host)->rrdset_root_index), (avl *)(st))
137 #define rrdset_index_del(host, st) (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index), (avl *)(st))
138
139 static RRDSET *rrdset_index_find(RRDHOST *host, const char *id, uint32_t hash) {
140     RRDSET tmp;
141     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
142     tmp.hash = (hash)?hash:simple_hash(tmp.id);
143
144     return (RRDSET *)avl_search_lock(&(host->rrdset_root_index), (avl *) &tmp);
145 }
146
147 // ----------------------------------------------------------------------------
148 // RRDSET name index
149
150 #define rrdset_from_avlname(avlname_ptr) ((RRDSET *)((avlname_ptr) - offsetof(RRDSET, avlname)))
151
152 static int rrdset_compare_name(void* a, void* b) {
153     RRDSET *A = rrdset_from_avlname(a);
154     RRDSET *B = rrdset_from_avlname(b);
155
156     // fprintf(stderr, "COMPARING: %s with %s\n", A->name, B->name);
157
158     if(A->hash_name < B->hash_name) return -1;
159     else if(A->hash_name > B->hash_name) return 1;
160     else return strcmp(A->name, B->name);
161 }
162
163 RRDSET *rrdset_index_add_name(RRDHOST *host, RRDSET *st) {
164     void *result;
165     // fprintf(stderr, "ADDING: %s (name: %s)\n", st->id, st->name);
166     result = avl_insert_lock(&host->rrdset_root_index_name, (avl *) (&st->avlname));
167     if(result) return rrdset_from_avlname(result);
168     return NULL;
169 }
170
171 RRDSET *rrdset_index_del_name(RRDHOST *host, RRDSET *st) {
172     void *result;
173     // fprintf(stderr, "DELETING: %s (name: %s)\n", st->id, st->name);
174     result = (RRDSET *)avl_remove_lock(&((host)->rrdset_root_index_name), (avl *)(&st->avlname));
175     if(result) return rrdset_from_avlname(result);
176     return NULL;
177 }
178
179 static RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name, uint32_t hash) {
180     void *result = NULL;
181     RRDSET tmp;
182     tmp.name = name;
183     tmp.hash_name = (hash)?hash:simple_hash(tmp.name);
184
185     // fprintf(stderr, "SEARCHING: %s\n", name);
186     result = avl_search_lock(&host->rrdset_root_index_name, (avl *) (&(tmp.avlname)));
187     if(result) {
188         RRDSET *st = rrdset_from_avlname(result);
189         if(strcmp(st->magic, RRDSET_MAGIC))
190             error("Search for RRDSET %s returned an invalid RRDSET %s (name %s)", name, st->id, st->name);
191
192         // fprintf(stderr, "FOUND: %s\n", name);
193         return rrdset_from_avlname(result);
194     }
195     // fprintf(stderr, "NOT FOUND: %s\n", name);
196     return NULL;
197 }
198
199
200 // ----------------------------------------------------------------------------
201 // RRDDIM index
202
203 static int rrddim_compare(void* a, void* b) {
204     if(((RRDDIM *)a)->hash < ((RRDDIM *)b)->hash) return -1;
205     else if(((RRDDIM *)a)->hash > ((RRDDIM *)b)->hash) return 1;
206     else return strcmp(((RRDDIM *)a)->id, ((RRDDIM *)b)->id);
207 }
208
209 #define rrddim_index_add(st, rd) avl_insert_lock(&((st)->dimensions_index), (avl *)(rd))
210 #define rrddim_index_del(st,rd ) avl_remove_lock(&((st)->dimensions_index), (avl *)(rd))
211
212 static RRDDIM *rrddim_index_find(RRDSET *st, const char *id, uint32_t hash) {
213     RRDDIM tmp;
214     strncpyz(tmp.id, id, RRD_ID_LENGTH_MAX);
215     tmp.hash = (hash)?hash:simple_hash(tmp.id);
216
217     return (RRDDIM *)avl_search_lock(&(st->dimensions_index), (avl *) &tmp);
218 }
219
220 // ----------------------------------------------------------------------------
221 // chart types
222
223 int rrdset_type_id(const char *name)
224 {
225     if(unlikely(strcmp(name, RRDSET_TYPE_AREA_NAME) == 0)) return RRDSET_TYPE_AREA;
226     else if(unlikely(strcmp(name, RRDSET_TYPE_STACKED_NAME) == 0)) return RRDSET_TYPE_STACKED;
227     else if(unlikely(strcmp(name, RRDSET_TYPE_LINE_NAME) == 0)) return RRDSET_TYPE_LINE;
228     return RRDSET_TYPE_LINE;
229 }
230
231 const char *rrdset_type_name(int chart_type)
232 {
233     static char line[] = RRDSET_TYPE_LINE_NAME;
234     static char area[] = RRDSET_TYPE_AREA_NAME;
235     static char stacked[] = RRDSET_TYPE_STACKED_NAME;
236
237     switch(chart_type) {
238         case RRDSET_TYPE_LINE:
239             return line;
240
241         case RRDSET_TYPE_AREA:
242             return area;
243
244         case RRDSET_TYPE_STACKED:
245             return stacked;
246     }
247     return line;
248 }
249
250 // ----------------------------------------------------------------------------
251 // load / save
252
253 const char *rrd_memory_mode_name(int id)
254 {
255     static const char ram[] = RRD_MEMORY_MODE_RAM_NAME;
256     static const char map[] = RRD_MEMORY_MODE_MAP_NAME;
257     static const char save[] = RRD_MEMORY_MODE_SAVE_NAME;
258
259     switch(id) {
260         case RRD_MEMORY_MODE_RAM:
261             return ram;
262
263         case RRD_MEMORY_MODE_MAP:
264             return map;
265
266         case RRD_MEMORY_MODE_SAVE:
267         default:
268             return save;
269     }
270
271     return save;
272 }
273
274 int rrd_memory_mode_id(const char *name)
275 {
276     if(unlikely(!strcmp(name, RRD_MEMORY_MODE_RAM_NAME)))
277         return RRD_MEMORY_MODE_RAM;
278     else if(unlikely(!strcmp(name, RRD_MEMORY_MODE_MAP_NAME)))
279         return RRD_MEMORY_MODE_MAP;
280
281     return RRD_MEMORY_MODE_SAVE;
282 }
283
284 // ----------------------------------------------------------------------------
285 // algorithms types
286
287 int rrddim_algorithm_id(const char *name)
288 {
289     if(strcmp(name, RRDDIM_INCREMENTAL_NAME) == 0)          return RRDDIM_INCREMENTAL;
290     if(strcmp(name, RRDDIM_ABSOLUTE_NAME) == 0)             return RRDDIM_ABSOLUTE;
291     if(strcmp(name, RRDDIM_PCENT_OVER_ROW_TOTAL_NAME) == 0)         return RRDDIM_PCENT_OVER_ROW_TOTAL;
292     if(strcmp(name, RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME) == 0)    return RRDDIM_PCENT_OVER_DIFF_TOTAL;
293     return RRDDIM_ABSOLUTE;
294 }
295
296 const char *rrddim_algorithm_name(int chart_type)
297 {
298     static char absolute[] = RRDDIM_ABSOLUTE_NAME;
299     static char incremental[] = RRDDIM_INCREMENTAL_NAME;
300     static char percentage_of_absolute_row[] = RRDDIM_PCENT_OVER_ROW_TOTAL_NAME;
301     static char percentage_of_incremental_row[] = RRDDIM_PCENT_OVER_DIFF_TOTAL_NAME;
302
303     switch(chart_type) {
304         case RRDDIM_ABSOLUTE:
305             return absolute;
306
307         case RRDDIM_INCREMENTAL:
308             return incremental;
309
310         case RRDDIM_PCENT_OVER_ROW_TOTAL:
311             return percentage_of_absolute_row;
312
313         case RRDDIM_PCENT_OVER_DIFF_TOTAL:
314             return percentage_of_incremental_row;
315     }
316     return absolute;
317 }
318
319 // ----------------------------------------------------------------------------
320 // chart names
321
322 char *rrdset_strncpyz_name(char *to, const char *from, size_t length)
323 {
324     char c, *p = to;
325
326     while (length-- && (c = *from++)) {
327         if(c != '.' && !isalnum(c))
328             c = '_';
329
330         *p++ = c;
331     }
332
333     *p = '\0';
334
335     return to;
336 }
337
338 void rrdset_set_name(RRDSET *st, const char *name)
339 {
340     debug(D_RRD_CALLS, "rrdset_set_name() old: %s, new: %s", st->name, name);
341
342     if(st->name) {
343         rrdset_index_del_name(&localhost, st);
344         rrdsetvar_rename_all(st);
345     }
346
347     char b[CONFIG_MAX_VALUE + 1];
348     char n[RRD_ID_LENGTH_MAX + 1];
349
350     snprintfz(n, RRD_ID_LENGTH_MAX, "%s.%s", st->type, name);
351     rrdset_strncpyz_name(b, n, CONFIG_MAX_VALUE);
352     st->name = config_get(st->id, "name", b);
353     st->hash_name = simple_hash(st->name);
354
355     rrdset_index_add_name(&localhost, st);
356 }
357
358 // ----------------------------------------------------------------------------
359 // cache directory
360
361 char *rrdset_cache_dir(const char *id)
362 {
363     char *ret = NULL;
364
365     static char *cache_dir = NULL;
366     if(!cache_dir) {
367         cache_dir = config_get("global", "cache directory", CACHE_DIR);
368         int r = mkdir(cache_dir, 0755);
369         if(r != 0 && errno != EEXIST)
370             error("Cannot create directory '%s'", cache_dir);
371     }
372
373     char b[FILENAME_MAX + 1];
374     char n[FILENAME_MAX + 1];
375     rrdset_strncpyz_name(b, id, FILENAME_MAX);
376
377     snprintfz(n, FILENAME_MAX, "%s/%s", cache_dir, b);
378     ret = config_get(id, "cache directory", n);
379
380     if(rrd_memory_mode == RRD_MEMORY_MODE_MAP || rrd_memory_mode == RRD_MEMORY_MODE_SAVE) {
381         int r = mkdir(ret, 0775);
382         if(r != 0 && errno != EEXIST)
383             error("Cannot create directory '%s'", ret);
384     }
385
386     return ret;
387 }
388
389 // ----------------------------------------------------------------------------
390 // core functions
391
392 void rrdset_reset(RRDSET *st)
393 {
394     debug(D_RRD_CALLS, "rrdset_reset() %s", st->name);
395
396     st->last_collected_time.tv_sec = 0;
397     st->last_collected_time.tv_usec = 0;
398     st->last_updated.tv_sec = 0;
399     st->last_updated.tv_usec = 0;
400     st->current_entry = 0;
401     st->counter = 0;
402     st->counter_done = 0;
403
404     RRDDIM *rd;
405     for(rd = st->dimensions; rd ; rd = rd->next) {
406         rd->last_collected_time.tv_sec = 0;
407         rd->last_collected_time.tv_usec = 0;
408         rd->counter = 0;
409         bzero(rd->values, rd->entries * sizeof(storage_number));
410     }
411 }
412
413 RRDSET *rrdset_create(const char *type, const char *id, const char *name, const char *family, const char *context, const char *title, const char *units, long priority, int update_every, int chart_type)
414 {
415     if(!type || !type[0]) {
416         fatal("Cannot create rrd stats without a type.");
417         return NULL;
418     }
419
420     if(!id || !id[0]) {
421         fatal("Cannot create rrd stats without an id.");
422         return NULL;
423     }
424
425     char fullid[RRD_ID_LENGTH_MAX + 1];
426     char fullfilename[FILENAME_MAX + 1];
427     RRDSET *st = NULL;
428
429     snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
430
431     st = rrdset_find(fullid);
432     if(st) {
433         error("Cannot create rrd stats for '%s', it already exists.", fullid);
434         return st;
435     }
436
437     long entries = config_get_number(fullid, "history", rrd_default_history_entries);
438     if(entries < 5) entries = config_set_number(fullid, "history", 5);
439     if(entries > RRD_HISTORY_ENTRIES_MAX) entries = config_set_number(fullid, "history", RRD_HISTORY_ENTRIES_MAX);
440
441     int enabled = config_get_boolean(fullid, "enabled", 1);
442     if(!enabled) entries = 5;
443
444     unsigned long size = sizeof(RRDSET);
445     char *cache_dir = rrdset_cache_dir(fullid);
446
447     debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id);
448
449     snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir);
450     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0);
451     if(st) {
452         if(strcmp(st->magic, RRDSET_MAGIC) != 0) {
453             errno = 0;
454             info("Initializing file %s.", fullfilename);
455             bzero(st, size);
456         }
457         else if(strcmp(st->id, fullid) != 0) {
458             errno = 0;
459             error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid);
460             // munmap(st, size);
461             // st = NULL;
462             bzero(st, size);
463         }
464         else if(st->memsize != size || st->entries != entries) {
465             errno = 0;
466             error("File %s does not have the desired size. Clearing it.", fullfilename);
467             bzero(st, size);
468         }
469         else if(st->update_every != update_every) {
470             errno = 0;
471             error("File %s does not have the desired update frequency. Clearing it.", fullfilename);
472             bzero(st, size);
473         }
474         else if((time(NULL) - st->last_updated.tv_sec) > update_every * entries) {
475             errno = 0;
476             error("File %s is too old. Clearing it.", fullfilename);
477             bzero(st, size);
478         }
479     }
480
481     if(st) {
482         st->name = NULL;
483         st->type = NULL;
484         st->family = NULL;
485         st->context = NULL;
486         st->title = NULL;
487         st->units = NULL;
488         st->dimensions = NULL;
489         st->next = NULL;
490         st->mapped = rrd_memory_mode;
491         st->variables = NULL;
492     }
493     else {
494         st = callocz(1, size);
495         st->mapped = RRD_MEMORY_MODE_RAM;
496     }
497     st->memsize = size;
498     st->entries = entries;
499     st->update_every = update_every;
500
501     if(st->current_entry >= st->entries) st->current_entry = 0;
502
503     strcpy(st->cache_filename, fullfilename);
504     strcpy(st->magic, RRDSET_MAGIC);
505
506     strcpy(st->id, fullid);
507     st->hash = simple_hash(st->id);
508
509     st->cache_dir = cache_dir;
510
511     st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type)));
512     st->type       = config_get(st->id, "type", type);
513     st->family     = config_get(st->id, "family", family?family:st->type);
514     st->units      = config_get(st->id, "units", units?units:"");
515
516     st->context    = config_get(st->id, "context", context?context:st->id);
517     st->hash_context = simple_hash(st->context);
518
519     st->priority = config_get_number(st->id, "priority", priority);
520     st->enabled = enabled;
521
522     st->isdetail = 0;
523     st->debug = 0;
524
525     st->green = NAN;
526     st->red = NAN;
527
528     st->last_collected_time.tv_sec = 0;
529     st->last_collected_time.tv_usec = 0;
530     st->counter_done = 0;
531
532     st->gap_when_lost_iterations_above = (int) (
533             config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2);
534
535     avl_init_lock(&st->dimensions_index, rrddim_compare);
536     avl_init_lock(&st->variables_root_index, rrdvar_compare);
537
538     pthread_rwlock_init(&st->rwlock, NULL);
539     rrdhost_rwlock(&localhost);
540
541     if(name && *name) rrdset_set_name(st, name);
542     else rrdset_set_name(st, id);
543
544     {
545         char varvalue[CONFIG_MAX_VALUE + 1];
546         snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name);
547         st->title = config_get(st->id, "title", varvalue);
548     }
549
550     st->rrdfamily = rrdfamily_create(st->family);
551     st->rrdhost = &localhost;
552
553     st->next = localhost.rrdset_root;
554     localhost.rrdset_root = st;
555
556     if(health_enabled) {
557         rrdsetvar_create(st, "last_collected_t", RRDVAR_TYPE_TIME_T, &st->last_collected_time.tv_sec, 0);
558         rrdsetvar_create(st, "collected_total_raw", RRDVAR_TYPE_TOTAL, &st->last_collected_total, 0);
559         rrdsetvar_create(st, "green", RRDVAR_TYPE_CALCULATED, &st->green, 0);
560         rrdsetvar_create(st, "red", RRDVAR_TYPE_CALCULATED, &st->red, 0);
561         rrdsetvar_create(st, "update_every", RRDVAR_TYPE_INT, &st->update_every, 0);
562     }
563
564     rrdset_index_add(&localhost, st);
565
566     rrdsetcalc_link_matching(st);
567     rrdcalctemplate_link_matching(st);
568
569     rrdhost_unlock(&localhost);
570
571     return(st);
572 }
573
574 RRDDIM *rrddim_add(RRDSET *st, const char *id, const char *name, long multiplier, long divisor, int algorithm)
575 {
576     char filename[FILENAME_MAX + 1];
577     char fullfilename[FILENAME_MAX + 1];
578
579     char varname[CONFIG_MAX_NAME + 1];
580     RRDDIM *rd = NULL;
581     unsigned long size = sizeof(RRDDIM) + (st->entries * sizeof(storage_number));
582
583     debug(D_RRD_CALLS, "Adding dimension '%s/%s'.", st->id, id);
584
585     rrdset_strncpyz_name(filename, id, FILENAME_MAX);
586     snprintfz(fullfilename, FILENAME_MAX, "%s/%s.db", st->cache_dir, filename);
587     if(rrd_memory_mode != RRD_MEMORY_MODE_RAM) rd = (RRDDIM *)mymmap(fullfilename, size, ((rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 1);
588     if(rd) {
589         struct timeval now;
590         gettimeofday(&now, NULL);
591
592         if(strcmp(rd->magic, RRDDIMENSION_MAGIC) != 0) {
593             errno = 0;
594             info("Initializing file %s.", fullfilename);
595             bzero(rd, size);
596         }
597         else if(rd->memsize != size) {
598             errno = 0;
599             error("File %s does not have the desired size. Clearing it.", fullfilename);
600             bzero(rd, size);
601         }
602         else if(rd->multiplier != multiplier) {
603             errno = 0;
604             error("File %s does not have the same multiplier. Clearing it.", fullfilename);
605             bzero(rd, size);
606         }
607         else if(rd->divisor != divisor) {
608             errno = 0;
609             error("File %s does not have the same divisor. Clearing it.", fullfilename);
610             bzero(rd, size);
611         }
612         else if(rd->algorithm != algorithm) {
613             errno = 0;
614             error("File %s does not have the same algorithm. Clearing it.", fullfilename);
615             bzero(rd, size);
616         }
617         else if(rd->update_every != st->update_every) {
618             errno = 0;
619             error("File %s does not have the same refresh frequency. Clearing it.", fullfilename);
620             bzero(rd, size);
621         }
622         else if(usec_dt(&now, &rd->last_collected_time) > (rd->entries * rd->update_every * 1000000ULL)) {
623             errno = 0;
624             error("File %s is too old. Clearing it.", fullfilename);
625             bzero(rd, size);
626         }
627         else if(strcmp(rd->id, id) != 0) {
628             errno = 0;
629             error("File %s contents are not for dimension %s. Clearing it.", fullfilename, id);
630             // munmap(rd, size);
631             // rd = NULL;
632             bzero(rd, size);
633         }
634     }
635
636     if(rd) {
637         // we have a file mapped for rd
638         rd->mapped = rrd_memory_mode;
639         rd->flags = 0x00000000;
640         rd->variables = NULL;
641         rd->next = NULL;
642         rd->name = NULL;
643     }
644     else {
645         // if we didn't manage to get a mmap'd dimension, just create one
646
647         rd = callocz(1, size);
648         rd->mapped = RRD_MEMORY_MODE_RAM;
649     }
650     rd->memsize = size;
651
652     strcpy(rd->magic, RRDDIMENSION_MAGIC);
653     strcpy(rd->cache_filename, fullfilename);
654     strncpyz(rd->id, id, RRD_ID_LENGTH_MAX);
655     rd->hash = simple_hash(rd->id);
656
657     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
658     rd->name = config_get(st->id, varname, (name && *name)?name:rd->id);
659
660     snprintfz(varname, CONFIG_MAX_NAME, "dim %s algorithm", rd->id);
661     rd->algorithm = rrddim_algorithm_id(config_get(st->id, varname, rrddim_algorithm_name(algorithm)));
662
663     snprintfz(varname, CONFIG_MAX_NAME, "dim %s multiplier", rd->id);
664     rd->multiplier = config_get_number(st->id, varname, multiplier);
665
666     snprintfz(varname, CONFIG_MAX_NAME, "dim %s divisor", rd->id);
667     rd->divisor = config_get_number(st->id, varname, divisor);
668     if(!rd->divisor) rd->divisor = 1;
669
670     rd->entries = st->entries;
671     rd->update_every = st->update_every;
672
673     // prevent incremental calculation spikes
674     rd->counter = 0;
675     rd->updated = 0;
676     rd->calculated_value = 0;
677     rd->last_calculated_value = 0;
678     rd->collected_value = 0;
679     rd->last_collected_value = 0;
680     rd->collected_volume = 0;
681     rd->stored_volume = 0;
682     rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
683     rd->last_collected_time.tv_sec = 0;
684     rd->last_collected_time.tv_usec = 0;
685     rd->rrdset = st;
686
687     // append this dimension
688     pthread_rwlock_wrlock(&st->rwlock);
689     if(!st->dimensions)
690         st->dimensions = rd;
691     else {
692         RRDDIM *td = st->dimensions;
693         for(; td->next; td = td->next) ;
694         td->next = rd;
695     }
696
697     if(health_enabled) {
698         rrddimvar_create(rd, RRDVAR_TYPE_CALCULATED, NULL, NULL, &rd->last_stored_value, 0);
699         rrddimvar_create(rd, RRDVAR_TYPE_COLLECTED, NULL, "_raw", &rd->last_collected_value, 0);
700         rrddimvar_create(rd, RRDVAR_TYPE_TIME_T, NULL, "_last_collected_t", &rd->last_collected_time.tv_sec, 0);
701     }
702
703     pthread_rwlock_unlock(&st->rwlock);
704
705     rrddim_index_add(st, rd);
706
707     return(rd);
708 }
709
710 void rrddim_set_name(RRDSET *st, RRDDIM *rd, const char *name)
711 {
712     debug(D_RRD_CALLS, "rrddim_set_name() %s.%s", st->name, rd->name);
713
714     char varname[CONFIG_MAX_NAME + 1];
715     snprintfz(varname, CONFIG_MAX_NAME, "dim %s name", rd->id);
716     config_set_default(st->id, varname, name);
717
718     rrddimvar_rename_all(rd);
719 }
720
721 void rrddim_free(RRDSET *st, RRDDIM *rd)
722 {
723     debug(D_RRD_CALLS, "rrddim_free() %s.%s", st->name, rd->name);
724
725     RRDDIM *i, *last = NULL;
726     for(i = st->dimensions; i && i != rd ; i = i->next) last = i;
727
728     if(!i) {
729         error("Request to free dimension %s.%s but it is not linked.", st->id, rd->name);
730         return;
731     }
732
733     if(last) last->next = rd->next;
734     else st->dimensions = rd->next;
735     rd->next = NULL;
736
737     while(rd->variables)
738         rrddimvar_free(rd->variables);
739
740     rrddim_index_del(st, rd);
741
742     // free(rd->annotations);
743     if(rd->mapped == RRD_MEMORY_MODE_SAVE) {
744         debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
745         savememory(rd->cache_filename, rd, rd->memsize);
746
747         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
748         munmap(rd, rd->memsize);
749     }
750     else if(rd->mapped == RRD_MEMORY_MODE_MAP) {
751         debug(D_RRD_CALLS, "Unmapping dimension '%s'.", rd->name);
752         munmap(rd, rd->memsize);
753     }
754     else {
755         debug(D_RRD_CALLS, "Removing dimension '%s'.", rd->name);
756         freez(rd);
757     }
758 }
759
760 void rrdset_free_all(void)
761 {
762     info("Freeing all memory...");
763
764     rrdhost_rwlock(&localhost);
765
766     RRDSET *st;
767     for(st = localhost.rrdset_root; st ;) {
768         RRDSET *next = st->next;
769
770         pthread_rwlock_wrlock(&st->rwlock);
771
772         while(st->variables)
773             rrdsetvar_free(st->variables);
774
775         while(st->alarms)
776             rrdsetcalc_unlink(st->alarms);
777
778         while(st->dimensions)
779             rrddim_free(st, st->dimensions);
780
781         rrdset_index_del(&localhost, st);
782
783         st->rrdfamily->use_count--;
784         if(!st->rrdfamily->use_count)
785             rrdfamily_free(st->rrdfamily);
786
787         pthread_rwlock_unlock(&st->rwlock);
788
789         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
790             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
791             savememory(st->cache_filename, st, st->memsize);
792
793             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
794             munmap(st, st->memsize);
795         }
796         else if(st->mapped == RRD_MEMORY_MODE_MAP) {
797             debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name);
798             munmap(st, st->memsize);
799         }
800         else
801             freez(st);
802
803         st = next;
804     }
805     localhost.rrdset_root = NULL;
806
807     rrdhost_unlock(&localhost);
808
809     info("Memory cleanup completed...");
810 }
811
812 void rrdset_save_all(void) {
813     info("Saving database...");
814
815     RRDSET *st;
816     RRDDIM *rd;
817
818     rrdhost_rwlock(&localhost);
819     for(st = localhost.rrdset_root; st ; st = st->next) {
820         pthread_rwlock_wrlock(&st->rwlock);
821
822         if(st->mapped == RRD_MEMORY_MODE_SAVE) {
823             debug(D_RRD_CALLS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename);
824             savememory(st->cache_filename, st, st->memsize);
825         }
826
827         for(rd = st->dimensions; rd ; rd = rd->next) {
828             if(likely(rd->mapped == RRD_MEMORY_MODE_SAVE)) {
829                 debug(D_RRD_CALLS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename);
830                 savememory(rd->cache_filename, rd, rd->memsize);
831             }
832         }
833
834         pthread_rwlock_unlock(&st->rwlock);
835     }
836     rrdhost_unlock(&localhost);
837 }
838
839
840 RRDSET *rrdset_find(const char *id)
841 {
842     debug(D_RRD_CALLS, "rrdset_find() for chart %s", id);
843
844     RRDSET *st = rrdset_index_find(&localhost, id, 0);
845     return(st);
846 }
847
848 RRDSET *rrdset_find_bytype(const char *type, const char *id)
849 {
850     debug(D_RRD_CALLS, "rrdset_find_bytype() for chart %s.%s", type, id);
851
852     char buf[RRD_ID_LENGTH_MAX + 1];
853
854     strncpyz(buf, type, RRD_ID_LENGTH_MAX - 1);
855     strcat(buf, ".");
856     int len = (int) strlen(buf);
857     strncpyz(&buf[len], id, (size_t) (RRD_ID_LENGTH_MAX - len));
858
859     return(rrdset_find(buf));
860 }
861
862 RRDSET *rrdset_find_byname(const char *name)
863 {
864     debug(D_RRD_CALLS, "rrdset_find_byname() for chart %s", name);
865
866     RRDSET *st = rrdset_index_find_name(&localhost, name, 0);
867     return(st);
868 }
869
870 RRDDIM *rrddim_find(RRDSET *st, const char *id)
871 {
872     debug(D_RRD_CALLS, "rrddim_find() for chart %s, dimension %s", st->name, id);
873
874     return rrddim_index_find(st, id, 0);
875 }
876
877 int rrddim_hide(RRDSET *st, const char *id)
878 {
879     debug(D_RRD_CALLS, "rrddim_hide() for chart %s, dimension %s", st->name, id);
880
881     RRDDIM *rd = rrddim_find(st, id);
882     if(unlikely(!rd)) {
883         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
884         return 1;
885     }
886
887     rd->flags |= RRDDIM_FLAG_HIDDEN;
888     return 0;
889 }
890
891 int rrddim_unhide(RRDSET *st, const char *id)
892 {
893     debug(D_RRD_CALLS, "rrddim_unhide() for chart %s, dimension %s", st->name, id);
894
895     RRDDIM *rd = rrddim_find(st, id);
896     if(unlikely(!rd)) {
897         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
898         return 1;
899     }
900
901     if(rd->flags & RRDDIM_FLAG_HIDDEN) rd->flags ^= RRDDIM_FLAG_HIDDEN;
902     return 0;
903 }
904
905 collected_number rrddim_set_by_pointer(RRDSET *st, RRDDIM *rd, collected_number value)
906 {
907     debug(D_RRD_CALLS, "rrddim_set_by_pointer() for chart %s, dimension %s, value " COLLECTED_NUMBER_FORMAT, st->name, rd->name, value);
908
909     gettimeofday(&rd->last_collected_time, NULL);
910     rd->collected_value = value;
911     rd->updated = 1;
912     rd->counter++;
913
914     return rd->last_collected_value;
915 }
916
917 collected_number rrddim_set(RRDSET *st, const char *id, collected_number value)
918 {
919     RRDDIM *rd = rrddim_find(st, id);
920     if(unlikely(!rd)) {
921         error("Cannot find dimension with id '%s' on stats '%s' (%s).", id, st->name, st->id);
922         return 0;
923     }
924
925     return rrddim_set_by_pointer(st, rd, value);
926 }
927
928 void rrdset_next_usec(RRDSET *st, unsigned long long microseconds)
929 {
930     if(!microseconds) rrdset_next(st);
931     else {
932         debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
933
934         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds);
935         st->usec_since_last_update = microseconds;
936     }
937 }
938
939 void rrdset_next(RRDSET *st)
940 {
941     unsigned long long microseconds = 0;
942
943     if(likely(st->last_collected_time.tv_sec)) {
944         struct timeval now;
945         gettimeofday(&now, NULL);
946         microseconds = usec_dt(&now, &st->last_collected_time);
947     }
948     // prevent infinite loop
949     else microseconds = st->update_every * 1000000ULL;
950
951     rrdset_next_usec(st, microseconds);
952 }
953
954 void rrdset_next_plugins(RRDSET *st)
955 {
956     rrdset_next(st);
957 }
958
959 unsigned long long rrdset_done(RRDSET *st)
960 {
961     if(unlikely(netdata_exit)) return 0;
962
963     debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name);
964
965     RRDDIM *rd, *last;
966     int oldstate, store_this_entry = 1, first_entry = 0;
967     unsigned long long last_ut, now_ut, next_ut, stored_entries = 0;
968
969     if(unlikely(pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &oldstate) != 0))
970         error("Cannot set pthread cancel state to DISABLE.");
971
972     // a read lock is OK here
973     pthread_rwlock_rdlock(&st->rwlock);
974
975     // enable the chart, if it was disabled
976     if(unlikely(rrd_delete_unupdated_dimensions) && !st->enabled)
977         st->enabled = 1;
978
979     // check if the chart has a long time to be updated
980     if(unlikely(st->usec_since_last_update > st->entries * st->update_every * 1000000ULL)) {
981         info("%s: took too long to be updated (%0.3Lf secs). Reseting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0));
982         rrdset_reset(st);
983         st->usec_since_last_update = st->update_every * 1000000ULL;
984         first_entry = 1;
985     }
986     if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update);
987
988     // set last_collected_time
989     if(unlikely(!st->last_collected_time.tv_sec)) {
990         // it is the first entry
991         // set the last_collected_time to now
992         gettimeofday(&st->last_collected_time, NULL);
993
994         // the first entry should not be stored
995         store_this_entry = 0;
996         first_entry = 1;
997
998         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name);
999     }
1000     else {
1001         // it is not the first entry
1002         // calculate the proper last_collected_time, using usec_since_last_update
1003         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec + st->usec_since_last_update;
1004         st->last_collected_time.tv_sec = (time_t) (ut / 1000000ULL);
1005         st->last_collected_time.tv_usec = (suseconds_t) (ut % 1000000ULL);
1006     }
1007
1008     // if this set has not been updated in the past
1009     // we fake the last_update time to be = now - usec_since_last_update
1010     if(unlikely(!st->last_updated.tv_sec)) {
1011         // it has never been updated before
1012         // set a fake last_updated, in the past using usec_since_last_update
1013         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1014         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1015         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1016
1017         // the first entry should not be stored
1018         store_this_entry = 0;
1019         first_entry = 1;
1020
1021         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
1022     }
1023
1024     // check if we will re-write the entire data set
1025     if(unlikely(usec_dt(&st->last_collected_time, &st->last_updated) > st->update_every * st->entries * 1000000ULL)) {
1026         info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Reseting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
1027         rrdset_reset(st);
1028
1029         st->usec_since_last_update = st->update_every * 1000000ULL;
1030
1031         gettimeofday(&st->last_collected_time, NULL);
1032
1033         unsigned long long ut = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec - st->usec_since_last_update;
1034         st->last_updated.tv_sec = (time_t) (ut / 1000000ULL);
1035         st->last_updated.tv_usec = (suseconds_t) (ut % 1000000ULL);
1036
1037         // the first entry should not be stored
1038         store_this_entry = 0;
1039         first_entry = 1;
1040     }
1041
1042     // these are the 3 variables that will help us in interpolation
1043     // last_ut = the last time we added a value to the storage
1044     //  now_ut = the time the current value is taken at
1045     // next_ut = the time of the next interpolation point
1046     last_ut = st->last_updated.tv_sec * 1000000ULL + st->last_updated.tv_usec;
1047     now_ut  = st->last_collected_time.tv_sec * 1000000ULL + st->last_collected_time.tv_usec;
1048     next_ut = (st->last_updated.tv_sec + st->update_every) * 1000000ULL;
1049
1050     if(unlikely(!first_entry && now_ut < next_ut)) {
1051         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name);
1052     }
1053
1054     if(unlikely(st->debug)) {
1055         debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1056         debug(D_RRD_STATS, "%s: now  ut = %0.3Lf (current update time)", st->name, (long double)now_ut/1000000.0);
1057         debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1058     }
1059
1060     if(unlikely(!st->counter_done)) {
1061         store_this_entry = 0;
1062         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name);
1063     }
1064     st->counter_done++;
1065
1066     // calculate totals and count the dimensions
1067     int dimensions;
1068     st->collected_total = 0;
1069     for( rd = st->dimensions, dimensions = 0 ; likely(rd) ; rd = rd->next, dimensions++ )
1070         if(likely(rd->updated)) st->collected_total += rd->collected_value;
1071
1072     uint32_t storage_flags = SN_EXISTS;
1073
1074     // process all dimensions to calculate their values
1075     // based on the collected figures only
1076     // at this stage we do not interpolate anything
1077     for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1078
1079         if(unlikely(!rd->updated)) {
1080             rd->calculated_value = 0;
1081             continue;
1082         }
1083
1084         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START "
1085             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1086             " collected_value = " COLLECTED_NUMBER_FORMAT
1087             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1088             " calculated_value = " CALCULATED_NUMBER_FORMAT
1089             , st->id, rd->name
1090             , rd->last_collected_value
1091             , rd->collected_value
1092             , rd->last_calculated_value
1093             , rd->calculated_value
1094             );
1095
1096         switch(rd->algorithm) {
1097             case RRDDIM_ABSOLUTE:
1098                 rd->calculated_value = (calculated_number)rd->collected_value
1099                     * (calculated_number)rd->multiplier
1100                     / (calculated_number)rd->divisor;
1101
1102                 if(unlikely(st->debug))
1103                     debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN "
1104                         CALCULATED_NUMBER_FORMAT " = "
1105                         COLLECTED_NUMBER_FORMAT
1106                         " * " CALCULATED_NUMBER_FORMAT
1107                         " / " CALCULATED_NUMBER_FORMAT
1108                         , st->id, rd->name
1109                         , rd->calculated_value
1110                         , rd->collected_value
1111                         , (calculated_number)rd->multiplier
1112                         , (calculated_number)rd->divisor
1113                         );
1114                 break;
1115
1116             case RRDDIM_PCENT_OVER_ROW_TOTAL:
1117                 if(unlikely(!st->collected_total))
1118                     rd->calculated_value = 0;
1119                 else
1120                     // the percentage of the current value
1121                     // over the total of all dimensions
1122                     rd->calculated_value =
1123                           (calculated_number)100
1124                         * (calculated_number)rd->collected_value
1125                         / (calculated_number)st->collected_total;
1126
1127                 if(unlikely(st->debug))
1128                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW "
1129                         CALCULATED_NUMBER_FORMAT " = 100"
1130                         " * " COLLECTED_NUMBER_FORMAT
1131                         " / " COLLECTED_NUMBER_FORMAT
1132                         , st->id, rd->name
1133                         , rd->calculated_value
1134                         , rd->collected_value
1135                         , st->collected_total
1136                         );
1137                 break;
1138
1139             case RRDDIM_INCREMENTAL:
1140                 if(unlikely(rd->counter <= 1)) {
1141                     rd->calculated_value = 0;
1142                     continue;
1143                 }
1144
1145                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1146                 // to reset the calculation (it will give zero as the calculation for this second)
1147                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1148                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1149                             , st->name, rd->name
1150                             , rd->last_collected_value
1151                             , rd->collected_value);
1152                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1153                     rd->last_collected_value = rd->collected_value;
1154                 }
1155
1156                 rd->calculated_value =
1157                       (calculated_number)(rd->collected_value - rd->last_collected_value)
1158                     * (calculated_number)rd->multiplier
1159                     / (calculated_number)rd->divisor;
1160
1161                 if(unlikely(st->debug))
1162                     debug(D_RRD_STATS, "%s/%s: CALC INC PRE "
1163                         CALCULATED_NUMBER_FORMAT " = ("
1164                         COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT
1165                         ")"
1166                         " * " CALCULATED_NUMBER_FORMAT
1167                         " / " CALCULATED_NUMBER_FORMAT
1168                         , st->id, rd->name
1169                         , rd->calculated_value
1170                         , rd->collected_value, rd->last_collected_value
1171                         , (calculated_number)rd->multiplier
1172                         , (calculated_number)rd->divisor
1173                         );
1174                 break;
1175
1176             case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1177                 if(unlikely(rd->counter <= 1)) {
1178                     rd->calculated_value = 0;
1179                     continue;
1180                 }
1181
1182                 // if the new is smaller than the old (an overflow, or reset), set the old equal to the new
1183                 // to reset the calculation (it will give zero as the calculation for this second)
1184                 if(unlikely(rd->last_collected_value > rd->collected_value)) {
1185                     debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT
1186                     , st->name, rd->name
1187                     , rd->last_collected_value
1188                     , rd->collected_value);
1189                     if(!(rd->flags & RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS)) storage_flags = SN_EXISTS_RESET;
1190                     rd->last_collected_value = rd->collected_value;
1191                 }
1192
1193                 // the percentage of the current increment
1194                 // over the increment of all dimensions together
1195                 if(unlikely(st->collected_total == st->last_collected_total))
1196                     rd->calculated_value = 0;
1197                 else
1198                     rd->calculated_value =
1199                           (calculated_number)100
1200                         * (calculated_number)(rd->collected_value - rd->last_collected_value)
1201                         / (calculated_number)(st->collected_total - st->last_collected_total);
1202
1203                 if(unlikely(st->debug))
1204                     debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF "
1205                         CALCULATED_NUMBER_FORMAT " = 100"
1206                         " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1207                         " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")"
1208                         , st->id, rd->name
1209                         , rd->calculated_value
1210                         , rd->collected_value, rd->last_collected_value
1211                         , st->collected_total, st->last_collected_total
1212                         );
1213                 break;
1214
1215             default:
1216                 // make the default zero, to make sure
1217                 // it gets noticed when we add new types
1218                 rd->calculated_value = 0;
1219
1220                 if(unlikely(st->debug))
1221                     debug(D_RRD_STATS, "%s/%s: CALC "
1222                         CALCULATED_NUMBER_FORMAT " = 0"
1223                         , st->id, rd->name
1224                         , rd->calculated_value
1225                         );
1226                 break;
1227         }
1228
1229         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 "
1230             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1231             " collected_value = " COLLECTED_NUMBER_FORMAT
1232             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1233             " calculated_value = " CALCULATED_NUMBER_FORMAT
1234             , st->id, rd->name
1235             , rd->last_collected_value
1236             , rd->collected_value
1237             , rd->last_calculated_value
1238             , rd->calculated_value
1239             );
1240
1241     }
1242
1243     // at this point we have all the calculated values ready
1244     // it is now time to interpolate values on a second boundary
1245
1246     unsigned long long first_ut = last_ut;
1247     long long iterations = (now_ut - last_ut) / (st->update_every * 1000000ULL);
1248     if((now_ut % (st->update_every * 1000000ULL)) == 0) iterations++;
1249
1250     for( ; likely(next_ut <= now_ut) ; next_ut += st->update_every * 1000000ULL, iterations-- ) {
1251 #ifdef NETDATA_INTERNAL_CHECKS
1252         if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_ut = %llu, next_ut = %llu, now_ut = %llu", st->name, first_ut, last_ut, next_ut, now_ut); }
1253 #endif
1254
1255         if(unlikely(st->debug)) {
1256             debug(D_RRD_STATS, "%s: last ut = %0.3Lf (last updated time)", st->name, (long double)last_ut/1000000.0);
1257             debug(D_RRD_STATS, "%s: next ut = %0.3Lf (next interpolation point)", st->name, (long double)next_ut/1000000.0);
1258         }
1259
1260         st->last_updated.tv_sec = (time_t) (next_ut / 1000000ULL);
1261         st->last_updated.tv_usec = 0;
1262
1263         for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) {
1264             calculated_number new_value;
1265
1266             switch(rd->algorithm) {
1267                 case RRDDIM_INCREMENTAL:
1268                     new_value = (calculated_number)
1269                         (      rd->calculated_value
1270                             * (calculated_number)(next_ut - last_ut)
1271                             / (calculated_number)(now_ut - last_ut)
1272                         );
1273
1274                     if(unlikely(st->debug))
1275                         debug(D_RRD_STATS, "%s/%s: CALC2 INC "
1276                             CALCULATED_NUMBER_FORMAT " = "
1277                             CALCULATED_NUMBER_FORMAT
1278                             " * %llu"
1279                             " / %llu"
1280                             , st->id, rd->name
1281                             , new_value
1282                             , rd->calculated_value
1283                             , (next_ut - last_ut)
1284                             , (now_ut - last_ut)
1285                             );
1286
1287                     rd->calculated_value -= new_value;
1288                     new_value += rd->last_calculated_value;
1289                     rd->last_calculated_value = 0;
1290                     new_value /= (calculated_number)st->update_every;
1291                     break;
1292
1293                 case RRDDIM_ABSOLUTE:
1294                 case RRDDIM_PCENT_OVER_ROW_TOTAL:
1295                 case RRDDIM_PCENT_OVER_DIFF_TOTAL:
1296                 default:
1297                     if(iterations == 1) {
1298                         // this is the last iteration
1299                         // do not interpolate
1300                         // just show the calculated value
1301
1302                         new_value = rd->calculated_value;
1303                     }
1304                     else {
1305                         // we have missed an update
1306                         // interpolate in the middle values
1307
1308                         new_value = (calculated_number)
1309                             (   (     (rd->calculated_value - rd->last_calculated_value)
1310                                     * (calculated_number)(next_ut - first_ut)
1311                                     / (calculated_number)(now_ut - first_ut)
1312                                 )
1313                                 +  rd->last_calculated_value
1314                             );
1315
1316                         if(unlikely(st->debug))
1317                             debug(D_RRD_STATS, "%s/%s: CALC2 DEF "
1318                                 CALCULATED_NUMBER_FORMAT " = ((("
1319                                 "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")"
1320                                 " * %llu"
1321                                 " / %llu) + " CALCULATED_NUMBER_FORMAT
1322                                 , st->id, rd->name
1323                                 , new_value
1324                                 , rd->calculated_value, rd->last_calculated_value
1325                                 , (next_ut - first_ut)
1326                                 , (now_ut - first_ut), rd->last_calculated_value
1327                                 );
1328
1329                         // this is wrong
1330                         // it fades the value towards the target
1331                         // while we know the calculated value is different
1332                         // if(likely(next_ut + st->update_every * 1000000ULL > now_ut)) rd->calculated_value = new_value;
1333                     }
1334                     break;
1335             }
1336
1337             if(unlikely(!store_this_entry)) {
1338                 store_this_entry = 1;
1339                 continue;
1340             }
1341
1342             if(likely(rd->updated && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
1343                 rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags );
1344                 rd->last_stored_value = new_value;
1345
1346                 if(unlikely(st->debug))
1347                     debug(D_RRD_STATS, "%s/%s: STORE[%ld] "
1348                         CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT
1349                         , st->id, rd->name
1350                         , st->current_entry
1351                         , unpack_storage_number(rd->values[st->current_entry]), new_value
1352                         );
1353             }
1354             else {
1355                 if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING "
1356                         , st->id, rd->name
1357                         , st->current_entry
1358                         );
1359                 rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS);
1360                 rd->last_stored_value = 0;
1361             }
1362
1363             stored_entries++;
1364
1365             if(unlikely(st->debug)) {
1366                 calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor;
1367                 calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]);
1368                 calculated_number accuracy = accuracy_loss(t1, t2);
1369                 debug(D_RRD_STATS, "%s/%s: UNPACK[%ld] = " CALCULATED_NUMBER_FORMAT " FLAGS=0x%08x (original = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s)"
1370                         , st->id, rd->name
1371                         , st->current_entry
1372                         , t2
1373                         , get_storage_number_flags(rd->values[st->current_entry])
1374                         , t1
1375                         , accuracy
1376                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1377                         );
1378
1379                 rd->collected_volume += t1;
1380                 rd->stored_volume += t2;
1381                 accuracy = accuracy_loss(rd->collected_volume, rd->stored_volume);
1382                 debug(D_RRD_STATS, "%s/%s: VOLUME[%ld] = " CALCULATED_NUMBER_FORMAT ", calculated  = " CALCULATED_NUMBER_FORMAT ", accuracy loss = " CALCULATED_NUMBER_FORMAT "%%%s"
1383                         , st->id, rd->name
1384                         , st->current_entry
1385                         , rd->stored_volume
1386                         , rd->collected_volume
1387                         , accuracy
1388                         , (accuracy > ACCURACY_LOSS) ? " **TOO BIG** " : ""
1389                         );
1390
1391             }
1392         }
1393         // reset the storage flags for the next point, if any;
1394         storage_flags = SN_EXISTS;
1395
1396         st->counter++;
1397         st->current_entry = ((st->current_entry + 1) >= st->entries) ? 0 : st->current_entry + 1;
1398         last_ut = next_ut;
1399     }
1400
1401     // align next interpolation to last collection point
1402     if(likely(stored_entries || !store_this_entry)) {
1403         st->last_updated.tv_sec = st->last_collected_time.tv_sec;
1404         st->last_updated.tv_usec = st->last_collected_time.tv_usec;
1405         st->last_collected_total  = st->collected_total;
1406     }
1407
1408     for( rd = st->dimensions; likely(rd) ; rd = rd->next ) {
1409         if(unlikely(!rd->updated)) continue;
1410
1411         if(likely(stored_entries || !store_this_entry)) {
1412             if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value);
1413             rd->last_collected_value = rd->collected_value;
1414
1415             if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value);
1416             rd->last_calculated_value = rd->calculated_value;
1417         }
1418
1419         rd->calculated_value = 0;
1420         rd->collected_value = 0;
1421         rd->updated = 0;
1422
1423         if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END "
1424             " last_collected_value = " COLLECTED_NUMBER_FORMAT
1425             " collected_value = " COLLECTED_NUMBER_FORMAT
1426             " last_calculated_value = " CALCULATED_NUMBER_FORMAT
1427             " calculated_value = " CALCULATED_NUMBER_FORMAT
1428             , st->id, rd->name
1429             , rd->last_collected_value
1430             , rd->collected_value
1431             , rd->last_calculated_value
1432             , rd->calculated_value
1433             );
1434     }
1435
1436     // ALL DONE ABOUT THE DATA UPDATE
1437     // --------------------------------------------------------------------
1438
1439     // find if there are any obsolete dimensions (not updated recently)
1440     if(unlikely(rrd_delete_unupdated_dimensions)) {
1441
1442         for( rd = st->dimensions; likely(rd) ; rd = rd->next )
1443             if((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)
1444                 break;
1445
1446         if(unlikely(rd)) {
1447             // there is dimension to free
1448             // upgrade our read lock to a write lock
1449             pthread_rwlock_unlock(&st->rwlock);
1450             pthread_rwlock_wrlock(&st->rwlock);
1451
1452             for( rd = st->dimensions, last = NULL ; likely(rd) ; ) {
1453                 // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds
1454
1455                 if(unlikely((rd->last_collected_time.tv_sec + (rrd_delete_unupdated_dimensions * st->update_every)) < st->last_collected_time.tv_sec)) {
1456                     info("Removing obsolete dimension '%s' (%s) of '%s' (%s).", rd->name, rd->id, st->name, st->id);
1457
1458                     if(unlikely(!last)) {
1459                         st->dimensions = rd->next;
1460                         rd->next = NULL;
1461                         rrddim_free(st, rd);
1462                         rd = st->dimensions;
1463                         continue;
1464                     }
1465                     else {
1466                         last->next = rd->next;
1467                         rd->next = NULL;
1468                         rrddim_free(st, rd);
1469                         rd = last->next;
1470                         continue;
1471                     }
1472                 }
1473
1474                 last = rd;
1475                 rd = rd->next;
1476             }
1477
1478             if(unlikely(!st->dimensions)) {
1479                 info("Disabling chart %s (%s) since it does not have any dimensions", st->name, st->id);
1480                 st->enabled = 0;
1481             }
1482         }
1483     }
1484
1485     pthread_rwlock_unlock(&st->rwlock);
1486
1487     if(unlikely(pthread_setcancelstate(oldstate, NULL) != 0))
1488         error("Cannot set pthread cancel state to RESTORE (%d).", oldstate);
1489
1490     return(st->usec_since_last_update);
1491 }