]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
4fdfae34725fb18d0f715ac07c8065fa877cd4ee
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env bash
2 '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
3 # -*- coding: utf-8 -*-
4
5 # Description: netdata python modules supervisor
6 # Author: Pawel Krupa (paulfantom)
7
8 import os
9 import sys
10 import time
11 import threading
12 from re import sub
13
14 # -----------------------------------------------------------------------------
15 # globals & environment setup
16 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
17 MODULE_EXTENSION = ".chart.py"
18 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
19                'priority': 90000,
20                'retries': 10}
21
22 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
23                                         os.path.dirname(__file__)) + "/../python.d") + "/"
24 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
25 # directories should end with '/'
26 if CONFIG_DIR[-1] != "/":
27     CONFIG_DIR += "/"
28 sys.path.append(MODULES_DIR + "python_modules")
29
30 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
31 DEBUG_FLAG = False
32 TRACE_FLAG = False
33 OVERRIDE_UPDATE_EVERY = False
34
35 # -----------------------------------------------------------------------------
36 # custom, third party and version specific python modules management
37 import msg
38
39 try:
40     assert sys.version_info >= (3, 1)
41     import importlib.machinery
42     PY_VERSION = 3
43     # change this hack below if we want PY_VERSION to be used in modules
44     # import builtins
45     # builtins.PY_VERSION = 3
46     msg.info('Using python v3')
47 except (AssertionError, ImportError):
48     try:
49         import imp
50
51         # change this hack below if we want PY_VERSION to be used in modules
52         # import __builtin__
53         # __builtin__.PY_VERSION = 2
54         PY_VERSION = 2
55         msg.info('Using python v2')
56     except ImportError:
57         msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
58 # try:
59 #     import yaml
60 # except ImportError:
61 #     msg.fatal('Cannot find yaml library')
62 try:
63     if PY_VERSION == 3:
64         import pyyaml3 as yaml
65     else:
66         import pyyaml2 as yaml
67 except ImportError:
68     msg.fatal('Cannot find yaml library')
69
70 try:
71     from collections import OrderedDict
72     ORDERED = True
73     DICT = OrderedDict
74     msg.info('YAML output is ordered')
75 except ImportError:
76     try:
77         from ordereddict import OrderedDict
78         ORDERED = True
79         DICT = OrderedDict
80         msg.info('YAML output is ordered')
81     except ImportError:
82         ORDERED = False
83         DICT = dict
84         msg.info('YAML output is unordered')
85 if ORDERED:
86     def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
87         class OrderedLoader(Loader):
88             pass
89         def construct_mapping(loader, node):
90            loader.flatten_mapping(node)
91            return object_pairs_hook(loader.construct_pairs(node))
92         OrderedLoader.add_constructor(
93             yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
94             construct_mapping)
95         return yaml.load(stream, OrderedLoader)
96
97 class PythonCharts(object):
98     """
99     Main class used to control every python module.
100     """
101
102     def __init__(self,
103                  modules=None,
104                  modules_path='../python.d/',
105                  modules_configs='../conf.d/',
106                  modules_disabled=None):
107         """
108         :param modules: list
109         :param modules_path: str
110         :param modules_configs: str
111         :param modules_disabled: list
112         """
113
114         if modules is None:
115             modules = []
116         if modules_disabled is None:
117             modules_disabled = []
118
119         self.first_run = True
120         # set configuration directory
121         self.configs = modules_configs
122
123         # load modules
124         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
125
126         # load configuration files
127         configured_modules = self._load_configs(loaded_modules)
128
129         # good economy and prosperity:
130         self.jobs = self._create_jobs(configured_modules)  # type: list
131
132         # enable timetable override like `python.d.plugin mysql debug 1`
133         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
134             for job in self.jobs:
135                 job.create_timetable(BASE_CONFIG['update_every'])
136
137     @staticmethod
138     def _import_module(path, name=None):
139         """
140         Try to import module using only its path.
141         :param path: str
142         :param name: str
143         :return: object
144         """
145
146         if name is None:
147             name = path.split('/')[-1]
148             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
149                 return None
150             name = name[:-len(MODULE_EXTENSION)]
151         try:
152             if PY_VERSION == 3:
153                 return importlib.machinery.SourceFileLoader(name, path).load_module()
154             else:
155                 return imp.load_source(name, path)
156         except Exception as e:
157             msg.error("Problem loading", name, str(e))
158             return None
159
160     def _load_modules(self, path, modules, disabled):
161         """
162         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
163         :param path: str
164         :param modules: list
165         :param disabled: list
166         :return: list
167         """
168
169         # check if plugin directory exists
170         if not os.path.isdir(path):
171             msg.fatal("cannot find charts directory ", path)
172
173         # load modules
174         loaded = []
175         if len(modules) > 0:
176             for m in modules:
177                 if m in disabled:
178                     continue
179                 mod = self._import_module(path + m + MODULE_EXTENSION)
180                 if mod is not None:
181                     loaded.append(mod)
182                 else:  # exit if plugin is not found
183                     msg.fatal('no modules found.')
184         else:
185             # scan directory specified in path and load all modules from there
186             names = os.listdir(path)
187             for mod in names:
188                 if mod.replace(MODULE_EXTENSION, "") in disabled:
189                     msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
190                     continue
191                 m = self._import_module(path + mod)
192                 if m is not None:
193                     msg.debug(mod + ": loading module '" + path + mod + "'")
194                     loaded.append(m)
195         return loaded
196
197     def _load_configs(self, modules):
198         """
199         Append configuration in list named `config` to every module.
200         For multi-job modules `config` list is created in _parse_config,
201         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
202         :param modules: list
203         :return: list
204         """
205         for mod in modules:
206             configfile = self.configs + mod.__name__ + ".conf"
207             if os.path.isfile(configfile):
208                 msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
209                 try:
210                     if not hasattr(mod, 'config'):
211                         mod.config = {}
212                     setattr(mod,
213                             'config',
214                             self._parse_config(mod, read_config(configfile)))
215                 except Exception as e:
216                     msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
217             else:
218                 msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
219                 # set config if not found
220                 if not hasattr(mod, 'config'):
221                     msg.debug(mod.__name__ + ": setting configuration for only one job")
222                     mod.config = {None: {}}
223                     for var in BASE_CONFIG:
224                         try:
225                             mod.config[None][var] = getattr(mod, var)
226                         except AttributeError:
227                             mod.config[None][var] = BASE_CONFIG[var]
228         return modules
229
230     @staticmethod
231     def _parse_config(module, config):
232         """
233         Parse configuration file or extract configuration from module file.
234         Example of returned dictionary:
235             config = {'name': {
236                             'update_every': 2,
237                             'retries': 3,
238                             'priority': 30000
239                             'other_val': 123}}
240         :param module: object
241         :param config: dict
242         :return: dict
243         """
244         if config is None:
245             config = {}
246         # get default values
247         defaults = {}
248         msg.debug(module.__name__ + ": reading configuration")
249         for key in BASE_CONFIG:
250             try:
251                 # get defaults from module config
252                 defaults[key] = int(config.pop(key))
253             except (KeyError, ValueError):
254                 try:
255                     # get defaults from module source code
256                     defaults[key] = getattr(module, key)
257                 except (KeyError, ValueError, AttributeError):
258                     # if above failed, get defaults from global dict
259                     defaults[key] = BASE_CONFIG[key]
260
261         # check if there are dict in config dict
262         many_jobs = False
263         for name in config:
264             if isinstance(config[name], DICT):
265                 many_jobs = True
266                 break
267
268         # assign variables needed by supervisor to every job configuration
269         if many_jobs:
270             for name in config:
271                 for key in defaults:
272                     if key not in config[name]:
273                         config[name][key] = defaults[key]
274         # if only one job is needed, values doesn't have to be in dict (in YAML)
275         else:
276             config = {None: config.copy()}
277             config[None].update(defaults)
278
279         # return dictionary of jobs where every job has BASE_CONFIG variables
280         return config
281
282     @staticmethod
283     def _create_jobs(modules):
284         """
285         Create jobs based on module.config dictionary and module.Service class definition.
286         :param modules: list
287         :return: list
288         """
289         jobs = []
290         for module in modules:
291             for name in module.config:
292                 # register a new job
293                 conf = module.config[name]
294                 try:
295                     job = module.Service(configuration=conf, name=name)
296                 except Exception as e:
297                     msg.error(module.__name__ +
298                               ("/" + str(name) if name is not None else "") +
299                               ": cannot start job: '" +
300                               str(e))
301                     return None
302                 else:
303                     # set chart_name (needed to plot run time graphs)
304                     job.chart_name = module.__name__
305                     if name is not None:
306                         job.chart_name += "_" + name
307                 jobs.append(job)
308                 msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
309
310         return [j for j in jobs if j is not None]
311
312     def _stop(self, job, reason=None):
313         """
314         Stop specified job and remove it from self.jobs list
315         Also notifies user about job failure if DEBUG_FLAG is set
316         :param job: object
317         :param reason: str
318         """
319         prefix = job.__module__
320         if job.name is not None and len(job.name) != 0:
321             prefix += "/" + job.name
322         try:
323             msg.error("DISABLED:", prefix)
324             self.jobs.remove(job)
325         except Exception as e:
326             msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
327
328         # TODO remove section below and remove `reason`.
329         prefix += ": "
330         if reason is None:
331             return
332         elif reason[:3] == "no ":
333             msg.error(prefix +
334                       "does not seem to have " +
335                       reason[3:] +
336                       "() function. Disabling it.")
337         elif reason[:7] == "failed ":
338             msg.error(prefix +
339                       reason[7:] +
340                       "() function reports failure.")
341         elif reason[:13] == "configuration":
342             msg.error(prefix +
343                       "configuration file '" +
344                       self.configs +
345                       job.__module__ +
346                       ".conf' not found. Using defaults.")
347         elif reason[:11] == "misbehaving":
348             msg.error(prefix + "is " + reason)
349
350     def check(self):
351         """
352         Tries to execute check() on every job.
353         This cannot fail thus it is catching every exception
354         If job.check() fails job is stopped
355         """
356         i = 0
357         overridden = []
358         msg.debug("all job objects", str(self.jobs))
359         while i < len(self.jobs):
360             job = self.jobs[i]
361             try:
362                 if not job.check():
363                     msg.error(job.chart_name, "check() failed - disabling job")
364                     self._stop(job)
365                 else:
366                     msg.info("CHECKED OK:", job.chart_name)
367                     i += 1
368                     try:
369                         if job.override_name is not None:
370                             new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
371                             if new_name in overridden:
372                                 msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
373                                 self._stop(job)
374                                 i -= 1
375                             else:
376                                 job.name = job.override_name
377                                 msg.info("RENAMED:", new_name, ", from " + job.chart_name)
378                                 job.chart_name = new_name
379                                 overridden.append(job.chart_name)
380                     except Exception:
381                         pass
382             except AttributeError as e:
383                 self._stop(job)
384                 msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
385                 msg.debug(str(e))
386             except (UnboundLocalError, Exception) as e:
387                 msg.error(job.chart_name, str(e))
388                 self._stop(job)
389         msg.debug("overridden job names:", str(overridden))
390         msg.debug("all remaining job objects:", str(self.jobs))
391
392     def create(self):
393         """
394         Tries to execute create() on every job.
395         This cannot fail thus it is catching every exception.
396         If job.create() fails job is stopped.
397         This is also creating job run time chart.
398         """
399         i = 0
400         while i < len(self.jobs):
401             job = self.jobs[i]
402             try:
403                 if not job.create():
404                     msg.error(job.chart_name, "create function failed.")
405                     self._stop(job)
406                 else:
407                     chart = job.chart_name
408                     sys.stdout.write(
409                         "CHART netdata.plugin_pythond_" +
410                         chart +
411                         " '' 'Execution time for " +
412                         chart +
413                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
414                         str(job.timetable['freq']) +
415                         '\n')
416                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
417                     msg.debug("created charts for", job.chart_name)
418                     # sys.stdout.flush()
419                     i += 1
420             except AttributeError:
421                 msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
422                 self._stop(job)
423             except (UnboundLocalError, Exception) as e:
424                 msg.error(job.chart_name, str(e))
425                 self._stop(job)
426
427     def update(self):
428         """
429         Creates and supervises every job thread.
430         This will stay forever and ever and ever forever and ever it'll be the one...
431         """
432         for job in self.jobs:
433             job.start()
434
435         while True:
436             if threading.active_count() <= 1:
437                 msg.fatal("no more jobs")
438             time.sleep(1)
439
440
441 def read_config(path):
442     """
443     Read YAML configuration from specified file
444     :param path: str
445     :return: dict
446     """
447     try:
448         with open(path, 'r') as stream:
449             if ORDERED:
450                 config = ordered_load(stream, yaml.SafeLoader)
451             else:
452                 config = yaml.load(stream)
453     except (OSError, IOError):
454         msg.error(str(path), "is not a valid configuration file")
455         return None
456     except yaml.YAMLError as e:
457         msg.error(str(path), "is malformed:", e)
458         return None
459     return config
460
461
462 def parse_cmdline(directory, *commands):
463     """
464     Parse parameters from command line.
465     :param directory: str
466     :param commands: list of str
467     :return: dict
468     """
469     global DEBUG_FLAG, TRACE_FLAG
470     global OVERRIDE_UPDATE_EVERY
471     global BASE_CONFIG
472
473     changed_update = False
474     mods = []
475     for cmd in commands[1:]:
476         if cmd == "check":
477             pass
478         elif cmd == "debug" or cmd == "all":
479             DEBUG_FLAG = True
480             # redirect stderr to stdout?
481         elif cmd == "trace" or cmd == "all":
482             TRACE_FLAG = True
483         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
484             #DEBUG_FLAG = True
485             mods.append(cmd.replace(".chart.py", ""))
486         else:
487             try:
488                 BASE_CONFIG['update_every'] = int(cmd)
489                 changed_update = True
490             except ValueError:
491                 pass
492     if changed_update and DEBUG_FLAG:
493         OVERRIDE_UPDATE_EVERY = True
494         msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
495
496     msg.debug("started from", commands[0], "with options:", *commands[1:])
497
498     return mods
499
500
501 # if __name__ == '__main__':
502 def run():
503     """
504     Main program.
505     """
506     global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
507
508     # read configuration file
509     disabled = ['nginx_log', 'gunicorn_log']
510     configfile = CONFIG_DIR + "python.d.conf"
511     msg.PROGRAM = PROGRAM
512     msg.info("reading configuration file:", configfile)
513     log_throttle = 200
514     log_interval = 3600
515
516     conf = read_config(configfile)
517     if conf is not None:
518         try:
519             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
520             if conf['enabled'] is False:
521                 msg.fatal('disabled in configuration file.\n')
522         except (KeyError, TypeError):
523             pass
524
525         try:
526             for param in BASE_CONFIG:
527                 BASE_CONFIG[param] = conf[param]
528         except (KeyError, TypeError):
529             pass  # use default update_every from NETDATA_UPDATE_EVERY
530
531         try:
532             DEBUG_FLAG = conf['debug']
533         except (KeyError, TypeError):
534             pass
535
536         try:
537             TRACE_FLAG = conf['trace']
538         except (KeyError, TypeError):
539             pass
540
541         try:
542             log_throttle = conf['logs_per_interval']
543         except (KeyError, TypeError):
544             pass
545
546         try:
547             log_interval = conf['log_interval']
548         except (KeyError, TypeError):
549             pass
550
551         for k, v in conf.items():
552             if k in ("update_every", "debug", "enabled"):
553                 continue
554             if v is False:
555                 disabled.append(k)
556
557     # parse passed command line arguments
558     modules = parse_cmdline(MODULES_DIR, *sys.argv)
559     msg.DEBUG_FLAG = DEBUG_FLAG
560     msg.TRACE_FLAG = TRACE_FLAG
561     msg.LOG_THROTTLE = log_throttle
562     msg.LOG_INTERVAL = log_interval
563     msg.LOG_COUNTER = 0
564     msg.LOG_NEXT_CHECK = 0
565     msg.info("MODULES_DIR='" + MODULES_DIR +
566              "', CONFIG_DIR='" + CONFIG_DIR +
567              "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
568              ", ONLY_MODULES=" + str(modules))
569
570     # run plugins
571     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
572     charts.check()
573     charts.create()
574     charts.update()
575     msg.fatal("finished")
576
577
578 if __name__ == '__main__':
579     run()