]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
Show locks per database.
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 import re
6 from copy import deepcopy
7
8 import psycopg2
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11
12 from base import SimpleService
13
14 # default module values
15 update_every = 1
16 priority = 90000
17 retries = 60
18
19 # Default Config options.
20 # {
21 #    'database': None,
22 #    'user': 'postgres',
23 #    'password': None,
24 #    'host': 'localhost',
25 #    'port': 5432
26 # }
27
28 ARCHIVE = """
29 SELECT
30     CAST(COUNT(*) AS INT) AS file_count,
31     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
32     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
33 FROM
34     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
35 """
36
37 BACKENDS = """
38 SELECT
39     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
40     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
41 FROM
42     pg_stat_activity;
43 """
44
45 TABLE_STATS = """
46 SELECT
47   ((sum(relpages) * 8) * 1024) AS size_relations,
48   count(1)                     AS relations
49 FROM pg_class
50 WHERE relkind IN ('r', 't');
51 """
52
53 INDEX_STATS = """
54 SELECT
55   ((sum(relpages) * 8) * 1024) AS size_indexes,
56   count(1)                     AS indexes
57 FROM pg_class
58 WHERE relkind = 'i';"""
59
60 DATABASE = """
61 SELECT
62   datname AS database_name,
63   sum(xact_commit) AS xact_commit,
64   sum(xact_rollback) AS xact_rollback,
65   sum(blks_read) AS blks_read,
66   sum(blks_hit) AS blks_hit,
67   sum(tup_returned) AS tup_returned,
68   sum(tup_fetched) AS tup_fetched,
69   sum(tup_inserted) AS tup_inserted,
70   sum(tup_updated) AS tup_updated,
71   sum(tup_deleted) AS tup_deleted,
72   sum(conflicts) AS conflicts
73 FROM pg_stat_database
74 WHERE NOT datname ~* '^template\d+'
75 GROUP BY database_name;
76 """
77
78 STATIO = """
79 SELECT
80     sum(heap_blks_read) AS heap_blocks_read,
81     sum(heap_blks_hit) AS heap_blocks_hit,
82     sum(idx_blks_read) AS index_blocks_read,
83     sum(idx_blks_hit) AS index_blocks_hit,
84     sum(toast_blks_read) AS toast_blocks_read,
85     sum(toast_blks_hit) AS toast_blocks_hit,
86     sum(tidx_blks_read) AS toastindex_blocks_read,
87     sum(tidx_blks_hit) AS toastindex_blocks_hit
88 FROM
89     pg_statio_all_tables
90 WHERE
91     schemaname <> 'pg_catalog';
92 """
93 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
94 DATABASE_LOCKS = """
95 SELECT
96   pg_database.datname as database_name,
97   mode,
98   count(mode) AS count
99 FROM pg_locks
100   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
101 GROUP BY datname, mode
102 ORDER BY datname, mode;
103 """
104 REPLICATION = """
105 SELECT
106     client_hostname,
107     client_addr,
108     state,
109     sent_offset - (
110         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
111 FROM (
112     SELECT
113         client_addr, client_hostname, state,
114         ('x' || lpad(split_part(sent_location,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
115         ('x' || lpad(split_part(replay_location, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
116         ('x' || lpad(split_part(sent_location,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
117         ('x' || lpad(split_part(replay_location, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
118     FROM pg_stat_replication
119 ) AS s;
120 """
121
122 # LOCK_MAP = {'AccessExclusiveLock': 'lock_access_exclusive',
123 #             'AccessShareLock': 'lock_access_share',
124 #             'ExclusiveLock': 'lock_exclusive',
125 #             'RowExclusiveLock': 'lock_row_exclusive',
126 #             'RowShareLock': 'lock_row_share',
127 #             'ShareUpdateExclusiveLock': 'lock_update_exclusive_lock',
128 #             'ShareLock': 'lock_share',
129 #             'ShareRowExclusiveLock': 'lock_share_row_exclusive',
130 #             'SIReadLock': 'lock_si_read'}
131 LOCK_TYPES = [
132     'ExclusiveLock',
133     'RowShareLock',
134     'SIReadLock',
135     'ShareUpdateExclusiveLock',
136     'AccessExclusiveLock',
137     'AccessShareLock',
138     'ShareRowExclusiveLock',
139     'ShareLock',
140     'RowExclusiveLock'
141 ]
142
143 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
144          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'operations_heap',
145          'operations_index', 'operations_toast', 'operations_toast_index', 'background_writer']
146
147 CHARTS = {
148     'db_stat_transactions': {
149         'options': [None, ' Transactions', 'Count', ' database statistics', '.db_stat_transactions', 'line'],
150         'lines': [
151             ['db_stat_xact_commit', 'Committed', 'absolute'],
152             ['db_stat_xact_rollback', 'Rolled Back', 'absolute']
153         ]},
154     'db_stat_tuple_read': {
155         'options': [None, ' Tuple read', 'Count', ' database statistics', '.db_stat_tuple_read', 'line'],
156         'lines': [
157             ['db_stat_blks_read', 'Disk', 'absolute'],
158             ['db_stat_blks_hit', 'Cache', 'absolute']
159         ]},
160     'db_stat_tuple_returned': {
161         'options': [None, ' Tuple returned', 'Count', ' database statistics', '.db_stat_tuple_returned', 'line'],
162         'lines': [
163             ['db_stat_tup_returned', 'Sequential', 'absolute'],
164             ['db_stat_tup_fetched', 'Bitmap', 'absolute']
165         ]},
166     'db_stat_tuple_write': {
167         'options': [None, ' Tuple write', 'Count', ' database statistics', '.db_stat_tuple_write', 'line'],
168         'lines': [
169             ['db_stat_tup_inserted', 'Inserted', 'absolute'],
170             ['db_stat_tup_updated', 'Updated', 'absolute'],
171             ['db_stat_tup_deleted', 'Deleted', 'absolute'],
172             ['db_stat_conflicts', 'Conflicts', 'absolute']
173         ]},
174     'backend_process': {
175         'options': [None, 'Backend processes', 'Count', 'Backend processes', 'postgres.backend_process', 'line'],
176         'lines': [
177             ['backend_process_active', 'Active', 'absolute'],
178             ['backend_process_idle', 'Idle', 'absolute']
179         ]},
180     'index_count': {
181         'options': [None, 'Total index', 'Count', 'Index', 'postgres.index_count', 'line'],
182         'lines': [
183             ['index_count', 'Total index', 'absolute']
184         ]},
185     'index_size': {
186         'options': [None, 'Index size', 'MB', 'Index', 'postgres.index_size', 'line'],
187         'lines': [
188             ['index_size', 'Size', 'absolute', 1, 1024 * 1024]
189         ]},
190     'table_count': {
191         'options': [None, 'Total table', 'Count', 'Table', 'postgres.table_count', 'line'],
192         'lines': [
193             ['table_count', 'Total table', 'absolute']
194         ]},
195     'table_size': {
196         'options': [None, 'Table size', 'MB', 'Table', 'postgres.table_size', 'line'],
197         'lines': [
198             ['table_size', 'Size', 'absolute', 1, 1024 * 1024]
199         ]},
200     'wal': {
201         'options': [None, 'WAL stats', 'Files', 'WAL', 'postgres.wal', 'line'],
202         'lines': [
203             ['wal_total', 'Total', 'absolute'],
204             ['wal_ready', 'Ready', 'absolute'],
205             ['wal_done', 'Done', 'absolute']
206         ]},
207     'operations_heap': {
208         'options': [None, 'Heap', 'iops', 'IO Operations', 'postgres.operations_heap', 'line'],
209         'lines': [
210             ['operations_heap_blocks_read', 'Read', 'absolute'],
211             ['operations_heap_blocks_hit', 'Hit', 'absolute']
212         ]},
213     'operations_index': {
214         'options': [None, 'Index', 'iops', 'IO Operations', 'postgres.operations_index', 'line'],
215         'lines': [
216             ['operations_index_blocks_read', 'Read', 'absolute'],
217             ['operations_index_blocks_hit', 'Hit', 'absolute']
218         ]},
219     'operations_toast': {
220         'options': [None, 'Toast', 'iops', 'IO Operations', 'postgres.operations_toast', 'line'],
221         'lines': [
222             ['operations_toast_blocks_read', 'Read', 'absolute'],
223             ['operations_toast_blocks_hit', 'Hit', 'absolute']
224         ]},
225     'operations_toast_index': {
226         'options': [None, 'Toast index', 'iops', 'IO Operations', 'postgres.operations_toast_index', 'line'],
227         'lines': [
228             ['operations_toastindex_blocks_read', 'Read', 'absolute'],
229             ['operations_toastindex_blocks_hit', 'Hit', 'absolute']
230         ]},
231     'background_writer': {
232         'options': [None, 'Checkpoints', 'Count', 'Background Writer', 'postgres.background_writer', 'line'],
233         'lines': [
234             ['background_writer_scheduled', 'Scheduled', 'absolute'],
235             ['background_writer_requested', 'Requested', 'absolute']
236         ]}
237 }
238
239
240 class Service(SimpleService):
241     def __init__(self, configuration=None, name=None):
242         super(self.__class__, self).__init__(configuration=configuration, name=name)
243         self.order = ORDER
244         self.definitions = CHARTS
245         self.configuration = configuration
246         self.connection = None
247         self.data = {}
248         self.old_data = {}
249         self.databases = set()
250
251     def connect(self):
252         params = dict(user='postgres',
253                       database=None,
254                       password=None,
255                       host='localhost',
256                       port=5432)
257         params.update(self.configuration)
258         self.connection = psycopg2.connect(**params)
259         self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
260         self.connection.set_session(readonly=True)
261
262     def check(self):
263         try:
264             self.connect()
265             self.discover_databases()
266             self._create_definitions()
267             return True
268         except Exception as e:
269             self.error(e)
270             return False
271
272     def _create_definitions(self):
273         for database_name in self.databases:
274             self.databases.add(database_name)
275             for chart_template_name in list(CHARTS):
276                 if chart_template_name.startswith('db_stat'):
277                     self._add_database_stat_chart(chart_template_name, database_name)
278             self._add_database_lock_chart(database_name)
279
280     def discover_databases(self):
281         cursor = self.connection.cursor()
282         cursor.execute("""
283             SELECT datname
284             FROM pg_stat_database
285             WHERE NOT datname ~* '^template\d+'
286         """)
287         self.databases = set(r[0] for r in cursor)
288         cursor.close()
289
290     def _add_database_stat_chart(self, chart_template_name, database_name):
291         chart_template = CHARTS[chart_template_name]
292         chart_name = "{}_{}".format(database_name, chart_template_name)
293         if chart_name not in self.order:
294             self.order.insert(0, chart_name)
295             name, title, units, family, context, chart_type = chart_template['options']
296             self.definitions[chart_name] = {
297                 'options': [
298                     name,
299                     database_name + title,
300                     units,
301                     database_name + family,
302                     database_name + context,
303                     chart_type
304                 ]
305             }
306
307             self.definitions[chart_name]['lines'] = []
308             for line in deepcopy(chart_template['lines']):
309                 line[0] = "{}_{}".format(database_name, line[0])
310                 self.definitions[chart_name]['lines'].append(line)
311
312     def _add_database_lock_chart(self, database_name):
313         chart_name = "{}_locks".format(database_name)
314         if chart_name not in self.order:
315             self.order.insert(0, chart_name)
316             self.definitions[chart_name] = dict(
317                 options=
318                 [
319                     None,
320                     database_name + ' locks',
321                     'Count',
322                     database_name + ' database statistics',
323                     database_name + '.locks',
324                     'line'
325                 ],
326                 lines=[]
327             )
328
329             for lock_type in LOCK_TYPES:
330                 lock_id = "{}_{}".format(database_name, lock_type.lower())
331                 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
332                 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
333
334     def _get_data(self):
335         self.connect()
336
337         cursor = self.connection.cursor(cursor_factory=DictCursor)
338         self.add_stats(cursor)
339
340         cursor.close()
341         self.connection.close()
342
343         return self.data
344
345     def add_stats(self, cursor):
346         self.add_database_stats(cursor)
347         self.add_backend_stats(cursor)
348         self.add_index_stats(cursor)
349         self.add_table_stats(cursor)
350         self.add_lock_stats(cursor)
351         self.add_statio_stats(cursor)
352         self.add_bgwriter_stats(cursor)
353
354         # self.add_replication_stats(cursor)
355
356         # add_wal_metrics needs superuser to get directory listings
357         # if self.config.get('superuser', True):
358         # self.add_wal_stats(cursor)
359
360     def add_database_stats(self, cursor):
361         cursor.execute(DATABASE)
362         for row in cursor:
363             database_name = row.get('database_name')
364             self.add_derive_value('db_stat_xact_commit', prefix=database_name, value=int(row.get('xact_commit', 0)))
365             self.add_derive_value('db_stat_xact_rollback', prefix=database_name, value=int(row.get('xact_rollback', 0)))
366             self.add_derive_value('db_stat_blks_read', prefix=database_name, value=int(row.get('blks_read', 0)))
367             self.add_derive_value('db_stat_blks_hit', prefix=database_name, value=int(row.get('blks_hit', 0)))
368             self.add_derive_value('db_stat_tup_returned', prefix=database_name, value=int(row.get('tup_returned', 0)))
369             self.add_derive_value('db_stat_tup_fetched', prefix=database_name, value=int(row.get('tup_fetched', 0)))
370             self.add_derive_value('db_stat_tup_inserted', prefix=database_name, value=int(row.get('tup_inserted', 0)))
371             self.add_derive_value('db_stat_tup_updated', prefix=database_name, value=int(row.get('tup_updated', 0)))
372             self.add_derive_value('db_stat_tup_deleted', prefix=database_name, value=int(row.get('tup_deleted', 0)))
373             self.add_derive_value('db_stat_conflicts', prefix=database_name, value=int(row.get('conflicts', 0)))
374
375     def add_backend_stats(self, cursor):
376         cursor.execute(BACKENDS)
377         temp = cursor.fetchone()
378
379         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
380         self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
381
382     def add_index_stats(self, cursor):
383         cursor.execute(INDEX_STATS)
384         temp = cursor.fetchone()
385         self.data['index_count'] = int(temp.get('indexes', 0))
386         self.data['index_size'] = int(temp.get('size_indexes', 0))
387
388     def add_table_stats(self, cursor):
389         cursor.execute(TABLE_STATS)
390         temp = cursor.fetchone()
391         self.data['table_count'] = int(temp.get('relations', 0))
392         self.data['table_size'] = int(temp.get('size_relations', 0))
393
394     def add_lock_stats(self, cursor):
395         cursor.execute(DATABASE_LOCKS)
396         # First zero out all current lock values.
397         for database_name in self.databases:
398             for lock_type in LOCK_TYPES:
399                 lock_id = "{}_{}".format(database_name, lock_type.lower())
400                 self.data[lock_id] = 0
401
402         # Now populate those that have current locks
403         for row in cursor:
404             database_name, lock_type, lock_count = row
405             lock_id = "{}_{}".format(database_name, lock_type.lower())
406             self.data[lock_id] = lock_count
407
408     def add_wal_stats(self, cursor):
409         cursor.execute(ARCHIVE)
410         temp = cursor.fetchone()
411         self.add_derive_value('wal_total', int(temp.get('file_count', 0)))
412         self.add_derive_value('wal_ready', int(temp.get('ready_count', 0)))
413         self.add_derive_value('wal_done', int(temp.get('done_count', 0)))
414
415     def add_statio_stats(self, cursor):
416         cursor.execute(STATIO)
417         temp = cursor.fetchone()
418         self.add_derive_value('operations_heap_blocks_read', int(temp.get('heap_blocks_read', 0)))
419         self.add_derive_value('operations_heap_blocks_hit', int(temp.get('heap_blocks_hit', 0)))
420         self.add_derive_value('operations_index_blocks_read', int(temp.get('index_blocks_read', 0)))
421         self.add_derive_value('operations_index_blocks_hit', int(temp.get('index_blocks_hit', 0)))
422         self.add_derive_value('operations_toast_blocks_read', int(temp.get('toast_blocks_read', 0)))
423         self.add_derive_value('operations_toast_blocks_hit', int(temp.get('toast_blocks_hit', 0)))
424         self.add_derive_value('operations_toastindex_blocks_read', int(temp.get('toastindex_blocks_read', 0)))
425         self.add_derive_value('operations_toastindex_blocks_hit', int(temp.get('toastindex_blocks_hit', 0)))
426
427     def add_bgwriter_stats(self, cursor):
428         cursor.execute(BGWRITER)
429         temp = cursor.fetchone()
430
431         self.add_derive_value('background_writer_scheduled', temp.get('checkpoints_timed', 0))
432         self.add_derive_value('background_writer_requested', temp.get('checkpoints_requests', 0))
433
434     def add_derive_value(self, key, value, prefix=None):
435         if prefix:
436             key = "{}_{}".format(prefix, key)
437         if key not in self.old_data.keys():
438             self.data[key] = 0
439         else:
440             self.data[key] = value - self.old_data[key]
441
442         self.old_data[key] = value
443
444
445 '''
446     def add_replication_stats(self, cursor):
447         cursor.execute(REPLICATION)
448         temp = cursor.fetchall()
449         for row in temp:
450             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
451                                  'byte_lag',
452                                  int(row.get('byte_lag', 0)))
453 '''