]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
Merge pull request #1615 from lowfive/discord_notify
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 import re
6 from copy import deepcopy
7
8 import psycopg2
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11 from psycopg2 import OperationalError
12
13 from base import SimpleService
14
15 # default module values
16 update_every = 1
17 priority = 90000
18 retries = 60
19
20 ARCHIVE = """
21 SELECT
22     CAST(COUNT(*) AS INT) AS file_count,
23     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
24     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
25 FROM
26     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
27 """
28
29 BACKENDS = """
30 SELECT
31     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
32     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
33 FROM
34     pg_stat_activity;
35 """
36
37 TABLE_STATS = """
38 SELECT
39   ((sum(relpages) * 8) * 1024) AS size_relations,
40   count(1)                     AS relations
41 FROM pg_class
42 WHERE relkind IN ('r', 't');
43 """
44
45 INDEX_STATS = """
46 SELECT
47   ((sum(relpages) * 8) * 1024) AS size_indexes,
48   count(1)                     AS indexes
49 FROM pg_class
50 WHERE relkind = 'i';"""
51
52 DATABASE = """
53 SELECT
54   datname AS database_name,
55   sum(numbackends) AS connections,
56   sum(xact_commit) AS xact_commit,
57   sum(xact_rollback) AS xact_rollback,
58   sum(blks_read) AS blks_read,
59   sum(blks_hit) AS blks_hit,
60   sum(tup_returned) AS tup_returned,
61   sum(tup_fetched) AS tup_fetched,
62   sum(tup_inserted) AS tup_inserted,
63   sum(tup_updated) AS tup_updated,
64   sum(tup_deleted) AS tup_deleted,
65   sum(conflicts) AS conflicts
66 FROM pg_stat_database
67 WHERE NOT datname ~* '^template\d+'
68 GROUP BY database_name;
69 """
70
71 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
72 DATABASE_LOCKS = """
73 SELECT
74   pg_database.datname as database_name,
75   mode,
76   count(mode) AS count
77 FROM pg_locks
78   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
79 GROUP BY datname, mode
80 ORDER BY datname, mode;
81 """
82 REPLICATION = """
83 SELECT
84     client_hostname,
85     client_addr,
86     state,
87     sent_offset - (
88         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
89 FROM (
90     SELECT
91         client_addr, client_hostname, state,
92         ('x' || lpad(split_part(sent_location::text,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
93         ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
94         ('x' || lpad(split_part(sent_location::text,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
95         ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
96     FROM pg_stat_replication
97 ) AS s;
98 """
99
100 LOCK_TYPES = [
101     'ExclusiveLock',
102     'RowShareLock',
103     'SIReadLock',
104     'ShareUpdateExclusiveLock',
105     'AccessExclusiveLock',
106     'AccessShareLock',
107     'ShareRowExclusiveLock',
108     'ShareLock',
109     'RowExclusiveLock'
110 ]
111
112 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
113          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
114
115 CHARTS = {
116     'db_stat_transactions': {
117         'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', 'line'],
118         'lines': [
119             ['db_stat_xact_commit',   'committed',   'incremental'],
120             ['db_stat_xact_rollback', 'rolled back', 'incremental']
121         ]},
122     'db_stat_connections': {
123         'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', 'line'],
124         'lines': [
125             ['db_stat_connections', 'connections', 'absolute']
126         ]},
127     'db_stat_tuple_read': {
128         'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
129         'lines': [
130             ['db_stat_blks_read', 'disk',  'incremental'],
131             ['db_stat_blks_hit',  'cache', 'incremental']
132         ]},
133     'db_stat_tuple_returned': {
134         'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', 'line'],
135         'lines': [
136             ['db_stat_tup_returned', 'sequential', 'incremental'],
137             ['db_stat_tup_fetched',  'bitmap',     'incremental']
138         ]},
139     'db_stat_tuple_write': {
140         'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
141         'lines': [
142             ['db_stat_tup_inserted', 'inserted',  'incremental'],
143             ['db_stat_tup_updated',  'updated',   'incremental'],
144             ['db_stat_tup_deleted',  'deleted',   'incremental'],
145             ['db_stat_conflicts',    'conflicts', 'incremental']
146         ]},
147     'backend_process': {
148         'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', 'line'],
149         'lines': [
150             ['backend_process_active', 'active', 'absolute'],
151             ['backend_process_idle',   'idle',   'absolute']
152         ]},
153     'index_count': {
154         'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
155         'lines': [
156             ['index_count', 'total', 'absolute']
157         ]},
158     'index_size': {
159         'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
160         'lines': [
161             ['index_size', 'size', 'absolute', 1, 1024 * 1024]
162         ]},
163     'table_count': {
164         'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
165         'lines': [
166             ['table_count', 'total', 'absolute']
167         ]},
168     'table_size': {
169         'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
170         'lines': [
171             ['table_size', 'size', 'absolute', 1, 1024 * 1024]
172         ]},
173     'wal': {
174         'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
175         'lines': [
176             ['wal_total', 'total', 'incremental'],
177             ['wal_ready', 'ready', 'incremental'],
178             ['wal_done',  'done',  'incremental']
179         ]},
180     'background_writer': {
181         'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
182         'lines': [
183             ['background_writer_scheduled', 'scheduled', 'incremental'],
184             ['background_writer_requested', 'requested', 'incremental']
185         ]}
186 }
187
188
189 class Service(SimpleService):
190     def __init__(self, configuration=None, name=None):
191         super(self.__class__, self).__init__(configuration=configuration, name=name)
192         self.order = ORDER
193         self.definitions = CHARTS
194         self.table_stats = configuration.pop('table_stats', True)
195         self.index_stats = configuration.pop('index_stats', True)
196         self.configuration = configuration
197         self.connection = False
198         self.is_superuser = False
199         self.data = {}
200         self.databases = set()
201
202     def _connect(self):
203         params = dict(user='postgres',
204                       database=None,
205                       password=None,
206                       host='localhost',
207                       port=5432)
208         params.update(self.configuration)
209
210         if not self.connection:
211             try:
212                 self.connection = psycopg2.connect(**params)
213                 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
214                 self.connection.set_session(readonly=True)
215             except OperationalError:
216                 return False
217         return True
218
219     def check(self):
220         try:
221             self._connect()
222             cursor = self.connection.cursor()
223             self._discover_databases(cursor)
224             self._check_if_superuser(cursor)
225             cursor.close()
226
227             self._create_definitions()
228             return True
229         except Exception as e:
230             self.error(str(e))
231             return False
232
233     def _discover_databases(self, cursor):
234         cursor.execute("""
235             SELECT datname
236             FROM pg_stat_database
237             WHERE NOT datname ~* '^template\d+'
238         """)
239         self.databases = set(r[0] for r in cursor)
240
241     def _check_if_superuser(self, cursor):
242         cursor.execute("""
243             SELECT current_setting('is_superuser') = 'on' AS is_superuser;
244         """)
245         self.is_superuser = cursor.fetchone()[0]
246
247     def _create_definitions(self):
248         for database_name in self.databases:
249             for chart_template_name in list(CHARTS):
250                 if chart_template_name.startswith('db_stat'):
251                     self._add_database_stat_chart(chart_template_name, database_name)
252             self._add_database_lock_chart(database_name)
253
254     def _add_database_stat_chart(self, chart_template_name, database_name):
255         chart_template = CHARTS[chart_template_name]
256         chart_name = "{0}_{1}".format(database_name, chart_template_name)
257         if chart_name not in self.order:
258             self.order.insert(0, chart_name)
259             name, title, units, family, context, chart_type = chart_template['options']
260             self.definitions[chart_name] = {
261                 'options': [
262                     name,
263                     title + ': ' + database_name,
264                     units,
265                     'db ' + database_name,
266                     context,
267                     chart_type
268                 ]
269             }
270
271             self.definitions[chart_name]['lines'] = []
272             for line in deepcopy(chart_template['lines']):
273                 line[0] = "{0}_{1}".format(database_name, line[0])
274                 self.definitions[chart_name]['lines'].append(line)
275
276     def _add_database_lock_chart(self, database_name):
277         chart_name = "{0}_locks".format(database_name)
278         if chart_name not in self.order:
279             self.order.insert(-1, chart_name)
280             self.definitions[chart_name] = dict(
281                 options=
282                 [
283                     None,
284                     'Locks on db: ' + database_name,
285                     'locks',
286                     'db ' + database_name,
287                     'postgres.db_locks',
288                     'line'
289                 ],
290                 lines=[]
291             )
292
293             for lock_type in LOCK_TYPES:
294                 lock_id = "{0}_{1}".format(database_name, lock_type)
295                 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
296                 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
297
298     def _get_data(self):
299         if self._connect():
300             cursor = self.connection.cursor(cursor_factory=DictCursor)
301             try:
302                 self.add_stats(cursor)
303             except OperationalError:
304                 if self.connection.closed == 2:
305                     self.connection = False
306                 cursor.close()
307                 return None
308             else:
309                 cursor.close()
310                 return self.data
311         else:
312             return None
313
314     def add_stats(self, cursor):
315         self.add_database_stats(cursor)
316         self.add_backend_stats(cursor)
317         if self.index_stats:
318             self.add_index_stats(cursor)
319         if self.table_stats:
320             self.add_table_stats(cursor)
321         self.add_lock_stats(cursor)
322         self.add_bgwriter_stats(cursor)
323
324         # self.add_replication_stats(cursor)
325
326         if self.is_superuser:
327             self.add_wal_stats(cursor)
328
329     def add_database_stats(self, cursor):
330         cursor.execute(DATABASE)
331         for row in cursor:
332             database_name = row.get('database_name')
333             self.data["{0}_{1}".format(database_name, 'db_stat_xact_commit')]   = int(row.get('xact_commit',   0))
334             self.data["{0}_{1}".format(database_name, 'db_stat_xact_rollback')] = int(row.get('xact_rollback', 0))
335             self.data["{0}_{1}".format(database_name, 'db_stat_blks_read')]     = int(row.get('blks_read',     0))
336             self.data["{0}_{1}".format(database_name, 'db_stat_blks_hit')]      = int(row.get('blks_hit',      0))
337             self.data["{0}_{1}".format(database_name, 'db_stat_tup_returned')]  = int(row.get('tup_returned',  0))
338             self.data["{0}_{1}".format(database_name, 'db_stat_tup_fetched')]   = int(row.get('tup_fetched',   0))
339             self.data["{0}_{1}".format(database_name, 'db_stat_tup_inserted')]  = int(row.get('tup_inserted',  0))
340             self.data["{0}_{1}".format(database_name, 'db_stat_tup_updated')]   = int(row.get('tup_updated',   0))
341             self.data["{0}_{1}".format(database_name, 'db_stat_tup_deleted')]   = int(row.get('tup_deleted',   0))
342             self.data["{0}_{1}".format(database_name, 'db_stat_conflicts')]     = int(row.get('conflicts',     0))
343             self.data["{0}_{1}".format(database_name, 'db_stat_connections')]   = int(row.get('connections',   0))
344
345     def add_backend_stats(self, cursor):
346         cursor.execute(BACKENDS)
347         temp = cursor.fetchone()
348
349         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
350         self.data['backend_process_idle']   = int(temp.get('backends_idle',   0))
351
352     def add_index_stats(self, cursor):
353         cursor.execute(INDEX_STATS)
354         temp = cursor.fetchone()
355         self.data['index_count'] = int(temp.get('indexes',      0))
356         self.data['index_size']  = int(temp.get('size_indexes', 0))
357
358     def add_table_stats(self, cursor):
359         cursor.execute(TABLE_STATS)
360         temp = cursor.fetchone()
361         self.data['table_count'] = int(temp.get('relations',      0))
362         self.data['table_size']  = int(temp.get('size_relations', 0))
363
364     def add_lock_stats(self, cursor):
365         cursor.execute(DATABASE_LOCKS)
366
367         # zero out all current lock values
368         for database_name in self.databases:
369             for lock_type in LOCK_TYPES:
370                 self.data["{0}_{1}".format(database_name, lock_type)] = 0
371
372         # populate those that have current locks
373         for row in cursor:
374             database_name, lock_type, lock_count = row
375             self.data["{0}_{1}".format(database_name, lock_type)] = lock_count
376
377     def add_wal_stats(self, cursor):
378         cursor.execute(ARCHIVE)
379         temp = cursor.fetchone()
380         self.data['wal_total'] = int(temp.get('file_count',  0))
381         self.data['wal_ready'] = int(temp.get('ready_count', 0))
382         self.data['wal_done']  = int(temp.get('done_count',  0))
383
384     def add_bgwriter_stats(self, cursor):
385         cursor.execute(BGWRITER)
386         temp = cursor.fetchone()
387         self.data['background_writer_scheduled'] = temp.get('checkpoints_timed',    0)
388         self.data['background_writer_requested'] = temp.get('checkpoints_requests', 0)
389
390 '''
391     def add_replication_stats(self, cursor):
392         cursor.execute(REPLICATION)
393         temp = cursor.fetchall()
394         for row in temp:
395             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
396                                  'byte_lag',
397                                  int(row.get('byte_lag', 0)))
398 '''