-#!/usr/bin/env python
+#!/usr/bin/env bash
+'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# -*- coding: utf-8 -*-
# Description: netdata python modules supervisor
import os
import sys
import time
+import threading
+from re import sub
-# setup environment
+# -----------------------------------------------------------------------------
+# globals & environment setup
# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
-MODULES_DIR = os.getenv('NETDATA_PLUGINS_DIR',
- os.path.abspath(__file__).strip("python.d.plugin.py").replace("plugins.d", ""))
+MODULE_EXTENSION = ".chart.py"
+BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
+ 'priority': 90000,
+ 'retries': 10}
+
+MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
+ os.path.dirname(__file__)) + "/../python.d") + "/"
CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
-INTERVAL = os.getenv('NETDATA_UPDATE_EVERY', None)
# directories should end with '/'
-if MODULES_DIR[-1] != "/":
- MODULES_DIR += "/"
-MODULES_DIR += "python.d/"
if CONFIG_DIR[-1] != "/":
CONFIG_DIR += "/"
sys.path.append(MODULES_DIR + "python_modules")
+PROGRAM = os.path.basename(__file__).replace(".plugin", "")
+DEBUG_FLAG = False
+TRACE_FLAG = False
+OVERRIDE_UPDATE_EVERY = False
+
+# -----------------------------------------------------------------------------
+# custom, third party and version specific python modules management
+import msg
try:
assert sys.version_info >= (3, 1)
import importlib.machinery
-
+ PY_VERSION = 3
# change this hack below if we want PY_VERSION to be used in modules
# import builtins
# builtins.PY_VERSION = 3
- PY_VERSION = 3
- sys.stderr.write('python.d.plugin: Using python 3\n')
+ msg.info('Using python v3')
except (AssertionError, ImportError):
try:
import imp
# import __builtin__
# __builtin__.PY_VERSION = 2
PY_VERSION = 2
- sys.stderr.write('python.d.plugin: Using python 2\n')
+ msg.info('Using python v2')
except ImportError:
- sys.stderr.write('python.d.plugin: Cannot start. No importlib.machinery on python3 or lack of imp on python2\n')
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
+ msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+# try:
+# import yaml
+# except ImportError:
+# msg.fatal('Cannot find yaml library')
try:
- import yaml
+ if PY_VERSION == 3:
+ import pyyaml3 as yaml
+ else:
+ import pyyaml2 as yaml
except ImportError:
- sys.stderr.write('python.d.plugin: Cannot find yaml library\n')
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
-
-DEBUG_FLAG = False
-PROGRAM = "python.d.plugin"
-MODULE_EXTENSION = ".chart.py"
-BASE_CONFIG = {'update_every': 10,
- 'priority': 12345,
- 'retries': 0}
+ msg.fatal('Cannot find yaml library')
+try:
+ from collections import OrderedDict
+ ORDERED = True
+ DICT = OrderedDict
+ msg.info('YAML output is ordered')
+except ImportError:
+ ORDERED = False
+ DICT = dict
+ msg.info('YAML output is unordered')
+else:
+ def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
+ class OrderedLoader(Loader):
+ pass
+ def construct_mapping(loader, node):
+ loader.flatten_mapping(node)
+ return object_pairs_hook(loader.construct_pairs(node))
+ OrderedLoader.add_constructor(
+ yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
+ construct_mapping)
+ return yaml.load(stream, OrderedLoader)
class PythonCharts(object):
"""
Main class used to control every python module.
"""
+
def __init__(self,
- interval=None,
modules=None,
modules_path='../python.d/',
modules_configs='../conf.d/',
modules_disabled=None):
"""
- :param interval: int
:param modules: list
:param modules_path: str
:param modules_configs: str
# good economy and prosperity:
self.jobs = self._create_jobs(configured_modules) # type: list
- if DEBUG_FLAG and interval is not None:
+
+ # enable timetable override like `python.d.plugin mysql debug 1`
+ if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
for job in self.jobs:
- job.create_timetable(interval)
+ job.create_timetable(BASE_CONFIG['update_every'])
@staticmethod
def _import_module(path, name=None):
else:
return imp.load_source(name, path)
except Exception as e:
- debug(str(e))
+ msg.error("Problem loading", name, str(e))
return None
def _load_modules(self, path, modules, disabled):
# check if plugin directory exists
if not os.path.isdir(path):
- debug("cannot find charts directory ", path)
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
+ msg.fatal("cannot find charts directory ", path)
# load modules
loaded = []
if mod is not None:
loaded.append(mod)
else: # exit if plugin is not found
- sys.stdout.write("DISABLE")
- sys.stdout.flush()
- sys.exit(1)
+ msg.fatal('no modules found.')
else:
# scan directory specified in path and load all modules from there
names = os.listdir(path)
for mod in names:
- if mod.strip(MODULE_EXTENSION) in disabled:
- debug("disabling:", mod.strip(MODULE_EXTENSION))
+ if mod.replace(MODULE_EXTENSION, "") in disabled:
+ msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
continue
m = self._import_module(path + mod)
if m is not None:
- debug("loading module: '" + path + mod + "'")
+ msg.debug(mod + ": loading module '" + path + mod + "'")
loaded.append(m)
return loaded
for mod in modules:
configfile = self.configs + mod.__name__ + ".conf"
if os.path.isfile(configfile):
- debug("loading module configuration: '" + configfile + "'")
+ msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
try:
+ if not hasattr(mod, 'config'):
+ mod.config = {}
setattr(mod,
'config',
self._parse_config(mod, read_config(configfile)))
except Exception as e:
- debug("something went wrong while loading configuration", e)
+ msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
else:
- debug(mod.__name__ +
- ": configuration file '" +
- configfile +
- "' not found. Using defaults.")
+ msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
# set config if not found
if not hasattr(mod, 'config'):
+ msg.debug(mod.__name__ + ": setting configuration for only one job")
mod.config = {None: {}}
for var in BASE_CONFIG:
try:
:param config: dict
:return: dict
"""
+ if config is None:
+ config = {}
# get default values
defaults = {}
+ msg.debug(module.__name__ + ": reading configuration")
for key in BASE_CONFIG:
try:
# get defaults from module config
try:
# get defaults from module source code
defaults[key] = getattr(module, key)
- except (KeyError, ValueError):
+ except (KeyError, ValueError, AttributeError):
# if above failed, get defaults from global dict
defaults[key] = BASE_CONFIG[key]
# check if there are dict in config dict
many_jobs = False
for name in config:
- if type(config[name]) is dict:
+ if isinstance(config[name], DICT):
many_jobs = True
break
try:
job = module.Service(configuration=conf, name=name)
except Exception as e:
- debug(module.__name__ +
- ": Couldn't start job named " +
- str(name) +
- ": " +
- str(e))
+ msg.error(module.__name__ +
+ ("/" + str(name) if name is not None else "") +
+ ": cannot start job: '" +
+ str(e))
return None
else:
# set chart_name (needed to plot run time graphs)
if name is not None:
job.chart_name += "_" + name
jobs.append(job)
+ msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
return [j for j in jobs if j is not None]
:param job: object
:param reason: str
"""
- prefix = ""
- if job.name is not None:
- prefix = "'" + job.name + "' in "
+ prefix = job.__module__
+ if job.name is not None and len(job.name) != 0:
+ prefix += "/" + job.name
+ try:
+ msg.error("DISABLED:", prefix)
+ self.jobs.remove(job)
+ except Exception as e:
+ msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
- prefix += "'" + job.__module__ + MODULE_EXTENSION + "' "
- self.jobs.remove(job)
+ # TODO remove section below and remove `reason`.
+ prefix += ": "
if reason is None:
return
elif reason[:3] == "no ":
- debug(prefix +
- "does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
+ msg.error(prefix +
+ "does not seem to have " +
+ reason[3:] +
+ "() function. Disabling it.")
elif reason[:7] == "failed ":
- debug(prefix +
- reason[7:] +
- "() function reports failure.")
+ msg.error(prefix +
+ reason[7:] +
+ "() function reports failure.")
elif reason[:13] == "configuration":
- debug(prefix +
- "configuration file '" +
- self.configs +
- job.__module__ +
- ".conf' not found. Using defaults.")
+ msg.error(prefix +
+ "configuration file '" +
+ self.configs +
+ job.__module__ +
+ ".conf' not found. Using defaults.")
elif reason[:11] == "misbehaving":
- debug(prefix + "is " + reason)
+ msg.error(prefix + "is " + reason)
def check(self):
"""
If job.check() fails job is stopped
"""
i = 0
+ overridden = []
+ msg.debug("all job objects", str(self.jobs))
while i < len(self.jobs):
job = self.jobs[i]
try:
if not job.check():
- self._stop(job, "failed check")
+ msg.error(job.chart_name, "check() failed - disabling job")
+ self._stop(job)
else:
+ msg.info("CHECKED OK:", job.chart_name)
i += 1
- except AttributeError:
- self._stop(job, "no check")
+ try:
+ if job.override_name is not None:
+ new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
+ if new_name in overridden:
+ msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
+ self._stop(job)
+ i -= 1
+ else:
+ job.name = job.override_name
+ msg.info("RENAMED:", new_name, ", from " + job.chart_name)
+ job.chart_name = new_name
+ overridden.append(job.chart_name)
+ except Exception:
+ pass
+ except AttributeError as e:
+ self._stop(job)
+ msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
+ msg.debug(str(e))
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
+ msg.debug("overridden job names:", str(overridden))
+ msg.debug("all remaining job objects:", str(self.jobs))
def create(self):
"""
job = self.jobs[i]
try:
if not job.create():
- self._stop(job, "failed create")
+ msg.error(job.chart_name, "create function failed.")
+ self._stop(job)
else:
chart = job.chart_name
sys.stdout.write(
str(job.timetable['freq']) +
'\n')
sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
- sys.stdout.flush()
+ msg.debug("created charts for", job.chart_name)
+ # sys.stdout.flush()
i += 1
except AttributeError:
- self._stop(job, "no create")
+ msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
+ self._stop(job)
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
-
- def _update_job(self, job):
- """
- Tries to execute update() on specified job.
- This cannot fail thus it is catching every exception.
- If job.update() returns False, number of retries_left is decremented.
- If there are no more retries, job is stopped.
- Job is also stopped if it throws an exception.
- This is also updating job run time chart.
- Return False if job is stopped
- :param job: object
- :return: boolean
- """
- t_start = time.time()
- # check if it is time to execute job update() function
- if job.timetable['next'] > t_start:
- return True
- try:
- if self.first_run:
- since_last = 0
- else:
- since_last = int((t_start - job.timetable['last']) * 1000000)
- if not job.update(since_last):
- if job.retries_left <= 0:
- self._stop(job, "update failed")
- return False
- job.retries_left -= 1
- job.timetable['next'] += job.timetable['freq']
- return True
- except AttributeError:
- self._stop(job, "no update")
- return False
- except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
- return False
- t_end = time.time()
- job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
- # draw performance graph
- sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
- sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n')
- sys.stdout.write("END\n")
- sys.stdout.flush()
- job.timetable['last'] = t_start
- job.retries_left = job.retries
- self.first_run = False
- return True
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
def update(self):
"""
- Tries to execute update() on every job by using _update_job()
+ Creates and supervises every job thread.
This will stay forever and ever and ever forever and ever it'll be the one...
"""
- self.first_run = True
+ for job in self.jobs:
+ job.start()
+
while True:
- next_runs = []
- i = 0
- while i < len(self.jobs):
- job = self.jobs[i]
- if self._update_job(job):
- try:
- next_runs.append(job.timetable['next'])
- except KeyError:
- pass
- i += 1
- if len(next_runs) == 0:
- debug("No plugins loaded")
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
- time.sleep(min(next_runs) - time.time())
+ if threading.active_count() <= 1:
+ msg.fatal("no more jobs")
+ time.sleep(1)
def read_config(path):
"""
try:
with open(path, 'r') as stream:
- config = yaml.load(stream)
+ if ORDERED:
+ config = ordered_load(stream, yaml.SafeLoader)
+ else:
+ config = yaml.load(stream)
except (OSError, IOError):
- debug(str(path), "is not a valid configuration file")
+ msg.error(str(path), "is not a valid configuration file")
return None
except yaml.YAMLError as e:
- debug(str(path), "is malformed:", e)
+ msg.error(str(path), "is malformed:", e)
return None
return config
-def debug(*args):
- """
- Print message on stderr.
- """
- if not DEBUG_FLAG:
- return
- sys.stderr.write(PROGRAM + ":")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
def parse_cmdline(directory, *commands):
"""
Parse parameters from command line.
:param commands: list of str
:return: dict
"""
- global DEBUG_FLAG
- interval = None
+ global DEBUG_FLAG, TRACE_FLAG
+ global OVERRIDE_UPDATE_EVERY
+ global BASE_CONFIG
+ changed_update = False
mods = []
for cmd in commands[1:]:
if cmd == "check":
elif cmd == "debug" or cmd == "all":
DEBUG_FLAG = True
# redirect stderr to stdout?
+ elif cmd == "trace" or cmd == "all":
+ TRACE_FLAG = True
elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
- DEBUG_FLAG = True
+ #DEBUG_FLAG = True
mods.append(cmd.replace(".chart.py", ""))
else:
- DEBUG_FLAG = False
try:
- interval = int(cmd)
+ BASE_CONFIG['update_every'] = int(cmd)
+ changed_update = True
except ValueError:
pass
+ if changed_update and DEBUG_FLAG:
+ OVERRIDE_UPDATE_EVERY = True
+ msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
- debug("started from", commands[0], "with options:", *commands[1:])
- if len(mods) == 0 and DEBUG_FLAG is False:
- interval = None
+ msg.debug("started from", commands[0], "with options:", *commands[1:])
- return {'interval': interval,
- 'modules': mods}
+ return mods
# if __name__ == '__main__':
"""
Main program.
"""
- global PROGRAM, DEBUG_FLAG
- PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0]
+ global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
# read configuration file
- disabled = []
+ disabled = ['nginx_log', 'gunicorn_log']
configfile = CONFIG_DIR + "python.d.conf"
+ msg.PROGRAM = PROGRAM
+ msg.info("reading configuration file:", configfile)
+ log_throttle = 200
+ log_interval = 3600
- interval = INTERVAL
conf = read_config(configfile)
if conf is not None:
try:
- if str(conf['enable']) is False:
- debug("disabled in configuration file")
- sys.stdout.write("DISABLE\n")
- sys.exit(1)
+ # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
+ if conf['enabled'] is False:
+ msg.fatal('disabled in configuration file.\n')
except (KeyError, TypeError):
pass
+
try:
- interval = conf['interval']
+ for param in BASE_CONFIG:
+ BASE_CONFIG[param] = conf[param]
except (KeyError, TypeError):
- pass # use default interval from NETDATA_UPDATE_EVERY
+ pass # use default update_every from NETDATA_UPDATE_EVERY
+
try:
DEBUG_FLAG = conf['debug']
except (KeyError, TypeError):
pass
+
+ try:
+ TRACE_FLAG = conf['trace']
+ except (KeyError, TypeError):
+ pass
+
+ try:
+ log_throttle = conf['logs_per_interval']
+ except (KeyError, TypeError):
+ pass
+
+ try:
+ log_interval = conf['log_interval']
+ except (KeyError, TypeError):
+ pass
+
for k, v in conf.items():
- if k in ("interval", "debug", "enable"):
+ if k in ("update_every", "debug", "enabled"):
continue
if v is False:
disabled.append(k)
# parse passed command line arguments
- out = parse_cmdline(MODULES_DIR, *sys.argv)
- modules = out['modules']
- if out['interval'] is not None:
- interval = out['interval']
+ modules = parse_cmdline(MODULES_DIR, *sys.argv)
+ msg.DEBUG_FLAG = DEBUG_FLAG
+ msg.TRACE_FLAG = TRACE_FLAG
+ msg.LOG_THROTTLE = log_throttle
+ msg.LOG_INTERVAL = log_interval
+ msg.LOG_COUNTER = 0
+ msg.LOG_NEXT_CHECK = 0
+ msg.info("MODULES_DIR='" + MODULES_DIR +
+ "', CONFIG_DIR='" + CONFIG_DIR +
+ "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+ ", ONLY_MODULES=" + str(modules))
# run plugins
- charts = PythonCharts(interval, modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
+ charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
charts.check()
charts.create()
charts.update()
- sys.stdout.write("DISABLE")
+ msg.fatal("finished")
if __name__ == '__main__':