]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
Reduce necessary queries.
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 from copy import deepcopy
6
7 import psycopg2
8 from psycopg2 import extensions
9 from psycopg2.extras import DictCursor
10
11 from base import SimpleService
12
13 # default module values
14 update_every = 1
15 priority = 90000
16 retries = 60
17
18 # Default Config options.
19 # {
20 #    'database': None,
21 #    'user': 'postgres',
22 #    'password': None,
23 #    'host': 'localhost',
24 #    'port': 5432
25 # }
26
27 ARCHIVE = """
28 SELECT
29     CAST(COUNT(*) AS INT) AS file_count,
30     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
31     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
32 FROM
33     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
34 """
35
36 BACKENDS = """
37 SELECT
38     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
39     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
40 FROM
41     pg_stat_activity;
42 """
43
44 TABLE_STATS = """
45 SELECT
46   ((sum(relpages) * 8) * 1024) AS size_relations,
47   count(1)                     AS relations
48 FROM pg_class
49 WHERE relkind IN ('r', 't');
50 """
51
52 INDEX_STATS = """
53 SELECT
54   ((sum(relpages) * 8) * 1024) AS size_indexes,
55   count(1)                     AS indexes
56 FROM pg_class
57 WHERE relkind = 'i';"""
58
59 DATABASE = """
60 SELECT
61   datname AS database_name,
62   sum(xact_commit) AS xact_commit,
63   sum(xact_rollback) AS xact_rollback,
64   sum(blks_read) AS blks_read,
65   sum(blks_hit) AS blks_hit,
66   sum(tup_returned) AS tup_returned,
67   sum(tup_fetched) AS tup_fetched,
68   sum(tup_inserted) AS tup_inserted,
69   sum(tup_updated) AS tup_updated,
70   sum(tup_deleted) AS tup_deleted,
71   sum(conflicts) AS conflicts
72 FROM pg_stat_database
73 WHERE NOT datname ~* '^template\d+'
74 GROUP BY database_name;
75 """
76
77 STATIO = """
78 SELECT
79     sum(heap_blks_read) AS heap_blocks_read,
80     sum(heap_blks_hit) AS heap_blocks_hit,
81     sum(idx_blks_read) AS index_blocks_read,
82     sum(idx_blks_hit) AS index_blocks_hit,
83     sum(toast_blks_read) AS toast_blocks_read,
84     sum(toast_blks_hit) AS toast_blocks_hit,
85     sum(tidx_blks_read) AS toastindex_blocks_read,
86     sum(tidx_blks_hit) AS toastindex_blocks_hit
87 FROM
88     pg_statio_all_tables
89 WHERE
90     schemaname <> 'pg_catalog';
91 """
92 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
93 LOCKS = 'SELECT mode, count(mode) AS count FROM pg_locks GROUP BY mode ORDER BY mode;'
94 REPLICATION = """
95 SELECT
96     client_hostname,
97     client_addr,
98     state,
99     sent_offset - (
100         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
101 FROM (
102     SELECT
103         client_addr, client_hostname, state,
104         ('x' || lpad(split_part(sent_location,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
105         ('x' || lpad(split_part(replay_location, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
106         ('x' || lpad(split_part(sent_location,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
107         ('x' || lpad(split_part(replay_location, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
108     FROM pg_stat_replication
109 ) AS s;
110 """
111
112 LOCK_MAP = {'AccessExclusiveLock': 'lock_access_exclusive',
113             'AccessShareLock': 'lock_access_share',
114             'ExclusiveLock': 'lock_exclusive',
115             'RowExclusiveLock': 'lock_row_exclusive',
116             'RowShareLock': 'lock_row_share',
117             'ShareUpdateExclusiveLock': 'lock_update_exclusive_lock',
118             'ShareLock': 'lock_share',
119             'ShareRowExclusiveLock': 'lock_share_row_exclusive',
120             'SIReadLock': 'lock_si_read'}
121
122 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
123          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'locks', 'wal', 'operations_heap',
124          'operations_index', 'operations_toast', 'operations_toast_index', 'background_writer']
125
126 CHARTS = {
127     'db_stat_transactions': {
128         'options': [None, ' Transactions', 'Count', ' database statistics', '.db_stat_transactions', 'line'],
129         'lines': [
130             ['db_stat_xact_commit', 'Committed', 'absolute'],
131             ['db_stat_xact_rollback', 'Rolled Back', 'absolute']
132         ]},
133     'db_stat_tuple_read': {
134         'options': [None, ' Tuple read', 'Count', ' database statistics', '.db_stat_tuple_read', 'line'],
135         'lines': [
136             ['db_stat_blks_read', 'Disk', 'absolute'],
137             ['db_stat_blks_hit', 'Cache', 'absolute']
138         ]},
139     'db_stat_tuple_returned': {
140         'options': [None, ' Tuple returned', 'Count', ' database statistics', '.db_stat_tuple_returned', 'line'],
141         'lines': [
142             ['db_stat_tup_returned', 'Sequential', 'absolute'],
143             ['db_stat_tup_fetched', 'Bitmap', 'absolute']
144         ]},
145     'db_stat_tuple_write': {
146         'options': [None, ' Tuple write', 'Count', ' database statistics', '.db_stat_tuple_write', 'line'],
147         'lines': [
148             ['db_stat_tup_inserted', 'Inserted', 'absolute'],
149             ['db_stat_tup_updated', 'Updated', 'absolute'],
150             ['db_stat_tup_deleted', 'Deleted', 'absolute'],
151             ['db_stat_conflicts', 'Conflicts', 'absolute']
152         ]},
153     'backend_process': {
154         'options': [None, 'Backend processes', 'Count', 'Backend processes', 'postgres.backend_process', 'line'],
155         'lines': [
156             ['backend_process_active', 'Active', 'absolute'],
157             ['backend_process_idle', 'Idle', 'absolute']
158         ]},
159     'index_count': {
160         'options': [None, 'Total index', 'Count', 'Index', 'postgres.index_count', 'line'],
161         'lines': [
162             ['index_count', 'Total index', 'absolute']
163         ]},
164     'index_size': {
165         'options': [None, 'Index size', 'MB', 'Index', 'postgres.index_size', 'line'],
166         'lines': [
167             ['index_size', 'Size', 'absolute', 1, 1024 * 1024]
168         ]},
169     'table_count': {
170         'options': [None, 'Total table', 'Count', 'Table', 'postgres.table_count', 'line'],
171         'lines': [
172             ['table_count', 'Total table', 'absolute']
173         ]},
174     'table_size': {
175         'options': [None, 'Table size', 'MB', 'Table', 'postgres.table_size', 'line'],
176         'lines': [
177             ['table_size', 'Size', 'absolute', 1, 1024 * 1024]
178         ]},
179     'locks': {
180         'options': [None, 'Table size', 'Count', 'Locks', 'postgres.locks', 'line'],
181         'lines': [
182             ['lock_access_exclusive', 'Access Exclusive', 'absolute'],
183             ['lock_access_share', 'Access Share', 'absolute'],
184             ['lock_exclusive', 'Exclusive', 'absolute'],
185             ['lock_row_exclusive', 'Row Exclusive', 'absolute'],
186             ['lock_row_share', 'Row Share', 'absolute'],
187             ['lock_update_exclusive_lock', 'Update Exclusive Lock', 'absolute'],
188             ['lock_share', 'Share', 'absolute'],
189             ['lock_share_row_exclusive', 'Share Row Exclusive', 'absolute'],
190             ['lock_si_read', 'SI Read', 'absolute']
191         ]},
192     'wal': {
193         'options': [None, 'WAL stats', 'Files', 'WAL', 'postgres.wal', 'line'],
194         'lines': [
195             ['wal_total', 'Total', 'absolute'],
196             ['wal_ready', 'Ready', 'absolute'],
197             ['wal_done', 'Done', 'absolute']
198         ]},
199     'operations_heap': {
200         'options': [None, 'Heap', 'iops', 'IO Operations', 'postgres.operations_heap', 'line'],
201         'lines': [
202             ['operations_heap_blocks_read', 'Read', 'absolute'],
203             ['operations_heap_blocks_hit', 'Hit', 'absolute']
204         ]},
205     'operations_index': {
206         'options': [None, 'Index', 'iops', 'IO Operations', 'postgres.operations_index', 'line'],
207         'lines': [
208             ['operations_index_blocks_read', 'Read', 'absolute'],
209             ['operations_index_blocks_hit', 'Hit', 'absolute']
210         ]},
211     'operations_toast': {
212         'options': [None, 'Toast', 'iops', 'IO Operations', 'postgres.operations_toast', 'line'],
213         'lines': [
214             ['operations_toast_blocks_read', 'Read', 'absolute'],
215             ['operations_toast_blocks_hit', 'Hit', 'absolute']
216         ]},
217     'operations_toast_index': {
218         'options': [None, 'Toast index', 'iops', 'IO Operations', 'postgres.operations_toast_index', 'line'],
219         'lines': [
220             ['operations_toastindex_blocks_read', 'Read', 'absolute'],
221             ['operations_toastindex_blocks_hit', 'Hit', 'absolute']
222         ]},
223     'background_writer': {
224         'options': [None, 'Checkpoints', 'Count', 'Background Writer', 'postgres.background_writer', 'line'],
225         'lines': [
226             ['background_writer_scheduled', 'Scheduled', 'absolute'],
227             ['background_writer_requested', 'Requested', 'absolute']
228         ]}
229 }
230
231
232 class Service(SimpleService):
233     def __init__(self, configuration=None, name=None):
234         super(self.__class__, self).__init__(configuration=configuration, name=name)
235         self.order = ORDER
236         self.definitions = CHARTS
237         self.configuration = configuration
238         self.connection = None
239         self.data = {}
240         self.old_data = {}
241
242     def connect(self):
243         params = dict(user='postgres',
244                       database=None,
245                       password=None,
246                       host='localhost',
247                       port=5432)
248         params.update(self.configuration)
249         self.connection = psycopg2.connect(**params)
250         self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
251         self.connection.set_session(readonly=True)
252
253     def check(self):
254         try:
255             self.connect()
256             self._create_definitions()
257             return True
258         except Exception as e:
259             self.error(e)
260             return False
261
262     def _create_definitions(self):
263         cursor = self.connection.cursor()
264         cursor.execute("""
265             SELECT datname
266             FROM pg_stat_database
267             WHERE NOT datname ~* '^template\d+'
268         """)
269
270         for row in cursor:
271             database_name = row[0]
272             for chart_template_name in list(CHARTS):
273                 if not chart_template_name.startswith('db_stat'):
274                     continue
275
276                 chart_template = CHARTS[chart_template_name]
277                 chart_name = "{}_{}".format(database_name, chart_template_name)
278                 if chart_name not in self.order:
279                     self.order.insert(0, chart_name)
280                     name, title, units, family, context, chart_type = chart_template['options']
281                     self.definitions[chart_name] = {
282                         'options': [
283                             name,
284                             database_name + title,
285                             units,
286                             database_name + family,
287                             database_name + context,
288                             chart_type
289                         ]
290                     }
291
292                     self.definitions[chart_name]['lines'] = []
293                     for line in deepcopy(chart_template['lines']):
294                         line[0] = "{}_{}".format(database_name, line[0])
295                         self.definitions[chart_name]['lines'].append(line)
296
297         cursor.close()
298
299     def _get_data(self):
300         self.connect()
301
302         cursor = self.connection.cursor(cursor_factory=DictCursor)
303         self.add_stats(cursor)
304
305         cursor.close()
306         self.connection.close()
307
308         return self.data
309
310     def add_stats(self, cursor):
311         self.add_database_stats(cursor)
312         self.add_backend_stats(cursor)
313         self.add_index_stats(cursor)
314         self.add_table_stats(cursor)
315         self.add_lock_stats(cursor)
316         self.add_statio_stats(cursor)
317         self.add_bgwriter_stats(cursor)
318
319         # self.add_replication_stats(cursor)
320
321         # add_wal_metrics needs superuser to get directory listings
322         # if self.config.get('superuser', True):
323         # self.add_wal_stats(cursor)
324
325     def add_database_stats(self, cursor):
326         cursor.execute(DATABASE)
327         for row in cursor:
328             database_name = row.get('database_name')
329             self.add_derive_value('db_stat_xact_commit', prefix=database_name, value=int(row.get('xact_commit', 0)))
330             self.add_derive_value('db_stat_xact_rollback', prefix=database_name, value=int(row.get('xact_rollback', 0)))
331             self.add_derive_value('db_stat_blks_read', prefix=database_name, value=int(row.get('blks_read', 0)))
332             self.add_derive_value('db_stat_blks_hit', prefix=database_name, value=int(row.get('blks_hit', 0)))
333             self.add_derive_value('db_stat_tup_returned', prefix=database_name, value=int(row.get('tup_returned', 0)))
334             self.add_derive_value('db_stat_tup_fetched', prefix=database_name, value=int(row.get('tup_fetched', 0)))
335             self.add_derive_value('db_stat_tup_inserted', prefix=database_name, value=int(row.get('tup_inserted', 0)))
336             self.add_derive_value('db_stat_tup_updated', prefix=database_name, value=int(row.get('tup_updated', 0)))
337             self.add_derive_value('db_stat_tup_deleted', prefix=database_name, value=int(row.get('tup_deleted', 0)))
338             self.add_derive_value('db_stat_conflicts', prefix=database_name, value=int(row.get('conflicts', 0)))
339
340     def add_backend_stats(self, cursor):
341         cursor.execute(BACKENDS)
342         temp = cursor.fetchone()
343
344         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
345         self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
346
347     def add_index_stats(self, cursor):
348         cursor.execute(INDEX_STATS)
349         temp = cursor.fetchone()
350         self.data['index_count'] = int(temp.get('indexes', 0))
351         self.data['index_size'] = int(temp.get('size_indexes', 0))
352
353     def add_table_stats(self, cursor):
354         cursor.execute(TABLE_STATS)
355         temp = cursor.fetchone()
356         self.data['table_count'] = int(temp.get('relations', 0))
357         self.data['table_size'] = int(temp.get('size_relations', 0))
358
359     def add_lock_stats(self, cursor):
360         cursor.execute(LOCKS)
361         temp = cursor.fetchall()
362         for key in LOCK_MAP:
363             found = False
364             for row in temp:
365                 if row['mode'] == key:
366                     found = True
367                     self.data[LOCK_MAP[key]] = int(row['count'])
368
369             if not found:
370                 self.data[LOCK_MAP[key]] = 0
371
372     def add_wal_stats(self, cursor):
373         cursor.execute(ARCHIVE)
374         temp = cursor.fetchone()
375         self.add_derive_value('wal_total', int(temp.get('file_count', 0)))
376         self.add_derive_value('wal_ready', int(temp.get('ready_count', 0)))
377         self.add_derive_value('wal_done', int(temp.get('done_count', 0)))
378
379     def add_statio_stats(self, cursor):
380         cursor.execute(STATIO)
381         temp = cursor.fetchone()
382         self.add_derive_value('operations_heap_blocks_read', int(temp.get('heap_blocks_read', 0)))
383         self.add_derive_value('operations_heap_blocks_hit', int(temp.get('heap_blocks_hit', 0)))
384         self.add_derive_value('operations_index_blocks_read', int(temp.get('index_blocks_read', 0)))
385         self.add_derive_value('operations_index_blocks_hit', int(temp.get('index_blocks_hit', 0)))
386         self.add_derive_value('operations_toast_blocks_read', int(temp.get('toast_blocks_read', 0)))
387         self.add_derive_value('operations_toast_blocks_hit', int(temp.get('toast_blocks_hit', 0)))
388         self.add_derive_value('operations_toastindex_blocks_read', int(temp.get('toastindex_blocks_read', 0)))
389         self.add_derive_value('operations_toastindex_blocks_hit', int(temp.get('toastindex_blocks_hit', 0)))
390
391     def add_bgwriter_stats(self, cursor):
392         cursor.execute(BGWRITER)
393         temp = cursor.fetchone()
394
395         self.add_derive_value('background_writer_scheduled', temp.get('checkpoints_timed', 0))
396         self.add_derive_value('background_writer_requested', temp.get('checkpoints_requests', 0))
397
398     def add_derive_value(self, key, value, prefix=None):
399         if prefix:
400             key = "{}_{}".format(prefix, key)
401         if key not in self.old_data.keys():
402             self.data[key] = 0
403         else:
404             self.data[key] = value - self.old_data[key]
405
406         self.old_data[key] = value
407
408
409 '''
410     def add_replication_stats(self, cursor):
411         cursor.execute(REPLICATION)
412         temp = cursor.fetchall()
413         for row in temp:
414             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
415                                  'byte_lag',
416                                  int(row.get('byte_lag', 0)))
417 '''