]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
mongodb_plugin: replica set heartbeat latency chart added
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 import re
6 from copy import deepcopy
7
8 import psycopg2
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11 from psycopg2 import OperationalError
12
13 from base import SimpleService
14
15 # default module values
16 update_every = 1
17 priority = 90000
18 retries = 60
19
20 ARCHIVE = """
21 SELECT
22     CAST(COUNT(*) AS INT) AS file_count,
23     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
24     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
25 FROM
26     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
27 """
28
29 BACKENDS = """
30 SELECT
31     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
32     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
33 FROM
34     pg_stat_activity;
35 """
36
37 TABLE_STATS = """
38 SELECT
39   ((sum(relpages) * 8) * 1024) AS size_relations,
40   count(1)                     AS relations
41 FROM pg_class
42 WHERE relkind IN ('r', 't');
43 """
44
45 INDEX_STATS = """
46 SELECT
47   ((sum(relpages) * 8) * 1024) AS size_indexes,
48   count(1)                     AS indexes
49 FROM pg_class
50 WHERE relkind = 'i';"""
51
52 DATABASE = """
53 SELECT
54   datname AS database_name,
55   sum(numbackends) AS connections,
56   sum(xact_commit) AS xact_commit,
57   sum(xact_rollback) AS xact_rollback,
58   sum(blks_read) AS blks_read,
59   sum(blks_hit) AS blks_hit,
60   sum(tup_returned) AS tup_returned,
61   sum(tup_fetched) AS tup_fetched,
62   sum(tup_inserted) AS tup_inserted,
63   sum(tup_updated) AS tup_updated,
64   sum(tup_deleted) AS tup_deleted,
65   sum(conflicts) AS conflicts
66 FROM pg_stat_database
67 WHERE NOT datname ~* '^template\d+'
68 GROUP BY database_name;
69 """
70
71 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
72 DATABASE_LOCKS = """
73 SELECT
74   pg_database.datname as database_name,
75   mode,
76   count(mode) AS count
77 FROM pg_locks
78   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
79 GROUP BY datname, mode
80 ORDER BY datname, mode;
81 """
82 REPLICATION = """
83 SELECT
84     client_hostname,
85     client_addr,
86     state,
87     sent_offset - (
88         replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
89 FROM (
90     SELECT
91         client_addr, client_hostname, state,
92         ('x' || lpad(split_part(sent_location::text,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
93         ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
94         ('x' || lpad(split_part(sent_location::text,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
95         ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
96     FROM pg_stat_replication
97 ) AS s;
98 """
99
100 LOCK_TYPES = [
101     'ExclusiveLock',
102     'RowShareLock',
103     'SIReadLock',
104     'ShareUpdateExclusiveLock',
105     'AccessExclusiveLock',
106     'AccessShareLock',
107     'ShareRowExclusiveLock',
108     'ShareLock',
109     'RowExclusiveLock'
110 ]
111
112 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
113          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
114
115 CHARTS = {
116     'db_stat_transactions': {
117         'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', 'line'],
118         'lines': [
119             ['db_stat_xact_commit',   'committed',   'incremental'],
120             ['db_stat_xact_rollback', 'rolled back', 'incremental']
121         ]},
122     'db_stat_connections': {
123         'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', 'line'],
124         'lines': [
125             ['db_stat_connections', 'connections', 'absolute']
126         ]},
127     'db_stat_tuple_read': {
128         'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
129         'lines': [
130             ['db_stat_blks_read', 'disk',  'incremental'],
131             ['db_stat_blks_hit',  'cache', 'incremental']
132         ]},
133     'db_stat_tuple_returned': {
134         'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', 'line'],
135         'lines': [
136             ['db_stat_tup_returned', 'sequential', 'incremental'],
137             ['db_stat_tup_fetched',  'bitmap',     'incremental']
138         ]},
139     'db_stat_tuple_write': {
140         'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
141         'lines': [
142             ['db_stat_tup_inserted', 'inserted',  'incremental'],
143             ['db_stat_tup_updated',  'updated',   'incremental'],
144             ['db_stat_tup_deleted',  'deleted',   'incremental'],
145             ['db_stat_conflicts',    'conflicts', 'incremental']
146         ]},
147     'backend_process': {
148         'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', 'line'],
149         'lines': [
150             ['backend_process_active', 'active', 'absolute'],
151             ['backend_process_idle',   'idle',   'absolute']
152         ]},
153     'index_count': {
154         'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
155         'lines': [
156             ['index_count', 'total', 'absolute']
157         ]},
158     'index_size': {
159         'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
160         'lines': [
161             ['index_size', 'size', 'absolute', 1, 1024 * 1024]
162         ]},
163     'table_count': {
164         'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
165         'lines': [
166             ['table_count', 'total', 'absolute']
167         ]},
168     'table_size': {
169         'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
170         'lines': [
171             ['table_size', 'size', 'absolute', 1, 1024 * 1024]
172         ]},
173     'wal': {
174         'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
175         'lines': [
176             ['wal_total', 'total', 'incremental'],
177             ['wal_ready', 'ready', 'incremental'],
178             ['wal_done',  'done',  'incremental']
179         ]},
180     'background_writer': {
181         'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
182         'lines': [
183             ['background_writer_scheduled', 'scheduled', 'incremental'],
184             ['background_writer_requested', 'requested', 'incremental']
185         ]}
186 }
187
188
189 class Service(SimpleService):
190     def __init__(self, configuration=None, name=None):
191         super(self.__class__, self).__init__(configuration=configuration, name=name)
192         self.order = ORDER
193         self.definitions = CHARTS
194         self.table_stats = configuration.pop('table_stats', True)
195         self.index_stats = configuration.pop('index_stats', True)
196         self.configuration = configuration
197         self.connection = False
198         self.is_superuser = False
199         self.data = {}
200         self.databases = set()
201
202     def _connect(self):
203         params = dict(user='postgres',
204                       database=None,
205                       password=None,
206                       host=None,
207                       port=5432)
208         params.update(self.configuration)
209
210         if not self.connection:
211             try:
212                 self.connection = psycopg2.connect(**params)
213                 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
214                 self.connection.set_session(readonly=True)
215             except OperationalError:
216                 return False
217         return True
218
219     def check(self):
220         try:
221             if not self._connect():
222                 self.error('Can\'t connect to %s' % str(self.configuration))
223                 return False
224             cursor = self.connection.cursor()
225             self._discover_databases(cursor)
226             self._check_if_superuser(cursor)
227             cursor.close()
228
229             self._create_definitions()
230             return True
231         except Exception as e:
232             self.error(str(e))
233             return False
234
235     def _discover_databases(self, cursor):
236         cursor.execute("""
237             SELECT datname
238             FROM pg_stat_database
239             WHERE NOT datname ~* '^template\d+'
240         """)
241         self.databases = set(r[0] for r in cursor)
242
243     def _check_if_superuser(self, cursor):
244         cursor.execute("""
245             SELECT current_setting('is_superuser') = 'on' AS is_superuser;
246         """)
247         self.is_superuser = cursor.fetchone()[0]
248
249     def _create_definitions(self):
250         for database_name in self.databases:
251             for chart_template_name in list(CHARTS):
252                 if chart_template_name.startswith('db_stat'):
253                     self._add_database_stat_chart(chart_template_name, database_name)
254             self._add_database_lock_chart(database_name)
255
256     def _add_database_stat_chart(self, chart_template_name, database_name):
257         chart_template = CHARTS[chart_template_name]
258         chart_name = "{0}_{1}".format(database_name, chart_template_name)
259         if chart_name not in self.order:
260             self.order.insert(0, chart_name)
261             name, title, units, family, context, chart_type = chart_template['options']
262             self.definitions[chart_name] = {
263                 'options': [
264                     name,
265                     title + ': ' + database_name,
266                     units,
267                     'db ' + database_name,
268                     context,
269                     chart_type
270                 ]
271             }
272
273             self.definitions[chart_name]['lines'] = []
274             for line in deepcopy(chart_template['lines']):
275                 line[0] = "{0}_{1}".format(database_name, line[0])
276                 self.definitions[chart_name]['lines'].append(line)
277
278     def _add_database_lock_chart(self, database_name):
279         chart_name = "{0}_locks".format(database_name)
280         if chart_name not in self.order:
281             self.order.insert(-1, chart_name)
282             self.definitions[chart_name] = dict(
283                 options=
284                 [
285                     None,
286                     'Locks on db: ' + database_name,
287                     'locks',
288                     'db ' + database_name,
289                     'postgres.db_locks',
290                     'line'
291                 ],
292                 lines=[]
293             )
294
295             for lock_type in LOCK_TYPES:
296                 lock_id = "{0}_{1}".format(database_name, lock_type)
297                 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
298                 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
299
300     def _get_data(self):
301         if self._connect():
302             cursor = self.connection.cursor(cursor_factory=DictCursor)
303             try:
304                 self.add_stats(cursor)
305             except OperationalError:
306                 self.connection = False
307                 cursor.close()
308                 return None
309             else:
310                 cursor.close()
311                 return self.data
312         else:
313             return None
314
315     def add_stats(self, cursor):
316         self.add_database_stats(cursor)
317         self.add_backend_stats(cursor)
318         if self.index_stats:
319             self.add_index_stats(cursor)
320         if self.table_stats:
321             self.add_table_stats(cursor)
322         self.add_lock_stats(cursor)
323         self.add_bgwriter_stats(cursor)
324
325         # self.add_replication_stats(cursor)
326
327         if self.is_superuser:
328             self.add_wal_stats(cursor)
329
330     def add_database_stats(self, cursor):
331         cursor.execute(DATABASE)
332         for row in cursor:
333             database_name = row.get('database_name')
334             self.data["{0}_{1}".format(database_name, 'db_stat_xact_commit')]   = int(row.get('xact_commit',   0))
335             self.data["{0}_{1}".format(database_name, 'db_stat_xact_rollback')] = int(row.get('xact_rollback', 0))
336             self.data["{0}_{1}".format(database_name, 'db_stat_blks_read')]     = int(row.get('blks_read',     0))
337             self.data["{0}_{1}".format(database_name, 'db_stat_blks_hit')]      = int(row.get('blks_hit',      0))
338             self.data["{0}_{1}".format(database_name, 'db_stat_tup_returned')]  = int(row.get('tup_returned',  0))
339             self.data["{0}_{1}".format(database_name, 'db_stat_tup_fetched')]   = int(row.get('tup_fetched',   0))
340             self.data["{0}_{1}".format(database_name, 'db_stat_tup_inserted')]  = int(row.get('tup_inserted',  0))
341             self.data["{0}_{1}".format(database_name, 'db_stat_tup_updated')]   = int(row.get('tup_updated',   0))
342             self.data["{0}_{1}".format(database_name, 'db_stat_tup_deleted')]   = int(row.get('tup_deleted',   0))
343             self.data["{0}_{1}".format(database_name, 'db_stat_conflicts')]     = int(row.get('conflicts',     0))
344             self.data["{0}_{1}".format(database_name, 'db_stat_connections')]   = int(row.get('connections',   0))
345
346     def add_backend_stats(self, cursor):
347         cursor.execute(BACKENDS)
348         temp = cursor.fetchone()
349
350         self.data['backend_process_active'] = int(temp.get('backends_active', 0))
351         self.data['backend_process_idle']   = int(temp.get('backends_idle',   0))
352
353     def add_index_stats(self, cursor):
354         cursor.execute(INDEX_STATS)
355         temp = cursor.fetchone()
356         self.data['index_count'] = int(temp.get('indexes',      0))
357         self.data['index_size']  = int(temp.get('size_indexes', 0))
358
359     def add_table_stats(self, cursor):
360         cursor.execute(TABLE_STATS)
361         temp = cursor.fetchone()
362         self.data['table_count'] = int(temp.get('relations',      0))
363         self.data['table_size']  = int(temp.get('size_relations', 0))
364
365     def add_lock_stats(self, cursor):
366         cursor.execute(DATABASE_LOCKS)
367
368         # zero out all current lock values
369         for database_name in self.databases:
370             for lock_type in LOCK_TYPES:
371                 self.data["{0}_{1}".format(database_name, lock_type)] = 0
372
373         # populate those that have current locks
374         for row in cursor:
375             database_name, lock_type, lock_count = row
376             self.data["{0}_{1}".format(database_name, lock_type)] = lock_count
377
378     def add_wal_stats(self, cursor):
379         cursor.execute(ARCHIVE)
380         temp = cursor.fetchone()
381         self.data['wal_total'] = int(temp.get('file_count',  0))
382         self.data['wal_ready'] = int(temp.get('ready_count', 0))
383         self.data['wal_done']  = int(temp.get('done_count',  0))
384
385     def add_bgwriter_stats(self, cursor):
386         cursor.execute(BGWRITER)
387         temp = cursor.fetchone()
388         self.data['background_writer_scheduled'] = temp.get('checkpoints_timed',    0)
389         self.data['background_writer_requested'] = temp.get('checkpoints_requests', 0)
390
391 '''
392     def add_replication_stats(self, cursor):
393         cursor.execute(REPLICATION)
394         temp = cursor.fetchall()
395         for row in temp:
396             self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
397                                  'byte_lag',
398                                  int(row.get('byte_lag', 0)))
399 '''