]> arthur.barton.de Git - netdata.git/blob - python.d/postgres.chart.py
d359bb4f7a587039315d9b5d927785c72159b7c9
[netdata.git] / python.d / postgres.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
4
5 from copy import deepcopy
6
7 try:
8     import psycopg2
9     from psycopg2 import extensions
10     from psycopg2.extras import DictCursor
11     from psycopg2 import OperationalError
12     PSYCOPG2 = True
13 except ImportError:
14     PSYCOPG2 = False
15
16 from base import SimpleService
17
18 # default module values
19 update_every = 1
20 priority = 90000
21 retries = 60
22
23 METRICS = dict(
24     DATABASE=['connections',
25               'xact_commit',
26               'xact_rollback',
27               'blks_read',
28               'blks_hit',
29               'tup_returned',
30               'tup_fetched',
31               'tup_inserted',
32               'tup_updated',
33               'tup_deleted',
34               'conflicts',
35               'size'],
36     BACKENDS=['backends_active',
37               'backends_idle'],
38     INDEX_STATS=['index_count',
39                  'index_size'],
40     TABLE_STATS=['table_size',
41                  'table_count'],
42     ARCHIVE=['ready_count',
43              'done_count',
44              'file_count'],
45     BGWRITER=['writer_scheduled',
46               'writer_requested'],
47     LOCKS=['ExclusiveLock',
48            'RowShareLock',
49            'SIReadLock',
50            'ShareUpdateExclusiveLock',
51            'AccessExclusiveLock',
52            'AccessShareLock',
53            'ShareRowExclusiveLock',
54            'ShareLock',
55            'RowExclusiveLock']
56 )
57
58 QUERIES = dict(
59     ARCHIVE="""
60 SELECT
61     CAST(COUNT(*) AS INT) AS file_count,
62     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
63     CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
64 FROM
65     pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
66 """,
67     BACKENDS="""
68 SELECT
69     count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
70     (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
71 FROM  pg_stat_activity;
72 """,
73     TABLE_STATS="""
74 SELECT
75   ((sum(relpages) * 8) * 1024) AS table_size,
76   count(1)                     AS table_count
77 FROM pg_class
78 WHERE relkind IN ('r', 't');
79 """,
80     INDEX_STATS="""
81 SELECT
82   ((sum(relpages) * 8) * 1024) AS index_size,
83   count(1)                     AS index_count
84 FROM pg_class
85 WHERE relkind = 'i';""",
86     DATABASE="""
87 SELECT
88   datname AS database_name,
89   sum(numbackends) AS connections,
90   sum(xact_commit) AS xact_commit,
91   sum(xact_rollback) AS xact_rollback,
92   sum(blks_read) AS blks_read,
93   sum(blks_hit) AS blks_hit,
94   sum(tup_returned) AS tup_returned,
95   sum(tup_fetched) AS tup_fetched,
96   sum(tup_inserted) AS tup_inserted,
97   sum(tup_updated) AS tup_updated,
98   sum(tup_deleted) AS tup_deleted,
99   sum(conflicts) AS conflicts,
100   pg_database_size(datname) AS size
101 FROM pg_stat_database
102 WHERE NOT datname ~* '^template\d+'
103 GROUP BY database_name;
104 """,
105     BGWRITER="""
106 SELECT
107   checkpoints_timed AS writer_scheduled,
108   checkpoints_req AS writer_requested
109 FROM pg_stat_bgwriter;""",
110    LOCKS="""
111 SELECT
112   pg_database.datname as database_name,
113   mode,
114   count(mode) AS locks_count
115 FROM pg_locks
116   INNER JOIN pg_database ON pg_database.oid = pg_locks.database
117 GROUP BY datname, mode
118 ORDER BY datname, mode;
119 """,
120     FIND_DATABASES="""
121 SELECT datname FROM pg_stat_database WHERE NOT datname ~* '^template\d+'
122 """,
123     IF_SUPERUSER="""
124 SELECT current_setting('is_superuser') = 'on' AS is_superuser;
125     """)
126
127 # REPLICATION = """
128 # SELECT
129 #    client_hostname,
130 #    client_addr,
131 #    state,
132 #    sent_offset - (
133 #        replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
134 # FROM (
135 #    SELECT
136 #        client_addr, client_hostname, state,
137 #        ('x' || lpad(split_part(sent_location::text,   '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
138 #        ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
139 #        ('x' || lpad(split_part(sent_location::text,   '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
140 #        ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
141 #    FROM pg_stat_replication
142 # ) AS s;
143 # """
144
145
146 QUERY_STATS = {
147     QUERIES['DATABASE']: METRICS['DATABASE'],
148     QUERIES['BACKENDS']: METRICS['BACKENDS'],
149     QUERIES['ARCHIVE']: METRICS['ARCHIVE'],
150     QUERIES['LOCKS']: METRICS['LOCKS']
151 }
152
153 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size',
154          'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
155
156 CHARTS = {
157     'db_stat_transactions': {
158         'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
159                     'line'],
160         'lines': [
161             ['xact_commit', 'committed', 'incremental'],
162             ['xact_rollback', 'rolled back', 'incremental']
163         ]},
164     'db_stat_connections': {
165         'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
166                     'line'],
167         'lines': [
168             ['connections', 'connections', 'absolute']
169         ]},
170     'db_stat_tuple_read': {
171         'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
172         'lines': [
173             ['blks_read', 'disk', 'incremental'],
174             ['blks_hit', 'cache', 'incremental']
175         ]},
176     'db_stat_tuple_returned': {
177         'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
178                     'line'],
179         'lines': [
180             ['tup_returned', 'sequential', 'incremental'],
181             ['tup_fetched', 'bitmap', 'incremental']
182         ]},
183     'db_stat_tuple_write': {
184         'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
185         'lines': [
186             ['tup_inserted', 'inserted', 'incremental'],
187             ['tup_updated', 'updated', 'incremental'],
188             ['tup_deleted', 'deleted', 'incremental'],
189             ['conflicts', 'conflicts', 'incremental']
190         ]},
191     'database_size': {
192         'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'],
193         'lines': [
194         ]},
195     'backend_process': {
196         'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
197                     'line'],
198         'lines': [
199             ['backends_active', 'active', 'absolute'],
200             ['backends_idle', 'idle', 'absolute']
201         ]},
202     'index_count': {
203         'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
204         'lines': [
205             ['index_count', 'total', 'absolute']
206         ]},
207     'index_size': {
208         'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
209         'lines': [
210             ['index_size', 'size', 'absolute', 1, 1024 * 1024]
211         ]},
212     'table_count': {
213         'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
214         'lines': [
215             ['table_count', 'total', 'absolute']
216         ]},
217     'table_size': {
218         'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
219         'lines': [
220             ['table_size', 'size', 'absolute', 1, 1024 * 1024]
221         ]},
222     'wal': {
223         'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
224         'lines': [
225             ['file_count', 'total', 'incremental'],
226             ['ready_count', 'ready', 'incremental'],
227             ['done_count', 'done', 'incremental']
228         ]},
229     'background_writer': {
230         'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
231         'lines': [
232             ['writer_scheduled', 'scheduled', 'incremental'],
233             ['writer_requested', 'requested', 'incremental']
234         ]}
235 }
236
237
238 class Service(SimpleService):
239     def __init__(self, configuration=None, name=None):
240         super(self.__class__, self).__init__(configuration=configuration, name=name)
241         self.order = ORDER[:]
242         self.definitions = deepcopy(CHARTS)
243         self.table_stats = configuration.pop('table_stats', False)
244         self.index_stats = configuration.pop('index_stats', False)
245         self.configuration = configuration
246         self.connection = False
247         self.is_superuser = False
248         self.data = dict()
249         self.locks_zeroed = dict()
250         self.databases = list()
251
252     def _connect(self):
253         params = dict(user='postgres',
254                       database=None,
255                       password=None,
256                       host=None,
257                       port=5432)
258         params.update(self.configuration)
259
260         if not self.connection:
261             try:
262                 self.connection = psycopg2.connect(**params)
263                 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
264                 self.connection.set_session(readonly=True)
265             except OperationalError as error:
266                 return False, str(error)
267         return True, True
268
269     def check(self):
270         if not PSYCOPG2:
271             self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
272             return False
273         result, error = self._connect()
274         if not result:
275             conf = dict([(k, (lambda k, v: v if k != 'password' else '*****')(k, v)) for k, v in self.configuration.items()])
276             self.error('Failed to connect to %s. Error: %s' % (str(conf), error))
277             return False
278         try:
279             cursor = self.connection.cursor()
280             self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
281             is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
282             cursor.close()
283
284             self.locks_zeroed = populate_lock_types(self.databases)
285             self.add_additional_queries_(is_superuser)
286             self.create_dynamic_charts_()
287             return True
288         except Exception as error:
289             self.error(str(error))
290             return False
291
292     def add_additional_queries_(self, is_superuser):
293         if self.index_stats:
294             QUERY_STATS[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
295         if self.table_stats:
296             QUERY_STATS[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
297         if is_superuser:
298             QUERY_STATS[QUERIES['BGWRITER']] = METRICS['BGWRITER']
299
300     def create_dynamic_charts_(self):
301
302         for database_name in self.databases[::-1]:
303             self.definitions['database_size']['lines'].append([database_name + '_size',
304                                                                database_name, 'absolute', 1, 1024 * 1024])
305             for chart_name in [name for name in CHARTS if name.startswith('db_stat')]:
306                     add_database_stat_chart_(order=self.order, definitions=self.definitions,
307                                              name=chart_name, database_name=database_name)
308
309             add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
310
311     def _get_data(self):
312         result, error = self._connect()
313         if result:
314             cursor = self.connection.cursor(cursor_factory=DictCursor)
315             try:
316                 self.data.update(self.locks_zeroed)
317                 for query, metrics in QUERY_STATS.items():
318                     self.query_stats_(cursor, query, metrics)
319
320             except OperationalError:
321                 self.connection = False
322                 cursor.close()
323                 return None
324             else:
325                 cursor.close()
326                 return self.data
327         else:
328             return None
329
330     def query_stats_(self, cursor, query, metrics):
331         cursor.execute(query)
332         for row in cursor:
333             for metric in metrics:
334                 dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric
335                 if metric in row:
336                     self.data[dimension_id] = int(row[metric])
337                 elif 'locks_count' in row:
338                     self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0
339
340
341 def discover_databases_(cursor, query):
342     cursor.execute(query)
343     result = list()
344     for db in [database[0] for database in cursor]:
345         if db not in result:
346             result.append(db)
347     return result
348
349
350 def check_if_superuser_(cursor, query):
351     cursor.execute(query)
352     return cursor.fetchone()[0]
353
354
355 def populate_lock_types(databases):
356     result = dict()
357     for database in databases:
358         for lock_type in METRICS['LOCKS']:
359             key = '_'.join([database, lock_type])
360             result[key] = 0
361
362     return result
363
364
365 def add_database_lock_chart_(order, definitions, database_name):
366     def create_lines(database):
367         result = list()
368         for lock_type in METRICS['LOCKS']:
369             dimension_id = '_'.join([database, lock_type])
370             result.append([dimension_id, lock_type, 'absolute'])
371         return result
372
373     chart_name = database_name + '_locks'
374     order.insert(-1, chart_name)
375     definitions[chart_name] = {
376             'options':
377             [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
378             'lines': create_lines(database_name)
379             }
380
381
382 def add_database_stat_chart_(order, definitions, name, database_name):
383     def create_lines(database, lines):
384         result = list()
385         for line in lines:
386             new_line = ['_'.join([database, line[0]])] + line[1:]
387             result.append(new_line)
388         return result
389
390     chart_template = CHARTS[name]
391     chart_name = '_'.join([database_name, name])
392     order.insert(0, chart_name)
393     name, title, units, family, context, chart_type = chart_template['options']
394     definitions[chart_name] = {
395                'options': [name, title + ': ' + database_name,  units, 'db ' + database_name, context,  chart_type],
396                'lines': create_lines(database_name, chart_template['lines'])}
397
398
399 #
400 #    def add_replication_stats(self, cursor):
401 #        cursor.execute(REPLICATION)
402 #        temp = cursor.fetchall()
403 #        for row in temp:
404 #            self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
405 #                                 'byte_lag',
406 #                                 int(row.get('byte_lag', 0)))