]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
Merge pull request #1175 from ktsaou/master
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 import re
6 from copy import deepcopy
7
8 import psycopg2
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11
12 from base import SimpleService
13
14 # default module values
15 update_every = 1
16 priority = 90000
17 retries = 60
18
19 ARCHIVE = """
20 SELECT
21     CAST(COUNT(*) AS INT) AS file_count,
22     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
23     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
24 FROM
25     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
26 """
27
28 BACKENDS = """
29 SELECT
30     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
31     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
32 FROM
33     pg_stat_activity;
34 """
35
36 TABLE_STATS = """
37 SELECT
38   ((sum(relpages) * 8) * 1024) AS size_relations,
39   count(1)                     AS relations
40 FROM pg_class
41 WHERE relkind IN ('r', 't');
42 """
43
44 INDEX_STATS = """
45 SELECT
46   ((sum(relpages) * 8) * 1024) AS size_indexes,
47   count(1)                     AS indexes
48 FROM pg_class
49 WHERE relkind = 'i';"""
50
51 DATABASE = """
52 SELECT
53   datname AS database_name,
54   sum(numbackends) AS connections,
55   sum(xact_commit) AS xact_commit,
56   sum(xact_rollback) AS xact_rollback,
57   sum(blks_read) AS blks_read,
58   sum(blks_hit) AS blks_hit,
59   sum(tup_returned) AS tup_returned,
60   sum(tup_fetched) AS tup_fetched,
61   sum(tup_inserted) AS tup_inserted,
62   sum(tup_updated) AS tup_updated,
63   sum(tup_deleted) AS tup_deleted,
64   sum(conflicts) AS conflicts
65 FROM pg_stat_database
66 WHERE NOT datname ~* '^template\d+'
67 GROUP BY database_name;
68 """
69
70 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
71 DATABASE_LOCKS = """
72 SELECT
73   pg_database.datname as database_name,
74   mode,
75   count(mode) AS count
76 FROM pg_locks
77   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
78 GROUP BY datname, mode
79 ORDER BY datname, mode;
80 """
81 REPLICATION = """
82 SELECT
83     client_hostname,
84     client_addr,
85     state,
86     sent_offset - (
87         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
88 FROM (
89     SELECT
90         client_addr, client_hostname, state,
91         ('x' || lpad(split_part(sent_location,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
92         ('x' || lpad(split_part(replay_location, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
93         ('x' || lpad(split_part(sent_location,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
94         ('x' || lpad(split_part(replay_location, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
95     FROM pg_stat_replication
96 ) AS s;
97 """
98
99 LOCK_TYPES = [
100     'ExclusiveLock',
101     'RowShareLock',
102     'SIReadLock',
103     'ShareUpdateExclusiveLock',
104     'AccessExclusiveLock',
105     'AccessShareLock',
106     'ShareRowExclusiveLock',
107     'ShareLock',
108     'RowExclusiveLock'
109 ]
110
111 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
112          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
113
114 CHARTS = {
115     'db_stat_transactions': {
116         'options': [None, ' Transactions', 'Count', ' database statistics', '.db_stat_transactions', 'line'],
117         'lines': [
118             ['db_stat_xact_commit', 'Committed', 'absolute'],
119             ['db_stat_xact_rollback', 'Rolled Back', 'absolute']
120         ]},
121     'db_stat_connections': {
122         'options': [None, ' Connections', 'Count', ' database statistics', '.db_stat_connections', 'line'],
123         'lines': [
124             ['db_stat_connections', 'Connections', 'absolute']
125         ]},
126     'db_stat_tuple_read': {
127         'options': [None, ' Tuple read', 'Count', ' database statistics', '.db_stat_tuple_read', 'line'],
128         'lines': [
129             ['db_stat_blks_read', 'Disk', 'absolute'],
130             ['db_stat_blks_hit', 'Cache', 'absolute']
131         ]},
132     'db_stat_tuple_returned': {
133         'options': [None, ' Tuple returned', 'Count', ' database statistics', '.db_stat_tuple_returned', 'line'],
134         'lines': [
135             ['db_stat_tup_returned', 'Sequential', 'absolute'],
136             ['db_stat_tup_fetched', 'Bitmap', 'absolute']
137         ]},
138     'db_stat_tuple_write': {
139         'options': [None, ' Tuple write', 'Count', ' database statistics', '.db_stat_tuple_write', 'line'],
140         'lines': [
141             ['db_stat_tup_inserted', 'Inserted', 'absolute'],
142             ['db_stat_tup_updated', 'Updated', 'absolute'],
143             ['db_stat_tup_deleted', 'Deleted', 'absolute'],
144             ['db_stat_conflicts', 'Conflicts', 'absolute']
145         ]},
146     'backend_process': {
147         'options': [None, 'Backend processes', 'Count', 'Backend processes', 'postgres.backend_process', 'line'],
148         'lines': [
149             ['backend_process_active', 'Active', 'absolute'],
150             ['backend_process_idle', 'Idle', 'absolute']
151         ]},
152     'index_count': {
153         'options': [None, 'Total index', 'Count', 'Index', 'postgres.index_count', 'line'],
154         'lines': [
155             ['index_count', 'Total index', 'absolute']
156         ]},
157     'index_size': {
158         'options': [None, 'Index size', 'MB', 'Index', 'postgres.index_size', 'line'],
159         'lines': [
160             ['index_size', 'Size', 'absolute', 1, 1024 * 1024]
161         ]},
162     'table_count': {
163         'options': [None, 'Total table', 'Count', 'Table', 'postgres.table_count', 'line'],
164         'lines': [
165             ['table_count', 'Total table', 'absolute']
166         ]},
167     'table_size': {
168         'options': [None, 'Table size', 'MB', 'Table', 'postgres.table_size', 'line'],
169         'lines': [
170             ['table_size', 'Size', 'absolute', 1, 1024 * 1024]
171         ]},
172     'wal': {
173         'options': [None, 'WAL stats', 'Files', 'WAL', 'postgres.wal', 'line'],
174         'lines': [
175             ['wal_total', 'Total', 'absolute'],
176             ['wal_ready', 'Ready', 'absolute'],
177             ['wal_done', 'Done', 'absolute']
178         ]},
179     'background_writer': {
180         'options': [None, 'Checkpoints', 'Count', 'Background Writer', 'postgres.background_writer', 'line'],
181         'lines': [
182             ['background_writer_scheduled', 'Scheduled', 'absolute'],
183             ['background_writer_requested', 'Requested', 'absolute']
184         ]}
185 }
186
187
188 class Service(SimpleService):
189     def __init__(self, configuration=None, name=None):
190         super(self.__class__, self).__init__(configuration=configuration, name=name)
191         self.order = ORDER
192         self.definitions = CHARTS
193         self.table_stats = configuration.pop('table_stats', True)
194         self.index_stats = configuration.pop('index_stats', True)
195         self.configuration = configuration
196         self.connection = None
197         self.is_superuser = False
198         self.data = {}
199         self.old_data = {}
200         self.databases = set()
201
202     def _connect(self):
203         params = dict(user='postgres',
204                       database=None,
205                       password=None,
206                       host='localhost',
207                       port=5432)
208         params.update(self.configuration)
209
210         if not self.connection:
211             self.connection = psycopg2.connect(**params)
212             self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
213             self.connection.set_session(readonly=True)
214
215     def check(self):
216         try:
217             self._connect()
218             cursor = self.connection.cursor()
219             self._discover_databases(cursor)
220             self._check_if_superuser(cursor)
221             cursor.close()
222
223             self._create_definitions()
224             return True
225         except Exception as e:
226             self.error(e)
227             return False
228
229     def _discover_databases(self, cursor):
230         cursor.execute("""
231             SELECT datname
232             FROM pg_stat_database
233             WHERE NOT datname ~* '^template\d+'
234         """)
235         self.databases = set(r[0] for r in cursor)
236
237     def _check_if_superuser(self, cursor):
238         cursor.execute("""
239             SELECT current_setting('is_superuser') = 'on' AS is_superuser;
240         """)
241         self.is_superuser = cursor.fetchone()[0]
242
243     def _create_definitions(self):
244         for database_name in self.databases:
245             for chart_template_name in list(CHARTS):
246                 if chart_template_name.startswith('db_stat'):
247                     self._add_database_stat_chart(chart_template_name, database_name)
248             self._add_database_lock_chart(database_name)
249
250     def _add_database_stat_chart(self, chart_template_name, database_name):
251         chart_template = CHARTS[chart_template_name]
252         chart_name = "{}_{}".format(database_name, chart_template_name)
253         if chart_name not in self.order:
254             self.order.insert(0, chart_name)
255             name, title, units, family, context, chart_type = chart_template['options']
256             self.definitions[chart_name] = {
257                 'options': [
258                     name,
259                     database_name + title,
260                     units,
261                     database_name + family,
262                     database_name + context,
263                     chart_type
264                 ]
265             }
266
267             self.definitions[chart_name]['lines'] = []
268             for line in deepcopy(chart_template['lines']):
269                 line[0] = "{}_{}".format(database_name, line[0])
270                 self.definitions[chart_name]['lines'].append(line)
271
272     def _add_database_lock_chart(self, database_name):
273         chart_name = "{}_locks".format(database_name)
274         if chart_name not in self.order:
275             self.order.insert(0, chart_name)
276             self.definitions[chart_name] = dict(
277                 options=
278                 [
279                     None,
280                     database_name + ' locks',
281                     'Count',
282                     database_name + ' database statistics',
283                     database_name + '.locks',
284                     'line'
285                 ],
286                 lines=[]
287             )
288
289             for lock_type in LOCK_TYPES:
290                 lock_id = "{}_{}".format(database_name, lock_type.lower())
291                 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
292                 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
293
294     def _get_data(self):
295         self._connect()
296
297         cursor = self.connection.cursor(cursor_factory=DictCursor)
298         self.add_stats(cursor)
299
300         cursor.close()
301         return self.data
302
303     def add_stats(self, cursor):
304         self.add_database_stats(cursor)
305         self.add_backend_stats(cursor)
306         if self.index_stats:
307             self.add_index_stats(cursor)
308         if self.table_stats:
309             self.add_table_stats(cursor)
310         self.add_lock_stats(cursor)
311         self.add_bgwriter_stats(cursor)
312
313         # self.add_replication_stats(cursor)
314
315         if self.is_superuser:
316             self.add_wal_stats(cursor)
317
318     def add_database_stats(self, cursor):
319         cursor.execute(DATABASE)
320         for row in cursor:
321             database_name = row.get('database_name')
322             self.add_derive_value('db_stat_xact_commit', prefix=database_name, value=int(row.get('xact_commit', 0)))
323             self.add_derive_value('db_stat_xact_rollback', prefix=database_name, value=int(row.get('xact_rollback', 0)))
324             self.add_derive_value('db_stat_blks_read', prefix=database_name, value=int(row.get('blks_read', 0)))
325             self.add_derive_value('db_stat_blks_hit', prefix=database_name, value=int(row.get('blks_hit', 0)))
326             self.add_derive_value('db_stat_tup_returned', prefix=database_name, value=int(row.get('tup_returned', 0)))
327             self.add_derive_value('db_stat_tup_fetched', prefix=database_name, value=int(row.get('tup_fetched', 0)))
328             self.add_derive_value('db_stat_tup_inserted', prefix=database_name, value=int(row.get('tup_inserted', 0)))
329             self.add_derive_value('db_stat_tup_updated', prefix=database_name, value=int(row.get('tup_updated', 0)))
330             self.add_derive_value('db_stat_tup_deleted', prefix=database_name, value=int(row.get('tup_deleted', 0)))
331             self.add_derive_value('db_stat_conflicts', prefix=database_name, value=int(row.get('conflicts', 0)))
332             conn_key = "{}_{}".format(database_name, 'db_stat_connections')
333             self.data[conn_key] = int(row.get('connections', 0))
334
335     def add_backend_stats(self, cursor):
336         cursor.execute(BACKENDS)
337         temp = cursor.fetchone()
338
339         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
340         self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
341
342     def add_index_stats(self, cursor):
343         cursor.execute(INDEX_STATS)
344         temp = cursor.fetchone()
345         self.data['index_count'] = int(temp.get('indexes', 0))
346         self.data['index_size'] = int(temp.get('size_indexes', 0))
347
348     def add_table_stats(self, cursor):
349         cursor.execute(TABLE_STATS)
350         temp = cursor.fetchone()
351         self.data['table_count'] = int(temp.get('relations', 0))
352         self.data['table_size'] = int(temp.get('size_relations', 0))
353
354     def add_lock_stats(self, cursor):
355         cursor.execute(DATABASE_LOCKS)
356         # First zero out all current lock values.
357         for database_name in self.databases:
358             for lock_type in LOCK_TYPES:
359                 lock_id = "{}_{}".format(database_name, lock_type.lower())
360                 self.data[lock_id] = 0
361
362         # Now populate those that have current locks
363         for row in cursor:
364             database_name, lock_type, lock_count = row
365             lock_id = "{}_{}".format(database_name, lock_type.lower())
366             self.data[lock_id] = lock_count
367
368     def add_wal_stats(self, cursor):
369         cursor.execute(ARCHIVE)
370         temp = cursor.fetchone()
371         self.add_derive_value('wal_total', int(temp.get('file_count', 0)))
372         self.add_derive_value('wal_ready', int(temp.get('ready_count', 0)))
373         self.add_derive_value('wal_done', int(temp.get('done_count', 0)))
374
375     def add_bgwriter_stats(self, cursor):
376         cursor.execute(BGWRITER)
377         temp = cursor.fetchone()
378
379         self.add_derive_value('background_writer_scheduled', temp.get('checkpoints_timed', 0))
380         self.add_derive_value('background_writer_requested', temp.get('checkpoints_requests', 0))
381
382     def add_derive_value(self, key, value, prefix=None):
383         if prefix:
384             key = "{}_{}".format(prefix, key)
385         if key not in self.old_data.keys():
386             self.data[key] = 0
387         else:
388             self.data[key] = value - self.old_data[key]
389
390         self.old_data[key] = value
391
392
393 '''
394     def add_replication_stats(self, cursor):
395         cursor.execute(REPLICATION)
396         temp = cursor.fetchall()
397         for row in temp:
398             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
399                                  'byte_lag',
400                                  int(row.get('byte_lag', 0)))
401 '''