X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=src%2Frrdset.c;h=63aeacebd04fb42d0e3ac06fc110f398616295be;hb=73212e3a8731012fee476e373f4a5b647f5126e5;hp=cfd3b9db2c5888dab59f3b29446660216368fd14;hpb=99e33cc6d52402d77c2be0409386461281e86be0;p=netdata.git diff --git a/src/rrdset.c b/src/rrdset.c index cfd3b9db..63aeaceb 100644 --- a/src/rrdset.c +++ b/src/rrdset.c @@ -3,6 +3,23 @@ #define RRD_DEFAULT_GAP_INTERPOLATIONS 1 +void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) { + debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id); + + int ret = pthread_rwlock_trywrlock(&st->rrdset_rwlock); + if(ret == 0) + fatal("RRDSET '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file); +} + +void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) { + debug(D_RRD_CALLS, "Checking write lock on chart '%s'", st->id); + + int ret = pthread_rwlock_tryrdlock(&st->rrdset_rwlock); + if(ret == 0) + fatal("RRDSET '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file); +} + + // ---------------------------------------------------------------------------- // RRDSET index @@ -100,7 +117,6 @@ inline RRDSET *rrdset_find_byname(RRDHOST *host, const char *name) { return(st); } - // ---------------------------------------------------------------------------- // RRDSET - rename charts @@ -133,20 +149,20 @@ void rrdset_set_name(RRDSET *st, const char *name) { if(st->name) { rrdset_index_del_name(st->rrdhost, st); - st->name = config_set_default(st->id, "name", b); + st->name = config_set_default(st->config_section, "name", b); st->hash_name = simple_hash(st->name); rrdsetvar_rename_all(st); } else { - st->name = config_get(st->id, "name", b); + st->name = config_get(st->config_section, "name", b); st->hash_name = simple_hash(st->name); } - pthread_rwlock_wrlock(&st->rwlock); + rrdset_wrlock(st); RRDDIM *rd; - for(rd = st->dimensions; rd ;rd = rd->next) + rrddim_foreach_write(rd, st) rrddimvar_rename_all(rd); - pthread_rwlock_unlock(&st->rwlock); + rrdset_unlock(st); if(unlikely(rrdset_index_add_name(st->rrdhost, st) != st)) error("RRDSET: INTERNAL ERROR: attempted to index duplicate chart name '%s'", st->name); @@ -168,10 +184,10 @@ void rrdset_reset(RRDSET *st) { st->counter_done = 0; RRDDIM *rd; - for(rd = st->dimensions; rd ; rd = rd->next) { + rrddim_foreach_read(rd, st) { rd->last_collected_time.tv_sec = 0; rd->last_collected_time.tv_usec = 0; - rd->counter = 0; + rd->collections_counter = 0; memset(rd->values, 0, rd->entries * sizeof(storage_number)); } } @@ -179,15 +195,16 @@ void rrdset_reset(RRDSET *st) { // ---------------------------------------------------------------------------- // RRDSET - helpers for rrdset_create() -static inline long align_entries_to_pagesize(long entries) { - if(entries < 5) entries = 5; - if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX; +inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) { + if(unlikely(entries < 5)) entries = 5; + if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX; -#ifdef NETDATA_LOG_ALLOCATIONS - long page = (size_t)sysconf(_SC_PAGESIZE); + if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM)) + return entries; + long page = (size_t)sysconf(_SC_PAGESIZE); long size = sizeof(RRDDIM) + entries * sizeof(storage_number); - if(size % page) { + if(unlikely(size % page)) { size -= (size % page); size += page; @@ -196,23 +213,114 @@ static inline long align_entries_to_pagesize(long entries) { } return entries; -#else - return entries; -#endif } -static inline void timeval_align(struct timeval *tv, int update_every) { +static inline void last_collected_time_align(struct timeval *tv, int update_every) { tv->tv_sec -= tv->tv_sec % update_every; tv->tv_usec = 500000; } +static inline void last_updated_time_align(struct timeval *tv, int update_every) { + tv->tv_sec -= tv->tv_sec % update_every; + tv->tv_usec = 0; +} + // ---------------------------------------------------------------------------- -// RRDSET - create a chart +// RRDSET - free a chart + +void rrdset_free(RRDSET *st) { + if(unlikely(!st)) return; + + rrdhost_check_wrlock(st->rrdhost); // make sure we have a write lock on the host + rrdset_wrlock(st); // lock this RRDSET + + // info("Removing chart '%s' ('%s')", st->id, st->name); + + // ------------------------------------------------------------------------ + // remove it from the indexes + + if(unlikely(rrdset_index_del(st->rrdhost, st) != st)) + error("RRDSET: INTERNAL ERROR: attempt to remove from index chart '%s', removed a different chart.", st->id); + + rrdset_index_del_name(st->rrdhost, st); + + // ------------------------------------------------------------------------ + // free its children structures + + while(st->variables) rrdsetvar_free(st->variables); + while(st->alarms) rrdsetcalc_unlink(st->alarms); + while(st->dimensions) rrddim_free(st, st->dimensions); + + rrdfamily_free(st->rrdhost, st->rrdfamily); + + // ------------------------------------------------------------------------ + // unlink it from the host + + if(st == st->rrdhost->rrdset_root) { + st->rrdhost->rrdset_root = st->next; + } + else { + // find the previous one + RRDSET *s; + for(s = st->rrdhost->rrdset_root; s && s->next != st ; s = s->next) ; + + // bypass it + if(s) s->next = st->next; + else error("Request to free RRDSET '%s': cannot find it under host '%s'", st->id, st->rrdhost->hostname); + } + + rrdset_unlock(st); -RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const char *name, const char *family - , const char *context, const char *title, const char *units, long priority - , int update_every, int chart_type) { + // ------------------------------------------------------------------------ + // free it + // free directly allocated members + freez(st->config_section); + + if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || st->rrd_memory_mode == RRD_MEMORY_MODE_MAP) { + debug(D_RRD_CALLS, "Unmapping stats '%s'.", st->name); + munmap(st, st->memsize); + } + else + freez(st); +} + +void rrdset_save(RRDSET *st) { + RRDDIM *rd; + + rrdset_check_rdlock(st); + + // info("Saving chart '%s' ('%s')", st->id, st->name); + + if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) { + debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename); + savememory(st->cache_filename, st, st->memsize); + } + + rrddim_foreach_read(rd, st) { + if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) { + debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename); + savememory(rd->cache_filename, rd, rd->memsize); + } + } +} + +// ---------------------------------------------------------------------------- +// RRDSET - create a chart + +RRDSET *rrdset_create( + RRDHOST *host + , const char *type + , const char *id + , const char *name + , const char *family + , const char *context + , const char *title + , const char *units + , long priority + , int update_every + , RRDSET_TYPE chart_type +) { if(!type || !type[0]) { fatal("Cannot create rrd stats without a type."); return NULL; @@ -223,88 +331,125 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha return NULL; } - char fullid[RRD_ID_LENGTH_MAX + 1]; - char fullfilename[FILENAME_MAX + 1]; + // ------------------------------------------------------------------------ + // check if it already exists + char fullid[RRD_ID_LENGTH_MAX + 1]; snprintfz(fullid, RRD_ID_LENGTH_MAX, "%s.%s", type, id); RRDSET *st = rrdset_find(host, fullid); if(st) { + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); debug(D_RRD_CALLS, "RRDSET '%s', already exists.", fullid); return st; } - long rentries = config_get_number(fullid, "history", host->rrd_history_entries); - long entries = align_entries_to_pagesize(rentries); - if(entries != rentries) entries = config_set_number(fullid, "history", entries); + char fullfilename[FILENAME_MAX + 1]; + + // ------------------------------------------------------------------------ + // compose the config_section for this chart + + char config_section[RRD_ID_LENGTH_MAX + 1]; + if(host == localhost) + strcpy(config_section, fullid); + else + snprintfz(config_section, RRD_ID_LENGTH_MAX, "%s/%s", host->machine_guid, fullid); + + // ------------------------------------------------------------------------ + // get the options from the config, we need to create it - int enabled = config_get_boolean(fullid, "enabled", 1); + long rentries = config_get_number(config_section, "history", host->rrd_history_entries); + long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries); + if(entries != rentries) entries = config_set_number(config_section, "history", entries); + + if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries) + entries = config_set_number(config_section, "history", 10); + + int enabled = config_get_boolean(config_section, "enabled", 1); if(!enabled) entries = 5; unsigned long size = sizeof(RRDSET); - char *cache_dir = rrdset_cache_dir(host, fullid); + char *cache_dir = rrdset_cache_dir(host, fullid, config_section); + + time_t now = now_realtime_sec(); + + // ------------------------------------------------------------------------ + // load it or allocate it debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id); snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir); - if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) st = (RRDSET *)mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP)?MAP_SHARED:MAP_PRIVATE), 0); - if(st) { - if(strcmp(st->magic, RRDSET_MAGIC) != 0) { - errno = 0; - info("Initializing file %s.", fullfilename); - memset(st, 0, size); - } - else if(strcmp(st->id, fullid) != 0) { - errno = 0; - error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid); - // munmap(st, size); - // st = NULL; - memset(st, 0, size); - } - else if(st->memsize != size || st->entries != entries) { - errno = 0; - error("File %s does not have the desired size. Clearing it.", fullfilename); - memset(st, 0, size); - } - else if(st->update_every != update_every) { - errno = 0; - error("File %s does not have the desired update frequency. Clearing it.", fullfilename); - memset(st, 0, size); - } - else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) { - errno = 0; - error("File %s is too old. Clearing it.", fullfilename); - memset(st, 0, size); - } + if(host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) { + st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0); + if(st) { + memset(&st->avl, 0, sizeof(avl)); + memset(&st->avlname, 0, sizeof(avl)); + memset(&st->variables_root_index, 0, sizeof(avl_tree_lock)); + memset(&st->dimensions_index, 0, sizeof(avl_tree_lock)); + memset(&st->rrdset_rwlock, 0, sizeof(pthread_rwlock_t)); + + st->name = NULL; + st->type = NULL; + st->family = NULL; + st->context = NULL; + st->title = NULL; + st->units = NULL; + st->dimensions = NULL; + st->next = NULL; + st->variables = NULL; + st->alarms = NULL; + st->flags = 0x00000000; + + if(strcmp(st->magic, RRDSET_MAGIC) != 0) { + errno = 0; + info("Initializing file %s.", fullfilename); + memset(st, 0, size); + } + else if(strcmp(st->id, fullid) != 0) { + errno = 0; + error("File %s contents are not for chart %s. Clearing it.", fullfilename, fullid); + // munmap(st, size); + // st = NULL; + memset(st, 0, size); + } + else if(st->memsize != size || st->entries != entries) { + errno = 0; + error("File %s does not have the desired size. Clearing it.", fullfilename); + memset(st, 0, size); + } + else if(st->update_every != update_every) { + errno = 0; + error("File %s does not have the desired update frequency. Clearing it.", fullfilename); + memset(st, 0, size); + } + else if((now - st->last_updated.tv_sec) > update_every * entries) { + errno = 0; + error("File %s is too old. Clearing it.", fullfilename); + memset(st, 0, size); + } + else if(st->last_updated.tv_sec > now + update_every) { + errno = 0; + error("File %s refers to the future. Clearing it.", fullfilename); + memset(st, 0, size); + } - // make sure the database is aligned - if(st->last_updated.tv_sec) - timeval_align(&st->last_updated, update_every); - } + // make sure the database is aligned + if(st->last_updated.tv_sec) + last_updated_time_align(&st->last_updated, update_every); - if(st) { - st->name = NULL; - st->type = NULL; - st->family = NULL; - st->context = NULL; - st->title = NULL; - st->units = NULL; - st->dimensions = NULL; - st->next = NULL; - st->rrd_memory_mode = host->rrd_memory_mode; - st->variables = NULL; - st->alarms = NULL; - memset(&st->rwlock, 0, sizeof(pthread_rwlock_t)); - memset(&st->avl, 0, sizeof(avl)); - memset(&st->avlname, 0, sizeof(avl)); - memset(&st->variables_root_index, 0, sizeof(avl_tree_lock)); - memset(&st->dimensions_index, 0, sizeof(avl_tree_lock)); + + // make sure we have the right memory mode + // even if we cleared the memory + st->rrd_memory_mode = host->rrd_memory_mode; + } } - else { + + if(unlikely(!st)) { st = callocz(1, size); - st->rrd_memory_mode = RRD_MEMORY_MODE_RAM; + st->rrd_memory_mode = (host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_RAM; } + st->config_section = strdup(config_section); st->rrdhost = host; st->memsize = size; st->entries = entries; @@ -320,19 +465,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha st->cache_dir = cache_dir; - st->chart_type = rrdset_type_id(config_get(st->id, "chart type", rrdset_type_name(chart_type))); - st->type = config_get(st->id, "type", type); - st->family = config_get(st->id, "family", family?family:st->type); - st->units = config_get(st->id, "units", units?units:""); + st->chart_type = rrdset_type_id(config_get(st->config_section, "chart type", rrdset_type_name(chart_type))); + st->type = config_get(st->config_section, "type", type); + st->family = config_get(st->config_section, "family", family?family:st->type); + st->units = config_get(st->config_section, "units", units?units:""); - st->context = config_get(st->id, "context", context?context:st->id); + st->context = config_get(st->config_section, "context", context?context:st->id); st->hash_context = simple_hash(st->context); - st->priority = config_get_number(st->id, "priority", priority); - st->enabled = enabled; + st->priority = config_get_number(st->config_section, "priority", priority); + if(enabled) + rrdset_flag_set(st, RRDSET_FLAG_ENABLED); + else + rrdset_flag_clear(st, RRDSET_FLAG_ENABLED); - st->isdetail = 0; - st->debug = 0; + rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + rrdset_flag_clear(st, RRDSET_FLAG_DEBUG); + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); // if(!strcmp(st->id, "disk_util.dm-0")) { // st->debug = 1; @@ -348,13 +497,13 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha st->counter_done = 0; st->gap_when_lost_iterations_above = (int) ( - config_get_number(st->id, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2); + config_get_number(st->config_section, "gap when lost iterations above", RRD_DEFAULT_GAP_INTERPOLATIONS) + 2); avl_init_lock(&st->dimensions_index, rrddim_compare); avl_init_lock(&st->variables_root_index, rrdvar_compare); - pthread_rwlock_init(&st->rwlock, NULL); - rrdhost_rwlock(host); + pthread_rwlock_init(&st->rrdset_rwlock, NULL); + rrdhost_wrlock(host); if(name && *name) rrdset_set_name(st, name); else rrdset_set_name(st, id); @@ -364,7 +513,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha char varvalue2[CONFIG_MAX_VALUE + 1]; snprintfz(varvalue, CONFIG_MAX_VALUE, "%s (%s)", title?title:"", st->name); json_escape_string(varvalue2, varvalue, sizeof(varvalue2)); - st->title = config_get(st->id, "title", varvalue2); + st->title = config_get(st->config_section, "title", varvalue2); } st->rrdfamily = rrdfamily_create(host, st->family); @@ -386,6 +535,8 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha rrdsetcalc_link_matching(st); rrdcalctemplate_link_matching(st); + rrdhost_cleanup(host); + rrdhost_unlock(host); return(st); @@ -395,16 +546,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha // ---------------------------------------------------------------------------- // RRDSET - data collection iteration control -void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) { - if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) { +inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) { + + if(unlikely(!st->last_collected_time.tv_sec)) { // the first entry microseconds = st->update_every * USEC_PER_SEC; } + else if(unlikely(!microseconds)) { + // no dt given by the plugin + struct timeval now; + now_realtime_timeval(&now); + microseconds = dt_usec(&now, &st->last_collected_time); + } + st->usec_since_last_update = microseconds; } -void rrdset_next_usec(RRDSET *st, usec_t microseconds) -{ +inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) { struct timeval now; now_realtime_timeval(&now); @@ -418,35 +576,53 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds) } else { // microseconds has the time since the last collection -#ifdef NETDATA_INTERNAL_CHECKS - usec_t now_usec = timeval_usec(&now); - usec_t last_usec = timeval_usec(&st->last_collected_time); -#endif - usec_t since_last_usec = dt_usec(&now, &st->last_collected_time); +//#ifdef NETDATA_INTERNAL_CHECKS +// usec_t now_usec = timeval_usec(&now); +// usec_t last_usec = timeval_usec(&st->last_collected_time); +//#endif + susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time); + + if(unlikely(since_last_usec < 0)) { + // oops! the database is in the future + error("Database for chart '%s' on host '%s' is %lld microseconds in the future. Adjusting it to current time.", st->id, st->rrdhost->hostname, -since_last_usec); + + st->last_collected_time.tv_sec = now.tv_sec - st->update_every; + st->last_collected_time.tv_usec = now.tv_usec; + last_collected_time_align(&st->last_collected_time, st->update_every); + + st->last_updated.tv_sec = now.tv_sec - st->update_every; + st->last_updated.tv_usec = now.tv_usec; + last_updated_time_align(&st->last_updated, st->update_every); + + microseconds = st->update_every * USEC_PER_SEC; + since_last_usec = st->update_every * USEC_PER_SEC; + } // verify the microseconds given is good - if(unlikely(microseconds > since_last_usec)) { - debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id); + if(unlikely(microseconds > (usec_t)since_last_usec)) { + debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id); -#ifdef NETDATA_INTERNAL_CHECKS - if(unlikely(last_usec + microseconds > now_usec + 1000)) - error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id); -#endif +//#ifdef NETDATA_INTERNAL_CHECKS +// if(unlikely(last_usec + microseconds > now_usec + 1000)) +// error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id); +//#endif - microseconds = since_last_usec; + microseconds = (usec_t)since_last_usec; } - else if(unlikely(microseconds < since_last_usec * 0.8)) { - debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id); + else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) { + debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id); -#ifdef NETDATA_INTERNAL_CHECKS - error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id); -#endif - microseconds = since_last_usec; +//#ifdef NETDATA_INTERNAL_CHECKS +// error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id); +//#endif + microseconds = (usec_t)since_last_usec; } } debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds); - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: NEXT: %llu microseconds", st->name, microseconds); + st->usec_since_last_update = microseconds; } @@ -454,8 +630,54 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds) // ---------------------------------------------------------------------------- // RRDSET - process the collected values for all dimensions of a chart -usec_t rrdset_done(RRDSET *st) { - if(unlikely(netdata_exit)) return 0; +static inline void rrdset_init_last_collected_time(RRDSET *st) { + now_realtime_timeval(&st->last_collected_time); + last_collected_time_align(&st->last_collected_time, st->update_every); +} + +static inline usec_t rrdset_update_last_collected_time(RRDSET *st) { + usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; + usec_t ut = last_collect_ut + st->usec_since_last_update; + st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC); + st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + return last_collect_ut; +} + +static inline void rrdset_init_last_updated_time(RRDSET *st) { + // copy the last collected time to last updated time + st->last_updated.tv_sec = st->last_collected_time.tv_sec; + st->last_updated.tv_usec = st->last_collected_time.tv_usec; + last_updated_time_align(&st->last_updated, st->update_every); +} + +static inline void rrdset_done_push_exclusive(RRDSET *st) { + if(unlikely(!st->last_collected_time.tv_sec)) { + // it is the first entry + // set the last_collected_time to now + rrdset_init_last_collected_time(st); + } + else { + // it is not the first entry + // calculate the proper last_collected_time, using usec_since_last_update + rrdset_update_last_collected_time(st); + } + + st->counter_done++; + + rrdset_rdlock(st); + rrdset_done_push(st); + rrdset_unlock(st); +} + +void rrdset_done(RRDSET *st) { + if(unlikely(netdata_exit)) return; + + if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) { + if(unlikely(st->rrdhost->rrdpush_enabled)) + rrdset_done_push_exclusive(st); + + return; + } debug(D_RRD_CALLS, "rrdset_done() for chart %s", st->name); @@ -482,7 +704,7 @@ usec_t rrdset_done(RRDSET *st) { error("Cannot set pthread cancel state to DISABLE."); // a read lock is OK here - pthread_rwlock_rdlock(&st->rwlock); + rrdset_rdlock(st); /* // enable the chart, if it was disabled @@ -490,6 +712,11 @@ usec_t rrdset_done(RRDSET *st) { st->enabled = 1; */ + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) { + error("Chart '%s' has the OBSOLETE flag set, but it is collected.", st->id); + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); + } + // check if the chart has a long time to be updated if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) { info("%s: took too long to be updated (%0.3Lf secs). Resetting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0)); @@ -497,14 +724,15 @@ usec_t rrdset_done(RRDSET *st) { st->usec_since_last_update = update_every_ut; first_entry = 1; } - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update); + + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: microseconds since last update: %llu", st->name, st->usec_since_last_update); // set last_collected_time if(unlikely(!st->last_collected_time.tv_sec)) { // it is the first entry // set the last_collected_time to now - now_realtime_timeval(&st->last_collected_time); - timeval_align(&st->last_collected_time, st->update_every); + rrdset_init_last_collected_time(st); last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut; @@ -512,15 +740,13 @@ usec_t rrdset_done(RRDSET *st) { store_this_entry = 0; first_entry = 1; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: has not set last_collected_time. Setting it now. Will not store the next entry.", st->name); } else { // it is not the first entry // calculate the proper last_collected_time, using usec_since_last_update - last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; - usec_t ut = last_collect_ut + st->usec_since_last_update; - st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + last_collect_ut = rrdset_update_last_collected_time(st); } // if this set has not been updated in the past @@ -528,31 +754,24 @@ usec_t rrdset_done(RRDSET *st) { if(unlikely(!st->last_updated.tv_sec)) { // it has never been updated before // set a fake last_updated, in the past using usec_since_last_update - usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update; - st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + rrdset_init_last_updated_time(st); // the first entry should not be stored store_this_entry = 0; first_entry = 1; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update); } // check if we will re-write the entire data set if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) { info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec); rrdset_reset(st); + rrdset_init_last_updated_time(st); st->usec_since_last_update = update_every_ut; - now_realtime_timeval(&st->last_collected_time); - timeval_align(&st->last_collected_time, st->update_every); - - usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update; - st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); - // the first entry should not be stored store_this_entry = 0; first_entry = 1; @@ -566,7 +785,7 @@ usec_t rrdset_done(RRDSET *st) { now_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; next_store_ut = (st->last_updated.tv_sec + st->update_every) * USEC_PER_SEC; - if(unlikely(st->debug)) { + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) { debug(D_RRD_STATS, "%s: last_collect_ut = %0.3Lf (last collection time)", st->name, (long double)last_collect_ut/1000000.0); debug(D_RRD_STATS, "%s: now_collect_ut = %0.3Lf (current collection time)", st->name, (long double)now_collect_ut/1000000.0); debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0); @@ -575,30 +794,37 @@ usec_t rrdset_done(RRDSET *st) { if(unlikely(!st->counter_done)) { store_this_entry = 0; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: Will not store the next entry.", st->name); } st->counter_done++; + if(unlikely(st->rrdhost->rrdpush_enabled)) + rrdset_done_push(st); + // calculate totals and count the dimensions - int dimensions; + int dimensions = 0; st->collected_total = 0; - for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ ) - if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) + rrddim_foreach_read(rd, st) { + dimensions++; + if(likely(rd->updated)) st->collected_total += rd->collected_value; + } uint32_t storage_flags = SN_EXISTS; // process all dimensions to calculate their values // based on the collected figures only // at this stage we do not interpolate anything - for( rd = st->dimensions ; rd ; rd = rd->next ) { + rrddim_foreach_read(rd, st) { - if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) { + if(unlikely(!rd->updated)) { rd->calculated_value = 0; continue; } - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: START " + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: START " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT " last_calculated_value = " CALCULATED_NUMBER_FORMAT @@ -616,7 +842,7 @@ usec_t rrdset_done(RRDSET *st) { * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC ABS/ABS-NO-IN " CALCULATED_NUMBER_FORMAT " = " COLLECTED_NUMBER_FORMAT @@ -641,7 +867,7 @@ usec_t rrdset_done(RRDSET *st) { * (calculated_number)rd->collected_value / (calculated_number)st->collected_total; - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC PCENT-ROW " CALCULATED_NUMBER_FORMAT " = 100" " * " COLLECTED_NUMBER_FORMAT @@ -654,7 +880,7 @@ usec_t rrdset_done(RRDSET *st) { break; case RRD_ALGORITHM_INCREMENTAL: - if(unlikely(rd->counter <= 1)) { + if(unlikely(rd->collections_counter <= 1)) { rd->calculated_value = 0; continue; } @@ -678,7 +904,7 @@ usec_t rrdset_done(RRDSET *st) { * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC INC PRE " CALCULATED_NUMBER_FORMAT " = (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT @@ -694,7 +920,7 @@ usec_t rrdset_done(RRDSET *st) { break; case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL: - if(unlikely(rd->counter <= 1)) { + if(unlikely(rd->collections_counter <= 1)) { rd->calculated_value = 0; continue; } @@ -723,7 +949,7 @@ usec_t rrdset_done(RRDSET *st) { * (calculated_number)(rd->collected_value - rd->last_collected_value) / (calculated_number)(st->collected_total - st->last_collected_total); - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC PCENT-DIFF " CALCULATED_NUMBER_FORMAT " = 100" " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" @@ -740,7 +966,7 @@ usec_t rrdset_done(RRDSET *st) { // it gets noticed when we add new types rd->calculated_value = 0; - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC " CALCULATED_NUMBER_FORMAT " = 0" , st->id, rd->name @@ -749,7 +975,8 @@ usec_t rrdset_done(RRDSET *st) { break; } - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: PHASE2 " + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: PHASE2 " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT " last_calculated_value = " CALCULATED_NUMBER_FORMAT @@ -768,10 +995,13 @@ usec_t rrdset_done(RRDSET *st) { if(unlikely(now_collect_ut < next_store_ut)) { // this is collected in the same interpolation point - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name); -#ifdef NETDATA_INTERNAL_CHECKS - info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut); -#endif + + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: THIS IS IN THE SAME INTERPOLATION POINT", st->name); + +//#ifdef NETDATA_INTERNAL_CHECKS +// info("%s is collected in the same interpolation point: short by %llu microseconds", st->name, next_store_ut - now_collect_ut); +//#endif } usec_t first_ut = last_stored_ut; @@ -779,11 +1009,11 @@ usec_t rrdset_done(RRDSET *st) { if((now_collect_ut % (update_every_ut)) == 0) iterations++; for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { -#ifdef NETDATA_INTERNAL_CHECKS - if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); } -#endif +//#ifdef NETDATA_INTERNAL_CHECKS +// if(iterations < 0) { error("%s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", st->name, first_ut, last_stored_ut, next_store_ut, now_collect_ut); } +//#endif - if(unlikely(st->debug)) { + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) { debug(D_RRD_STATS, "%s: last_stored_ut = %0.3Lf (last updated time)", st->name, (long double)last_stored_ut/1000000.0); debug(D_RRD_STATS, "%s: next_store_ut = %0.3Lf (next interpolation point)", st->name, (long double)next_store_ut/1000000.0); } @@ -791,7 +1021,7 @@ usec_t rrdset_done(RRDSET *st) { st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC); st->last_updated.tv_usec = 0; - for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) { + rrddim_foreach_read(rd, st) { calculated_number new_value; switch(rd->algorithm) { @@ -802,7 +1032,7 @@ usec_t rrdset_done(RRDSET *st) { / (calculated_number)(now_collect_ut - last_collect_ut) ); - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC2 INC " CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT @@ -821,7 +1051,7 @@ usec_t rrdset_done(RRDSET *st) { new_value /= (calculated_number)st->update_every; if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) { - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: COLLECTION POINT IS SHORT " CALCULATED_NUMBER_FORMAT " - EXTRAPOLATING", st->id, rd->name , (calculated_number)(next_store_ut - last_stored_ut) @@ -853,7 +1083,7 @@ usec_t rrdset_done(RRDSET *st) { + rd->last_calculated_value ); - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: CALC2 DEF " CALCULATED_NUMBER_FORMAT " = (((" "(" CALCULATED_NUMBER_FORMAT " - " CALCULATED_NUMBER_FORMAT ")" @@ -874,11 +1104,11 @@ usec_t rrdset_done(RRDSET *st) { continue; } - if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) { + if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) { rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags ); rd->last_stored_value = new_value; - if(unlikely(st->debug)) + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: STORE[%ld] " CALCULATED_NUMBER_FORMAT " = " CALCULATED_NUMBER_FORMAT , st->id, rd->name @@ -887,9 +1117,10 @@ usec_t rrdset_done(RRDSET *st) { ); } else { - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING " - , st->id, rd->name - , st->current_entry + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: STORE[%ld] = NON EXISTING " + , st->id, rd->name + , st->current_entry ); rd->values[st->current_entry] = pack_storage_number(0, SN_NOT_EXISTS); rd->last_stored_value = NAN; @@ -897,7 +1128,7 @@ usec_t rrdset_done(RRDSET *st) { stored_entries++; - if(unlikely(st->debug)) { + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) { calculated_number t1 = new_value * (calculated_number)rd->multiplier / (calculated_number)rd->divisor; calculated_number t2 = unpack_storage_number(rd->values[st->current_entry]); calculated_number accuracy = accuracy_loss(t1, t2); @@ -935,37 +1166,43 @@ usec_t rrdset_done(RRDSET *st) { st->last_collected_total = st->collected_total; - for( rd = st->dimensions; rd ; rd = rd->next ) { - if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) + rrddim_foreach_read(rd, st) { + if(unlikely(!rd->updated)) continue; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: setting last_collected_value (old: " COLLECTED_NUMBER_FORMAT ") to last_collected_value (new: " COLLECTED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_collected_value, rd->collected_value); + rd->last_collected_value = rd->collected_value; switch(rd->algorithm) { case RRD_ALGORITHM_INCREMENTAL: if(unlikely(!first_entry)) { - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value + rd->calculated_value, rd->calculated_value); rd->last_calculated_value += rd->calculated_value; } else { - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s: THIS IS THE FIRST POINT", st->name); } break; case RRD_ALGORITHM_ABSOLUTE: case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL: case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL: - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value); + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: setting last_calculated_value (old: " CALCULATED_NUMBER_FORMAT ") to last_calculated_value (new: " CALCULATED_NUMBER_FORMAT ")", st->id, rd->name, rd->last_calculated_value, rd->calculated_value); rd->last_calculated_value = rd->calculated_value; break; } rd->calculated_value = 0; rd->collected_value = 0; - rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED); + rd->updated = 0; - if(unlikely(st->debug)) debug(D_RRD_STATS, "%s/%s: END " + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) + debug(D_RRD_STATS, "%s/%s: END " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT " last_calculated_value = " CALCULATED_NUMBER_FORMAT @@ -993,8 +1230,8 @@ usec_t rrdset_done(RRDSET *st) { RRDDIM *last; // there is dimension to free // upgrade our read lock to a write lock - pthread_rwlock_unlock(&st->rwlock); - pthread_rwlock_wrlock(&st->rwlock); + pthread_rwlock_unlock(&st->rrdset_rwlock); + pthread_rwlock_wrlock(&st->rrdset_rwlock); for( rd = st->dimensions, last = NULL ; likely(rd) ; ) { // remove it only it is not updated in rrd_delete_unupdated_dimensions seconds @@ -1030,11 +1267,8 @@ usec_t rrdset_done(RRDSET *st) { } */ - pthread_rwlock_unlock(&st->rwlock); + rrdset_unlock(st); if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); - - return(st->usec_since_last_update); } -