#!/usr/bin/env bash '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # ''' # -*- coding: utf-8 -*- # Description: netdata python modules supervisor # Author: Pawel Krupa (paulfantom) import os import sys import time import threading from re import sub # ----------------------------------------------------------------------------- # globals & environment setup # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables MODULE_EXTENSION = ".chart.py" BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1), 'priority': 90000, 'retries': 10} MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + "/../python.d") + "/" CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") # directories should end with '/' if CONFIG_DIR[-1] != "/": CONFIG_DIR += "/" sys.path.append(MODULES_DIR + "python_modules") PROGRAM = os.path.basename(__file__).replace(".plugin", "") DEBUG_FLAG = False TRACE_FLAG = False OVERRIDE_UPDATE_EVERY = False # ----------------------------------------------------------------------------- # custom, third party and version specific python modules management import msg try: assert sys.version_info >= (3, 1) import importlib.machinery PY_VERSION = 3 # change this hack below if we want PY_VERSION to be used in modules # import builtins # builtins.PY_VERSION = 3 msg.info('Using python v3') except (AssertionError, ImportError): try: import imp # change this hack below if we want PY_VERSION to be used in modules # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 msg.info('Using python v2') except ImportError: msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2') # try: # import yaml # except ImportError: # msg.fatal('Cannot find yaml library') try: if PY_VERSION == 3: import pyyaml3 as yaml else: import pyyaml2 as yaml except ImportError: msg.fatal('Cannot find yaml library') try: from collections import OrderedDict ORDERED = True DICT = OrderedDict msg.info('YAML output is ordered') except ImportError: try: from ordereddict import OrderedDict ORDERED = True DICT = OrderedDict msg.info('YAML output is ordered') except ImportError: ORDERED = False DICT = dict msg.info('YAML output is unordered') if ORDERED: def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader) class PythonCharts(object): """ Main class used to control every python module. """ def __init__(self, modules=None, modules_path='../python.d/', modules_configs='../conf.d/', modules_disabled=None, modules_enabled=None, default_run=None): """ :param modules: list :param modules_path: str :param modules_configs: str :param modules_disabled: list :param modules_enabled: list :param default_run: bool """ if modules is None: modules = [] if modules_disabled is None: modules_disabled = [] self.first_run = True # set configuration directory self.configs = modules_configs # load modules loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run) # load configuration files configured_modules = self._load_configs(loaded_modules) # good economy and prosperity: self.jobs = self._create_jobs(configured_modules) # type # enable timetable override like `python.d.plugin mysql debug 1` if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY: for job in self.jobs: job.create_timetable(BASE_CONFIG['update_every']) @staticmethod def _import_module(path, name=None): """ Try to import module using only its path. :param path: str :param name: str :return: object """ if name is None: name = path.split('/')[-1] if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION: return None name = name[:-len(MODULE_EXTENSION)] try: if PY_VERSION == 3: return importlib.machinery.SourceFileLoader(name, path).load_module() else: return imp.load_source(name, path) except Exception as e: msg.error("Problem loading", name, str(e)) return None def _load_modules(self, path, modules, disabled, enabled, default_run): """ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) :param path: str :param modules: list :param disabled: list :return: list """ # check if plugin directory exists if not os.path.isdir(path): msg.fatal("cannot find charts directory ", path) # load modules loaded = [] if len(modules) > 0: for m in modules: if m in disabled: continue mod = self._import_module(path + m + MODULE_EXTENSION) if mod is not None: loaded.append(mod) else: # exit if plugin is not found msg.fatal('no modules found.') else: # scan directory specified in path and load all modules from there if default_run is False: names = [module for module in os.listdir(path) if module[:-9] in enabled] else: names = os.listdir(path) for mod in names: if mod.replace(MODULE_EXTENSION, "") in disabled: msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, "")) continue m = self._import_module(path + mod) if m is not None: msg.debug(mod + ": loading module '" + path + mod + "'") loaded.append(m) return loaded def _load_configs(self, modules): """ Append configuration in list named `config` to every module. For multi-job modules `config` list is created in _parse_config, otherwise it is created here based on BASE_CONFIG prototype with None as identifier. :param modules: list :return: list """ for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'") try: if not hasattr(mod, 'config'): mod.config = {} setattr(mod, 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e)) else: msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): msg.debug(mod.__name__ + ": setting configuration for only one job") mod.config = {None: {}} for var in BASE_CONFIG: try: mod.config[None][var] = getattr(mod, var) except AttributeError: mod.config[None][var] = BASE_CONFIG[var] return modules @staticmethod def _parse_config(module, config): """ Parse configuration file or extract configuration from module file. Example of returned dictionary: config = {'name': { 'update_every': 2, 'retries': 3, 'priority': 30000 'other_val': 123}} :param module: object :param config: dict :return: dict """ if config is None: config = {} # get default values defaults = {} msg.debug(module.__name__ + ": reading configuration") for key in BASE_CONFIG: try: # get defaults from module config defaults[key] = int(config.pop(key)) except (KeyError, ValueError): try: # get defaults from module source code defaults[key] = getattr(module, key) except (KeyError, ValueError, AttributeError): # if above failed, get defaults from global dict defaults[key] = BASE_CONFIG[key] # check if there are dict in config dict many_jobs = False for name in config: if isinstance(config[name], DICT): many_jobs = True break # assign variables needed by supervisor to every job configuration if many_jobs: for name in config: for key in defaults: if key not in config[name]: config[name][key] = defaults[key] # if only one job is needed, values doesn't have to be in dict (in YAML) else: config = {None: config.copy()} config[None].update(defaults) # return dictionary of jobs where every job has BASE_CONFIG variables return config @staticmethod def _create_jobs(modules): """ Create jobs based on module.config dictionary and module.Service class definition. :param modules: list :return: list """ jobs = [] for module in modules: for name in module.config: # register a new job conf = module.config[name] try: job = module.Service(configuration=conf, name=name) except Exception as e: msg.error(module.__name__ + ("/" + str(name) if name is not None else "") + ": cannot start job: '" + str(e)) return None else: # set chart_name (needed to plot run time graphs) job.chart_name = module.__name__ if name is not None: job.chart_name += "_" + name jobs.append(job) msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added") return [j for j in jobs if j is not None] def _stop(self, job, reason=None): """ Stop specified job and remove it from self.jobs list Also notifies user about job failure if DEBUG_FLAG is set :param job: object :param reason: str """ prefix = job.__module__ if job.name is not None and len(job.name) != 0: prefix += "/" + job.name try: msg.error("DISABLED:", prefix) self.jobs.remove(job) except Exception as e: msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e)) # TODO remove section below and remove `reason`. prefix += ": " if reason is None: return elif reason[:3] == "no ": msg.error(prefix + "does not seem to have " + reason[3:] + "() function. Disabling it.") elif reason[:7] == "failed ": msg.error(prefix + reason[7:] + "() function reports failure.") elif reason[:13] == "configuration": msg.error(prefix + "configuration file '" + self.configs + job.__module__ + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": msg.error(prefix + "is " + reason) def check(self): """ Tries to execute check() on every job. This cannot fail thus it is catching every exception If job.check() fails job is stopped """ i = 0 overridden = [] msg.debug("all job objects", str(self.jobs)) while i < len(self.jobs): job = self.jobs[i] try: if not job.check(): msg.error(job.chart_name, "check() failed - disabling job") self._stop(job) else: msg.info("CHECKED OK:", job.chart_name) i += 1 try: if job.override_name is not None: new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name) if new_name in overridden: msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.") self._stop(job) i -= 1 else: job.name = job.override_name msg.info("RENAMED:", new_name, ", from " + job.chart_name) job.chart_name = new_name overridden.append(job.chart_name) except Exception: pass except AttributeError as e: self._stop(job) msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.") msg.debug(str(e)) except (UnboundLocalError, Exception) as e: msg.error(job.chart_name, str(e)) self._stop(job) msg.debug("overridden job names:", str(overridden)) msg.debug("all remaining job objects:", str(self.jobs)) def create(self): """ Tries to execute create() on every job. This cannot fail thus it is catching every exception. If job.create() fails job is stopped. This is also creating job run time chart. """ i = 0 while i < len(self.jobs): job = self.jobs[i] try: if not job.create(): msg.error(job.chart_name, "create function failed.") self._stop(job) else: chart = job.chart_name sys.stdout.write( "CHART netdata.plugin_pythond_" + chart + " '' 'Execution time for " + chart + " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " + str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") msg.debug("created charts for", job.chart_name) # sys.stdout.flush() i += 1 except AttributeError: msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.") self._stop(job) except (UnboundLocalError, Exception) as e: msg.error(job.chart_name, str(e)) self._stop(job) def update(self): """ Creates and supervises every job thread. This will stay forever and ever and ever forever and ever it'll be the one... """ for job in self.jobs: job.start() while True: if threading.active_count() <= 1: msg.fatal("no more jobs") time.sleep(1) def read_config(path): """ Read YAML configuration from specified file :param path: str :return: dict """ try: with open(path, 'r') as stream: if ORDERED: config = ordered_load(stream, yaml.SafeLoader) else: config = yaml.load(stream) except (OSError, IOError): msg.error(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: msg.error(str(path), "is malformed:", e) return None return config def parse_cmdline(directory, *commands): """ Parse parameters from command line. :param directory: str :param commands: list of str :return: dict """ global DEBUG_FLAG, TRACE_FLAG global OVERRIDE_UPDATE_EVERY global BASE_CONFIG changed_update = False mods = [] for cmd in commands[1:]: if cmd == "check": pass elif cmd == "debug" or cmd == "all": DEBUG_FLAG = True # redirect stderr to stdout? elif cmd == "trace" or cmd == "all": TRACE_FLAG = True elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): # DEBUG_FLAG = True mods.append(cmd.replace(".chart.py", "")) else: try: BASE_CONFIG['update_every'] = int(cmd) changed_update = True except ValueError: pass if changed_update and DEBUG_FLAG: OVERRIDE_UPDATE_EVERY = True msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every'])) msg.debug("started from", commands[0], "with options:", *commands[1:]) return mods # if __name__ == '__main__': def run(): """ Main program. """ global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG # read configuration file disabled = ['nginx_log', 'gunicorn_log'] enabled = list() default_run = True configfile = CONFIG_DIR + "python.d.conf" msg.PROGRAM = PROGRAM msg.info("reading configuration file:", configfile) log_throttle = 200 log_interval = 3600 conf = read_config(configfile) if conf is not None: try: # exit the whole plugin when 'enabled: no' is set in 'python.d.conf' if conf['enabled'] is False: msg.fatal('disabled in configuration file.\n') except (KeyError, TypeError): pass try: for param in BASE_CONFIG: BASE_CONFIG[param] = conf[param] except (KeyError, TypeError): pass # use default update_every from NETDATA_UPDATE_EVERY try: DEBUG_FLAG = conf['debug'] except (KeyError, TypeError): pass try: TRACE_FLAG = conf['trace'] except (KeyError, TypeError): pass try: log_throttle = conf['logs_per_interval'] except (KeyError, TypeError): pass try: log_interval = conf['log_interval'] except (KeyError, TypeError): pass default_run = True if ('default_run' not in conf or conf.get('default_run')) else False for k, v in conf.items(): if k in ("update_every", "debug", "enabled", "default_run"): continue if default_run: if v is False: disabled.append(k) else: if v is True: enabled.append(k) # parse passed command line arguments modules = parse_cmdline(MODULES_DIR, *sys.argv) msg.DEBUG_FLAG = DEBUG_FLAG msg.TRACE_FLAG = TRACE_FLAG msg.LOG_THROTTLE = log_throttle msg.LOG_INTERVAL = log_interval msg.LOG_COUNTER = 0 msg.LOG_NEXT_CHECK = 0 msg.info("MODULES_DIR='" + MODULES_DIR + "', CONFIG_DIR='" + CONFIG_DIR + "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) + ", ONLY_MODULES=" + str(modules)) # run plugins charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run) charts.check() charts.create() charts.update() msg.fatal("finished") if __name__ == '__main__': run()