#!/usr/bin/env python # Description: netdata python modules supervisor # Author: Pawel Krupa (paulfantom) import os import sys import time # setup environment # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables MODULES_DIR = os.getenv('NETDATA_PLUGINS_DIR', os.path.abspath(__file__).strip("python.d.plugin.py").replace("plugins.d", "")) CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/") INTERVAL = os.getenv('NETDATA_UPDATE_EVERY', None) # directories should end with '/' if MODULES_DIR[-1] != "/": MODULES_DIR += "/" MODULES_DIR += "python.d/" if CONFIG_DIR[-1] != "/": CONFIG_DIR += "/" sys.path.append(MODULES_DIR + "python_modules") try: assert sys.version_info >= (3, 1) import importlib.machinery # change this hack below if we want PY_VERSION to be used in modules # import builtins # builtins.PY_VERSION = 3 PY_VERSION = 3 sys.stderr.write('python.d.plugin: Using python 3\n') except (AssertionError, ImportError): try: import imp # change this hack below if we want PY_VERSION to be used in modules # import __builtin__ # __builtin__.PY_VERSION = 2 PY_VERSION = 2 sys.stderr.write('python.d.plugin: Using python 2\n') except ImportError: sys.stderr.write('python.d.plugin: Cannot start. No importlib.machinery on python3 or lack of imp on python2\n') sys.stdout.write('DISABLE\n') sys.exit(1) try: import yaml except ImportError: sys.stderr.write('python.d.plugin: Cannot find yaml library\n') sys.stdout.write('DISABLE\n') sys.exit(1) DEBUG_FLAG = False PROGRAM = "python.d.plugin" MODULE_EXTENSION = ".chart.py" BASE_CONFIG = {'update_every': 10, 'priority': 12345, 'retries': 0} class PythonCharts(object): """ Main class used to control every python module. """ def __init__(self, interval=None, modules=None, modules_path='../python.d/', modules_configs='../conf.d/', modules_disabled=None): """ :param interval: int :param modules: list :param modules_path: str :param modules_configs: str :param modules_disabled: list """ if modules is None: modules = [] if modules_disabled is None: modules_disabled = [] self.first_run = True # set configuration directory self.configs = modules_configs # load modules loaded_modules = self._load_modules(modules_path, modules, modules_disabled) # load configuration files configured_modules = self._load_configs(loaded_modules) # good economy and prosperity: self.jobs = self._create_jobs(configured_modules) # type: list if DEBUG_FLAG and interval is not None: for job in self.jobs: job.create_timetable(interval) @staticmethod def _import_module(path, name=None): """ Try to import module using only its path. :param path: str :param name: str :return: object """ if name is None: name = path.split('/')[-1] if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION: return None name = name[:-len(MODULE_EXTENSION)] try: if PY_VERSION == 3: return importlib.machinery.SourceFileLoader(name, path).load_module() else: return imp.load_source(name, path) except Exception as e: debug(str(e)) return None def _load_modules(self, path, modules, disabled): """ Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files) :param path: str :param modules: list :param disabled: list :return: list """ # check if plugin directory exists if not os.path.isdir(path): debug("cannot find charts directory ", path) sys.stdout.write("DISABLE\n") sys.exit(1) # load modules loaded = [] if len(modules) > 0: for m in modules: if m in disabled: continue mod = self._import_module(path + m + MODULE_EXTENSION) if mod is not None: loaded.append(mod) else: # exit if plugin is not found sys.stdout.write("DISABLE") sys.stdout.flush() sys.exit(1) else: # scan directory specified in path and load all modules from there names = os.listdir(path) for mod in names: if mod.strip(MODULE_EXTENSION) in disabled: debug("disabling:", mod.strip(MODULE_EXTENSION)) continue m = self._import_module(path + mod) if m is not None: debug("loading module: '" + path + mod + "'") loaded.append(m) return loaded def _load_configs(self, modules): """ Append configuration in list named `config` to every module. For multi-job modules `config` list is created in _parse_config, otherwise it is created here based on BASE_CONFIG prototype with None as identifier. :param modules: list :return: list """ for mod in modules: configfile = self.configs + mod.__name__ + ".conf" if os.path.isfile(configfile): debug("loading module configuration: '" + configfile + "'") try: setattr(mod, 'config', self._parse_config(mod, read_config(configfile))) except Exception as e: debug("something went wrong while loading configuration", e) else: debug(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.") # set config if not found if not hasattr(mod, 'config'): mod.config = {None: {}} for var in BASE_CONFIG: try: mod.config[None][var] = getattr(mod, var) except AttributeError: mod.config[None][var] = BASE_CONFIG[var] return modules @staticmethod def _parse_config(module, config): """ Parse configuration file or extract configuration from module file. Example of returned dictionary: config = {'name': { 'update_every': 2, 'retries': 3, 'priority': 30000 'other_val': 123}} :param module: object :param config: dict :return: dict """ # get default values defaults = {} for key in BASE_CONFIG: try: # get defaults from module config defaults[key] = int(config.pop(key)) except (KeyError, ValueError): try: # get defaults from module source code defaults[key] = getattr(module, key) except (KeyError, ValueError): # if above failed, get defaults from global dict defaults[key] = BASE_CONFIG[key] # check if there are dict in config dict many_jobs = False for name in config: if type(config[name]) is dict: many_jobs = True break # assign variables needed by supervisor to every job configuration if many_jobs: for name in config: for key in defaults: if key not in config[name]: config[name][key] = defaults[key] # if only one job is needed, values doesn't have to be in dict (in YAML) else: config = {None: config.copy()} config[None].update(defaults) # return dictionary of jobs where every job has BASE_CONFIG variables return config @staticmethod def _create_jobs(modules): """ Create jobs based on module.config dictionary and module.Service class definition. :param modules: list :return: list """ jobs = [] for module in modules: for name in module.config: # register a new job conf = module.config[name] try: job = module.Service(configuration=conf, name=name) except Exception as e: debug(module.__name__ + ": Couldn't start job named " + str(name) + ": " + str(e)) return None else: # set chart_name (needed to plot run time graphs) job.chart_name = module.__name__ if name is not None: job.chart_name += "_" + name jobs.append(job) return [j for j in jobs if j is not None] def _stop(self, job, reason=None): """ Stop specified job and remove it from self.jobs list Also notifies user about job failure if DEBUG_FLAG is set :param job: object :param reason: str """ prefix = "" if job.name is not None: prefix = "'" + job.name + "' in " prefix += "'" + job.__module__ + MODULE_EXTENSION + "' " self.jobs.remove(job) if reason is None: return elif reason[:3] == "no ": debug(prefix + "does not seem to have " + reason[3:] + "() function. Disabling it.") elif reason[:7] == "failed ": debug(prefix + reason[7:] + "() function reports failure.") elif reason[:13] == "configuration": debug(prefix + "configuration file '" + self.configs + job.__module__ + ".conf' not found. Using defaults.") elif reason[:11] == "misbehaving": debug(prefix + "is " + reason) def check(self): """ Tries to execute check() on every job. This cannot fail thus it is catching every exception If job.check() fails job is stopped """ i = 0 while i < len(self.jobs): job = self.jobs[i] try: if not job.check(): self._stop(job, "failed check") else: i += 1 except AttributeError: self._stop(job, "no check") except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) def create(self): """ Tries to execute create() on every job. This cannot fail thus it is catching every exception. If job.create() fails job is stopped. This is also creating job run time chart. """ i = 0 while i < len(self.jobs): job = self.jobs[i] try: if not job.create(): self._stop(job, "failed create") else: chart = job.chart_name sys.stdout.write( "CHART netdata.plugin_pythond_" + chart + " '' 'Execution time for " + chart + " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " + str(job.timetable['freq']) + '\n') sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n") sys.stdout.flush() i += 1 except AttributeError: self._stop(job, "no create") except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) def _update_job(self, job): """ Tries to execute update() on specified job. This cannot fail thus it is catching every exception. If job.update() returns False, number of retries_left is decremented. If there are no more retries, job is stopped. Job is also stopped if it throws an exception. This is also updating job run time chart. Return False if job is stopped :param job: object :return: boolean """ t_start = time.time() # check if it is time to execute job update() function if job.timetable['next'] > t_start: return True try: if self.first_run: since_last = 0 else: since_last = int((t_start - job.timetable['last']) * 1000000) if not job.update(since_last): if job.retries_left <= 0: self._stop(job, "update failed") return False job.retries_left -= 1 job.timetable['next'] += job.timetable['freq'] return True except AttributeError: self._stop(job, "no update") return False except (UnboundLocalError, Exception) as e: self._stop(job, "misbehaving. Reason: " + str(e)) return False t_end = time.time() job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq'] # draw performance graph sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n') sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n') sys.stdout.write("END\n") sys.stdout.flush() job.timetable['last'] = t_start job.retries_left = job.retries self.first_run = False return True def update(self): """ Tries to execute update() on every job by using _update_job() This will stay forever and ever and ever forever and ever it'll be the one... """ self.first_run = True while True: next_runs = [] i = 0 while i < len(self.jobs): job = self.jobs[i] if self._update_job(job): try: next_runs.append(job.timetable['next']) except KeyError: pass i += 1 if len(next_runs) == 0: debug("No plugins loaded") sys.stdout.write("DISABLE\n") sys.exit(1) time.sleep(min(next_runs) - time.time()) def read_config(path): """ Read YAML configuration from specified file :param path: str :return: dict """ try: with open(path, 'r') as stream: config = yaml.load(stream) except (OSError, IOError): debug(str(path), "is not a valid configuration file") return None except yaml.YAMLError as e: debug(str(path), "is malformed:", e) return None return config def debug(*args): """ Print message on stderr. """ if not DEBUG_FLAG: return sys.stderr.write(PROGRAM + ":") for i in args: sys.stderr.write(" " + str(i)) sys.stderr.write("\n") sys.stderr.flush() def parse_cmdline(directory, *commands): """ Parse parameters from command line. :param directory: str :param commands: list of str :return: dict """ global DEBUG_FLAG interval = None mods = [] for cmd in commands[1:]: if cmd == "check": pass elif cmd == "debug" or cmd == "all": DEBUG_FLAG = True # redirect stderr to stdout? elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd): DEBUG_FLAG = True mods.append(cmd.replace(".chart.py", "")) else: DEBUG_FLAG = False try: interval = int(cmd) except ValueError: pass debug("started from", commands[0], "with options:", *commands[1:]) if len(mods) == 0 and DEBUG_FLAG is False: interval = None return {'interval': interval, 'modules': mods} # if __name__ == '__main__': def run(): """ Main program. """ global PROGRAM, DEBUG_FLAG PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0] # read configuration file disabled = [] configfile = CONFIG_DIR + "python.d.conf" interval = INTERVAL conf = read_config(configfile) if conf is not None: try: if str(conf['enable']) is False: debug("disabled in configuration file") sys.stdout.write("DISABLE\n") sys.exit(1) except (KeyError, TypeError): pass try: interval = conf['interval'] except (KeyError, TypeError): pass # use default interval from NETDATA_UPDATE_EVERY try: DEBUG_FLAG = conf['debug'] except (KeyError, TypeError): pass for k, v in conf.items(): if k in ("interval", "debug", "enable"): continue if v is False: disabled.append(k) # parse passed command line arguments out = parse_cmdline(MODULES_DIR, *sys.argv) modules = out['modules'] if out['interval'] is not None: interval = out['interval'] # run plugins charts = PythonCharts(interval, modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled) charts.check() charts.create() charts.update() sys.stdout.write("DISABLE") if __name__ == '__main__': run()