]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
Merge pull request #1555 from pjz/ipfs_health
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env bash
2 '''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
3 # -*- coding: utf-8 -*-
4
5 # Description: netdata python modules supervisor
6 # Author: Pawel Krupa (paulfantom)
7
8 import os
9 import sys
10 import time
11 import threading
12 from re import sub
13
14 # -----------------------------------------------------------------------------
15 # globals & environment setup
16 # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
17 MODULE_EXTENSION = ".chart.py"
18 BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
19                'priority': 90000,
20                'retries': 10}
21
22 MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
23                                         os.path.dirname(__file__)) + "/../python.d") + "/"
24 CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
25 # directories should end with '/'
26 if CONFIG_DIR[-1] != "/":
27     CONFIG_DIR += "/"
28 sys.path.append(MODULES_DIR + "python_modules")
29
30 PROGRAM = os.path.basename(__file__).replace(".plugin", "")
31 DEBUG_FLAG = False
32 TRACE_FLAG = False
33 OVERRIDE_UPDATE_EVERY = False
34
35 # -----------------------------------------------------------------------------
36 # custom, third party and version specific python modules management
37 import msg
38
39 try:
40     assert sys.version_info >= (3, 1)
41     import importlib.machinery
42     PY_VERSION = 3
43     # change this hack below if we want PY_VERSION to be used in modules
44     # import builtins
45     # builtins.PY_VERSION = 3
46     msg.info('Using python v3')
47 except (AssertionError, ImportError):
48     try:
49         import imp
50
51         # change this hack below if we want PY_VERSION to be used in modules
52         # import __builtin__
53         # __builtin__.PY_VERSION = 2
54         PY_VERSION = 2
55         msg.info('Using python v2')
56     except ImportError:
57         msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
58 # try:
59 #     import yaml
60 # except ImportError:
61 #     msg.fatal('Cannot find yaml library')
62 try:
63     if PY_VERSION == 3:
64         import pyyaml3 as yaml
65     else:
66         import pyyaml2 as yaml
67 except ImportError:
68     msg.fatal('Cannot find yaml library')
69
70
71 class PythonCharts(object):
72     """
73     Main class used to control every python module.
74     """
75
76     def __init__(self,
77                  modules=None,
78                  modules_path='../python.d/',
79                  modules_configs='../conf.d/',
80                  modules_disabled=None):
81         """
82         :param modules: list
83         :param modules_path: str
84         :param modules_configs: str
85         :param modules_disabled: list
86         """
87
88         if modules is None:
89             modules = []
90         if modules_disabled is None:
91             modules_disabled = []
92
93         self.first_run = True
94         # set configuration directory
95         self.configs = modules_configs
96
97         # load modules
98         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
99
100         # load configuration files
101         configured_modules = self._load_configs(loaded_modules)
102
103         # good economy and prosperity:
104         self.jobs = self._create_jobs(configured_modules)  # type: list
105
106         # enable timetable override like `python.d.plugin mysql debug 1`
107         if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
108             for job in self.jobs:
109                 job.create_timetable(BASE_CONFIG['update_every'])
110
111     @staticmethod
112     def _import_module(path, name=None):
113         """
114         Try to import module using only its path.
115         :param path: str
116         :param name: str
117         :return: object
118         """
119
120         if name is None:
121             name = path.split('/')[-1]
122             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
123                 return None
124             name = name[:-len(MODULE_EXTENSION)]
125         try:
126             if PY_VERSION == 3:
127                 return importlib.machinery.SourceFileLoader(name, path).load_module()
128             else:
129                 return imp.load_source(name, path)
130         except Exception as e:
131             msg.error("Problem loading", name, str(e))
132             return None
133
134     def _load_modules(self, path, modules, disabled):
135         """
136         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
137         :param path: str
138         :param modules: list
139         :param disabled: list
140         :return: list
141         """
142
143         # check if plugin directory exists
144         if not os.path.isdir(path):
145             msg.fatal("cannot find charts directory ", path)
146
147         # load modules
148         loaded = []
149         if len(modules) > 0:
150             for m in modules:
151                 if m in disabled:
152                     continue
153                 mod = self._import_module(path + m + MODULE_EXTENSION)
154                 if mod is not None:
155                     loaded.append(mod)
156                 else:  # exit if plugin is not found
157                     msg.fatal('no modules found.')
158         else:
159             # scan directory specified in path and load all modules from there
160             names = os.listdir(path)
161             for mod in names:
162                 if mod.replace(MODULE_EXTENSION, "") in disabled:
163                     msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
164                     continue
165                 m = self._import_module(path + mod)
166                 if m is not None:
167                     msg.debug(mod + ": loading module '" + path + mod + "'")
168                     loaded.append(m)
169         return loaded
170
171     def _load_configs(self, modules):
172         """
173         Append configuration in list named `config` to every module.
174         For multi-job modules `config` list is created in _parse_config,
175         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
176         :param modules: list
177         :return: list
178         """
179         for mod in modules:
180             configfile = self.configs + mod.__name__ + ".conf"
181             if os.path.isfile(configfile):
182                 msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
183                 try:
184                     if not hasattr(mod, 'config'):
185                         mod.config = {}
186                     setattr(mod,
187                             'config',
188                             self._parse_config(mod, read_config(configfile)))
189                 except Exception as e:
190                     msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
191             else:
192                 msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
193                 # set config if not found
194                 if not hasattr(mod, 'config'):
195                     msg.debug(mod.__name__ + ": setting configuration for only one job")
196                     mod.config = {None: {}}
197                     for var in BASE_CONFIG:
198                         try:
199                             mod.config[None][var] = getattr(mod, var)
200                         except AttributeError:
201                             mod.config[None][var] = BASE_CONFIG[var]
202         return modules
203
204     @staticmethod
205     def _parse_config(module, config):
206         """
207         Parse configuration file or extract configuration from module file.
208         Example of returned dictionary:
209             config = {'name': {
210                             'update_every': 2,
211                             'retries': 3,
212                             'priority': 30000
213                             'other_val': 123}}
214         :param module: object
215         :param config: dict
216         :return: dict
217         """
218         if config is None:
219             config = {}
220         # get default values
221         defaults = {}
222         msg.debug(module.__name__ + ": reading configuration")
223         for key in BASE_CONFIG:
224             try:
225                 # get defaults from module config
226                 defaults[key] = int(config.pop(key))
227             except (KeyError, ValueError):
228                 try:
229                     # get defaults from module source code
230                     defaults[key] = getattr(module, key)
231                 except (KeyError, ValueError, AttributeError):
232                     # if above failed, get defaults from global dict
233                     defaults[key] = BASE_CONFIG[key]
234
235         # check if there are dict in config dict
236         many_jobs = False
237         for name in config:
238             if type(config[name]) is dict:
239                 many_jobs = True
240                 break
241
242         # assign variables needed by supervisor to every job configuration
243         if many_jobs:
244             for name in config:
245                 for key in defaults:
246                     if key not in config[name]:
247                         config[name][key] = defaults[key]
248         # if only one job is needed, values doesn't have to be in dict (in YAML)
249         else:
250             config = {None: config.copy()}
251             config[None].update(defaults)
252
253         # return dictionary of jobs where every job has BASE_CONFIG variables
254         return config
255
256     @staticmethod
257     def _create_jobs(modules):
258         """
259         Create jobs based on module.config dictionary and module.Service class definition.
260         :param modules: list
261         :return: list
262         """
263         jobs = []
264         for module in modules:
265             for name in module.config:
266                 # register a new job
267                 conf = module.config[name]
268                 try:
269                     job = module.Service(configuration=conf, name=name)
270                 except Exception as e:
271                     msg.error(module.__name__ +
272                               ("/" + str(name) if name is not None else "") +
273                               ": cannot start job: '" +
274                               str(e))
275                     return None
276                 else:
277                     # set chart_name (needed to plot run time graphs)
278                     job.chart_name = module.__name__
279                     if name is not None:
280                         job.chart_name += "_" + name
281                 jobs.append(job)
282                 msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
283
284         return [j for j in jobs if j is not None]
285
286     def _stop(self, job, reason=None):
287         """
288         Stop specified job and remove it from self.jobs list
289         Also notifies user about job failure if DEBUG_FLAG is set
290         :param job: object
291         :param reason: str
292         """
293         prefix = job.__module__
294         if job.name is not None and len(job.name) != 0:
295             prefix += "/" + job.name
296         try:
297             msg.error("DISABLED:", prefix)
298             self.jobs.remove(job)
299         except Exception as e:
300             msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
301
302         # TODO remove section below and remove `reason`.
303         prefix += ": "
304         if reason is None:
305             return
306         elif reason[:3] == "no ":
307             msg.error(prefix +
308                       "does not seem to have " +
309                       reason[3:] +
310                       "() function. Disabling it.")
311         elif reason[:7] == "failed ":
312             msg.error(prefix +
313                       reason[7:] +
314                       "() function reports failure.")
315         elif reason[:13] == "configuration":
316             msg.error(prefix +
317                       "configuration file '" +
318                       self.configs +
319                       job.__module__ +
320                       ".conf' not found. Using defaults.")
321         elif reason[:11] == "misbehaving":
322             msg.error(prefix + "is " + reason)
323
324     def check(self):
325         """
326         Tries to execute check() on every job.
327         This cannot fail thus it is catching every exception
328         If job.check() fails job is stopped
329         """
330         i = 0
331         overridden = []
332         msg.debug("all job objects", str(self.jobs))
333         while i < len(self.jobs):
334             job = self.jobs[i]
335             try:
336                 if not job.check():
337                     msg.error(job.chart_name, "check() failed - disabling job")
338                     self._stop(job)
339                 else:
340                     msg.info("CHECKED OK:", job.chart_name)
341                     i += 1
342                     try:
343                         if job.override_name is not None:
344                             new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
345                             if new_name in overridden:
346                                 msg.info("DROPPED:", job.name, ", job '" + job.override_name + "' is already served by another job.")
347                                 self._stop(job)
348                                 i -= 1
349                             else:
350                                 job.name = job.override_name
351                                 msg.info("RENAMED:", new_name, ", from " + job.chart_name)
352                                 job.chart_name = new_name
353                                 overridden.append(job.chart_name)
354                     except Exception:
355                         pass
356             except AttributeError as e:
357                 self._stop(job)
358                 msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
359                 msg.debug(str(e))
360             except (UnboundLocalError, Exception) as e:
361                 msg.error(job.chart_name, str(e))
362                 self._stop(job)
363         msg.debug("overridden job names:", str(overridden))
364         msg.debug("all remaining job objects:", str(self.jobs))
365
366     def create(self):
367         """
368         Tries to execute create() on every job.
369         This cannot fail thus it is catching every exception.
370         If job.create() fails job is stopped.
371         This is also creating job run time chart.
372         """
373         i = 0
374         while i < len(self.jobs):
375             job = self.jobs[i]
376             try:
377                 if not job.create():
378                     msg.error(job.chart_name, "create function failed.")
379                     self._stop(job)
380                 else:
381                     chart = job.chart_name
382                     sys.stdout.write(
383                         "CHART netdata.plugin_pythond_" +
384                         chart +
385                         " '' 'Execution time for " +
386                         chart +
387                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
388                         str(job.timetable['freq']) +
389                         '\n')
390                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
391                     msg.debug("created charts for", job.chart_name)
392                     # sys.stdout.flush()
393                     i += 1
394             except AttributeError:
395                 msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
396                 self._stop(job)
397             except (UnboundLocalError, Exception) as e:
398                 msg.error(job.chart_name, str(e))
399                 self._stop(job)
400
401     def update(self):
402         """
403         Creates and supervises every job thread.
404         This will stay forever and ever and ever forever and ever it'll be the one...
405         """
406         for job in self.jobs:
407             job.start()
408
409         while True:
410             if threading.active_count() <= 1:
411                 msg.fatal("no more jobs")
412             time.sleep(1)
413
414
415 def read_config(path):
416     """
417     Read YAML configuration from specified file
418     :param path: str
419     :return: dict
420     """
421     try:
422         with open(path, 'r') as stream:
423             config = yaml.load(stream)
424     except (OSError, IOError):
425         msg.error(str(path), "is not a valid configuration file")
426         return None
427     except yaml.YAMLError as e:
428         msg.error(str(path), "is malformed:", e)
429         return None
430     return config
431
432
433 def parse_cmdline(directory, *commands):
434     """
435     Parse parameters from command line.
436     :param directory: str
437     :param commands: list of str
438     :return: dict
439     """
440     global DEBUG_FLAG, TRACE_FLAG
441     global OVERRIDE_UPDATE_EVERY
442     global BASE_CONFIG
443
444     changed_update = False
445     mods = []
446     for cmd in commands[1:]:
447         if cmd == "check":
448             pass
449         elif cmd == "debug" or cmd == "all":
450             DEBUG_FLAG = True
451             # redirect stderr to stdout?
452         elif cmd == "trace" or cmd == "all":
453             TRACE_FLAG = True
454         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
455             #DEBUG_FLAG = True
456             mods.append(cmd.replace(".chart.py", ""))
457         else:
458             try:
459                 BASE_CONFIG['update_every'] = int(cmd)
460                 changed_update = True
461             except ValueError:
462                 pass
463     if changed_update and DEBUG_FLAG:
464         OVERRIDE_UPDATE_EVERY = True
465         msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
466
467     msg.debug("started from", commands[0], "with options:", *commands[1:])
468
469     return mods
470
471
472 # if __name__ == '__main__':
473 def run():
474     """
475     Main program.
476     """
477     global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
478
479     # read configuration file
480     disabled = []
481     configfile = CONFIG_DIR + "python.d.conf"
482     msg.PROGRAM = PROGRAM
483     msg.info("reading configuration file:", configfile)
484     log_throttle = 200
485     log_interval = 3600
486
487     conf = read_config(configfile)
488     if conf is not None:
489         try:
490             # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
491             if conf['enabled'] is False:
492                 msg.fatal('disabled in configuration file.\n')
493         except (KeyError, TypeError):
494             pass
495
496         try:
497             for param in BASE_CONFIG:
498                 BASE_CONFIG[param] = conf[param]
499         except (KeyError, TypeError):
500             pass  # use default update_every from NETDATA_UPDATE_EVERY
501
502         try:
503             DEBUG_FLAG = conf['debug']
504         except (KeyError, TypeError):
505             pass
506
507         try:
508             TRACE_FLAG = conf['trace']
509         except (KeyError, TypeError):
510             pass
511
512         try:
513             log_throttle = conf['logs_per_interval']
514         except (KeyError, TypeError):
515             pass
516
517         try:
518             log_interval = conf['log_interval']
519         except (KeyError, TypeError):
520             pass
521
522         for k, v in conf.items():
523             if k in ("update_every", "debug", "enabled"):
524                 continue
525             if v is False:
526                 disabled.append(k)
527
528     # parse passed command line arguments
529     modules = parse_cmdline(MODULES_DIR, *sys.argv)
530     msg.DEBUG_FLAG = DEBUG_FLAG
531     msg.TRACE_FLAG = TRACE_FLAG
532     msg.LOG_THROTTLE = log_throttle
533     msg.LOG_INTERVAL = log_interval
534     msg.LOG_COUNTER = 0
535     msg.LOG_NEXT_CHECK = 0
536     msg.info("MODULES_DIR='" + MODULES_DIR +
537              "', CONFIG_DIR='" + CONFIG_DIR +
538              "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
539              ", ONLY_MODULES=" + str(modules))
540
541     # run plugins
542     charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled)
543     charts.check()
544     charts.create()
545     charts.update()
546     msg.fatal("finished")
547
548
549 if __name__ == '__main__':
550     run()