-#!/usr/bin/env python
+#!/usr/bin/env bash
+'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# -*- coding: utf-8 -*-
# Description: netdata python modules supervisor
import os
import sys
import time
+import threading
+from re import sub
# -----------------------------------------------------------------------------
# globals & environment setup
'retries': 10}
MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + "/../python.d") + "/"
+ os.path.dirname(__file__)) + "/../python.d") + "/"
CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
# directories should end with '/'
if CONFIG_DIR[-1] != "/":
PROGRAM = os.path.basename(__file__).replace(".plugin", "")
DEBUG_FLAG = False
+TRACE_FLAG = False
OVERRIDE_UPDATE_EVERY = False
-
# -----------------------------------------------------------------------------
-# logging functions
-def debug(*args):
- """
- Print message on stderr.
- """
- if not DEBUG_FLAG:
- return
- sys.stderr.write(PROGRAM + " DEBUG :")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
-def error(*args):
- """
- Print message on stderr.
- """
- sys.stderr.write(PROGRAM + " ERROR :")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
-def info(*args):
- """
- Print message on stderr.
- """
- sys.stderr.write(PROGRAM + " INFO :")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
-
-
-def fatal(*args):
- """
- Print message on stderr and exit.
- """
- sys.stderr.write(PROGRAM + " FATAL :")
- for i in args:
- sys.stderr.write(" " + str(i))
- sys.stderr.write("\n")
- sys.stderr.flush()
- sys.stdout.write('DISABLE\n')
- sys.exit(1)
+# custom, third party and version specific python modules management
+import msg
-
-# -----------------------------------------------------------------------------
-# third party and version specific python modules management
try:
assert sys.version_info >= (3, 1)
import importlib.machinery
-
+ PY_VERSION = 3
# change this hack below if we want PY_VERSION to be used in modules
# import builtins
# builtins.PY_VERSION = 3
- PY_VERSION = 3
- info('Using python v3')
+ msg.info('Using python v3')
except (AssertionError, ImportError):
try:
import imp
# import __builtin__
# __builtin__.PY_VERSION = 2
PY_VERSION = 2
- info('Using python v2')
+ msg.info('Using python v2')
except ImportError:
- fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+ msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
+# try:
+# import yaml
+# except ImportError:
+# msg.fatal('Cannot find yaml library')
try:
- import yaml
+ if PY_VERSION == 3:
+ import pyyaml3 as yaml
+ else:
+ import pyyaml2 as yaml
except ImportError:
- fatal('Cannot find yaml library')
+ msg.fatal('Cannot find yaml library')
+try:
+ from collections import OrderedDict
+ ORDERED = True
+ DICT = OrderedDict
+ msg.info('YAML output is ordered')
+except ImportError:
+ ORDERED = False
+ DICT = dict
+ msg.info('YAML output is unordered')
+else:
+ def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
+ class OrderedLoader(Loader):
+ pass
+ def construct_mapping(loader, node):
+ loader.flatten_mapping(node)
+ return object_pairs_hook(loader.construct_pairs(node))
+ OrderedLoader.add_constructor(
+ yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
+ construct_mapping)
+ return yaml.load(stream, OrderedLoader)
class PythonCharts(object):
"""
Main class used to control every python module.
"""
+
def __init__(self,
modules=None,
modules_path='../python.d/',
else:
return imp.load_source(name, path)
except Exception as e:
- debug(str(e))
+ msg.error("Problem loading", name, str(e))
return None
def _load_modules(self, path, modules, disabled):
# check if plugin directory exists
if not os.path.isdir(path):
- fatal("cannot find charts directory ", path)
+ msg.fatal("cannot find charts directory ", path)
# load modules
loaded = []
if mod is not None:
loaded.append(mod)
else: # exit if plugin is not found
- fatal('no modules found.')
+ msg.fatal('no modules found.')
else:
# scan directory specified in path and load all modules from there
names = os.listdir(path)
for mod in names:
- if mod.strip(MODULE_EXTENSION) in disabled:
- error(mod + ": disabled module ", mod.strip(MODULE_EXTENSION))
+ if mod.replace(MODULE_EXTENSION, "") in disabled:
+ msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
continue
m = self._import_module(path + mod)
if m is not None:
- debug(mod + ": loading module '" + path + mod + "'")
+ msg.debug(mod + ": loading module '" + path + mod + "'")
loaded.append(m)
return loaded
for mod in modules:
configfile = self.configs + mod.__name__ + ".conf"
if os.path.isfile(configfile):
- debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
+ msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
try:
+ if not hasattr(mod, 'config'):
+ mod.config = {}
setattr(mod,
'config',
self._parse_config(mod, read_config(configfile)))
except Exception as e:
- error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+ msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
else:
- error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
+ msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
# set config if not found
if not hasattr(mod, 'config'):
+ msg.debug(mod.__name__ + ": setting configuration for only one job")
mod.config = {None: {}}
for var in BASE_CONFIG:
try:
:param config: dict
:return: dict
"""
+ if config is None:
+ config = {}
# get default values
defaults = {}
+ msg.debug(module.__name__ + ": reading configuration")
for key in BASE_CONFIG:
try:
# get defaults from module config
try:
# get defaults from module source code
defaults[key] = getattr(module, key)
- except (KeyError, ValueError):
+ except (KeyError, ValueError, AttributeError):
# if above failed, get defaults from global dict
defaults[key] = BASE_CONFIG[key]
# check if there are dict in config dict
many_jobs = False
for name in config:
- if type(config[name]) is dict:
+ if isinstance(config[name], DICT):
many_jobs = True
break
try:
job = module.Service(configuration=conf, name=name)
except Exception as e:
- error(module.__name__ +
- ("/" + str(name) if name is not None else "") +
- ": cannot start job: '" +
- str(e))
+ msg.error(module.__name__ +
+ ("/" + str(name) if name is not None else "") +
+ ": cannot start job: '" +
+ str(e))
return None
else:
# set chart_name (needed to plot run time graphs)
if name is not None:
job.chart_name += "_" + name
jobs.append(job)
- debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
+ msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
return [j for j in jobs if j is not None]
:param reason: str
"""
prefix = job.__module__
- if job.name is not None:
+ if job.name is not None and len(job.name) != 0:
prefix += "/" + job.name
- prefix += ": "
+ try:
+ msg.error("DISABLED:", prefix)
+ self.jobs.remove(job)
+ except Exception as e:
+ msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
- self.jobs.remove(job)
+ # TODO remove section below and remove `reason`.
+ prefix += ": "
if reason is None:
return
elif reason[:3] == "no ":
- error(prefix +
- "does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
+ msg.error(prefix +
+ "does not seem to have " +
+ reason[3:] +
+ "() function. Disabling it.")
elif reason[:7] == "failed ":
- error(prefix +
- reason[7:] +
- "() function reports failure.")
+ msg.error(prefix +
+ reason[7:] +
+ "() function reports failure.")
elif reason[:13] == "configuration":
- error(prefix +
- "configuration file '" +
- self.configs +
- job.__module__ +
- ".conf' not found. Using defaults.")
+ msg.error(prefix +
+ "configuration file '" +
+ self.configs +
+ job.__module__ +
+ ".conf' not found. Using defaults.")
elif reason[:11] == "misbehaving":
- error(prefix + "is " + reason)
+ msg.error(prefix + "is " + reason)
def check(self):
"""
If job.check() fails job is stopped
"""
i = 0
+ overridden = []
+ msg.debug("all job objects", str(self.jobs))
while i < len(self.jobs):
job = self.jobs[i]
try:
if not job.check():
- self._stop(job, "failed check")
+ msg.error(job.chart_name, "check() failed - disabling job")
+ self._stop(job)
else:
+ msg.info("CHECKED OK:", job.chart_name)
i += 1
- except AttributeError:
- self._stop(job, "no check")
+ try:
+ if job.override_name is not None:
+ new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
+ if new_name in overridden:
+ msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
+ self._stop(job)
+ i -= 1
+ else:
+ job.name = job.override_name
+ msg.info("RENAMED:", new_name, ", from " + job.chart_name)
+ job.chart_name = new_name
+ overridden.append(job.chart_name)
+ except Exception:
+ pass
+ except AttributeError as e:
+ self._stop(job)
+ msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
+ msg.debug(str(e))
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
+ msg.debug("overridden job names:", str(overridden))
+ msg.debug("all remaining job objects:", str(self.jobs))
def create(self):
"""
job = self.jobs[i]
try:
if not job.create():
- self._stop(job, "failed create")
+ msg.error(job.chart_name, "create function failed.")
+ self._stop(job)
else:
chart = job.chart_name
sys.stdout.write(
str(job.timetable['freq']) +
'\n')
sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
+ msg.debug("created charts for", job.chart_name)
# sys.stdout.flush()
i += 1
except AttributeError:
- self._stop(job, "no create")
+ msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
+ self._stop(job)
except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
-
- def _update_job(self, job):
- """
- Tries to execute update() on specified job.
- This cannot fail thus it is catching every exception.
- If job.update() returns False, number of retries_left is decremented.
- If there are no more retries, job is stopped.
- Job is also stopped if it throws an exception.
- This is also updating job run time chart.
- Return False if job is stopped
- :param job: object
- :return: boolean
- """
- t_start = time.time()
- # check if it is time to execute job update() function
- if job.timetable['next'] > t_start:
- return True
- try:
- if self.first_run:
- since_last = 0
- else:
- since_last = int((t_start - job.timetable['last']) * 1000000)
- if not job.update(since_last):
- if job.retries_left <= 0:
- self._stop(job, "update failed")
- return False
- job.retries_left -= 1
- job.timetable['next'] += job.timetable['freq']
- return True
- except AttributeError:
- self._stop(job, "no update")
- return False
- except (UnboundLocalError, Exception) as e:
- self._stop(job, "misbehaving. Reason: " + str(e))
- return False
- t_end = time.time()
- job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
- # draw performance graph
- sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.chart_name + " " + str(since_last) + '\n')
- sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n')
- sys.stdout.write("END\n")
- # sys.stdout.flush()
- job.timetable['last'] = t_start
- job.retries_left = job.retries
- self.first_run = False
- return True
+ msg.error(job.chart_name, str(e))
+ self._stop(job)
def update(self):
"""
- Tries to execute update() on every job by using _update_job()
+ Creates and supervises every job thread.
This will stay forever and ever and ever forever and ever it'll be the one...
"""
- self.first_run = True
+ for job in self.jobs:
+ job.start()
+
while True:
- next_runs = []
- i = 0
- while i < len(self.jobs):
- job = self.jobs[i]
- if self._update_job(job):
- try:
- next_runs.append(job.timetable['next'])
- except KeyError:
- pass
- i += 1
- if len(next_runs) == 0:
- fatal('no python.d modules loaded.')
- time.sleep(min(next_runs) - time.time())
+ if threading.active_count() <= 1:
+ msg.fatal("no more jobs")
+ time.sleep(1)
def read_config(path):
"""
try:
with open(path, 'r') as stream:
- config = yaml.load(stream)
+ if ORDERED:
+ config = ordered_load(stream, yaml.SafeLoader)
+ else:
+ config = yaml.load(stream)
except (OSError, IOError):
- error(str(path), "is not a valid configuration file")
+ msg.error(str(path), "is not a valid configuration file")
return None
except yaml.YAMLError as e:
- error(str(path), "is malformed:", e)
+ msg.error(str(path), "is malformed:", e)
return None
return config
:param commands: list of str
:return: dict
"""
- global DEBUG_FLAG
+ global DEBUG_FLAG, TRACE_FLAG
global OVERRIDE_UPDATE_EVERY
global BASE_CONFIG
+ changed_update = False
mods = []
for cmd in commands[1:]:
if cmd == "check":
elif cmd == "debug" or cmd == "all":
DEBUG_FLAG = True
# redirect stderr to stdout?
+ elif cmd == "trace" or cmd == "all":
+ TRACE_FLAG = True
elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
- DEBUG_FLAG = True
+ #DEBUG_FLAG = True
mods.append(cmd.replace(".chart.py", ""))
else:
try:
BASE_CONFIG['update_every'] = int(cmd)
- OVERRIDE_UPDATE_EVERY = True
+ changed_update = True
except ValueError:
pass
+ if changed_update and DEBUG_FLAG:
+ OVERRIDE_UPDATE_EVERY = True
+ msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
- debug("started from", commands[0], "with options:", *commands[1:])
+ msg.debug("started from", commands[0], "with options:", *commands[1:])
return mods
"""
Main program.
"""
- global DEBUG_FLAG, BASE_CONFIG
+ global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
# read configuration file
- disabled = []
+ disabled = ['nginx_log', 'gunicorn_log']
configfile = CONFIG_DIR + "python.d.conf"
+ msg.PROGRAM = PROGRAM
+ msg.info("reading configuration file:", configfile)
+ log_throttle = 200
+ log_interval = 3600
conf = read_config(configfile)
if conf is not None:
try:
# exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
- if str(conf['enabled']) is False:
- fatal('disabled in configuration file.\n')
+ if conf['enabled'] is False:
+ msg.fatal('disabled in configuration file.\n')
except (KeyError, TypeError):
pass
+
try:
for param in BASE_CONFIG:
BASE_CONFIG[param] = conf[param]
except (KeyError, TypeError):
pass # use default update_every from NETDATA_UPDATE_EVERY
+
try:
DEBUG_FLAG = conf['debug']
except (KeyError, TypeError):
pass
+
+ try:
+ TRACE_FLAG = conf['trace']
+ except (KeyError, TypeError):
+ pass
+
+ try:
+ log_throttle = conf['logs_per_interval']
+ except (KeyError, TypeError):
+ pass
+
+ try:
+ log_interval = conf['log_interval']
+ except (KeyError, TypeError):
+ pass
+
for k, v in conf.items():
- if k in ("update_every", "debug", "enable"):
+ if k in ("update_every", "debug", "enabled"):
continue
if v is False:
disabled.append(k)
# parse passed command line arguments
modules = parse_cmdline(MODULES_DIR, *sys.argv)
- info("MODULES_DIR='" + MODULES_DIR +
- "', CONFIG_DIR='" + CONFIG_DIR +
- "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
- ", ONLY_MODULES=" + str(modules))
+ msg.DEBUG_FLAG = DEBUG_FLAG
+ msg.TRACE_FLAG = TRACE_FLAG
+ msg.LOG_THROTTLE = log_throttle
+ msg.LOG_INTERVAL = log_interval
+ msg.LOG_COUNTER = 0
+ msg.LOG_NEXT_CHECK = 0
+ msg.info("MODULES_DIR='" + MODULES_DIR +
+ "', CONFIG_DIR='" + CONFIG_DIR +
+ "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
+ ", ONLY_MODULES=" + str(modules))
# run plugins
charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
charts.check()
charts.create()
charts.update()
- fatal("finished")
+ msg.fatal("finished")
if __name__ == '__main__':