]> arthur.barton.de Git - netdata.git/blob - python.d/mongodb.chart.py
mongodb_plugin: code optimization
[netdata.git] / python.d / mongodb.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: mongodb netdata python.d module
3 # Author: l2isbad
4
5 from base import SimpleService
6 from copy import deepcopy
7 from datetime import datetime
8 from sys import exc_info
9
10 try:
11     from pymongo import MongoClient, ASCENDING, DESCENDING
12     from pymongo.errors import PyMongoError
13     PYMONGO = True
14 except ImportError:
15     PYMONGO = False
16
17 # default module values (can be overridden per job in `config`)
18 # update_every = 2
19 priority = 60000
20 retries = 60
21
22 REPLSET_STATES = [
23     ('1', 'primary'),
24     ('8', 'down'),
25     ('2', 'secondary'),
26     ('3', 'recovering'),
27     ('5', 'startup2'),
28     ('4', 'fatal'),
29     ('7', 'arbiter'),
30     ('6', 'unknown'),
31     ('9', 'rollback'),
32     ('10', 'removed'),
33     ('0', 'startup')]
34
35
36 def multiply_by_100(value):
37     return value * 100
38
39 DEFAULT_METRICS = [
40     ('opcounters.delete', None, None),
41     ('opcounters.update', None, None),
42     ('opcounters.insert', None, None),
43     ('opcounters.query', None, None),
44     ('opcounters.getmore', None, None),
45     ('globalLock.activeClients.readers', 'activeClients_readers', None),
46     ('globalLock.activeClients.writers', 'activeClients_writers', None),
47     ('connections.available', 'connections_available', None),
48     ('connections.current', 'connections_current', None),
49     ('mem.mapped', None, None),
50     ('mem.resident', None, None),
51     ('mem.virtual', None, None),
52     ('globalLock.currentQueue.readers', 'currentQueue_readers', None),
53     ('globalLock.currentQueue.writers', 'currentQueue_writers', None),
54     ('asserts.msg', None, None),
55     ('asserts.regular', None, None),
56     ('asserts.user', None, None),
57     ('asserts.warning', None, None),
58     ('extra_info.page_faults', None, None),
59     ('metrics.record.moves', None, None),
60     ('backgroundFlushing.average_ms', None, multiply_by_100),
61     ('backgroundFlushing.last_ms', None, multiply_by_100),
62     ('backgroundFlushing.flushes', None, multiply_by_100),
63     ('metrics.cursor.timedOut', None, None),
64     ('metrics.cursor.open.total', 'cursor_total', None),
65     ('metrics.cursor.open.noTimeout', None, None),
66     ('cursors.timedOut', None, None),
67     ('cursors.totalOpen', 'cursor_total', None)
68 ]
69
70 DUR = [
71     ('dur.commits', None, None),
72     ('dur.journaledMB', None, multiply_by_100)
73 ]
74
75 WIREDTIGER = [
76     ('wiredTiger.concurrentTransactions.read.available', 'wiredTigerRead_available', None),
77     ('wiredTiger.concurrentTransactions.read.out', 'wiredTigerRead_out', None),
78     ('wiredTiger.concurrentTransactions.write.available', 'wiredTigerWrite_available', None),
79     ('wiredTiger.concurrentTransactions.write.out', 'wiredTigerWrite_out', None),
80     ('wiredTiger.cache.bytes currently in the cache', None, None),
81     ('wiredTiger.cache.tracked dirty bytes in the cache', None, None),
82     ('wiredTiger.cache.maximum bytes configured', None, None),
83     ('wiredTiger.cache.unmodified pages evicted', 'unmodified', None),
84     ('wiredTiger.cache.modified pages evicted', 'modified', None)
85 ]
86
87 TCMALLOC = [
88     ('tcmalloc.generic.current_allocated_bytes', None, None),
89     ('tcmalloc.generic.heap_size', None, None),
90     ('tcmalloc.tcmalloc.central_cache_free_bytes', None, None),
91     ('tcmalloc.tcmalloc.current_total_thread_cache_bytes', None, None),
92     ('tcmalloc.tcmalloc.pageheap_free_bytes', None, None),
93     ('tcmalloc.tcmalloc.pageheap_unmapped_bytes', None, None),
94     ('tcmalloc.tcmalloc.thread_cache_free_bytes', None, None),
95     ('tcmalloc.tcmalloc.transfer_cache_free_bytes', None, None)
96 ]
97
98 COMMANDS = [
99     ('metrics.commands.count.total', 'count_total', None),
100     ('metrics.commands.createIndexes.total', 'createIndexes_total', None),
101     ('metrics.commands.delete.total', 'delete_total', None),
102     ('metrics.commands.eval.total', 'eval_total', None),
103     ('metrics.commands.findAndModify.total', 'findAndModify_total', None),
104     ('metrics.commands.insert.total', 'insert_total', None),
105     ('metrics.commands.delete.total', 'delete_total', None),
106     ('metrics.commands.count.failed', 'count_failed', None),
107     ('metrics.commands.createIndexes.failed', 'createIndexes_failed', None),
108     ('metrics.commands.delete.failed', 'delete_failed', None),
109     ('metrics.commands.eval.failed', 'eval_failed', None),
110     ('metrics.commands.findAndModify.failed', 'findAndModify_failed', None),
111     ('metrics.commands.insert.failed', 'insert_failed', None),
112     ('metrics.commands.delete.failed', 'delete_failed', None)
113 ]
114
115 DBSTATS = [
116         'dataSize',
117         'indexSize',
118         'storageSize',
119         'objects'
120 ]
121
122 # charts order (can be overridden if you want less charts, or different order)
123 ORDER = ['read_operations', 'write_operations', 'active_clients', 'journaling_transactions',
124          'journaling_volume', 'background_flush_average', 'background_flush_last', 'background_flush_rate',
125          'wiredtiger_read', 'wiredtiger_write', 'cursors', 'connections', 'memory', 'page_faults',
126          'queued_requests', 'record_moves', 'wiredtiger_cache', 'wiredtiger_pages_evicted', 'asserts',
127          'dbstats_objects', 'tcmalloc_generic', 'tcmalloc_metrics', 'command_total_rate', 'command_failed_rate']
128
129 CHARTS = {
130     'read_operations': {
131         'options': [None, 'Received read requests', 'requests/s', 'throughput metrics',
132                     'mongodb.read_operations', 'line'],
133         'lines': [
134             ['query', None, 'incremental'],
135             ['getmore', None, 'incremental']
136         ]},
137     'write_operations': {
138         'options': [None, 'Received write requests', 'requests/s', 'throughput metrics',
139                     'mongodb.write_operations', 'line'],
140         'lines': [
141             ['insert', None, 'incremental'],
142             ['update', None, 'incremental'],
143             ['delete', None, 'incremental']
144         ]},
145     'active_clients': {
146         'options': [None, 'Clients with read or write operations in progress or queued', 'clients',
147                     'throughput metrics', 'mongodb.active_clients', 'line'],
148         'lines': [
149             ['activeClients_readers', 'readers', 'absolute'],
150             ['activeClients_writers', 'writers', 'absolute']
151         ]},
152     'journaling_transactions': {
153         'options': [None, 'Transactions that have been written to the journal', 'commits',
154                     'database performance', 'mongodb.journaling_transactions', 'line'],
155         'lines': [
156             ['commits', None, 'absolute']
157         ]},
158     'journaling_volume': {
159         'options': [None, 'Volume of data written to the journal', 'MB', 'database performance',
160                     'mongodb.journaling_volume', 'line'],
161         'lines': [
162             ['journaledMB', 'volume', 'absolute', 1, 100]
163         ]},
164     'background_flush_average': {
165         'options': [None, 'Average time taken by flushes to execute', 'ms', 'database performance',
166                     'mongodb.background_flush_average', 'line'],
167         'lines': [
168             ['average_ms', 'time', 'absolute', 1, 100]
169         ]},
170     'background_flush_last': {
171         'options': [None, 'Time taken by the last flush operation to execute', 'ms', 'database performance',
172                     'mongodb.background_flush_last', 'line'],
173         'lines': [
174             ['last_ms', 'time', 'absolute', 1, 100]
175         ]},
176     'background_flush_rate': {
177         'options': [None, 'Flushes rate', 'flushes', 'database performance', 'mongodb.background_flush_rate', 'line'],
178         'lines': [
179             ['flushes', 'flushes', 'incremental', 1, 1]
180         ]},
181     'wiredtiger_read': {
182         'options': [None, 'Read tickets in use and remaining', 'tickets', 'database performance',
183                     'mongodb.wiredtiger_read', 'stacked'],
184         'lines': [
185             ['wiredTigerRead_available', 'available', 'absolute', 1, 1],
186             ['wiredTigerRead_out', 'inuse', 'absolute', 1, 1]
187         ]},
188     'wiredtiger_write': {
189         'options': [None, 'Write tickets in use and remaining', 'tickets', 'database performance',
190                     'mongodb.wiredtiger_write', 'stacked'],
191         'lines': [
192             ['wiredTigerWrite_available', 'available', 'absolute', 1, 1],
193             ['wiredTigerWrite_out', 'inuse', 'absolute', 1, 1]
194         ]},
195     'cursors': {
196         'options': [None, 'Currently openned cursors, cursors with timeout disabled and timed out cursors',
197                     'cursors', 'database performance', 'mongodb.cursors', 'stacked'],
198         'lines': [
199             ['cursor_total', 'openned', 'absolute', 1, 1],
200             ['noTimeout', None, 'absolute', 1, 1],
201             ['timedOut', None, 'incremental', 1, 1]
202         ]},
203     'connections': {
204         'options': [None, 'Currently connected clients and unused connections', 'connections',
205                     'resource utilization', 'mongodb.connections', 'stacked'],
206         'lines': [
207             ['connections_available', 'unused', 'absolute', 1, 1],
208             ['connections_current', 'connected', 'absolute', 1, 1]
209         ]},
210     'memory': {
211         'options': [None, 'Memory metrics', 'MB', 'resource utilization', 'mongodb.memory', 'stacked'],
212         'lines': [
213             ['virtual', None, 'absolute', 1, 1],
214             ['resident', None, 'absolute', 1, 1],
215             ['nonmapped', None, 'absolute', 1, 1],
216             ['mapped', None, 'absolute', 1, 1]
217         ]},
218     'page_faults': {
219         'options': [None, 'Number of times MongoDB had to fetch data from disk', 'request/s',
220                     'resource utilization', 'mongodb.page_faults', 'line'],
221         'lines': [
222             ['page_faults', None, 'incremental', 1, 1]
223         ]},
224     'queued_requests': {
225         'options': [None, 'Currently queued read and wrire requests', 'requests', 'resource saturation',
226                     'mongodb.queued_requests', 'line'],
227         'lines': [
228             ['currentQueue_readers', 'readers', 'absolute', 1, 1],
229             ['currentQueue_writers', 'writers', 'absolute', 1, 1]
230         ]},
231     'record_moves': {
232         'options': [None, 'Number of times documents had to be moved on-disk', 'number',
233                     'resource saturation', 'mongodb.record_moves', 'line'],
234         'lines': [
235             ['moves', None, 'incremental', 1, 1]
236         ]},
237     'asserts': {
238         'options': [None, 'Number of message, warning, regular, corresponding to errors generated'
239                           ' by users assertions raised', 'number', 'errors (asserts)', 'mongodb.asserts', 'line'],
240         'lines': [
241             ['msg', None, 'incremental', 1, 1],
242             ['warning', None, 'incremental', 1, 1],
243             ['regular', None, 'incremental', 1, 1],
244             ['user', None, 'incremental', 1, 1]
245         ]},
246     'wiredtiger_cache': {
247         'options': [None, 'The percentage of the wiredTiger cache that is in use and cache with dirty bytes',
248                     'percent', 'resource utilization', 'mongodb.wiredtiger_cache', 'stacked'],
249         'lines': [
250             ['wiredTiger_percent_clean', 'inuse', 'absolute', 1, 1000],
251             ['wiredTiger_percent_dirty', 'dirty', 'absolute', 1, 1000]
252         ]},
253     'wiredtiger_pages_evicted': {
254         'options': [None, 'Pages evicted from the cache',
255                     'pages', 'resource utilization', 'mongodb.wiredtiger_pages_evicted', 'stacked'],
256         'lines': [
257             ['unmodified', None, 'absolute', 1, 1],
258             ['modified', None, 'absolute', 1, 1]
259         ]},
260     'dbstats_objects': {
261         'options': [None, 'Number of documents in the database among all the collections', 'documents',
262                     'storage size metrics', 'mongodb.dbstats_objects', 'stacked'],
263         'lines': [
264         ]},
265     'tcmalloc_generic': {
266         'options': [None, 'Tcmalloc generic metrics', 'MB', 'tcmalloc', 'mongodb.tcmalloc_generic', 'stacked'],
267         'lines': [
268             ['current_allocated_bytes', 'allocated', 'absolute', 1, 1048576],
269             ['heap_size', 'heap_size', 'absolute', 1, 1048576]
270         ]},
271     'tcmalloc_metrics': {
272         'options': [None, 'Tcmalloc metrics', 'KB', 'tcmalloc', 'mongodb.tcmalloc_metrics', 'stacked'],
273         'lines': [
274             ['central_cache_free_bytes', 'central_cache_free', 'absolute', 1, 1024],
275             ['current_total_thread_cache_bytes', 'current_total_thread_cache', 'absolute', 1, 1024],
276             ['pageheap_free_bytes', 'pageheap_free', 'absolute', 1, 1024],
277             ['pageheap_unmapped_bytes', 'pageheap_unmapped', 'absolute', 1, 1024],
278             ['thread_cache_free_bytes', 'thread_cache_free', 'absolute', 1, 1024],
279             ['transfer_cache_free_bytes', 'transfer_cache_free', 'absolute', 1, 1024]
280         ]},
281     'command_total_rate': {
282         'options': [None, 'Commands total rate', 'commands/s', 'commands', 'mongodb.command_total_rate', 'stacked'],
283         'lines': [
284             ['count_total', 'count', 'incremental', 1, 1],
285             ['createIndexes_total', 'createIndexes', 'incremental', 1, 1],
286             ['delete_total', 'delete', 'incremental', 1, 1],
287             ['eval_total', 'eval', 'incremental', 1, 1],
288             ['findAndModify_total', 'findAndModify', 'incremental', 1, 1],
289             ['insert_total', 'insert', 'incremental', 1, 1],
290             ['update_total', 'update', 'incremental', 1, 1]
291         ]},
292     'command_failed_rate': {
293         'options': [None, 'Commands failed rate', 'commands/s', 'commands', 'mongodb.command_failed_rate', 'stacked'],
294         'lines': [
295             ['count_failed', 'count', 'incremental', 1, 1],
296             ['createIndexes_failed', 'createIndexes', 'incremental', 1, 1],
297             ['delete_failed', 'delete', 'incremental', 1, 1],
298             ['eval_failed', 'eval', 'incremental', 1, 1],
299             ['findAndModify_failed', 'findAndModify', 'incremental', 1, 1],
300             ['insert_failed', 'insert', 'incremental', 1, 1],
301             ['update_failed', 'update', 'incremental', 1, 1]
302         ]}
303 }
304
305
306 class Service(SimpleService):
307     def __init__(self, configuration=None, name=None):
308         SimpleService.__init__(self, configuration=configuration, name=name)
309         self.order = ORDER[:]
310         self.definitions = deepcopy(CHARTS)
311         self.user = self.configuration.get('user')
312         self.password = self.configuration.get('pass')
313         self.host = self.configuration.get('host', '127.0.0.1')
314         self.port = self.configuration.get('port', 27017)
315         self.timeout = self.configuration.get('timeout', 100)
316         self.metrics_to_collect = deepcopy(DEFAULT_METRICS)
317         self.connection = None
318         self.do_replica = None
319         self.databases = list()
320
321     def check(self):
322         if not PYMONGO:
323             self.error('Pymongo module is needed to use mongodb.chart.py')
324             return False
325         self.connection, server_status, error = self._create_connection()
326         if error:
327             self.error(error)
328             return False
329
330         self.build_metrics_to_collect_(server_status)
331
332         try:
333             self._get_data()
334         except (LookupError, SyntaxError, AttributeError):
335             self.error('Type: %s, error: %s' % (str(exc_info()[0]), str(exc_info()[1])))
336             return False
337         else:
338             self.create_charts_(server_status)
339             return True
340
341     def build_metrics_to_collect_(self, server_status):
342
343         self.do_replica = 'repl' in server_status
344         if 'dur' in server_status:
345             self.metrics_to_collect.extend(DUR)
346         if 'tcmalloc' in server_status:
347             self.metrics_to_collect.extend(TCMALLOC)
348         if 'commands' in server_status['metrics']:
349             self.metrics_to_collect.extend(COMMANDS)
350         if 'wiredTiger' in server_status:
351             self.metrics_to_collect.extend(WIREDTIGER)
352
353     def create_charts_(self, server_status):
354
355         if 'dur' not in server_status:
356             self.order.remove('journaling_transactions')
357             self.order.remove('journaling_volume')
358
359         if 'backgroundFlushing' not in server_status:
360             self.order.remove('background_flush_average')
361             self.order.remove('background_flush_last')
362             self.order.remove('background_flush_rate')
363
364         if 'wiredTiger' not in server_status:
365             self.order.remove('wiredtiger_write')
366             self.order.remove('wiredtiger_read')
367             self.order.remove('wiredtiger_cache')
368
369         if 'tcmalloc' not in server_status:
370             self.order.remove('tcmalloc_generic')
371             self.order.remove('tcmalloc_metrics')
372
373         if 'commands' not in server_status['metrics']:
374             self.order.remove('command_total_rate')
375             self.order.remove('command_failed_rate')
376
377         for dbase in self.databases:
378             self.order.append('_'.join([dbase, 'dbstats']))
379             self.definitions['_'.join([dbase, 'dbstats'])] = {
380                 'options': [None, '%s: size of all documents, indexes, extents' % dbase, 'KB',
381                             'storage size metrics', 'mongodb.dbstats', 'line'],
382                 'lines': [
383                     ['_'.join([dbase, 'dataSize']), 'documents', 'absolute', 1, 1024],
384                     ['_'.join([dbase, 'indexSize']), 'indexes', 'absolute', 1, 1024],
385                     ['_'.join([dbase, 'storageSize']), 'extents', 'absolute', 1, 1024]
386                 ]}
387             self.definitions['dbstats_objects']['lines'].append(['_'.join([dbase, 'objects']), dbase, 'absolute'])
388
389         if self.do_replica:
390             def create_lines(hosts, string):
391                 lines = list()
392                 for host in hosts:
393                     dim_id = '_'.join([host, string])
394                     lines.append([dim_id, host, 'absolute', 1, 1000])
395                 return lines
396
397             def create_state_lines(states):
398                 lines = list()
399                 for state, description in states:
400                     dim_id = '_'.join([host, 'state', state])
401                     lines.append([dim_id, description, 'absolute', 1, 1])
402                 return lines
403
404             all_hosts = server_status['repl']['hosts']
405             this_host = server_status['repl']['me']
406             other_hosts = [host for host in all_hosts if host != this_host]
407
408             if 'local' in self.databases:
409                 self.order.append('oplog_window')
410                 self.definitions['oplog_window'] = {
411                     'options': [None, 'Interval of time between the oldest and the latest entries in the oplog',
412                                 'seconds', 'replication', 'mongodb.oplog_window', 'line'],
413                     'lines': [['timeDiff', 'window', 'absolute', 1, 1000]]}
414             # Create "heartbeat delay" chart
415             self.order.append('heartbeat_delay')
416             self.definitions['heartbeat_delay'] = {
417                 'options': [None, 'Latency between this node and replica set members (lastHeartbeatRecv)',
418                             'seconds', 'replication', 'mongodb.replication_heartbeat_delay', 'stacked'],
419                 'lines': create_lines(other_hosts, 'heartbeat_lag')}
420             # Create "optimedate delay" chart
421             self.order.append('optimedate_delay')
422             self.definitions['optimedate_delay'] = {
423                 'options': [None, '"optimeDate"(time when last entry from the oplog was applied)'
424                                   ' diff between all nodes',
425                             'seconds', 'replication', 'mongodb.replication_optimedate_delay', 'stacked'],
426                 'lines': create_lines(all_hosts, 'optimedate')}
427             # Create "replica set members state" chart
428             for host in all_hosts:
429                 chart_name = '_'.join([host, 'state'])
430                 self.order.append(chart_name)
431                 self.definitions[chart_name] = {
432                     'options': [None, '%s state' % host, 'state',
433                                 'replication', 'mongodb.replication_state', 'line'],
434                     'lines': create_state_lines(REPLSET_STATES)}
435
436     def _get_raw_data(self):
437         raw_data = dict()
438
439         raw_data.update(self.get_serverstatus_() or dict())
440         raw_data.update(self.get_dbstats_() or dict())
441         raw_data.update(self.get_replsetgetstatus_() or dict())
442         raw_data.update(self.get_getreplicationinfo_() or dict())
443
444         return raw_data or None
445
446     def get_serverstatus_(self):
447         raw_data = dict()
448         try:
449             raw_data['serverStatus'] = self.connection.admin.command('serverStatus')
450         except PyMongoError:
451             return None
452         else:
453             return raw_data
454
455     def get_dbstats_(self):
456         if not self.databases:
457             return None
458
459         raw_data = dict()
460         raw_data['dbStats'] = dict()
461         try:
462             for dbase in self.databases:
463                 raw_data['dbStats'][dbase] = self.connection[dbase].command('dbStats')
464         except PyMongoError:
465             return None
466         else:
467             return raw_data
468
469     def get_replsetgetstatus_(self):
470         if not self.do_replica:
471             return None
472
473         raw_data = dict()
474         try:
475             raw_data['replSetGetStatus'] = self.connection.admin.command('replSetGetStatus')
476         except PyMongoError:
477             return None
478         else:
479             return raw_data
480
481     def get_getreplicationinfo_(self):
482         if not (self.do_replica and 'local' in self.databases):
483             return None
484
485         raw_data = dict()
486         raw_data['getReplicationInfo'] = dict()
487         try:
488             raw_data['getReplicationInfo']['ASCENDING'] = self.connection.local.oplog.rs.find().sort(
489                 "$natural", ASCENDING).limit(1)[0]
490             raw_data['getReplicationInfo']['DESCENDING'] = self.connection.local.oplog.rs.find().sort(
491                 "$natural", DESCENDING).limit(1)[0]
492         except PyMongoError:
493             return None
494         else:
495             return raw_data
496
497     def _get_data(self):
498         """
499         :return: dict
500         """
501         raw_data = self._get_raw_data()
502
503         if not raw_data:
504             return None
505
506         to_netdata = dict()
507         serverStatus = raw_data['serverStatus']
508         dbStats = raw_data.get('dbStats')
509         replSetGetStatus = raw_data.get('replSetGetStatus')
510         getReplicationInfo = raw_data.get('getReplicationInfo')
511         utc_now = datetime.utcnow()
512
513         # serverStatus
514         for metric, new_name, function in self.metrics_to_collect:
515             temp = serverStatus
516             for key in metric.split('.'):
517                 try:
518                     temp = temp[key]
519                 except KeyError:
520                     break
521
522             if not isinstance(temp, dict):
523                 to_netdata[new_name or key] = temp if not function else function(temp)
524
525         to_netdata['nonmapped'] = to_netdata['virtual'] - serverStatus['mem'].get('mappedWithJournal',
526                                                                                   to_netdata['mapped'])
527         if to_netdata.get('maximum bytes configured'):
528             maximum = to_netdata['maximum bytes configured']
529             to_netdata['wiredTiger_percent_clean'] = int(to_netdata['bytes currently in the cache']
530                                                          * 100 / maximum * 1000)
531             to_netdata['wiredTiger_percent_dirty'] = int(to_netdata['tracked dirty bytes in the cache']
532                                                          * 100 / maximum * 1000)
533
534         # dbStats
535         if dbStats:
536             for dbase in dbStats:
537                 for metric in DBSTATS:
538                     key = '_'.join([dbase, metric])
539                     to_netdata[key] = dbStats[dbase][metric]
540
541         # replSetGetStatus
542         if replSetGetStatus:
543             other_hosts = list()
544             members = replSetGetStatus['members']
545             unix_epoch = datetime(1970, 1, 1, 0, 0)
546
547             for member in members:
548                 if not member.get('self'):
549                     other_hosts.append(member)
550                 # Replica set time diff between current time and time when last entry from the oplog was applied
551                 if member['optimeDate'] != unix_epoch:
552                     member_optimedate = member['name'] + '_optimedate'
553                     to_netdata.update({member_optimedate: int(delta_calculation(delta=utc_now - member['optimeDate'],
554                                                                                 multiplier=1000))})
555                 # Replica set members state
556                 member_state = member['name'] + '_state'
557                 for elem in REPLSET_STATES:
558                     state = elem[0]
559                     to_netdata.update({'_'.join([member_state, state]): 0})
560                 to_netdata.update({'_'.join([member_state, str(member['state'])]): member['state']})
561             # Heartbeat lag calculation
562             for other in other_hosts:
563                 if other['lastHeartbeatRecv'] != unix_epoch:
564                     node = other['name'] + '_heartbeat_lag'
565                     to_netdata[node] = int(delta_calculation(delta=utc_now - other['lastHeartbeatRecv'],
566                                                              multiplier=1000))
567
568         if getReplicationInfo:
569             first_event = getReplicationInfo['ASCENDING']['ts'].as_datetime()
570             last_event = getReplicationInfo['DESCENDING']['ts'].as_datetime()
571             to_netdata['timeDiff'] = int(delta_calculation(delta=last_event - first_event, multiplier=1000))
572
573         return to_netdata
574
575     def _create_connection(self):
576         conn_vars = {'host': self.host, 'port': self.port}
577         if hasattr(MongoClient, 'server_selection_timeout'):
578             conn_vars.update({'serverselectiontimeoutms': self.timeout})
579         try:
580             connection = MongoClient(**conn_vars)
581             if self.user and self.password:
582                 connection.admin.authenticate(name=self.user, password=self.password)
583             # elif self.user:
584             #     connection.admin.authenticate(name=self.user, mechanism='MONGODB-X509')
585             server_status = connection.admin.command('serverStatus')
586         except PyMongoError as error:
587             return None, None, str(error)
588         else:
589             try:
590                 self.databases = connection.database_names()
591             except PyMongoError as error:
592                 self.info('Can\'t collect databases: %s' % str(error))
593             return connection, server_status, None
594
595
596 def delta_calculation(delta, multiplier=1):
597     if hasattr(delta, 'total_seconds'):
598         return delta.total_seconds() * multiplier
599     else:
600         return (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6 * multiplier