X-Git-Url: https://arthur.barton.de/gitweb/?a=blobdiff_plain;f=src%2Frrdset.c;h=63aeacebd04fb42d0e3ac06fc110f398616295be;hb=73212e3a8731012fee476e373f4a5b647f5126e5;hp=fea3d727523f3f1fd2da6eda1ff8a3e38e3196e7;hpb=489f4dfccb2259103175c01c4b51d76946248629;p=netdata.git diff --git a/src/rrdset.c b/src/rrdset.c index fea3d727..63aeaceb 100644 --- a/src/rrdset.c +++ b/src/rrdset.c @@ -3,6 +3,23 @@ #define RRD_DEFAULT_GAP_INTERPOLATIONS 1 +void rrdset_check_rdlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) { + debug(D_RRD_CALLS, "Checking read lock on chart '%s'", st->id); + + int ret = pthread_rwlock_trywrlock(&st->rrdset_rwlock); + if(ret == 0) + fatal("RRDSET '%s' should be read-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file); +} + +void rrdset_check_wrlock_int(RRDSET *st, const char *file, const char *function, const unsigned long line) { + debug(D_RRD_CALLS, "Checking write lock on chart '%s'", st->id); + + int ret = pthread_rwlock_tryrdlock(&st->rrdset_rwlock); + if(ret == 0) + fatal("RRDSET '%s' should be write-locked, but it is not, at function %s() at line %lu of file '%s'", st->id, function, line, file); +} + + // ---------------------------------------------------------------------------- // RRDSET index @@ -143,7 +160,7 @@ void rrdset_set_name(RRDSET *st, const char *name) { rrdset_wrlock(st); RRDDIM *rd; - for(rd = st->dimensions; rd ;rd = rd->next) + rrddim_foreach_write(rd, st) rrddimvar_rename_all(rd); rrdset_unlock(st); @@ -167,10 +184,10 @@ void rrdset_reset(RRDSET *st) { st->counter_done = 0; RRDDIM *rd; - for(rd = st->dimensions; rd ; rd = rd->next) { + rrddim_foreach_read(rd, st) { rd->last_collected_time.tv_sec = 0; rd->last_collected_time.tv_usec = 0; - rd->counter = 0; + rd->collections_counter = 0; memset(rd->values, 0, rd->entries * sizeof(storage_number)); } } @@ -178,16 +195,16 @@ void rrdset_reset(RRDSET *st) { // ---------------------------------------------------------------------------- // RRDSET - helpers for rrdset_create() -inline long align_entries_to_pagesize(long entries) { - if(central_netdata_to_push_data) - return entries; +inline long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries) { + if(unlikely(entries < 5)) entries = 5; + if(unlikely(entries > RRD_HISTORY_ENTRIES_MAX)) entries = RRD_HISTORY_ENTRIES_MAX; - if(entries < 5) entries = 5; - if(entries > RRD_HISTORY_ENTRIES_MAX) entries = RRD_HISTORY_ENTRIES_MAX; + if(unlikely(mode == RRD_MEMORY_MODE_NONE || mode == RRD_MEMORY_MODE_RAM)) + return entries; long page = (size_t)sysconf(_SC_PAGESIZE); long size = sizeof(RRDDIM) + entries * sizeof(storage_number); - if(size % page) { + if(unlikely(size % page)) { size -= (size % page); size += page; @@ -198,11 +215,16 @@ inline long align_entries_to_pagesize(long entries) { return entries; } -static inline void timeval_align(struct timeval *tv, int update_every) { +static inline void last_collected_time_align(struct timeval *tv, int update_every) { tv->tv_sec -= tv->tv_sec % update_every; tv->tv_usec = 500000; } +static inline void last_updated_time_align(struct timeval *tv, int update_every) { + tv->tv_sec -= tv->tv_sec % update_every; + tv->tv_usec = 0; +} + // ---------------------------------------------------------------------------- // RRDSET - free a chart @@ -212,14 +234,7 @@ void rrdset_free(RRDSET *st) { rrdhost_check_wrlock(st->rrdhost); // make sure we have a write lock on the host rrdset_wrlock(st); // lock this RRDSET - // ------------------------------------------------------------------------ - // free its children structures - - while(st->variables) rrdsetvar_free(st->variables); - while(st->alarms) rrdsetcalc_unlink(st->alarms); - while(st->dimensions) rrddim_free(st, st->dimensions); - - rrdfamily_free(st->rrdhost, st->rrdfamily); + // info("Removing chart '%s' ('%s')", st->id, st->name); // ------------------------------------------------------------------------ // remove it from the indexes @@ -229,6 +244,15 @@ void rrdset_free(RRDSET *st) { rrdset_index_del_name(st->rrdhost, st); + // ------------------------------------------------------------------------ + // free its children structures + + while(st->variables) rrdsetvar_free(st->variables); + while(st->alarms) rrdsetcalc_unlink(st->alarms); + while(st->dimensions) rrddim_free(st, st->dimensions); + + rrdfamily_free(st->rrdhost, st->rrdfamily); + // ------------------------------------------------------------------------ // unlink it from the host @@ -261,13 +285,42 @@ void rrdset_free(RRDSET *st) { freez(st); } +void rrdset_save(RRDSET *st) { + RRDDIM *rd; + + rrdset_check_rdlock(st); + + // info("Saving chart '%s' ('%s')", st->id, st->name); + + if(st->rrd_memory_mode == RRD_MEMORY_MODE_SAVE) { + debug(D_RRD_STATS, "Saving stats '%s' to '%s'.", st->name, st->cache_filename); + savememory(st->cache_filename, st, st->memsize); + } + + rrddim_foreach_read(rd, st) { + if(likely(rd->rrd_memory_mode == RRD_MEMORY_MODE_SAVE)) { + debug(D_RRD_STATS, "Saving dimension '%s' to '%s'.", rd->name, rd->cache_filename); + savememory(rd->cache_filename, rd, rd->memsize); + } + } +} + // ---------------------------------------------------------------------------- // RRDSET - create a chart -RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const char *name, const char *family - , const char *context, const char *title, const char *units, long priority - , int update_every, RRDSET_TYPE chart_type) { - +RRDSET *rrdset_create( + RRDHOST *host + , const char *type + , const char *id + , const char *name + , const char *family + , const char *context + , const char *title + , const char *units + , long priority + , int update_every + , RRDSET_TYPE chart_type +) { if(!type || !type[0]) { fatal("Cannot create rrd stats without a type."); return NULL; @@ -286,6 +339,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha RRDSET *st = rrdset_find(host, fullid); if(st) { + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); debug(D_RRD_CALLS, "RRDSET '%s', already exists.", fullid); return st; } @@ -305,22 +359,27 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha // get the options from the config, we need to create it long rentries = config_get_number(config_section, "history", host->rrd_history_entries); - long entries = align_entries_to_pagesize(rentries); + long entries = align_entries_to_pagesize(host->rrd_memory_mode, rentries); if(entries != rentries) entries = config_set_number(config_section, "history", entries); + if(host->rrd_memory_mode == RRD_MEMORY_MODE_NONE && entries != rentries) + entries = config_set_number(config_section, "history", 10); + int enabled = config_get_boolean(config_section, "enabled", 1); if(!enabled) entries = 5; unsigned long size = sizeof(RRDSET); char *cache_dir = rrdset_cache_dir(host, fullid, config_section); + time_t now = now_realtime_sec(); + // ------------------------------------------------------------------------ // load it or allocate it debug(D_RRD_CALLS, "Creating RRD_STATS for '%s.%s'.", type, id); snprintfz(fullfilename, FILENAME_MAX, "%s/main.db", cache_dir); - if(host->rrd_memory_mode != RRD_MEMORY_MODE_RAM) { + if(host->rrd_memory_mode == RRD_MEMORY_MODE_SAVE || host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) { st = (RRDSET *) mymmap(fullfilename, size, ((host->rrd_memory_mode == RRD_MEMORY_MODE_MAP) ? MAP_SHARED : MAP_PRIVATE), 0); if(st) { memset(&st->avl, 0, sizeof(avl)); @@ -363,15 +422,21 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha error("File %s does not have the desired update frequency. Clearing it.", fullfilename); memset(st, 0, size); } - else if((now_realtime_sec() - st->last_updated.tv_sec) > update_every * entries) { + else if((now - st->last_updated.tv_sec) > update_every * entries) { errno = 0; error("File %s is too old. Clearing it.", fullfilename); memset(st, 0, size); } + else if(st->last_updated.tv_sec > now + update_every) { + errno = 0; + error("File %s refers to the future. Clearing it.", fullfilename); + memset(st, 0, size); + } // make sure the database is aligned if(st->last_updated.tv_sec) - timeval_align(&st->last_updated, update_every); + last_updated_time_align(&st->last_updated, update_every); + // make sure we have the right memory mode // even if we cleared the memory @@ -381,7 +446,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha if(unlikely(!st)) { st = callocz(1, size); - st->rrd_memory_mode = RRD_MEMORY_MODE_RAM; + st->rrd_memory_mode = (host->rrd_memory_mode == RRD_MEMORY_MODE_NONE) ? RRD_MEMORY_MODE_NONE : RRD_MEMORY_MODE_RAM; } st->config_section = strdup(config_section); @@ -416,6 +481,7 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); rrdset_flag_clear(st, RRDSET_FLAG_DEBUG); + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); // if(!strcmp(st->id, "disk_util.dm-0")) { // st->debug = 1; @@ -469,6 +535,8 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha rrdsetcalc_link_matching(st); rrdcalctemplate_link_matching(st); + rrdhost_cleanup(host); + rrdhost_unlock(host); return(st); @@ -478,16 +546,23 @@ RRDSET *rrdset_create(RRDHOST *host, const char *type, const char *id, const cha // ---------------------------------------------------------------------------- // RRDSET - data collection iteration control -void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) { - if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) { +inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) { + + if(unlikely(!st->last_collected_time.tv_sec)) { // the first entry microseconds = st->update_every * USEC_PER_SEC; } + else if(unlikely(!microseconds)) { + // no dt given by the plugin + struct timeval now; + now_realtime_timeval(&now); + microseconds = dt_usec(&now, &st->last_collected_time); + } + st->usec_since_last_update = microseconds; } -void rrdset_next_usec(RRDSET *st, usec_t microseconds) -{ +inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) { struct timeval now; now_realtime_timeval(&now); @@ -505,26 +580,42 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds) // usec_t now_usec = timeval_usec(&now); // usec_t last_usec = timeval_usec(&st->last_collected_time); //#endif - usec_t since_last_usec = dt_usec(&now, &st->last_collected_time); + susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time); + + if(unlikely(since_last_usec < 0)) { + // oops! the database is in the future + error("Database for chart '%s' on host '%s' is %lld microseconds in the future. Adjusting it to current time.", st->id, st->rrdhost->hostname, -since_last_usec); + + st->last_collected_time.tv_sec = now.tv_sec - st->update_every; + st->last_collected_time.tv_usec = now.tv_usec; + last_collected_time_align(&st->last_collected_time, st->update_every); + + st->last_updated.tv_sec = now.tv_sec - st->update_every; + st->last_updated.tv_usec = now.tv_usec; + last_updated_time_align(&st->last_updated, st->update_every); + + microseconds = st->update_every * USEC_PER_SEC; + since_last_usec = st->update_every * USEC_PER_SEC; + } // verify the microseconds given is good - if(unlikely(microseconds > since_last_usec)) { - debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id); + if(unlikely(microseconds > (usec_t)since_last_usec)) { + debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id); //#ifdef NETDATA_INTERNAL_CHECKS // if(unlikely(last_usec + microseconds > now_usec + 1000)) -// error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id); +// error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id); //#endif - microseconds = since_last_usec; + microseconds = (usec_t)since_last_usec; } - else if(unlikely(microseconds < since_last_usec * 0.8)) { - debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id); + else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) { + debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id); //#ifdef NETDATA_INTERNAL_CHECKS -// error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id); +// error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id); //#endif - microseconds = since_last_usec; + microseconds = (usec_t)since_last_usec; } } debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds); @@ -539,10 +630,52 @@ void rrdset_next_usec(RRDSET *st, usec_t microseconds) // ---------------------------------------------------------------------------- // RRDSET - process the collected values for all dimensions of a chart +static inline void rrdset_init_last_collected_time(RRDSET *st) { + now_realtime_timeval(&st->last_collected_time); + last_collected_time_align(&st->last_collected_time, st->update_every); +} + +static inline usec_t rrdset_update_last_collected_time(RRDSET *st) { + usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; + usec_t ut = last_collect_ut + st->usec_since_last_update; + st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC); + st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + return last_collect_ut; +} + +static inline void rrdset_init_last_updated_time(RRDSET *st) { + // copy the last collected time to last updated time + st->last_updated.tv_sec = st->last_collected_time.tv_sec; + st->last_updated.tv_usec = st->last_collected_time.tv_usec; + last_updated_time_align(&st->last_updated, st->update_every); +} + +static inline void rrdset_done_push_exclusive(RRDSET *st) { + if(unlikely(!st->last_collected_time.tv_sec)) { + // it is the first entry + // set the last_collected_time to now + rrdset_init_last_collected_time(st); + } + else { + // it is not the first entry + // calculate the proper last_collected_time, using usec_since_last_update + rrdset_update_last_collected_time(st); + } + + st->counter_done++; + + rrdset_rdlock(st); + rrdset_done_push(st); + rrdset_unlock(st); +} + void rrdset_done(RRDSET *st) { if(unlikely(netdata_exit)) return; - if(unlikely(central_netdata_to_push_data)) { - rrdset_done_push(st); + + if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) { + if(unlikely(st->rrdhost->rrdpush_enabled)) + rrdset_done_push_exclusive(st); + return; } @@ -579,6 +712,11 @@ void rrdset_done(RRDSET *st) { st->enabled = 1; */ + if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) { + error("Chart '%s' has the OBSOLETE flag set, but it is collected.", st->id); + rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE); + } + // check if the chart has a long time to be updated if(unlikely(st->usec_since_last_update > st->entries * update_every_ut)) { info("%s: took too long to be updated (%0.3Lf secs). Resetting it.", st->name, (long double)(st->usec_since_last_update / 1000000.0)); @@ -594,8 +732,7 @@ void rrdset_done(RRDSET *st) { if(unlikely(!st->last_collected_time.tv_sec)) { // it is the first entry // set the last_collected_time to now - now_realtime_timeval(&st->last_collected_time); - timeval_align(&st->last_collected_time, st->update_every); + rrdset_init_last_collected_time(st); last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut; @@ -609,10 +746,7 @@ void rrdset_done(RRDSET *st) { else { // it is not the first entry // calculate the proper last_collected_time, using usec_since_last_update - last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; - usec_t ut = last_collect_ut + st->usec_since_last_update; - st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + last_collect_ut = rrdset_update_last_collected_time(st); } // if this set has not been updated in the past @@ -620,32 +754,24 @@ void rrdset_done(RRDSET *st) { if(unlikely(!st->last_updated.tv_sec)) { // it has never been updated before // set a fake last_updated, in the past using usec_since_last_update - usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update; - st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); + rrdset_init_last_updated_time(st); // the first entry should not be stored store_this_entry = 0; first_entry = 1; if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0); + debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update); } // check if we will re-write the entire data set if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) { info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec); rrdset_reset(st); + rrdset_init_last_updated_time(st); st->usec_since_last_update = update_every_ut; - now_realtime_timeval(&st->last_collected_time); - timeval_align(&st->last_collected_time, st->update_every); - - usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update; - st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC); - st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); - // the first entry should not be stored store_this_entry = 0; first_entry = 1; @@ -673,21 +799,26 @@ void rrdset_done(RRDSET *st) { } st->counter_done++; + if(unlikely(st->rrdhost->rrdpush_enabled)) + rrdset_done_push(st); + // calculate totals and count the dimensions - int dimensions; + int dimensions = 0; st->collected_total = 0; - for( rd = st->dimensions, dimensions = 0 ; rd ; rd = rd->next, dimensions++ ) - if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) + rrddim_foreach_read(rd, st) { + dimensions++; + if(likely(rd->updated)) st->collected_total += rd->collected_value; + } uint32_t storage_flags = SN_EXISTS; // process all dimensions to calculate their values // based on the collected figures only // at this stage we do not interpolate anything - for( rd = st->dimensions ; rd ; rd = rd->next ) { + rrddim_foreach_read(rd, st) { - if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) { + if(unlikely(!rd->updated)) { rd->calculated_value = 0; continue; } @@ -749,7 +880,7 @@ void rrdset_done(RRDSET *st) { break; case RRD_ALGORITHM_INCREMENTAL: - if(unlikely(rd->counter <= 1)) { + if(unlikely(rd->collections_counter <= 1)) { rd->calculated_value = 0; continue; } @@ -789,7 +920,7 @@ void rrdset_done(RRDSET *st) { break; case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL: - if(unlikely(rd->counter <= 1)) { + if(unlikely(rd->collections_counter <= 1)) { rd->calculated_value = 0; continue; } @@ -890,7 +1021,7 @@ void rrdset_done(RRDSET *st) { st->last_updated.tv_sec = (time_t) (next_store_ut / USEC_PER_SEC); st->last_updated.tv_usec = 0; - for( rd = st->dimensions ; likely(rd) ; rd = rd->next ) { + rrddim_foreach_read(rd, st) { calculated_number new_value; switch(rd->algorithm) { @@ -973,7 +1104,7 @@ void rrdset_done(RRDSET *st) { continue; } - if(likely(rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED) && rd->counter > 1 && iterations < st->gap_when_lost_iterations_above)) { + if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) { rd->values[st->current_entry] = pack_storage_number(new_value, storage_flags ); rd->last_stored_value = new_value; @@ -1035,8 +1166,8 @@ void rrdset_done(RRDSET *st) { st->last_collected_total = st->collected_total; - for( rd = st->dimensions; rd ; rd = rd->next ) { - if(unlikely(!rrddim_flag_check(rd, RRDDIM_FLAG_UPDATED))) + rrddim_foreach_read(rd, st) { + if(unlikely(!rd->updated)) continue; if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) @@ -1068,7 +1199,7 @@ void rrdset_done(RRDSET *st) { rd->calculated_value = 0; rd->collected_value = 0; - rrddim_flag_clear(rd, RRDDIM_FLAG_UPDATED); + rd->updated = 0; if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) debug(D_RRD_STATS, "%s/%s: END " @@ -1141,4 +1272,3 @@ void rrdset_done(RRDSET *st) { if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0)) error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate); } -