]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
postgres_plugin: database size chart added, index and table stats disabled by default...
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 from copy import deepcopy
6
7 try:
8     import psycopg2
9     from psycopg2 import extensions
10     from psycopg2.extras import DictCursor
11     from psycopg2 import OperationalError
12     PSYCOPG2 = True
13 except ImportError:
14     PSYCOPG2 = False
15
16 from base import SimpleService
17
18 # default module values
19 update_every = 1
20 priority = 90000
21 retries = 60
22
23 METRICS = dict(
24     DATABASE=['connections',
25               'xact_commit',
26               'xact_rollback',
27               'blks_read',
28               'blks_hit',
29               'tup_returned',
30               'tup_fetched',
31               'tup_inserted',
32               'tup_updated',
33               'tup_deleted',
34               'conflicts',
35               'size'],
36     BACKENDS=['backends_active',
37               'backends_idle'],
38     INDEX_STATS=['index_count',
39                  'index_size'],
40     TABLE_STATS=['table_size',
41                  'table_count'],
42     ARCHIVE=['ready_count',
43              'done_count',
44              'file_count'],
45     BGWRITER=['writer_scheduled',
46               'writer_requested'],
47     LOCKS=['ExclusiveLock',
48            'RowShareLock',
49            'SIReadLock',
50            'ShareUpdateExclusiveLock',
51            'AccessExclusiveLock',
52            'AccessShareLock',
53            'ShareRowExclusiveLock',
54            'ShareLock',
55            'RowExclusiveLock']
56 )
57
58 QUERIES = dict(
59     ARCHIVE="""
60 SELECT
61     CAST(COUNT(*) AS INT) AS file_count,
62     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
63     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
64 FROM
65     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
66 """,
67     BACKENDS="""
68 SELECT
69     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
70     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
71 FROM  pg_stat_activity;
72 """,
73     TABLE_STATS="""
74 SELECT
75   ((sum(relpages) * 8) * 1024) AS table_size,
76   count(1)                     AS table_count
77 FROM pg_class
78 WHERE relkind IN ('r', 't');
79 """,
80     INDEX_STATS="""
81 SELECT
82   ((sum(relpages) * 8) * 1024) AS index_size,
83   count(1)                     AS index_count
84 FROM pg_class
85 WHERE relkind = 'i';""",
86     DATABASE="""
87 SELECT
88   datname AS database_name,
89   sum(numbackends) AS connections,
90   sum(xact_commit) AS xact_commit,
91   sum(xact_rollback) AS xact_rollback,
92   sum(blks_read) AS blks_read,
93   sum(blks_hit) AS blks_hit,
94   sum(tup_returned) AS tup_returned,
95   sum(tup_fetched) AS tup_fetched,
96   sum(tup_inserted) AS tup_inserted,
97   sum(tup_updated) AS tup_updated,
98   sum(tup_deleted) AS tup_deleted,
99   sum(conflicts) AS conflicts,
100   pg_database_size(datname) AS size
101 FROM pg_stat_database
102 WHERE NOT datname ~* '^template\d+'
103 GROUP BY database_name;
104 """,
105     BGWRITER="""
106 SELECT
107   checkpoints_timed AS writer_scheduled,
108   checkpoints_req AS writer_requested
109 FROM pg_stat_bgwriter;""",
110    LOCKS="""
111 SELECT
112   pg_database.datname as database_name,
113   mode,
114   count(mode) AS locks_count
115 FROM pg_locks
116   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
117 GROUP BY datname, mode
118 ORDER BY datname, mode;
119 """,
120     FIND_DATABASES="""
121 SELECT datname FROM pg_stat_database WHERE NOT datname ~* '^template\d+'
122 """,
123     IF_SUPERUSER="""
124 SELECT current_setting('is_superuser') = 'on' AS is_superuser;
125     """)
126
127 # REPLICATION = """
128 # SELECT
129 #    client_hostname,
130 #    client_addr,
131 #    state,
132 #    sent_offset - (
133 #        replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
134 # FROM (
135 #    SELECT
136 #        client_addr, client_hostname, state,
137 #        ('x' || lpad(split_part(sent_location::text,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
138 #        ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
139 #        ('x' || lpad(split_part(sent_location::text,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
140 #        ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
141 #    FROM pg_stat_replication
142 # ) AS s;
143 # """
144
145
146 QUERY_STATS = {
147     QUERIES['DATABASE']: METRICS['DATABASE'],
148     QUERIES['BACKENDS']: METRICS['BACKENDS'],
149     QUERIES['ARCHIVE']: METRICS['ARCHIVE'],
150     QUERIES['LOCKS']: METRICS['LOCKS']
151 }
152
153 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size',
154          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
155
156 CHARTS = {
157     'db_stat_transactions': {
158         'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
159                     'line'],
160         'lines': [
161             ['xact_commit', 'committed', 'incremental'],
162             ['xact_rollback', 'rolled back', 'incremental']
163         ]},
164     'db_stat_connections': {
165         'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
166                     'line'],
167         'lines': [
168             ['connections', 'connections', 'absolute']
169         ]},
170     'db_stat_tuple_read': {
171         'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
172         'lines': [
173             ['blks_read', 'disk', 'incremental'],
174             ['blks_hit', 'cache', 'incremental']
175         ]},
176     'db_stat_tuple_returned': {
177         'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
178                     'line'],
179         'lines': [
180             ['tup_returned', 'sequential', 'incremental'],
181             ['tup_fetched', 'bitmap', 'incremental']
182         ]},
183     'db_stat_tuple_write': {
184         'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
185         'lines': [
186             ['tup_inserted', 'inserted', 'incremental'],
187             ['tup_updated', 'updated', 'incremental'],
188             ['tup_deleted', 'deleted', 'incremental'],
189             ['conflicts', 'conflicts', 'incremental']
190         ]},
191     'database_size': {
192         'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'],
193         'lines': [
194         ]},
195     'backend_process': {
196         'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
197                     'line'],
198         'lines': [
199             ['backends_active', 'active', 'absolute'],
200             ['backends_idle', 'idle', 'absolute']
201         ]},
202     'index_count': {
203         'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
204         'lines': [
205             ['index_count', 'total', 'absolute']
206         ]},
207     'index_size': {
208         'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
209         'lines': [
210             ['index_size', 'size', 'absolute', 1, 1024 * 1024]
211         ]},
212     'table_count': {
213         'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
214         'lines': [
215             ['table_count', 'total', 'absolute']
216         ]},
217     'table_size': {
218         'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
219         'lines': [
220             ['table_size', 'size', 'absolute', 1, 1024 * 1024]
221         ]},
222     'wal': {
223         'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
224         'lines': [
225             ['file_count', 'total', 'incremental'],
226             ['ready_count', 'ready', 'incremental'],
227             ['done_count', 'done', 'incremental']
228         ]},
229     'background_writer': {
230         'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
231         'lines': [
232             ['writer_scheduled', 'scheduled', 'incremental'],
233             ['writer_requested', 'requested', 'incremental']
234         ]}
235 }
236
237
238 class Service(SimpleService):
239     def __init__(self, configuration=None, name=None):
240         super(self.__class__, self).__init__(configuration=configuration, name=name)
241         self.order = ORDER[:]
242         self.definitions = deepcopy(CHARTS)
243         self.table_stats = configuration.pop('table_stats', False)
244         self.index_stats = configuration.pop('index_stats', False)
245         self.configuration = configuration
246         self.connection = False
247         self.is_superuser = False
248         self.data = dict()
249         self.locks_zeroed = dict()
250         self.databases = list()
251
252     def _connect(self):
253         params = dict(user='postgres',
254                       database=None,
255                       password=None,
256                       host=None,
257                       port=5432)
258         params.update(self.configuration)
259
260         if not self.connection:
261             try:
262                 self.connection = psycopg2.connect(**params)
263                 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
264                 self.connection.set_session(readonly=True)
265             except OperationalError:
266                 return False
267         return True
268
269     def check(self):
270         if not PSYCOPG2:
271             self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
272             return False
273         if not self._connect():
274             self.error('Can\'t connect to %s' % str(self.configuration))
275             return False
276         try:
277             cursor = self.connection.cursor()
278             self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
279             is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
280             cursor.close()
281
282             self.locks_zeroed = populate_lock_types(self.databases)
283             self.add_additional_queries_(is_superuser)
284             self.create_dynamic_charts_()
285             return True
286         except Exception as error:
287             self.error(str(error))
288             return False
289
290     def add_additional_queries_(self, is_superuser):
291         if self.index_stats:
292             QUERY_STATS[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
293         if self.table_stats:
294             QUERY_STATS[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
295         if is_superuser:
296             QUERY_STATS[QUERIES['BGWRITER']] = METRICS['BGWRITER']
297
298     def create_dynamic_charts_(self):
299
300         for database_name in self.databases[::-1]:
301             self.definitions['database_size']['lines'].append([database_name + '_size',
302                                                                database_name, 'absolute', 1, 1024 * 1024])
303             for chart_name in [name for name in CHARTS if name.startswith('db_stat')]:
304                     add_database_stat_chart_(order=self.order, definitions=self.definitions,
305                                              name=chart_name, database_name=database_name)
306
307             add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
308
309     def _get_data(self):
310         if self._connect():
311             cursor = self.connection.cursor(cursor_factory=DictCursor)
312             try:
313                 self.data.update(self.locks_zeroed)
314                 for query, metrics in QUERY_STATS.items():
315                     self.query_stats_(cursor, query, metrics)
316
317             except OperationalError:
318                 self.connection = False
319                 cursor.close()
320                 return None
321             else:
322                 cursor.close()
323                 return self.data
324         else:
325             return None
326
327     def query_stats_(self, cursor, query, metrics):
328         cursor.execute(query)
329         for row in cursor:
330             for metric in metrics:
331                 dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric
332                 if metric in row:
333                     self.data[dimension_id] = int(row[metric])
334                 elif 'locks_count' in row:
335                     self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0
336
337
338 def discover_databases_(cursor, query):
339     cursor.execute(query)
340     result = list()
341     for db in [database[0] for database in cursor]:
342         if db not in result:
343             result.append(db)
344     return result
345
346
347 def check_if_superuser_(cursor, query):
348     cursor.execute(query)
349     return cursor.fetchone()[0]
350
351
352 def populate_lock_types(databases):
353     result = dict()
354     for database in databases:
355         for lock_type in METRICS['LOCKS']:
356             key = '_'.join([database, lock_type])
357             result[key] = 0
358
359     return result
360
361
362 def add_database_lock_chart_(order, definitions, database_name):
363     def create_lines(database):
364         result = list()
365         for lock_type in METRICS['LOCKS']:
366             dimension_id = '_'.join([database, lock_type])
367             result.append([dimension_id, lock_type, 'absolute'])
368         return result
369
370     chart_name = database_name + '_locks'
371     order.insert(-1, chart_name)
372     definitions[chart_name] = {
373             'options':
374             [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
375             'lines': create_lines(database_name)
376             }
377
378
379 def add_database_stat_chart_(order, definitions, name, database_name):
380     def create_lines(database, lines):
381         result = list()
382         for line in lines:
383             new_line = ['_'.join([database, line[0]])] + line[1:]
384             result.append(new_line)
385         return result
386
387     chart_template = CHARTS[name]
388     chart_name = '_'.join([database_name, name])
389     order.insert(0, chart_name)
390     name, title, units, family, context, chart_type = chart_template['options']
391     definitions[chart_name] = {
392                'options': [name, title + ': ' + database_name,  units, 'db ' + database_name, context,  chart_type],
393                'lines': create_lines(database_name, chart_template['lines'])}
394
395
396 #
397 #    def add_replication_stats(self, cursor):
398 #        cursor.execute(REPLICATION)
399 #        temp = cursor.fetchall()
400 #        for row in temp:
401 #            self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
402 #                                 'byte_lag',
403 #                                 int(row.get('byte_lag', 0)))