]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
ab-debian 0.20170327.01-0ab1, upstream v1.6.0-42-gaa6b96fc
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env bash
2 '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
3 # -*- coding: utf-8 -*-
4
5 # Description: netdata python modules supervisor
6 # Author: Pawel Krupa (paulfantom)
7
8 import os
9 import sys
10 import time
11 import threading
12 from re import sub
13
14 # -----------------------------------------------------------------------------
15 # globals & environment setup
16 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
17 MODULE_EXTENSION = ".chart.py"
18 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
19                'priority': 90000,
20                'retries': 10}
21
22 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
23                                         os.path.dirname(__file__)) + "/../python.d") + "/"
24 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
25 # directories should end with '/'
26 if CONFIG_DIR[-1] != "/":
27     CONFIG_DIR += "/"
28 sys.path.append(MODULES_DIR + "python_modules")
29
30 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
31 DEBUG_FLAG = False
32 TRACE_FLAG = False
33 OVERRIDE_UPDATE_EVERY = False
34
35 # -----------------------------------------------------------------------------
36 # custom, third party and version specific python modules management
37 import msg
38
39 try:
40     assert sys.version_info >= (3, 1)
41     import importlib.machinery
42     PY_VERSION = 3
43     # change this hack below if we want PY_VERSION to be used in modules
44     # import builtins
45     # builtins.PY_VERSION = 3
46     msg.info('Using python v3')
47 except (AssertionError, ImportError):
48     try:
49         import imp
50
51         # change this hack below if we want PY_VERSION to be used in modules
52         # import __builtin__
53         # __builtin__.PY_VERSION = 2
54         PY_VERSION = 2
55         msg.info('Using python v2')
56     except ImportError:
57         msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
58 # try:
59 #     import yaml
60 # except ImportError:
61 #     msg.fatal('Cannot find yaml library')
62 try:
63     if PY_VERSION == 3:
64         import pyyaml3 as yaml
65     else:
66         import pyyaml2 as yaml
67 except ImportError:
68     msg.fatal('Cannot find yaml library')
69
70 try:
71     from collections import OrderedDict
72     ORDERED = True
73     DICT = OrderedDict
74     msg.info('YAML output is ordered')
75 except ImportError:
76     try:
77         from ordereddict import OrderedDict
78         ORDERED = True
79         DICT = OrderedDict
80         msg.info('YAML output is ordered')
81     except ImportError:
82         ORDERED = False
83         DICT = dict
84         msg.info('YAML output is unordered')
85 if ORDERED:
86     def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
87         class OrderedLoader(Loader):
88             pass
89
90         def construct_mapping(loader, node):
91             loader.flatten_mapping(node)
92             return object_pairs_hook(loader.construct_pairs(node))
93         OrderedLoader.add_constructor(
94             yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
95             construct_mapping)
96         return yaml.load(stream, OrderedLoader)
97
98
99 class PythonCharts(object):
100     """
101     Main class used to control every python module.
102     """
103
104     def __init__(self,
105                  modules=None,
106                  modules_path='../python.d/',
107                  modules_configs='../conf.d/',
108                  modules_disabled=None,
109                  modules_enabled=None,
110                  default_run=None):
111         """
112         :param modules: list
113         :param modules_path: str
114         :param modules_configs: str
115         :param modules_disabled: list
116         :param modules_enabled: list
117         :param default_run: bool
118         """
119
120         if modules is None:
121             modules = []
122         if modules_disabled is None:
123             modules_disabled = []
124
125         self.first_run = True
126         # set configuration directory
127         self.configs = modules_configs
128
129         # load modules
130         loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run)
131
132         # load configuration files
133         configured_modules = self._load_configs(loaded_modules)
134
135         # good economy and prosperity:
136         self.jobs = self._create_jobs(configured_modules)  # type <list>
137
138         # enable timetable override like `python.d.plugin mysql debug 1`
139         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
140             for job in self.jobs:
141                 job.create_timetable(BASE_CONFIG['update_every'])
142
143     @staticmethod
144     def _import_module(path, name=None):
145         """
146         Try to import module using only its path.
147         :param path: str
148         :param name: str
149         :return: object
150         """
151
152         if name is None:
153             name = path.split('/')[-1]
154             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
155                 return None
156             name = name[:-len(MODULE_EXTENSION)]
157         try:
158             if PY_VERSION == 3:
159                 return importlib.machinery.SourceFileLoader(name, path).load_module()
160             else:
161                 return imp.load_source(name, path)
162         except Exception as e:
163             msg.error("Problem loading", name, str(e))
164             return None
165
166     def _load_modules(self, path, modules, disabled, enabled, default_run):
167         """
168         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
169         :param path: str
170         :param modules: list
171         :param disabled: list
172         :return: list
173         """
174
175         # check if plugin directory exists
176         if not os.path.isdir(path):
177             msg.fatal("cannot find charts directory ", path)
178
179         # load modules
180         loaded = []
181         if len(modules) > 0:
182             for m in modules:
183                 if m in disabled:
184                     continue
185                 mod = self._import_module(path + m + MODULE_EXTENSION)
186                 if mod is not None:
187                     loaded.append(mod)
188                 else:  # exit if plugin is not found
189                     msg.fatal('no modules found.')
190         else:
191             # scan directory specified in path and load all modules from there
192             if default_run is False:
193                 names = [module for module in os.listdir(path) if module[:-9] in enabled]
194             else:
195                 names = os.listdir(path)
196             for mod in names:
197                 if mod.replace(MODULE_EXTENSION, "") in disabled:
198                     msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
199                     continue
200                 m = self._import_module(path + mod)
201                 if m is not None:
202                     msg.debug(mod + ": loading module '" + path + mod + "'")
203                     loaded.append(m)
204         return loaded
205
206     def _load_configs(self, modules):
207         """
208         Append configuration in list named `config` to every module.
209         For multi-job modules `config` list is created in _parse_config,
210         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
211         :param modules: list
212         :return: list
213         """
214         for mod in modules:
215             configfile = self.configs + mod.__name__ + ".conf"
216             if os.path.isfile(configfile):
217                 msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
218                 try:
219                     if not hasattr(mod, 'config'):
220                         mod.config = {}
221                     setattr(mod,
222                             'config',
223                             self._parse_config(mod, read_config(configfile)))
224                 except Exception as e:
225                     msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
226             else:
227                 msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
228                 # set config if not found
229                 if not hasattr(mod, 'config'):
230                     msg.debug(mod.__name__ + ": setting configuration for only one job")
231                     mod.config = {None: {}}
232                     for var in BASE_CONFIG:
233                         try:
234                             mod.config[None][var] = getattr(mod, var)
235                         except AttributeError:
236                             mod.config[None][var] = BASE_CONFIG[var]
237         return modules
238
239     @staticmethod
240     def _parse_config(module, config):
241         """
242         Parse configuration file or extract configuration from module file.
243         Example of returned dictionary:
244             config = {'name': {
245                             'update_every': 2,
246                             'retries': 3,
247                             'priority': 30000
248                             'other_val': 123}}
249         :param module: object
250         :param config: dict
251         :return: dict
252         """
253         if config is None:
254             config = {}
255         # get default values
256         defaults = {}
257         msg.debug(module.__name__ + ": reading configuration")
258         for key in BASE_CONFIG:
259             try:
260                 # get defaults from module config
261                 defaults[key] = int(config.pop(key))
262             except (KeyError, ValueError):
263                 try:
264                     # get defaults from module source code
265                     defaults[key] = getattr(module, key)
266                 except (KeyError, ValueError, AttributeError):
267                     # if above failed, get defaults from global dict
268                     defaults[key] = BASE_CONFIG[key]
269
270         # check if there are dict in config dict
271         many_jobs = False
272         for name in config:
273             if isinstance(config[name], DICT):
274                 many_jobs = True
275                 break
276
277         # assign variables needed by supervisor to every job configuration
278         if many_jobs:
279             for name in config:
280                 for key in defaults:
281                     if key not in config[name]:
282                         config[name][key] = defaults[key]
283         # if only one job is needed, values doesn't have to be in dict (in YAML)
284         else:
285             config = {None: config.copy()}
286             config[None].update(defaults)
287
288         # return dictionary of jobs where every job has BASE_CONFIG variables
289         return config
290
291     @staticmethod
292     def _create_jobs(modules):
293         """
294         Create jobs based on module.config dictionary and module.Service class definition.
295         :param modules: list
296         :return: list
297         """
298         jobs = []
299         for module in modules:
300             for name in module.config:
301                 # register a new job
302                 conf = module.config[name]
303                 try:
304                     job = module.Service(configuration=conf, name=name)
305                 except Exception as e:
306                     msg.error(module.__name__ +
307                               ("/" + str(name) if name is not None else "") +
308                               ": cannot start job: '" +
309                               str(e))
310                     return None
311                 else:
312                     # set chart_name (needed to plot run time graphs)
313                     job.chart_name = module.__name__
314                     if name is not None:
315                         job.chart_name += "_" + name
316                 jobs.append(job)
317                 msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
318
319         return [j for j in jobs if j is not None]
320
321     def _stop(self, job, reason=None):
322         """
323         Stop specified job and remove it from self.jobs list
324         Also notifies user about job failure if DEBUG_FLAG is set
325         :param job: object
326         :param reason: str
327         """
328         prefix = job.__module__
329         if job.name is not None and len(job.name) != 0:
330             prefix += "/" + job.name
331         try:
332             msg.error("DISABLED:", prefix)
333             self.jobs.remove(job)
334         except Exception as e:
335             msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
336
337         # TODO remove section below and remove `reason`.
338         prefix += ": "
339         if reason is None:
340             return
341         elif reason[:3] == "no ":
342             msg.error(prefix +
343                       "does not seem to have " +
344                       reason[3:] +
345                       "() function. Disabling it.")
346         elif reason[:7] == "failed ":
347             msg.error(prefix +
348                       reason[7:] +
349                       "() function reports failure.")
350         elif reason[:13] == "configuration":
351             msg.error(prefix +
352                       "configuration file '" +
353                       self.configs +
354                       job.__module__ +
355                       ".conf' not found. Using defaults.")
356         elif reason[:11] == "misbehaving":
357             msg.error(prefix + "is " + reason)
358
359     def check(self):
360         """
361         Tries to execute check() on every job.
362         This cannot fail thus it is catching every exception
363         If job.check() fails job is stopped
364         """
365         i = 0
366         overridden = []
367         msg.debug("all job objects", str(self.jobs))
368         while i < len(self.jobs):
369             job = self.jobs[i]
370             try:
371                 if not job.check():
372                     msg.error(job.chart_name, "check() failed - disabling job")
373                     self._stop(job)
374                 else:
375                     msg.info("CHECKED OK:", job.chart_name)
376                     i += 1
377                     try:
378                         if job.override_name is not None:
379                             new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
380                             if new_name in overridden:
381                                 msg.info("DROPPED:", job.name, ", job '" + job.override_name +
382                                          "' is already served by another job.")
383                                 self._stop(job)
384                                 i -= 1
385                             else:
386                                 job.name = job.override_name
387                                 msg.info("RENAMED:", new_name, ", from " + job.chart_name)
388                                 job.chart_name = new_name
389                                 overridden.append(job.chart_name)
390                     except Exception:
391                         pass
392             except AttributeError as e:
393                 self._stop(job)
394                 msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
395                 msg.debug(str(e))
396             except (UnboundLocalError, Exception) as e:
397                 msg.error(job.chart_name, str(e))
398                 self._stop(job)
399         msg.debug("overridden job names:", str(overridden))
400         msg.debug("all remaining job objects:", str(self.jobs))
401
402     def create(self):
403         """
404         Tries to execute create() on every job.
405         This cannot fail thus it is catching every exception.
406         If job.create() fails job is stopped.
407         This is also creating job run time chart.
408         """
409         i = 0
410         while i < len(self.jobs):
411             job = self.jobs[i]
412             try:
413                 if not job.create():
414                     msg.error(job.chart_name, "create function failed.")
415                     self._stop(job)
416                 else:
417                     chart = job.chart_name
418                     sys.stdout.write(
419                         "CHART netdata.plugin_pythond_" +
420                         chart +
421                         " '' 'Execution time for " +
422                         chart +
423                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
424                         str(job.timetable['freq']) +
425                         '\n')
426                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
427                     msg.debug("created charts for", job.chart_name)
428                     # sys.stdout.flush()
429                     i += 1
430             except AttributeError:
431                 msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
432                 self._stop(job)
433             except (UnboundLocalError, Exception) as e:
434                 msg.error(job.chart_name, str(e))
435                 self._stop(job)
436
437     def update(self):
438         """
439         Creates and supervises every job thread.
440         This will stay forever and ever and ever forever and ever it'll be the one...
441         """
442         for job in self.jobs:
443             job.start()
444
445         while True:
446             if threading.active_count() <= 1:
447                 msg.fatal("no more jobs")
448             time.sleep(1)
449
450
451 def read_config(path):
452     """
453     Read YAML configuration from specified file
454     :param path: str
455     :return: dict
456     """
457     try:
458         with open(path, 'r') as stream:
459             if ORDERED:
460                 config = ordered_load(stream, yaml.SafeLoader)
461             else:
462                 config = yaml.load(stream)
463     except (OSError, IOError):
464         msg.error(str(path), "is not a valid configuration file")
465         return None
466     except yaml.YAMLError as e:
467         msg.error(str(path), "is malformed:", e)
468         return None
469     return config
470
471
472 def parse_cmdline(directory, *commands):
473     """
474     Parse parameters from command line.
475     :param directory: str
476     :param commands: list of str
477     :return: dict
478     """
479     global DEBUG_FLAG, TRACE_FLAG
480     global OVERRIDE_UPDATE_EVERY
481     global BASE_CONFIG
482
483     changed_update = False
484     mods = []
485     for cmd in commands[1:]:
486         if cmd == "check":
487             pass
488         elif cmd == "debug" or cmd == "all":
489             DEBUG_FLAG = True
490             # redirect stderr to stdout?
491         elif cmd == "trace" or cmd == "all":
492             TRACE_FLAG = True
493         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
494             # DEBUG_FLAG = True
495             mods.append(cmd.replace(".chart.py", ""))
496         else:
497             try:
498                 BASE_CONFIG['update_every'] = int(cmd)
499                 changed_update = True
500             except ValueError:
501                 pass
502     if changed_update and DEBUG_FLAG:
503         OVERRIDE_UPDATE_EVERY = True
504         msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
505
506     msg.debug("started from", commands[0], "with options:", *commands[1:])
507
508     return mods
509
510
511 # if __name__ == '__main__':
512 def run():
513     """
514     Main program.
515     """
516     global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
517
518     # read configuration file
519     disabled = ['nginx_log', 'gunicorn_log']
520     enabled = list()
521     default_run = True
522     configfile = CONFIG_DIR + "python.d.conf"
523     msg.PROGRAM = PROGRAM
524     msg.info("reading configuration file:", configfile)
525     log_throttle = 200
526     log_interval = 3600
527
528     conf = read_config(configfile)
529     if conf is not None:
530         try:
531             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
532             if conf['enabled'] is False:
533                 msg.fatal('disabled in configuration file.\n')
534         except (KeyError, TypeError):
535             pass
536
537         try:
538             for param in BASE_CONFIG:
539                 BASE_CONFIG[param] = conf[param]
540         except (KeyError, TypeError):
541             pass  # use default update_every from NETDATA_UPDATE_EVERY
542
543         try:
544             DEBUG_FLAG = conf['debug']
545         except (KeyError, TypeError):
546             pass
547
548         try:
549             TRACE_FLAG = conf['trace']
550         except (KeyError, TypeError):
551             pass
552
553         try:
554             log_throttle = conf['logs_per_interval']
555         except (KeyError, TypeError):
556             pass
557
558         try:
559             log_interval = conf['log_interval']
560         except (KeyError, TypeError):
561             pass
562
563         default_run = True if ('default_run' not in conf or conf.get('default_run')) else False
564
565         for k, v in conf.items():
566             if k in ("update_every", "debug", "enabled", "default_run"):
567                 continue
568             if default_run:
569                 if v is False:
570                     disabled.append(k)
571             else:
572                 if v is True:
573                     enabled.append(k)
574     # parse passed command line arguments
575     modules = parse_cmdline(MODULES_DIR, *sys.argv)
576     msg.DEBUG_FLAG = DEBUG_FLAG
577     msg.TRACE_FLAG = TRACE_FLAG
578     msg.LOG_THROTTLE = log_throttle
579     msg.LOG_INTERVAL = log_interval
580     msg.LOG_COUNTER = 0
581     msg.LOG_NEXT_CHECK = 0
582     msg.info("MODULES_DIR='" + MODULES_DIR +
583              "', CONFIG_DIR='" + CONFIG_DIR +
584              "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
585              ", ONLY_MODULES=" + str(modules))
586
587     # run plugins
588     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run)
589     charts.check()
590     charts.create()
591     charts.update()
592     msg.fatal("finished")
593
594
595 if __name__ == '__main__':
596     run()