1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
6 from copy import deepcopy
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
12 from base import SimpleService
14 # default module values
19 # Default Config options.
24 # 'host': 'localhost',
30 CAST(COUNT(*) AS INT) AS file_count,
31 CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
32 CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
34 pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
39 count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
40 (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
47 ((sum(relpages) * 8) * 1024) AS size_relations,
50 WHERE relkind IN ('r', 't');
55 ((sum(relpages) * 8) * 1024) AS size_indexes,
58 WHERE relkind = 'i';"""
62 datname AS database_name,
63 sum(numbackends) AS connections,
64 sum(xact_commit) AS xact_commit,
65 sum(xact_rollback) AS xact_rollback,
66 sum(blks_read) AS blks_read,
67 sum(blks_hit) AS blks_hit,
68 sum(tup_returned) AS tup_returned,
69 sum(tup_fetched) AS tup_fetched,
70 sum(tup_inserted) AS tup_inserted,
71 sum(tup_updated) AS tup_updated,
72 sum(tup_deleted) AS tup_deleted,
73 sum(conflicts) AS conflicts
75 WHERE NOT datname ~* '^template\d+'
76 GROUP BY database_name;
81 sum(heap_blks_read) AS heap_blocks_read,
82 sum(heap_blks_hit) AS heap_blocks_hit,
83 sum(idx_blks_read) AS index_blocks_read,
84 sum(idx_blks_hit) AS index_blocks_hit,
85 sum(toast_blks_read) AS toast_blocks_read,
86 sum(toast_blks_hit) AS toast_blocks_hit,
87 sum(tidx_blks_read) AS toastindex_blocks_read,
88 sum(tidx_blks_hit) AS toastindex_blocks_hit
92 schemaname <> 'pg_catalog';
94 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
97 pg_database.datname as database_name,
101 INNER JOIN pg_database ON pg_database.oid = pg_locks.database
102 GROUP BY datname, mode
103 ORDER BY datname, mode;
111 replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
114 client_addr, client_hostname, state,
115 ('x' || lpad(split_part(sent_location, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
116 ('x' || lpad(split_part(replay_location, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
117 ('x' || lpad(split_part(sent_location, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
118 ('x' || lpad(split_part(replay_location, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
119 FROM pg_stat_replication
127 'ShareUpdateExclusiveLock',
128 'AccessExclusiveLock',
130 'ShareRowExclusiveLock',
135 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
136 'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'operations_heap',
137 'operations_index', 'operations_toast', 'operations_toast_index', 'background_writer']
140 'db_stat_transactions': {
141 'options': [None, ' Transactions', 'Count', ' database statistics', '.db_stat_transactions', 'line'],
143 ['db_stat_xact_commit', 'Committed', 'absolute'],
144 ['db_stat_xact_rollback', 'Rolled Back', 'absolute']
146 'db_stat_connections': {
147 'options': [None, ' Connections', 'Count', ' database statistics', '.db_stat_connections', 'line'],
149 ['db_stat_connections', 'Connections', 'absolute']
151 'db_stat_tuple_read': {
152 'options': [None, ' Tuple read', 'Count', ' database statistics', '.db_stat_tuple_read', 'line'],
154 ['db_stat_blks_read', 'Disk', 'absolute'],
155 ['db_stat_blks_hit', 'Cache', 'absolute']
157 'db_stat_tuple_returned': {
158 'options': [None, ' Tuple returned', 'Count', ' database statistics', '.db_stat_tuple_returned', 'line'],
160 ['db_stat_tup_returned', 'Sequential', 'absolute'],
161 ['db_stat_tup_fetched', 'Bitmap', 'absolute']
163 'db_stat_tuple_write': {
164 'options': [None, ' Tuple write', 'Count', ' database statistics', '.db_stat_tuple_write', 'line'],
166 ['db_stat_tup_inserted', 'Inserted', 'absolute'],
167 ['db_stat_tup_updated', 'Updated', 'absolute'],
168 ['db_stat_tup_deleted', 'Deleted', 'absolute'],
169 ['db_stat_conflicts', 'Conflicts', 'absolute']
172 'options': [None, 'Backend processes', 'Count', 'Backend processes', 'postgres.backend_process', 'line'],
174 ['backend_process_active', 'Active', 'absolute'],
175 ['backend_process_idle', 'Idle', 'absolute']
178 'options': [None, 'Total index', 'Count', 'Index', 'postgres.index_count', 'line'],
180 ['index_count', 'Total index', 'absolute']
183 'options': [None, 'Index size', 'MB', 'Index', 'postgres.index_size', 'line'],
185 ['index_size', 'Size', 'absolute', 1, 1024 * 1024]
188 'options': [None, 'Total table', 'Count', 'Table', 'postgres.table_count', 'line'],
190 ['table_count', 'Total table', 'absolute']
193 'options': [None, 'Table size', 'MB', 'Table', 'postgres.table_size', 'line'],
195 ['table_size', 'Size', 'absolute', 1, 1024 * 1024]
198 'options': [None, 'WAL stats', 'Files', 'WAL', 'postgres.wal', 'line'],
200 ['wal_total', 'Total', 'absolute'],
201 ['wal_ready', 'Ready', 'absolute'],
202 ['wal_done', 'Done', 'absolute']
205 'options': [None, 'Heap', 'iops', 'IO Operations', 'postgres.operations_heap', 'line'],
207 ['operations_heap_blocks_read', 'Read', 'absolute'],
208 ['operations_heap_blocks_hit', 'Hit', 'absolute']
210 'operations_index': {
211 'options': [None, 'Index', 'iops', 'IO Operations', 'postgres.operations_index', 'line'],
213 ['operations_index_blocks_read', 'Read', 'absolute'],
214 ['operations_index_blocks_hit', 'Hit', 'absolute']
216 'operations_toast': {
217 'options': [None, 'Toast', 'iops', 'IO Operations', 'postgres.operations_toast', 'line'],
219 ['operations_toast_blocks_read', 'Read', 'absolute'],
220 ['operations_toast_blocks_hit', 'Hit', 'absolute']
222 'operations_toast_index': {
223 'options': [None, 'Toast index', 'iops', 'IO Operations', 'postgres.operations_toast_index', 'line'],
225 ['operations_toastindex_blocks_read', 'Read', 'absolute'],
226 ['operations_toastindex_blocks_hit', 'Hit', 'absolute']
228 'background_writer': {
229 'options': [None, 'Checkpoints', 'Count', 'Background Writer', 'postgres.background_writer', 'line'],
231 ['background_writer_scheduled', 'Scheduled', 'absolute'],
232 ['background_writer_requested', 'Requested', 'absolute']
237 class Service(SimpleService):
238 def __init__(self, configuration=None, name=None):
239 super(self.__class__, self).__init__(configuration=configuration, name=name)
241 self.definitions = CHARTS
242 self.configuration = configuration
243 self.connection = None
246 self.databases = set()
249 params = dict(user='postgres',
254 params.update(self.configuration)
255 if not self.connection:
256 self.connection = psycopg2.connect(**params)
257 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
258 self.connection.set_session(readonly=True)
263 self.discover_databases()
264 self._create_definitions()
266 except Exception as e:
270 def _create_definitions(self):
271 for database_name in self.databases:
272 self.databases.add(database_name)
273 for chart_template_name in list(CHARTS):
274 if chart_template_name.startswith('db_stat'):
275 self._add_database_stat_chart(chart_template_name, database_name)
276 self._add_database_lock_chart(database_name)
278 def discover_databases(self):
279 cursor = self.connection.cursor()
282 FROM pg_stat_database
283 WHERE NOT datname ~* '^template\d+'
285 self.databases = set(r[0] for r in cursor)
288 def _add_database_stat_chart(self, chart_template_name, database_name):
289 chart_template = CHARTS[chart_template_name]
290 chart_name = "{}_{}".format(database_name, chart_template_name)
291 if chart_name not in self.order:
292 self.order.insert(0, chart_name)
293 name, title, units, family, context, chart_type = chart_template['options']
294 self.definitions[chart_name] = {
297 database_name + title,
299 database_name + family,
300 database_name + context,
305 self.definitions[chart_name]['lines'] = []
306 for line in deepcopy(chart_template['lines']):
307 line[0] = "{}_{}".format(database_name, line[0])
308 self.definitions[chart_name]['lines'].append(line)
310 def _add_database_lock_chart(self, database_name):
311 chart_name = "{}_locks".format(database_name)
312 if chart_name not in self.order:
313 self.order.insert(0, chart_name)
314 self.definitions[chart_name] = dict(
318 database_name + ' locks',
320 database_name + ' database statistics',
321 database_name + '.locks',
327 for lock_type in LOCK_TYPES:
328 lock_id = "{}_{}".format(database_name, lock_type.lower())
329 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
330 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
335 cursor = self.connection.cursor(cursor_factory=DictCursor)
336 self.add_stats(cursor)
341 def add_stats(self, cursor):
342 self.add_database_stats(cursor)
343 self.add_backend_stats(cursor)
344 self.add_index_stats(cursor)
345 self.add_table_stats(cursor)
346 self.add_lock_stats(cursor)
347 self.add_statio_stats(cursor)
348 self.add_bgwriter_stats(cursor)
350 # self.add_replication_stats(cursor)
352 # add_wal_metrics needs superuser to get directory listings
353 # if self.config.get('superuser', True):
354 # self.add_wal_stats(cursor)
356 def add_database_stats(self, cursor):
357 cursor.execute(DATABASE)
359 database_name = row.get('database_name')
360 self.add_derive_value('db_stat_xact_commit', prefix=database_name, value=int(row.get('xact_commit', 0)))
361 self.add_derive_value('db_stat_xact_rollback', prefix=database_name, value=int(row.get('xact_rollback', 0)))
362 self.add_derive_value('db_stat_blks_read', prefix=database_name, value=int(row.get('blks_read', 0)))
363 self.add_derive_value('db_stat_blks_hit', prefix=database_name, value=int(row.get('blks_hit', 0)))
364 self.add_derive_value('db_stat_tup_returned', prefix=database_name, value=int(row.get('tup_returned', 0)))
365 self.add_derive_value('db_stat_tup_fetched', prefix=database_name, value=int(row.get('tup_fetched', 0)))
366 self.add_derive_value('db_stat_tup_inserted', prefix=database_name, value=int(row.get('tup_inserted', 0)))
367 self.add_derive_value('db_stat_tup_updated', prefix=database_name, value=int(row.get('tup_updated', 0)))
368 self.add_derive_value('db_stat_tup_deleted', prefix=database_name, value=int(row.get('tup_deleted', 0)))
369 self.add_derive_value('db_stat_conflicts', prefix=database_name, value=int(row.get('conflicts', 0)))
370 conn_key = "{}_{}".format(database_name, 'db_stat_connections')
371 self.data[conn_key] = int(row.get('connections', 0))
373 def add_backend_stats(self, cursor):
374 cursor.execute(BACKENDS)
375 temp = cursor.fetchone()
377 self.data['backend_process_active'] = int(temp.get('backends_active', 0))
378 self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
380 def add_index_stats(self, cursor):
381 cursor.execute(INDEX_STATS)
382 temp = cursor.fetchone()
383 self.data['index_count'] = int(temp.get('indexes', 0))
384 self.data['index_size'] = int(temp.get('size_indexes', 0))
386 def add_table_stats(self, cursor):
387 cursor.execute(TABLE_STATS)
388 temp = cursor.fetchone()
389 self.data['table_count'] = int(temp.get('relations', 0))
390 self.data['table_size'] = int(temp.get('size_relations', 0))
392 def add_lock_stats(self, cursor):
393 cursor.execute(DATABASE_LOCKS)
394 # First zero out all current lock values.
395 for database_name in self.databases:
396 for lock_type in LOCK_TYPES:
397 lock_id = "{}_{}".format(database_name, lock_type.lower())
398 self.data[lock_id] = 0
400 # Now populate those that have current locks
402 database_name, lock_type, lock_count = row
403 lock_id = "{}_{}".format(database_name, lock_type.lower())
404 self.data[lock_id] = lock_count
406 def add_wal_stats(self, cursor):
407 cursor.execute(ARCHIVE)
408 temp = cursor.fetchone()
409 self.add_derive_value('wal_total', int(temp.get('file_count', 0)))
410 self.add_derive_value('wal_ready', int(temp.get('ready_count', 0)))
411 self.add_derive_value('wal_done', int(temp.get('done_count', 0)))
413 def add_statio_stats(self, cursor):
414 cursor.execute(STATIO)
415 temp = cursor.fetchone()
416 self.add_derive_value('operations_heap_blocks_read', int(temp.get('heap_blocks_read', 0)))
417 self.add_derive_value('operations_heap_blocks_hit', int(temp.get('heap_blocks_hit', 0)))
418 self.add_derive_value('operations_index_blocks_read', int(temp.get('index_blocks_read', 0)))
419 self.add_derive_value('operations_index_blocks_hit', int(temp.get('index_blocks_hit', 0)))
420 self.add_derive_value('operations_toast_blocks_read', int(temp.get('toast_blocks_read', 0)))
421 self.add_derive_value('operations_toast_blocks_hit', int(temp.get('toast_blocks_hit', 0)))
422 self.add_derive_value('operations_toastindex_blocks_read', int(temp.get('toastindex_blocks_read', 0)))
423 self.add_derive_value('operations_toastindex_blocks_hit', int(temp.get('toastindex_blocks_hit', 0)))
425 def add_bgwriter_stats(self, cursor):
426 cursor.execute(BGWRITER)
427 temp = cursor.fetchone()
429 self.add_derive_value('background_writer_scheduled', temp.get('checkpoints_timed', 0))
430 self.add_derive_value('background_writer_requested', temp.get('checkpoints_requests', 0))
432 def add_derive_value(self, key, value, prefix=None):
434 key = "{}_{}".format(prefix, key)
435 if key not in self.old_data.keys():
438 self.data[key] = value - self.old_data[key]
440 self.old_data[key] = value
444 def add_replication_stats(self, cursor):
445 cursor.execute(REPLICATION)
446 temp = cursor.fetchall()
448 self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
450 int(row.get('byte_lag', 0)))