X-Git-Url: https://arthur.barton.de/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=python.d%2Fpostgres.chart.py;h=1976e2a61c49a0bb025677ac4c347f47ec3829a1;hb=HEAD;hp=189717511a13072284ded7cf829c52450ecac8ce;hpb=d9d00ace4820c917acfaaeedea714f78003ec01e;p=netdata.git diff --git a/python.d/postgres.chart.py b/python.d/postgres.chart.py index 18971751..1976e2a6 100644 --- a/python.d/postgres.chart.py +++ b/python.d/postgres.chart.py @@ -1,101 +1,410 @@ # -*- coding: utf-8 -*- +# Description: example netdata python.d module +# Authors: facetoe, dangtranhoang + +from copy import deepcopy + +try: + import psycopg2 + from psycopg2 import extensions + from psycopg2.extras import DictCursor + from psycopg2 import OperationalError + PSYCOPG2 = True +except ImportError: + PSYCOPG2 = False -import psycopg2 from base import SimpleService -from psycopg2.extras import DictCursor # default module values update_every = 1 priority = 90000 retries = 60 -# Default Config options. -# { -# 'database': None, -# 'user': 'postgres', -# 'password': None, -# 'host': 'localhost', -# 'port': 5432 -# } +METRICS = dict( + DATABASE=['connections', + 'xact_commit', + 'xact_rollback', + 'blks_read', + 'blks_hit', + 'tup_returned', + 'tup_fetched', + 'tup_inserted', + 'tup_updated', + 'tup_deleted', + 'conflicts', + 'size'], + BACKENDS=['backends_active', + 'backends_idle'], + INDEX_STATS=['index_count', + 'index_size'], + TABLE_STATS=['table_size', + 'table_count'], + ARCHIVE=['ready_count', + 'done_count', + 'file_count'], + BGWRITER=['writer_scheduled', + 'writer_requested'], + LOCKS=['ExclusiveLock', + 'RowShareLock', + 'SIReadLock', + 'ShareUpdateExclusiveLock', + 'AccessExclusiveLock', + 'AccessShareLock', + 'ShareRowExclusiveLock', + 'ShareLock', + 'RowExclusiveLock'] +) + +QUERIES = dict( + ARCHIVE=""" +SELECT + CAST(COUNT(*) AS INT) AS file_count, + CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)), 0) AS INT) AS ready_count, + CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)), 0) AS INT) AS done_count +FROM + pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file); +""", + BACKENDS=""" +SELECT + count(*) - (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle') AS backends_active, + (SELECT count(*) FROM pg_stat_activity WHERE state = 'idle' ) AS backends_idle +FROM pg_stat_activity; +""", + TABLE_STATS=""" +SELECT + ((sum(relpages) * 8) * 1024) AS table_size, + count(1) AS table_count +FROM pg_class +WHERE relkind IN ('r', 't'); +""", + INDEX_STATS=""" +SELECT + ((sum(relpages) * 8) * 1024) AS index_size, + count(1) AS index_count +FROM pg_class +WHERE relkind = 'i';""", + DATABASE=""" +SELECT + datname AS database_name, + sum(numbackends) AS connections, + sum(xact_commit) AS xact_commit, + sum(xact_rollback) AS xact_rollback, + sum(blks_read) AS blks_read, + sum(blks_hit) AS blks_hit, + sum(tup_returned) AS tup_returned, + sum(tup_fetched) AS tup_fetched, + sum(tup_inserted) AS tup_inserted, + sum(tup_updated) AS tup_updated, + sum(tup_deleted) AS tup_deleted, + sum(conflicts) AS conflicts, + pg_database_size(datname) AS size +FROM pg_stat_database +WHERE NOT datname ~* '^template\d+' +GROUP BY database_name; +""", + BGWRITER=""" +SELECT + checkpoints_timed AS writer_scheduled, + checkpoints_req AS writer_requested +FROM pg_stat_bgwriter;""", + LOCKS=""" +SELECT + pg_database.datname as database_name, + mode, + count(mode) AS locks_count +FROM pg_locks + INNER JOIN pg_database ON pg_database.oid = pg_locks.database +GROUP BY datname, mode +ORDER BY datname, mode; +""", + FIND_DATABASES=""" +SELECT datname FROM pg_stat_database WHERE NOT datname ~* '^template\d+' +""", + IF_SUPERUSER=""" +SELECT current_setting('is_superuser') = 'on' AS is_superuser; + """) + +# REPLICATION = """ +# SELECT +# client_hostname, +# client_addr, +# state, +# sent_offset - ( +# replay_offset - (sent_xlog - replay_xlog) * 255 * 16 ^ 6 ) AS byte_lag +# FROM ( +# SELECT +# client_addr, client_hostname, state, +# ('x' || lpad(split_part(sent_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS sent_xlog, +# ('x' || lpad(split_part(replay_location::text, '/', 1), 8, '0'))::bit(32)::bigint AS replay_xlog, +# ('x' || lpad(split_part(sent_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS sent_offset, +# ('x' || lpad(split_part(replay_location::text, '/', 2), 8, '0'))::bit(32)::bigint AS replay_offset +# FROM pg_stat_replication +# ) AS s; +# """ + + +QUERY_STATS = { + QUERIES['DATABASE']: METRICS['DATABASE'], + QUERIES['BACKENDS']: METRICS['BACKENDS'], + QUERIES['ARCHIVE']: METRICS['ARCHIVE'], + QUERIES['LOCKS']: METRICS['LOCKS'] +} + +ORDER = ['db_stat_transactions', 'db_stat_tuple_read', 'db_stat_tuple_returned', 'db_stat_tuple_write', 'database_size', + 'backend_process', 'index_count', 'index_size', 'table_count', 'table_size', 'wal', 'background_writer'] CHARTS = { - "Tuples": { - 'options': ["tuples", "PostgreSQL tuple access", "Tuples / sec", "tuples", "postgres.tuples", "line"], + 'db_stat_transactions': { + 'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions', + 'line'], + 'lines': [ + ['xact_commit', 'committed', 'incremental'], + ['xact_rollback', 'rolled back', 'incremental'] + ]}, + 'db_stat_connections': { + 'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections', + 'line'], 'lines': [ - ["tup_inserted", "tup_inserted", "incremental", 1, 1], - ["tup_inserted", "tup_inserted", "incremental", 1, 1], - ["tup_fetched", "tup_fetched", "incremental", 1, 1], - ["tup_updated", "tup_updated", "incremental", 1, 1], - ["tup_deleted", "tup_deleted", "incremental", 1, 1], + ['connections', 'connections', 'absolute'] ]}, - "Transactions": { - 'options': ["transactions", "Transactions", "transactions / sec", "transactions", "postgres.transactions", "line"], + 'db_stat_tuple_read': { + 'options': [None, 'Tuple reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_tuple_read', 'line'], 'lines': [ - ["xact_commit", "xact_commit", "incremental", 1, 1], - ["xact_rollback", "xact_rollback", "incremental", 1, 1], + ['blks_read', 'disk', 'incremental'], + ['blks_hit', 'cache', 'incremental'] ]}, - "BlockAccess": { - 'options': ["block_access", "block_access", "Block / sec ", "block_access", "postgres.block_access", "line"], + 'db_stat_tuple_returned': { + 'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned', + 'line'], 'lines': [ - ["blks_read", "blks_read", "incremental", 1, 1], - ["blks_hit", "blks_hit", "incremental", 1, 1], + ['tup_returned', 'sequential', 'incremental'], + ['tup_fetched', 'bitmap', 'incremental'] ]}, - "Checkpoints": { - 'options': ["checkpoints", "Checkpoints", "Checkpoints", "checkpoints", "postgres.checkpoints", "line"], + 'db_stat_tuple_write': { + 'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'], 'lines': [ - ["bg_checkpoint_time", "bg_checkpoint_time", "absolute", 1, 1], - ["bg_checkpoint_requested", "bg_checkpoint_requested", "absolute", 1, 1], + ['tup_inserted', 'inserted', 'incremental'], + ['tup_updated', 'updated', 'incremental'], + ['tup_deleted', 'deleted', 'incremental'], + ['conflicts', 'conflicts', 'incremental'] ]}, - "Buffers": { - 'options': ["buffers", "buffers", "Buffer/ sec", "buffers", "postgres.buffers", "line"], + 'database_size': { + 'options': [None, 'Database size', 'MB', 'database size', 'postgres.db_size', 'stacked'], 'lines': [ - ["buffers_written", "buffers_written", "incremental", 1, 1], - ["buffers_allocated", "buffers_allocated", "incremental", 1, 1], ]}, + 'backend_process': { + 'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process', + 'line'], + 'lines': [ + ['backends_active', 'active', 'absolute'], + ['backends_idle', 'idle', 'absolute'] + ]}, + 'index_count': { + 'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'], + 'lines': [ + ['index_count', 'total', 'absolute'] + ]}, + 'index_size': { + 'options': [None, 'Indexes size', 'MB', 'indexes', 'postgres.index_size', 'line'], + 'lines': [ + ['index_size', 'size', 'absolute', 1, 1024 * 1024] + ]}, + 'table_count': { + 'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'], + 'lines': [ + ['table_count', 'total', 'absolute'] + ]}, + 'table_size': { + 'options': [None, 'Tables size', 'MB', 'tables', 'postgres.table_size', 'line'], + 'lines': [ + ['table_size', 'size', 'absolute', 1, 1024 * 1024] + ]}, + 'wal': { + 'options': [None, 'Write-Ahead Logging Statistics', 'files/s', 'write ahead log', 'postgres.wal', 'line'], + 'lines': [ + ['file_count', 'total', 'incremental'], + ['ready_count', 'ready', 'incremental'], + ['done_count', 'done', 'incremental'] + ]}, + 'background_writer': { + 'options': [None, 'Checkpoints', 'writes/s', 'background writer', 'postgres.background_writer', 'line'], + 'lines': [ + ['writer_scheduled', 'scheduled', 'incremental'], + ['writer_requested', 'requested', 'incremental'] + ]} } -ORDER = ["Tuples", "Transactions", "BlockAccess", "Checkpoints", "Buffers"] class Service(SimpleService): def __init__(self, configuration=None, name=None): super(self.__class__, self).__init__(configuration=configuration, name=name) - self.order = ORDER - self.definitions = CHARTS + self.order = ORDER[:] + self.definitions = deepcopy(CHARTS) + self.table_stats = configuration.pop('table_stats', False) + self.index_stats = configuration.pop('index_stats', False) + self.database_poll = configuration.pop('database_poll', None) self.configuration = configuration - self.connection = None + self.connection = False + self.is_superuser = False + self.data = dict() + self.locks_zeroed = dict() + self.databases = list() - def connect(self): + def _connect(self): params = dict(user='postgres', - database='postgres', + database=None, password=None, host=None, port=5432) params.update(self.configuration) - if self.connection is None: - self.connection = psycopg2.connect(**params) - self.connection.set_session(readonly=True) + + if not self.connection: + try: + self.connection = psycopg2.connect(**params) + self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + self.connection.set_session(readonly=True) + except OperationalError as error: + return False, str(error) + return True, True def check(self): + if not PSYCOPG2: + self.error('\'python-psycopg2\' module is needed to use postgres.chart.py') + return False + result, error = self._connect() + if not result: + conf = dict([(k, (lambda k, v: v if k != 'password' else '*****')(k, v)) for k, v in self.configuration.items()]) + self.error('Failed to connect to %s. Error: %s' % (str(conf), error)) + return False try: - self.connect() + cursor = self.connection.cursor() + self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES']) + is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER']) + cursor.close() + + if (self.database_poll and isinstance(self.database_poll, str)): + self.databases = [dbase for dbase in self.databases if dbase in self.database_poll.split()] or self.databases + + self.locks_zeroed = populate_lock_types(self.databases) + self.add_additional_queries_(is_superuser) + self.create_dynamic_charts_() return True - except Exception as e: - self.error(e) + except Exception as error: + self.error(str(error)) return False + def add_additional_queries_(self, is_superuser): + if self.index_stats: + QUERY_STATS[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS'] + if self.table_stats: + QUERY_STATS[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS'] + if is_superuser: + QUERY_STATS[QUERIES['BGWRITER']] = METRICS['BGWRITER'] + + def create_dynamic_charts_(self): + + for database_name in self.databases[::-1]: + self.definitions['database_size']['lines'].append([database_name + '_size', + database_name, 'absolute', 1, 1024 * 1024]) + for chart_name in [name for name in CHARTS if name.startswith('db_stat')]: + add_database_stat_chart_(order=self.order, definitions=self.definitions, + name=chart_name, database_name=database_name) + + add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name) + def _get_data(self): - cursor = self.connection.cursor(cursor_factory=DictCursor) - cursor.execute(""" - SELECT - pg_stat_database.*, - pg_stat_get_bgwriter_timed_checkpoints() AS bg_checkpoint_time, - pg_stat_get_bgwriter_requested_checkpoints() AS bg_checkpoint_requested, - pg_stat_get_buf_written_backend() AS buffers_written, - pg_stat_get_buf_alloc() AS buffers_allocated - FROM pg_stat_database - WHERE datname = %(database)s - """, self.configuration) - graph_data = dict(cursor.fetchone()) - self.connection.commit() - cursor.close() - return graph_data + result, error = self._connect() + if result: + cursor = self.connection.cursor(cursor_factory=DictCursor) + try: + self.data.update(self.locks_zeroed) + for query, metrics in QUERY_STATS.items(): + self.query_stats_(cursor, query, metrics) + + except OperationalError: + self.connection = False + cursor.close() + return None + else: + cursor.close() + return self.data + else: + return None + + def query_stats_(self, cursor, query, metrics): + cursor.execute(query) + for row in cursor: + for metric in metrics: + dimension_id = '_'.join([row['database_name'], metric]) if 'database_name' in row else metric + if metric in row: + self.data[dimension_id] = int(row[metric]) + elif 'locks_count' in row: + self.data[dimension_id] = row['locks_count'] if metric == row['mode'] else 0 + + +def discover_databases_(cursor, query): + cursor.execute(query) + result = list() + for db in [database[0] for database in cursor]: + if db not in result: + result.append(db) + return result + + +def check_if_superuser_(cursor, query): + cursor.execute(query) + return cursor.fetchone()[0] + + +def populate_lock_types(databases): + result = dict() + for database in databases: + for lock_type in METRICS['LOCKS']: + key = '_'.join([database, lock_type]) + result[key] = 0 + + return result + + +def add_database_lock_chart_(order, definitions, database_name): + def create_lines(database): + result = list() + for lock_type in METRICS['LOCKS']: + dimension_id = '_'.join([database, lock_type]) + result.append([dimension_id, lock_type, 'absolute']) + return result + + chart_name = database_name + '_locks' + order.insert(-1, chart_name) + definitions[chart_name] = { + 'options': + [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'], + 'lines': create_lines(database_name) + } + + +def add_database_stat_chart_(order, definitions, name, database_name): + def create_lines(database, lines): + result = list() + for line in lines: + new_line = ['_'.join([database, line[0]])] + line[1:] + result.append(new_line) + return result + + chart_template = CHARTS[name] + chart_name = '_'.join([database_name, name]) + order.insert(0, chart_name) + name, title, units, family, context, chart_type = chart_template['options'] + definitions[chart_name] = { + 'options': [name, title + ': ' + database_name, units, 'db ' + database_name, context, chart_type], + 'lines': create_lines(database_name, chart_template['lines'])} + + +# +# def add_replication_stats(self, cursor): +# cursor.execute(REPLICATION) +# temp = cursor.fetchall() +# for row in temp: +# self.add_gauge_value('Replication/%s' % row.get('client_addr', 'Unknown'), +# 'byte_lag', +# int(row.get('byte_lag', 0)))