// RRDSET - helpers for rrdset_create()
inline long align_entries_to_pagesize(long entries) {
- if(central_netdata_to_push_data)
+ if(rrdpush_exclusive)
return entries;
if(entries < 5) entries = 5;
// ----------------------------------------------------------------------------
// RRDSET - data collection iteration control
-void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
- if(unlikely(!st->last_collected_time.tv_sec || !microseconds)) {
+inline void rrdset_next_usec_unfiltered(RRDSET *st, usec_t microseconds) {
+
+ if(unlikely(!st->last_collected_time.tv_sec)) {
// the first entry
microseconds = st->update_every * USEC_PER_SEC;
}
+ else if(unlikely(!microseconds)) {
+ // no dt given by the plugin
+ struct timeval now;
+ now_realtime_timeval(&now);
+ microseconds = dt_usec(&now, &st->last_collected_time);
+ }
+
st->usec_since_last_update = microseconds;
}
-void rrdset_next_usec(RRDSET *st, usec_t microseconds)
-{
+inline void rrdset_next_usec(RRDSET *st, usec_t microseconds) {
struct timeval now;
now_realtime_timeval(&now);
// usec_t now_usec = timeval_usec(&now);
// usec_t last_usec = timeval_usec(&st->last_collected_time);
//#endif
- usec_t since_last_usec = dt_usec(&now, &st->last_collected_time);
+ susec_t since_last_usec = dt_usec_signed(&now, &st->last_collected_time);
+
+ if(unlikely(since_last_usec < 0)) {
+ // oops! the database is in the future
+ error("Database for chart '%s' on host '%s' is %lld microseconds in the future.", st->id, st->rrdhost->hostname, -since_last_usec);
+ memcpy(&st->last_collected_time, &now, sizeof(struct timeval));
+ st->last_collected_time.tv_sec -= st->update_every;
+
+ memcpy(&st->last_updated, &now, sizeof(struct timeval));
+ timeval_align(&st->last_updated, st->update_every);
+ st->last_updated.tv_sec -= st->update_every;
+
+ microseconds = st->update_every * USEC_PER_SEC;
+ since_last_usec = st->update_every * USEC_PER_SEC;
+ }
// verify the microseconds given is good
- if(unlikely(microseconds > since_last_usec)) {
- debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+ if(unlikely(microseconds > (usec_t)since_last_usec)) {
+ debug(D_RRD_CALLS, "dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
//#ifdef NETDATA_INTERNAL_CHECKS
// if(unlikely(last_usec + microseconds > now_usec + 1000))
-// error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - since_last_usec, st->name, st->id);
+// error("dt %llu usec given is too big - it leads %llu usec to the future, for chart '%s' (%s).", microseconds, microseconds - (usec_t)since_last_usec, st->name, st->id);
//#endif
- microseconds = since_last_usec;
+ microseconds = (usec_t)since_last_usec;
}
- else if(unlikely(microseconds < since_last_usec * 0.8)) {
- debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+ else if(unlikely(microseconds < (usec_t)since_last_usec * 0.8)) {
+ debug(D_RRD_CALLS, "dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
//#ifdef NETDATA_INTERNAL_CHECKS
-// error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, since_last_usec, st->name, st->id);
+// error("dt %llu usec given is too small - expected %llu usec up to -20%%, for chart '%s' (%s).", microseconds, (usec_t)since_last_usec, st->name, st->id);
//#endif
- microseconds = since_last_usec;
+ microseconds = (usec_t)since_last_usec;
}
}
debug(D_RRD_CALLS, "rrdset_next_usec() for chart %s with microseconds %llu", st->name, microseconds);
// ----------------------------------------------------------------------------
// RRDSET - process the collected values for all dimensions of a chart
+static inline void rrdset_init_last_collected_time(RRDSET *st) {
+ now_realtime_timeval(&st->last_collected_time);
+ timeval_align(&st->last_collected_time, st->update_every);
+}
+
+static inline usec_t rrdset_update_last_collected_time(RRDSET *st) {
+ usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
+ usec_t ut = last_collect_ut + st->usec_since_last_update;
+ st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
+ st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+ return last_collect_ut;
+}
+
+static inline void rrdset_init_last_updated_time(RRDSET *st) {
+ // copy the last collected time to last updated time
+ memcpy(&st->last_updated, &st->last_collected_time, sizeof(struct timeval));
+ timeval_align(&st->last_updated, st->update_every);
+ st->last_updated.tv_usec = 0;
+}
+
+static inline void rrdset_done_push_int(RRDSET *st) {
+ if(unlikely(!st->last_collected_time.tv_sec)) {
+ // it is the first entry
+ // set the last_collected_time to now
+ rrdset_init_last_collected_time(st);
+ }
+ else {
+ // it is not the first entry
+ // calculate the proper last_collected_time, using usec_since_last_update
+ rrdset_update_last_collected_time(st);
+ }
+
+ st->counter++;
+ st->counter_done++;
+
+ rrdset_done_push(st);
+}
+
void rrdset_done(RRDSET *st) {
if(unlikely(netdata_exit)) return;
- if(unlikely(central_netdata_to_push_data)) {
- rrdset_done_push(st);
+
+ if(unlikely(rrdpush_exclusive)) {
+ rrdset_done_push_int(st);
return;
}
if(unlikely(!st->last_collected_time.tv_sec)) {
// it is the first entry
// set the last_collected_time to now
- now_realtime_timeval(&st->last_collected_time);
- timeval_align(&st->last_collected_time, st->update_every);
+ rrdset_init_last_collected_time(st);
last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - update_every_ut;
else {
// it is not the first entry
// calculate the proper last_collected_time, using usec_since_last_update
- last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec;
- usec_t ut = last_collect_ut + st->usec_since_last_update;
- st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC);
- st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+ last_collect_ut = rrdset_update_last_collected_time(st);
}
// if this set has not been updated in the past
if(unlikely(!st->last_updated.tv_sec)) {
// it has never been updated before
// set a fake last_updated, in the past using usec_since_last_update
- usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
- st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
- st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
+ rrdset_init_last_updated_time(st);
// the first entry should not be stored
store_this_entry = 0;
first_entry = 1;
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_RRD_STATS, "%s: initializing last_updated to now - %llu microseconds (%0.3Lf). Will not store the next entry.", st->name, st->usec_since_last_update, (long double)ut/1000000.0);
+ debug(D_RRD_STATS, "%s: initializing last_updated to last_collected_time - %llu microseconds. Will not store the next entry.", st->name, st->usec_since_last_update);
}
// check if we will re-write the entire data set
if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut)) {
info("%s: too old data (last updated at %ld.%ld, last collected at %ld.%ld). Resetting it. Will not store the next entry.", st->name, st->last_updated.tv_sec, st->last_updated.tv_usec, st->last_collected_time.tv_sec, st->last_collected_time.tv_usec);
rrdset_reset(st);
+ rrdset_init_last_updated_time(st);
st->usec_since_last_update = update_every_ut;
- now_realtime_timeval(&st->last_collected_time);
- timeval_align(&st->last_collected_time, st->update_every);
-
- usec_t ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec - st->usec_since_last_update;
- st->last_updated.tv_sec = (time_t) (ut / USEC_PER_SEC);
- st->last_updated.tv_usec = (suseconds_t) (ut % USEC_PER_SEC);
-
// the first entry should not be stored
store_this_entry = 0;
first_entry = 1;
if(unlikely(pthread_setcancelstate(pthreadoldcancelstate, NULL) != 0))
error("Cannot set pthread cancel state to RESTORE (%d).", pthreadoldcancelstate);
+
+ if(unlikely(rrdpush_enabled))
+ rrdset_done_push_int(st);
}