]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
Enable WAL chart when connected as superuser.
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 import re
6 from copy import deepcopy
7
8 import psycopg2
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11
12 from base import SimpleService
13
14 # default module values
15 update_every = 1
16 priority = 90000
17 retries = 60
18
19 # Default Config options.
20 # {
21 #    'database': None,
22 #    'user': 'postgres',
23 #    'password': None,
24 #    'host': 'localhost',
25 #    'port': 5432
26 # }
27
28 ARCHIVE = """
29 SELECT
30     CAST(COUNT(*) AS INT) AS file_count,
31     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
32     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
33 FROM
34     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
35 """
36
37 BACKENDS = """
38 SELECT
39     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
40     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
41 FROM
42     pg_stat_activity;
43 """
44
45 TABLE_STATS = """
46 SELECT
47   ((sum(relpages) * 8) * 1024) AS size_relations,
48   count(1)                     AS relations
49 FROM pg_class
50 WHERE relkind IN ('r', 't');
51 """
52
53 INDEX_STATS = """
54 SELECT
55   ((sum(relpages) * 8) * 1024) AS size_indexes,
56   count(1)                     AS indexes
57 FROM pg_class
58 WHERE relkind = 'i';"""
59
60 DATABASE = """
61 SELECT
62   datname AS database_name,
63   sum(numbackends) AS connections,
64   sum(xact_commit) AS xact_commit,
65   sum(xact_rollback) AS xact_rollback,
66   sum(blks_read) AS blks_read,
67   sum(blks_hit) AS blks_hit,
68   sum(tup_returned) AS tup_returned,
69   sum(tup_fetched) AS tup_fetched,
70   sum(tup_inserted) AS tup_inserted,
71   sum(tup_updated) AS tup_updated,
72   sum(tup_deleted) AS tup_deleted,
73   sum(conflicts) AS conflicts
74 FROM pg_stat_database
75 WHERE NOT datname ~* '^template\d+'
76 GROUP BY database_name;
77 """
78
79 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
80 DATABASE_LOCKS = """
81 SELECT
82   pg_database.datname as database_name,
83   mode,
84   count(mode) AS count
85 FROM pg_locks
86   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
87 GROUP BY datname, mode
88 ORDER BY datname, mode;
89 """
90 REPLICATION = """
91 SELECT
92     client_hostname,
93     client_addr,
94     state,
95     sent_offset - (
96         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
97 FROM (
98     SELECT
99         client_addr, client_hostname, state,
100         ('x' || lpad(split_part(sent_location,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
101         ('x' || lpad(split_part(replay_location, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
102         ('x' || lpad(split_part(sent_location,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
103         ('x' || lpad(split_part(replay_location, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
104     FROM pg_stat_replication
105 ) AS s;
106 """
107
108 LOCK_TYPES = [
109     'ExclusiveLock',
110     'RowShareLock',
111     'SIReadLock',
112     'ShareUpdateExclusiveLock',
113     'AccessExclusiveLock',
114     'AccessShareLock',
115     'ShareRowExclusiveLock',
116     'ShareLock',
117     'RowExclusiveLock'
118 ]
119
120 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
121          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
122
123 CHARTS = {
124     'db_stat_transactions': {
125         'options': [None, ' Transactions', 'Count', ' database statistics', '.db_stat_transactions', 'line'],
126         'lines': [
127             ['db_stat_xact_commit', 'Committed', 'absolute'],
128             ['db_stat_xact_rollback', 'Rolled Back', 'absolute']
129         ]},
130     'db_stat_connections': {
131         'options': [None, ' Connections', 'Count', ' database statistics', '.db_stat_connections', 'line'],
132         'lines': [
133             ['db_stat_connections', 'Connections', 'absolute']
134         ]},
135     'db_stat_tuple_read': {
136         'options': [None, ' Tuple read', 'Count', ' database statistics', '.db_stat_tuple_read', 'line'],
137         'lines': [
138             ['db_stat_blks_read', 'Disk', 'absolute'],
139             ['db_stat_blks_hit', 'Cache', 'absolute']
140         ]},
141     'db_stat_tuple_returned': {
142         'options': [None, ' Tuple returned', 'Count', ' database statistics', '.db_stat_tuple_returned', 'line'],
143         'lines': [
144             ['db_stat_tup_returned', 'Sequential', 'absolute'],
145             ['db_stat_tup_fetched', 'Bitmap', 'absolute']
146         ]},
147     'db_stat_tuple_write': {
148         'options': [None, ' Tuple write', 'Count', ' database statistics', '.db_stat_tuple_write', 'line'],
149         'lines': [
150             ['db_stat_tup_inserted', 'Inserted', 'absolute'],
151             ['db_stat_tup_updated', 'Updated', 'absolute'],
152             ['db_stat_tup_deleted', 'Deleted', 'absolute'],
153             ['db_stat_conflicts', 'Conflicts', 'absolute']
154         ]},
155     'backend_process': {
156         'options': [None, 'Backend processes', 'Count', 'Backend processes', 'postgres.backend_process', 'line'],
157         'lines': [
158             ['backend_process_active', 'Active', 'absolute'],
159             ['backend_process_idle', 'Idle', 'absolute']
160         ]},
161     'index_count': {
162         'options': [None, 'Total index', 'Count', 'Index', 'postgres.index_count', 'line'],
163         'lines': [
164             ['index_count', 'Total index', 'absolute']
165         ]},
166     'index_size': {
167         'options': [None, 'Index size', 'MB', 'Index', 'postgres.index_size', 'line'],
168         'lines': [
169             ['index_size', 'Size', 'absolute', 1, 1024 * 1024]
170         ]},
171     'table_count': {
172         'options': [None, 'Total table', 'Count', 'Table', 'postgres.table_count', 'line'],
173         'lines': [
174             ['table_count', 'Total table', 'absolute']
175         ]},
176     'table_size': {
177         'options': [None, 'Table size', 'MB', 'Table', 'postgres.table_size', 'line'],
178         'lines': [
179             ['table_size', 'Size', 'absolute', 1, 1024 * 1024]
180         ]},
181     'wal': {
182         'options': [None, 'WAL stats', 'Files', 'WAL', 'postgres.wal', 'line'],
183         'lines': [
184             ['wal_total', 'Total', 'absolute'],
185             ['wal_ready', 'Ready', 'absolute'],
186             ['wal_done', 'Done', 'absolute']
187         ]},
188     'background_writer': {
189         'options': [None, 'Checkpoints', 'Count', 'Background Writer', 'postgres.background_writer', 'line'],
190         'lines': [
191             ['background_writer_scheduled', 'Scheduled', 'absolute'],
192             ['background_writer_requested', 'Requested', 'absolute']
193         ]}
194 }
195
196
197 class Service(SimpleService):
198     def __init__(self, configuration=None, name=None):
199         super(self.__class__, self).__init__(configuration=configuration, name=name)
200         self.order = ORDER
201         self.definitions = CHARTS
202         self.table_stats = configuration.pop('table_stats', True)
203         self.index_stats = configuration.pop('index_stats', True)
204         self.configuration = configuration
205         self.connection = None
206         self.is_superuser = False
207         self.data = {}
208         self.old_data = {}
209         self.databases = set()
210
211     def _connect(self):
212         params = dict(user='postgres',
213                       database=None,
214                       password=None,
215                       host='localhost',
216                       port=5432)
217         params.update(self.configuration)
218
219         if not self.connection:
220             self.connection = psycopg2.connect(**params)
221             self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
222             self.connection.set_session(readonly=True)
223
224     def check(self):
225         try:
226             self._connect()
227             cursor = self.connection.cursor()
228             self._discover_databases(cursor)
229             self._check_if_superuser(cursor)
230             cursor.close()
231
232             self._create_definitions()
233             return True
234         except Exception as e:
235             self.error(e)
236             return False
237
238     def _discover_databases(self, cursor):
239         cursor.execute("""
240             SELECT datname
241             FROM pg_stat_database
242             WHERE NOT datname ~* '^template\d+'
243         """)
244         self.databases = set(r[0] for r in cursor)
245
246     def _check_if_superuser(self, cursor):
247         cursor.execute("""
248             SELECT current_setting('is_superuser') = 'on' AS is_superuser;
249         """)
250         self.is_superuser = cursor.fetchone()[0]
251
252
253     def _create_definitions(self):
254         for database_name in self.databases:
255             self.databases.add(database_name)
256             for chart_template_name in list(CHARTS):
257                 if chart_template_name.startswith('db_stat'):
258                     self._add_database_stat_chart(chart_template_name, database_name)
259             self._add_database_lock_chart(database_name)
260
261     def _add_database_stat_chart(self, chart_template_name, database_name):
262         chart_template = CHARTS[chart_template_name]
263         chart_name = "{}_{}".format(database_name, chart_template_name)
264         if chart_name not in self.order:
265             self.order.insert(0, chart_name)
266             name, title, units, family, context, chart_type = chart_template['options']
267             self.definitions[chart_name] = {
268                 'options': [
269                     name,
270                     database_name + title,
271                     units,
272                     database_name + family,
273                     database_name + context,
274                     chart_type
275                 ]
276             }
277
278             self.definitions[chart_name]['lines'] = []
279             for line in deepcopy(chart_template['lines']):
280                 line[0] = "{}_{}".format(database_name, line[0])
281                 self.definitions[chart_name]['lines'].append(line)
282
283     def _add_database_lock_chart(self, database_name):
284         chart_name = "{}_locks".format(database_name)
285         if chart_name not in self.order:
286             self.order.insert(0, chart_name)
287             self.definitions[chart_name] = dict(
288                 options=
289                 [
290                     None,
291                     database_name + ' locks',
292                     'Count',
293                     database_name + ' database statistics',
294                     database_name + '.locks',
295                     'line'
296                 ],
297                 lines=[]
298             )
299
300             for lock_type in LOCK_TYPES:
301                 lock_id = "{}_{}".format(database_name, lock_type.lower())
302                 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
303                 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
304
305     def _get_data(self):
306         self._connect()
307
308         cursor = self.connection.cursor(cursor_factory=DictCursor)
309         self.add_stats(cursor)
310
311         cursor.close()
312         return self.data
313
314     def add_stats(self, cursor):
315         self.add_database_stats(cursor)
316         self.add_backend_stats(cursor)
317         if self.index_stats:
318             self.add_index_stats(cursor)
319         if self.table_stats:
320             self.add_table_stats(cursor)
321         self.add_lock_stats(cursor)
322         self.add_bgwriter_stats(cursor)
323
324         # self.add_replication_stats(cursor)
325
326         if self.is_superuser:
327             self.add_wal_stats(cursor)
328
329     def add_database_stats(self, cursor):
330         cursor.execute(DATABASE)
331         for row in cursor:
332             database_name = row.get('database_name')
333             self.add_derive_value('db_stat_xact_commit', prefix=database_name, value=int(row.get('xact_commit', 0)))
334             self.add_derive_value('db_stat_xact_rollback', prefix=database_name, value=int(row.get('xact_rollback', 0)))
335             self.add_derive_value('db_stat_blks_read', prefix=database_name, value=int(row.get('blks_read', 0)))
336             self.add_derive_value('db_stat_blks_hit', prefix=database_name, value=int(row.get('blks_hit', 0)))
337             self.add_derive_value('db_stat_tup_returned', prefix=database_name, value=int(row.get('tup_returned', 0)))
338             self.add_derive_value('db_stat_tup_fetched', prefix=database_name, value=int(row.get('tup_fetched', 0)))
339             self.add_derive_value('db_stat_tup_inserted', prefix=database_name, value=int(row.get('tup_inserted', 0)))
340             self.add_derive_value('db_stat_tup_updated', prefix=database_name, value=int(row.get('tup_updated', 0)))
341             self.add_derive_value('db_stat_tup_deleted', prefix=database_name, value=int(row.get('tup_deleted', 0)))
342             self.add_derive_value('db_stat_conflicts', prefix=database_name, value=int(row.get('conflicts', 0)))
343             conn_key = "{}_{}".format(database_name, 'db_stat_connections')
344             self.data[conn_key] = int(row.get('connections', 0))
345
346     def add_backend_stats(self, cursor):
347         cursor.execute(BACKENDS)
348         temp = cursor.fetchone()
349
350         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
351         self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
352
353     def add_index_stats(self, cursor):
354         cursor.execute(INDEX_STATS)
355         temp = cursor.fetchone()
356         self.data['index_count'] = int(temp.get('indexes', 0))
357         self.data['index_size'] = int(temp.get('size_indexes', 0))
358
359     def add_table_stats(self, cursor):
360         cursor.execute(TABLE_STATS)
361         temp = cursor.fetchone()
362         self.data['table_count'] = int(temp.get('relations', 0))
363         self.data['table_size'] = int(temp.get('size_relations', 0))
364
365     def add_lock_stats(self, cursor):
366         cursor.execute(DATABASE_LOCKS)
367         # First zero out all current lock values.
368         for database_name in self.databases:
369             for lock_type in LOCK_TYPES:
370                 lock_id = "{}_{}".format(database_name, lock_type.lower())
371                 self.data[lock_id] = 0
372
373         # Now populate those that have current locks
374         for row in cursor:
375             database_name, lock_type, lock_count = row
376             lock_id = "{}_{}".format(database_name, lock_type.lower())
377             self.data[lock_id] = lock_count
378
379     def add_wal_stats(self, cursor):
380         cursor.execute(ARCHIVE)
381         temp = cursor.fetchone()
382         self.add_derive_value('wal_total', int(temp.get('file_count', 0)))
383         self.add_derive_value('wal_ready', int(temp.get('ready_count', 0)))
384         self.add_derive_value('wal_done', int(temp.get('done_count', 0)))
385
386     def add_bgwriter_stats(self, cursor):
387         cursor.execute(BGWRITER)
388         temp = cursor.fetchone()
389
390         self.add_derive_value('background_writer_scheduled', temp.get('checkpoints_timed', 0))
391         self.add_derive_value('background_writer_requested', temp.get('checkpoints_requests', 0))
392
393     def add_derive_value(self, key, value, prefix=None):
394         if prefix:
395             key = "{}_{}".format(prefix, key)
396         if key not in self.old_data.keys():
397             self.data[key] = 0
398         else:
399             self.data[key] = value - self.old_data[key]
400
401         self.old_data[key] = value
402
403
404 '''
405     def add_replication_stats(self, cursor):
406         cursor.execute(REPLICATION)
407         temp = cursor.fetchall()
408         for row in temp:
409             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
410                                  'byte_lag',
411                                  int(row.get('byte_lag', 0)))
412 '''