1 # -*- coding: utf-8 -*-
2 # Description: example netdata python.d module
3 # Authors: facetoe, dangtranhoang
6 from copy import deepcopy
9 from psycopg2 import extensions
10 from psycopg2.extras import DictCursor
11 from psycopg2 import OperationalError
13 from base import SimpleService
15 # default module values
22 CAST(COUNT(*) AS INT) AS file_count,
23 CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count,
24 CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count
26 pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
31 count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active,
32 (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle
39 ((sum(relpages) * 8) * 1024) AS size_relations,
42 WHERE relkind IN ('r', 't');
47 ((sum(relpages) * 8) * 1024) AS size_indexes,
50 WHERE relkind = 'i';"""
54 datname AS database_name,
55 sum(numbackends) AS connections,
56 sum(xact_commit) AS xact_commit,
57 sum(xact_rollback) AS xact_rollback,
58 sum(blks_read) AS blks_read,
59 sum(blks_hit) AS blks_hit,
60 sum(tup_returned) AS tup_returned,
61 sum(tup_fetched) AS tup_fetched,
62 sum(tup_inserted) AS tup_inserted,
63 sum(tup_updated) AS tup_updated,
64 sum(tup_deleted) AS tup_deleted,
65 sum(conflicts) AS conflicts
67 WHERE NOT datname ~* '^template\d+'
68 GROUP BY database_name;
71 BGWRITER = 'SELECT * FROM pg_stat_bgwriter;'
74 pg_database.datname as database_name,
78 INNER JOIN pg_database ON pg_database.oid = pg_locks.database
79 GROUP BY datname, mode
80 ORDER BY datname, mode;
88 replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag
91 client_addr, client_hostname, state,
92 ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog,
93 ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog,
94 ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset,
95 ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset
96 FROM pg_stat_replication
104 'ShareUpdateExclusiveLock',
105 'AccessExclusiveLock',
107 'ShareRowExclusiveLock',
112 ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write',
113 'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer']
116 'db_stat_transactions': {
117 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', 'line'],
119 ['db_stat_xact_commit', 'committed', 'incremental'],
120 ['db_stat_xact_rollback', 'rolled back', 'incremental']
122 'db_stat_connections': {
123 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', 'line'],
125 ['db_stat_connections', 'connections', 'absolute']
127 'db_stat_tuple_read': {
128 'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'],
130 ['db_stat_blks_read', 'disk', 'incremental'],
131 ['db_stat_blks_hit', 'cache', 'incremental']
133 'db_stat_tuple_returned': {
134 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', 'line'],
136 ['db_stat_tup_returned', 'sequential', 'incremental'],
137 ['db_stat_tup_fetched', 'bitmap', 'incremental']
139 'db_stat_tuple_write': {
140 'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
142 ['db_stat_tup_inserted', 'inserted', 'incremental'],
143 ['db_stat_tup_updated', 'updated', 'incremental'],
144 ['db_stat_tup_deleted', 'deleted', 'incremental'],
145 ['db_stat_conflicts', 'conflicts', 'incremental']
148 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', 'line'],
150 ['backend_process_active', 'active', 'absolute'],
151 ['backend_process_idle', 'idle', 'absolute']
154 'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
156 ['index_count', 'total', 'absolute']
159 'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'],
161 ['index_size', 'size', 'absolute', 1, 1024 * 1024]
164 'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
166 ['table_count', 'total', 'absolute']
169 'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'],
171 ['table_size', 'size', 'absolute', 1, 1024 * 1024]
174 'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'],
176 ['wal_total', 'total', 'incremental'],
177 ['wal_ready', 'ready', 'incremental'],
178 ['wal_done', 'done', 'incremental']
180 'background_writer': {
181 'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'],
183 ['background_writer_scheduled', 'scheduled', 'incremental'],
184 ['background_writer_requested', 'requested', 'incremental']
189 class Service(SimpleService):
190 def __init__(self, configuration=None, name=None):
191 super(self.__class__, self).__init__(configuration=configuration, name=name)
193 self.definitions = CHARTS
194 self.table_stats = configuration.pop('table_stats', True)
195 self.index_stats = configuration.pop('index_stats', True)
196 self.configuration = configuration
197 self.connection = False
198 self.is_superuser = False
200 self.databases = set()
203 params = dict(user='postgres',
208 params.update(self.configuration)
210 if not self.connection:
212 self.connection = psycopg2.connect(**params)
213 self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
214 self.connection.set_session(readonly=True)
215 except OperationalError:
222 cursor = self.connection.cursor()
223 self._discover_databases(cursor)
224 self._check_if_superuser(cursor)
227 self._create_definitions()
229 except Exception as e:
233 def _discover_databases(self, cursor):
236 FROM pg_stat_database
237 WHERE NOT datname ~* '^template\d+'
239 self.databases = set(r[0] for r in cursor)
241 def _check_if_superuser(self, cursor):
243 SELECT current_setting('is_superuser') = 'on' AS is_superuser;
245 self.is_superuser = cursor.fetchone()[0]
247 def _create_definitions(self):
248 for database_name in self.databases:
249 for chart_template_name in list(CHARTS):
250 if chart_template_name.startswith('db_stat'):
251 self._add_database_stat_chart(chart_template_name, database_name)
252 self._add_database_lock_chart(database_name)
254 def _add_database_stat_chart(self, chart_template_name, database_name):
255 chart_template = CHARTS[chart_template_name]
256 chart_name = "{0}_{1}".format(database_name, chart_template_name)
257 if chart_name not in self.order:
258 self.order.insert(0, chart_name)
259 name, title, units, family, context, chart_type = chart_template['options']
260 self.definitions[chart_name] = {
263 title + ': ' + database_name,
265 'db ' + database_name,
271 self.definitions[chart_name]['lines'] = []
272 for line in deepcopy(chart_template['lines']):
273 line[0] = "{0}_{1}".format(database_name, line[0])
274 self.definitions[chart_name]['lines'].append(line)
276 def _add_database_lock_chart(self, database_name):
277 chart_name = "{0}_locks".format(database_name)
278 if chart_name not in self.order:
279 self.order.insert(-1, chart_name)
280 self.definitions[chart_name] = dict(
284 'Locks on db: ' + database_name,
286 'db ' + database_name,
293 for lock_type in LOCK_TYPES:
294 lock_id = "{0}_{1}".format(database_name, lock_type)
295 label = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", lock_type)
296 self.definitions[chart_name]['lines'].append([lock_id, label, 'absolute'])
300 cursor = self.connection.cursor(cursor_factory=DictCursor)
302 self.add_stats(cursor)
303 except OperationalError:
304 if self.connection.closed == 2:
305 self.connection = False
314 def add_stats(self, cursor):
315 self.add_database_stats(cursor)
316 self.add_backend_stats(cursor)
318 self.add_index_stats(cursor)
320 self.add_table_stats(cursor)
321 self.add_lock_stats(cursor)
322 self.add_bgwriter_stats(cursor)
324 # self.add_replication_stats(cursor)
326 if self.is_superuser:
327 self.add_wal_stats(cursor)
329 def add_database_stats(self, cursor):
330 cursor.execute(DATABASE)
332 database_name = row.get('database_name')
333 self.data["{0}_{1}".format(database_name, 'db_stat_xact_commit')] = int(row.get('xact_commit', 0))
334 self.data["{0}_{1}".format(database_name, 'db_stat_xact_rollback')] = int(row.get('xact_rollback', 0))
335 self.data["{0}_{1}".format(database_name, 'db_stat_blks_read')] = int(row.get('blks_read', 0))
336 self.data["{0}_{1}".format(database_name, 'db_stat_blks_hit')] = int(row.get('blks_hit', 0))
337 self.data["{0}_{1}".format(database_name, 'db_stat_tup_returned')] = int(row.get('tup_returned', 0))
338 self.data["{0}_{1}".format(database_name, 'db_stat_tup_fetched')] = int(row.get('tup_fetched', 0))
339 self.data["{0}_{1}".format(database_name, 'db_stat_tup_inserted')] = int(row.get('tup_inserted', 0))
340 self.data["{0}_{1}".format(database_name, 'db_stat_tup_updated')] = int(row.get('tup_updated', 0))
341 self.data["{0}_{1}".format(database_name, 'db_stat_tup_deleted')] = int(row.get('tup_deleted', 0))
342 self.data["{0}_{1}".format(database_name, 'db_stat_conflicts')] = int(row.get('conflicts', 0))
343 self.data["{0}_{1}".format(database_name, 'db_stat_connections')] = int(row.get('connections', 0))
345 def add_backend_stats(self, cursor):
346 cursor.execute(BACKENDS)
347 temp = cursor.fetchone()
349 self.data['backend_process_active'] = int(temp.get('backends_active', 0))
350 self.data['backend_process_idle'] = int(temp.get('backends_idle', 0))
352 def add_index_stats(self, cursor):
353 cursor.execute(INDEX_STATS)
354 temp = cursor.fetchone()
355 self.data['index_count'] = int(temp.get('indexes', 0))
356 self.data['index_size'] = int(temp.get('size_indexes', 0))
358 def add_table_stats(self, cursor):
359 cursor.execute(TABLE_STATS)
360 temp = cursor.fetchone()
361 self.data['table_count'] = int(temp.get('relations', 0))
362 self.data['table_size'] = int(temp.get('size_relations', 0))
364 def add_lock_stats(self, cursor):
365 cursor.execute(DATABASE_LOCKS)
367 # zero out all current lock values
368 for database_name in self.databases:
369 for lock_type in LOCK_TYPES:
370 self.data["{0}_{1}".format(database_name, lock_type)] = 0
372 # populate those that have current locks
374 database_name, lock_type, lock_count = row
375 self.data["{0}_{1}".format(database_name, lock_type)] = lock_count
377 def add_wal_stats(self, cursor):
378 cursor.execute(ARCHIVE)
379 temp = cursor.fetchone()
380 self.data['wal_total'] = int(temp.get('file_count', 0))
381 self.data['wal_ready'] = int(temp.get('ready_count', 0))
382 self.data['wal_done'] = int(temp.get('done_count', 0))
384 def add_bgwriter_stats(self, cursor):
385 cursor.execute(BGWRITER)
386 temp = cursor.fetchone()
387 self.data['background_writer_scheduled'] = temp.get('checkpoints_timed', 0)
388 self.data['background_writer_requested'] = temp.get('checkpoints_requests', 0)
391 def add_replication_stats(self, cursor):
392 cursor.execute(REPLICATION)
393 temp = cursor.fetchall()
395 self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'),
397 int(row.get('byte_lag', 0)))