]> arthur.barton.de Git - netdata.git/blob - python.d/mongodb.chart.py
mongodb_plugin: replica set heartbeat latency chart added
[netdata.git] / python.d / mongodb.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: mongodb netdata python.d module
3 # Author: l2isbad
4
5 from base import SimpleService
6 from copy import deepcopy
7 from datetime import datetime
8 try:
9     from pymongo import MongoClient
10     from pymongo.errors import PyMongoError
11     PYMONGO = True
12 except ImportError:
13     PYMONGO = False
14
15 # default module values (can be overridden per job in `config`)
16 # update_every = 2
17 priority = 60000
18 retries = 60
19
20 REPLSET_STATES = [
21         ('1', 'primary'),
22         ('8', 'down'),
23         ('2', 'secondary'),
24         ('3', 'recovering'),
25         ('5', 'startup2'),
26         ('4', 'fatal'),
27         ('7', 'arbiter'),
28         ('6', 'unknown'),
29         ('9', 'rollback'),
30         ('10', 'removed'),
31         ('0', 'startup')]
32
33 # charts order (can be overridden if you want less charts, or different order)
34 ORDER = ['read_operations', 'write_operations', 'active_clients', 'journaling_transactions',
35          'journaling_volume', 'background_flush_average', 'background_flush_last', 'background_flush_rate',
36          'wiredtiger_read', 'wiredtiger_write', 'cursors', 'connections', 'memory', 'page_faults',
37          'queued_requests', 'record_moves', 'wiredtiger_cache', 'wiredtiger_pages_evicted', 'asserts',
38          'dbstats_objects', 'tcmalloc_generic', 'tcmalloc_metrics', 'command_total_rate', 'command_failed_rate']
39
40 CHARTS = {
41     'read_operations': {
42         'options': [None, 'Received read requests', 'requests/s', 'throughput metrics',
43                     'mongodb.read_operations', 'line'],
44         'lines': [
45             ['readWriteOper_query', 'query', 'incremental'],
46             ['readWriteOper_getmore', 'getmore', 'incremental']
47         ]},
48     'write_operations': {
49         'options': [None, 'Received write requests', 'requests/s', 'throughput metrics',
50                     'mongodb.write_operations', 'line'],
51         'lines': [
52             ['readWriteOper_insert', 'insert', 'incremental'],
53             ['readWriteOper_update', 'update', 'incremental'],
54             ['readWriteOper_delete', 'delete', 'incremental']
55         ]},
56     'active_clients': {
57         'options': [None, 'Clients with read or write operations in progress or queued', 'clients',
58                     'throughput metrics', 'mongodb.active_clients', 'line'],
59         'lines': [
60             ['activeClients_readers', 'readers', 'absolute'],
61             ['activeClients_writers', 'writers', 'absolute']
62             ]},
63     'journaling_transactions': {
64         'options': [None, 'Transactions that have been written to the journal', 'commits',
65                     'database performance', 'mongodb.journaling_transactions', 'line'],
66         'lines': [
67             ['journalTrans_commits', 'commits', 'absolute']
68             ]},
69     'journaling_volume': {
70         'options': [None, 'Volume of data written to the journal', 'MB', 'database performance',
71                     'mongodb.journaling_volume', 'line'],
72         'lines': [
73             ['journalTrans_journaled', 'volume', 'absolute', 1, 100]
74             ]},
75     'background_flush_average': {
76         'options': [None, 'Average time taken by flushes to execute', 'ms', 'database performance',
77                     'mongodb.background_flush_average', 'line'],
78         'lines': [
79             ['background_flush_average', 'time', 'absolute', 1, 100]
80             ]},
81     'background_flush_last': {
82         'options': [None, 'Time taken by the last flush operation to execute', 'ms', 'database performance',
83                     'mongodb.background_flush_last', 'line'],
84         'lines': [
85             ['background_flush_last', 'time', 'absolute', 1, 100]
86             ]},
87     'background_flush_rate': {
88         'options': [None, 'Flushes rate', 'flushes', 'database performance', 'mongodb.background_flush_rate', 'line'],
89         'lines': [
90             ['background_flush_rate', 'flushes', 'incremental', 1, 1]
91             ]},
92     'wiredtiger_read': {
93         'options': [None, 'Read tickets in use and remaining', 'tickets', 'database performance',
94                     'mongodb.wiredtiger_read', 'stacked'],
95         'lines': [
96             ['wiredTigerRead_available', 'available', 'absolute', 1, 1],
97             ['wiredTigerRead_out', 'inuse', 'absolute', 1, 1]
98             ]},
99     'wiredtiger_write': {
100         'options': [None, 'Write tickets in use and remaining', 'tickets', 'database performance',
101                     'mongodb.wiredtiger_write', 'stacked'],
102         'lines': [
103             ['wiredTigerWrite_available', 'available', 'absolute', 1, 1],
104             ['wiredTigerWrite_out', 'inuse', 'absolute', 1, 1]
105             ]},
106     'cursors': {
107         'options': [None, 'Currently openned cursors, cursors with timeout disabled and timed out cursors',
108                     'cursors', 'database performance', 'mongodb.cursors', 'stacked'],
109         'lines': [
110             ['cursor_total', 'openned', 'absolute', 1, 1],
111             ['cursor_noTimeout', 'notimeout', 'absolute', 1, 1],
112             ['cursor_timedOut', 'timedout', 'incremental', 1, 1]
113             ]},
114     'connections': {
115         'options': [None, 'Currently connected clients and unused connections', 'connections',
116                     'resource utilization', 'mongodb.connections', 'stacked'],
117         'lines': [
118             ['connections_available', 'unused', 'absolute', 1, 1],
119             ['connections_current', 'connected', 'absolute', 1, 1]
120             ]},
121     'memory': {
122         'options': [None, 'Memory metrics', 'MB', 'resource utilization', 'mongodb.memory', 'stacked'],
123         'lines': [
124             ['memory_virtual', 'virtual', 'absolute', 1, 1],
125             ['memory_resident', 'resident', 'absolute', 1, 1],
126             ['memory_mapped', 'mapped', 'absolute', 1, 1]
127             ]},
128     'page_faults': {
129         'options': [None, 'Number of times MongoDB had to fetch data from disk', 'request/s',
130                     'resource utilization', 'mongodb.page_faults', 'line'],
131         'lines': [
132             ['page_faults', 'page_faults', 'incremental', 1, 1]
133             ]},
134     'queued_requests': {
135         'options': [None, 'Currently queued read and wrire requests', 'requests', 'resource saturation',
136                     'mongodb.queued_requests', 'line'],
137         'lines': [
138             ['currentQueue_readers', 'readers', 'absolute', 1, 1],
139             ['currentQueue_writers', 'writers', 'absolute', 1, 1]
140             ]},
141     'record_moves': {
142         'options': [None, 'Number of times documents had to be moved on-disk', 'number',
143                     'resource saturation', 'mongodb.record_moves', 'line'],
144         'lines': [
145             ['record_moves', 'moves', 'incremental', 1, 1]
146             ]},
147     'asserts': {
148         'options': [None, 'Number of message, warning, regular, corresponding to errors generated'
149                           ' by users assertions raised', 'number', 'errors (asserts)', 'mongodb.asserts', 'line'],
150         'lines': [
151             ['errors_msg', 'msg', 'incremental', 1, 1],
152             ['errors_warning', 'warning', 'incremental', 1, 1],
153             ['errors_regular', 'regular', 'incremental', 1, 1],
154             ['errors_user', 'user', 'incremental', 1, 1]
155             ]},
156     'wiredtiger_cache': {
157         'options': [None, 'Amount of space taken by cached data and by dirty data in the cache',
158                     'KB', 'resource utilization', 'mongodb.wiredtiger_cache', 'stacked'],
159         'lines': [
160             ['wiredTiger_bytes_in_cache', 'cached', 'absolute', 1, 1024],
161             ['wiredTiger_dirty_in_cache', 'dirty', 'absolute', 1, 1024]
162             ]},
163     'wiredtiger_pages_evicted': {
164         'options': [None, 'Pages evicted from the cache',
165                     'pages', 'resource utilization', 'mongodb.wiredtiger_pages_evicted', 'stacked'],
166         'lines': [
167             ['wiredTiger_unmodified_pages_evicted', 'unmodified', 'absolute', 1, 1],
168             ['wiredTiger_modified_pages_evicted', 'modified', 'absolute', 1, 1]
169             ]},
170     'dbstats_objects': {
171         'options': [None, 'Number of documents in the database among all the collections', 'documents',
172                     'storage size metrics', 'mongodb.dbstats_objects', 'stacked'],
173         'lines': [
174             ]},
175     'tcmalloc_generic': {
176         'options': [None, 'Tcmalloc generic metrics', 'MB', 'tcmalloc', 'mongodb.tcmalloc_generic', 'stacked'],
177         'lines': [
178             ['current_allocated_bytes', 'allocated', 'absolute', 1, 1048576],
179             ['heap_size', 'heap_size', 'absolute', 1, 1048576]
180             ]},
181     'tcmalloc_metrics': {
182         'options': [None, 'Tcmalloc metrics', 'KB', 'tcmalloc', 'mongodb.tcmalloc_metrics', 'stacked'],
183         'lines': [
184             ['central_cache_free_bytes', 'central_cache_free', 'absolute', 1, 1024],
185             ['current_total_thread_cache_bytes', 'current_total_thread_cache', 'absolute', 1, 1024],
186             ['pageheap_free_bytes', 'pageheap_free', 'absolute', 1, 1024],
187             ['pageheap_unmapped_bytes', 'pageheap_unmapped', 'absolute', 1, 1024],
188             ['thread_cache_free_bytes', 'thread_cache_free', 'absolute', 1, 1024],
189             ['transfer_cache_free_bytes', 'transfer_cache_free', 'absolute', 1, 1024]
190             ]},
191     'command_total_rate': {
192         'options': [None, 'Commands total rate', 'commands/s', 'commands', 'mongodb.command_total_rate', 'stacked'],
193         'lines': [
194             ['count_total', 'count', 'incremental', 1, 1],
195             ['createIndexes_total', 'createIndexes', 'incremental', 1, 1],
196             ['delete_total', 'delete', 'incremental', 1, 1],
197             ['eval_total', 'eval', 'incremental', 1, 1],
198             ['findAndModify_total', 'findAndModify', 'incremental', 1, 1],
199             ['insert_total', 'insert', 'incremental', 1, 1],
200             ['update_total', 'update', 'incremental', 1, 1]
201             ]},
202     'command_failed_rate': {
203         'options': [None, 'Commands failed rate', 'commands/s', 'commands', 'mongodb.command_failed_rate', 'stacked'],
204         'lines': [
205             ['count_failed', 'count', 'incremental', 1, 1],
206             ['createIndexes_failed', 'createIndexes', 'incremental', 1, 1],
207             ['delete_dailed', 'delete', 'incremental', 1, 1],
208             ['eval_failed', 'eval', 'incremental', 1, 1],
209             ['findAndModify_failed', 'findAndModify', 'incremental', 1, 1],
210             ['insert_failed', 'insert', 'incremental', 1, 1],
211             ['update_failed', 'update', 'incremental', 1, 1]
212             ]}
213 }
214
215
216 class Service(SimpleService):
217     def __init__(self, configuration=None, name=None):
218         SimpleService.__init__(self, configuration=configuration, name=name)
219         self.user = self.configuration.get('user')
220         self.password = self.configuration.get('pass')
221         self.host = self.configuration.get('host', '127.0.0.1')
222         self.port = self.configuration.get('port', 27017)
223         self.timeout = self.configuration.get('timeout', 100)
224
225     def check(self):
226         if not PYMONGO:
227             self.error('Pymongo module is needed to use mongodb.chart.py')
228             return False
229         self.connection, server_status, error = self._create_connection()
230         if error:
231             self.error(error)
232             return False
233
234         self.repl = 'repl' in server_status
235         try:
236             self.databases = self.connection.database_names()
237         except PyMongoError as error:
238             self.databases = list()
239             self.info('Can\'t collect databases: %s' % str(error))
240
241         self.create_charts_(server_status)
242
243         return True
244
245     def create_charts_(self, server_status):
246
247         self.order = ORDER[:]
248         self.definitions = deepcopy(CHARTS)
249         self.ss = dict()
250
251         for elem in ['dur', 'backgroundFlushing', 'wiredTiger', 'tcmalloc', 'cursor', 'commands']:
252             self.ss[elem] = in_server_status(elem, server_status)
253
254         if not self.ss['dur']:
255             self.order.remove('journaling_transactions')
256             self.order.remove('journaling_volume')
257
258         if not self.ss['backgroundFlushing']:
259             self.order.remove('background_flush_average')
260             self.order.remove('background_flush_last')
261             self.order.remove('background_flush_rate')
262
263         if not self.ss['cursor']:
264             self.order.remove('cursors')
265
266         if not self.ss['wiredTiger']:
267             self.order.remove('wiredtiger_write')
268             self.order.remove('wiredtiger_read')
269             self.order.remove('wiredtiger_cache')
270
271         if not self.ss['tcmalloc']:
272             self.order.remove('tcmalloc_generic')
273             self.order.remove('tcmalloc_metrics')
274
275         if not self.ss['commands']:
276             self.order.remove('command_total_rate')
277             self.order.remove('command_failed_rate')
278
279         for dbase in self.databases:
280             self.order.append('_'.join([dbase, 'dbstats']))
281             self.definitions['_'.join([dbase, 'dbstats'])] = {
282                     'options': [None, '%s: size of all documents, indexes, extents' % dbase, 'KB',
283                                 'storage size metrics', 'mongodb.dbstats', 'line'],
284                     'lines': [
285                              ['_'.join([dbase, 'dataSize']), 'documents', 'absolute', 1, 1024],
286                              ['_'.join([dbase, 'indexSize']), 'indexes', 'absolute', 1, 1024],
287                              ['_'.join([dbase, 'storageSize']), 'extents', 'absolute', 1, 1024]
288                       ]}
289             self.definitions['dbstats_objects']['lines'].append(['_'.join([dbase, 'objects']), dbase, 'absolute'])
290
291         if server_status.get('repl'):
292             def create_heartbeat_lines(hosts):
293                 lines = list()
294                 for host in hosts:
295                     dim_id = '_'.join([host, 'heartbeat_lag'])
296                     lines.append([dim_id, host, 'absolute', 1, 1000])
297                 return lines
298
299             def create_state_lines(states):
300                 lines = list()
301                 for state, description in states:
302                     dim_id = '_'.join([host, 'state', state])
303                     lines.append([dim_id, description, 'absolute', 1, 1])
304                 return lines
305
306             all_hosts = server_status['repl']['hosts']
307             this_host = server_status['repl']['me']
308             other_hosts = [host for host in all_hosts if host != this_host]
309
310             # Create "heartbeat delay" charts
311             self.order.append('heartbeat_delay')
312             self.definitions['heartbeat_delay'] = {
313                        'options': [None, 'Latency between this node and replica set members (lastHeartbeatRecv)',
314                                    'seconds', 'replication', 'mongodb.replication_heartbeat_delay', 'stacked'],
315                        'lines': create_heartbeat_lines(other_hosts)}
316             # Create "replica set members state" chart
317             for host in all_hosts:
318                 chart_name = '_'.join([host, 'state'])
319                 self.order.append(chart_name)
320                 self.definitions[chart_name] = {
321                        'options': [None, '%s state' % host, 'state',
322                                    'replication', 'mongodb.replication_state', 'line'],
323                        'lines': create_state_lines(REPLSET_STATES)}
324
325     def _get_raw_data(self):
326         raw_data = dict()
327
328         raw_data.update(self.get_serverstatus_() or dict())
329         raw_data.update(self.get_dbstats_() or dict())
330         raw_data.update(self.get_replsetgetstatus_() or dict())
331
332         return raw_data or None
333
334     def get_serverstatus_(self):
335         raw_data = dict()
336         try:
337             raw_data['serverStatus'] = self.connection.admin.command('serverStatus')
338         except PyMongoError:
339             return None
340         else:
341             return raw_data
342
343     def get_dbstats_(self):
344         if not self.databases:
345             return None
346
347         raw_data = dict()
348         raw_data['dbStats'] = dict()
349         try:
350             for dbase in self.databases:
351                 raw_data['dbStats'][dbase] = self.connection[dbase].command('dbStats')
352         except PyMongoError:
353             return None
354         else:
355             return raw_data
356
357     def get_replsetgetstatus_(self):
358         if not self.repl:
359             return None
360
361         raw_data = dict()
362         try:
363             raw_data['replSetGetStatus'] = self.connection.admin.command('replSetGetStatus')
364         except PyMongoError:
365             return None
366         else:
367             return raw_data
368
369     def _get_data(self):
370         """
371         :return: dict
372         """
373         raw_data = self._get_raw_data()
374
375         if not raw_data:
376             return None
377
378         to_netdata = dict()
379         serverStatus = raw_data['serverStatus']
380         dbStats = raw_data.get('dbStats')
381         replSetGetStatus = raw_data.get('replSetGetStatus')
382         utc_now = datetime.utcnow()
383
384         # serverStatus
385         to_netdata.update(update_dict_key(serverStatus['opcounters'], 'readWriteOper'))
386         to_netdata.update(update_dict_key(serverStatus['globalLock']['activeClients'], 'activeClients'))
387         to_netdata.update(update_dict_key(serverStatus['connections'], 'connections'))
388         to_netdata.update(update_dict_key(serverStatus['mem'], 'memory'))
389         to_netdata.update(update_dict_key(serverStatus['globalLock']['currentQueue'], 'currentQueue'))
390         to_netdata.update(update_dict_key(serverStatus['asserts'], 'errors'))
391         to_netdata['page_faults'] = serverStatus['extra_info']['page_faults']
392         to_netdata['record_moves'] = serverStatus['metrics']['record']['moves']
393
394         if self.ss['dur']:
395             to_netdata['journalTrans_commits'] = serverStatus['dur']['commits']
396             to_netdata['journalTrans_journaled'] = int(serverStatus['dur']['journaledMB'] * 100)
397
398         if self.ss['backgroundFlushing']:
399             to_netdata['background_flush_average'] = int(serverStatus['backgroundFlushing']['average_ms'] * 100)
400             to_netdata['background_flush_last'] = int(serverStatus['backgroundFlushing']['last_ms'] * 100)
401             to_netdata['background_flush_rate'] = serverStatus['backgroundFlushing']['flushes']
402
403         if self.ss['cursor']:
404             to_netdata['cursor_timedOut'] = serverStatus['metrics']['cursor']['timedOut']
405             to_netdata.update(update_dict_key(serverStatus['metrics']['cursor']['open'], 'cursor'))
406
407         if self.ss['wiredTiger']:
408             wired_tiger = serverStatus['wiredTiger']
409             to_netdata.update(update_dict_key(serverStatus['wiredTiger']['concurrentTransactions']['read'],
410                                               'wiredTigerRead'))
411             to_netdata.update(update_dict_key(serverStatus['wiredTiger']['concurrentTransactions']['write'],
412                                               'wiredTigerWrite'))
413             to_netdata['wiredTiger_bytes_in_cache'] = wired_tiger['cache']['bytes currently in the cache']
414             to_netdata['wiredTiger_dirty_in_cache'] = wired_tiger['cache']['tracked dirty bytes in the cache']
415             to_netdata['wiredTiger_unmodified_pages_evicted'] = wired_tiger['cache']['unmodified pages evicted']
416             to_netdata['wiredTiger_modified_pages_evicted'] = wired_tiger['cache']['modified pages evicted']
417
418         if self.ss['tcmalloc']:
419             to_netdata.update(serverStatus['tcmalloc']['generic'])
420             to_netdata.update(dict([(k, v) for k, v in serverStatus['tcmalloc']['tcmalloc'].items()
421                                     if int_or_float(v)]))
422
423         if self.ss['commands']:
424             for elem in ['count', 'createIndexes', 'delete', 'eval', 'findAndModify', 'insert', 'update']:
425                 to_netdata.update(update_dict_key(serverStatus['metrics']['commands'][elem], elem))
426
427         # dbStats
428         if dbStats:
429             for dbase in dbStats:
430                 to_netdata.update(update_dict_key(dbStats[dbase], dbase))
431
432         # replSetGetStatus
433         if replSetGetStatus:
434             other_hosts = list()
435             members = replSetGetStatus['members']
436             for member in members:
437                 if not member.get('self'):
438                     other_hosts.append(member)
439                 # Replica set members state
440                 for elem in REPLSET_STATES:
441                     state = elem[0]
442                     to_netdata.update({'_'.join([member['name'], 'state', state]): 0})
443                 to_netdata.update({'_'.join([member['name'], 'state', str(member['state'])]): member['state']})
444             # Heartbeat lag calculation
445             for other in other_hosts:
446                 if other['lastHeartbeatRecv'] != datetime(1970, 1, 1, 0, 0):
447                     node = other['name'] + '_heartbeat_lag'
448                     to_netdata[node] = int(lag_calculation(utc_now - other['lastHeartbeatRecv']) * 1000)
449
450         return to_netdata
451
452     def _create_connection(self):
453         conn_vars = {'host': self.host, 'port': self.port}
454         if hasattr(MongoClient, 'server_selection_timeout'):
455             conn_vars.update({'serverselectiontimeoutms': self.timeout})
456         try:
457             connection = MongoClient(**conn_vars)
458             if self.user and self.password:
459                 connection.admin.authenticate(name=self.user, password=self.password)
460             # elif self.user:
461             #     connection.admin.authenticate(name=self.user, mechanism='MONGODB-X509')
462             server_status = connection.admin.command('serverStatus')
463         except PyMongoError as error:
464             return None, None, str(error)
465         else:
466             return connection, server_status, None
467
468
469 def update_dict_key(collection, string):
470     return dict([('_'.join([string, k]), int(round(v))) for k, v in collection.items() if int_or_float(v)])
471
472
473 def int_or_float(value):
474     return isinstance(value, (int, float))
475
476
477 def in_server_status(elem, server_status):
478     return elem in server_status or elem in server_status['metrics']
479
480
481 def lag_calculation(lag):
482     if hasattr(lag, 'total_seconds'):
483         return lag.total_seconds()
484     else:
485         return (lag.microseconds + (lag.seconds + lag.days * 24 * 3600) * 10 ** 6) / 10.0 ** 6