]> arthur.barton.de Git - netdata.git/blob - plugins.d/python.d.plugin
ability to retry failed job
[netdata.git] / plugins.d / python.d.plugin
1 #!/usr/bin/env python3
2
3 import os
4 import sys
5 import time
6
7 try:
8     assert sys.version_info >= (3, 1)
9     import importlib.machinery
10
11     # change this hack below if we want PY_VERSION to be used in modules
12     # import builtins
13     # builtins.PY_VERSION = 3
14     PY_VERSION = 3
15     sys.stderr.write('python.d.plugin: Using python 3\n')
16 except (AssertionError, ImportError):
17     try:
18         import imp
19
20         # change this hack below if we want PY_VERSION to be used in modules
21         # import __builtin__
22         # __builtin__.PY_VERSION = 2
23         PY_VERSION = 2
24         sys.stderr.write('python.d.plugin: Using python 2\n')
25     except (AssertionError, ImportError):
26         sys.stderr.write('python.d.plugin: Cannot start. No importlib.machinery on python3 or lack of imp on python2\n')
27         sys.stdout.write('DISABLE\n')
28         sys.exit(1)
29 try:
30     import yaml
31 except ImportError:
32     sys.stderr.write('python.d.plugin: Cannot find yaml library\n')
33     sys.stdout.write('DISABLE\n')
34     sys.exit(1)
35
36 DEBUG_FLAG = False
37 PROGRAM = "python.d.plugin"
38 MODULE_EXTENSION = ".chart.py"
39 BASE_CONFIG = {'update_every': 10,
40                'priority': 12345,
41                'retries': 0}
42
43
44 class PythonCharts(object):
45     """
46     Main class used to control every python module.
47     """
48     def __init__(self,
49                  interval=None,
50                  modules=None,
51                  modules_path='../python.d/',
52                  modules_configs='../conf.d/',
53                  modules_disabled=None):
54         """
55         :param interval: int
56         :param modules: list
57         :param modules_path: str
58         :param modules_configs: str
59         :param modules_disabled: list
60         """
61
62         if modules is None:
63             modules = []
64         if modules_disabled is None:
65             modules_disabled = []
66
67         self.first_run = True
68         # set configuration directory
69         self.configs = modules_configs
70
71         # load modules
72         loaded_modules = self._load_modules(modules_path, modules, modules_disabled)
73
74         # load configuration files
75         configured_modules = self._load_configs(loaded_modules)
76
77         # good economy and prosperity:
78         self.jobs = self._create_jobs(configured_modules)  # type: list
79         if DEBUG_FLAG and interval is not None:
80             for job in self.jobs:
81                 job.create_timetable(interval)
82
83     @staticmethod
84     def _import_module(path, name=None):
85         """
86         Try to import module using only its path.
87         :param path: str
88         :param name: str
89         :return: object
90         """
91
92         if name is None:
93             name = path.split('/')[-1]
94             if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
95                 return None
96             name = name[:-len(MODULE_EXTENSION)]
97         try:
98             if PY_VERSION == 3:
99                 return importlib.machinery.SourceFileLoader(name, path).load_module()
100             else:
101                 return imp.load_source(name, path)
102         except Exception as e:
103             debug(str(e))
104             return None
105
106     def _load_modules(self, path, modules, disabled):
107         """
108         Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
109         :param path: str
110         :param modules: list
111         :param disabled: list
112         :return: list
113         """
114
115         # check if plugin directory exists
116         if not os.path.isdir(path):
117             debug("cannot find charts directory ", path)
118             sys.stdout.write("DISABLE\n")
119             sys.exit(1)
120
121         # load modules
122         loaded = []
123         if len(modules) > 0:
124             for m in modules:
125                 if m in disabled:
126                     continue
127                 mod = self._import_module(path + m + MODULE_EXTENSION)
128                 if mod is not None:
129                     loaded.append(mod)
130                 else:  # exit if plugin is not found
131                     sys.stdout.write("DISABLE")
132                     sys.stdout.flush()
133                     sys.exit(1)
134         else:
135             # scan directory specified in path and load all modules from there
136             names = os.listdir(path)
137             for mod in names:
138                 if mod.strip(MODULE_EXTENSION) in disabled:
139                     debug("disabling:", mod.strip(MODULE_EXTENSION))
140                     continue
141                 m = self._import_module(path + mod)
142                 if m is not None:
143                     debug("loading chart: '" + path + mod + "'")
144                     loaded.append(m)
145         return loaded
146
147     def _load_configs(self, modules):
148         """
149         Append configuration in list named `config` to every module.
150         For multi-job modules `config` list is created in _parse_config,
151         otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
152         :param modules: list
153         :return: list
154         """
155
156         for mod in modules:
157             configfile = self.configs + mod.__name__ + ".conf"
158             if os.path.isfile(configfile):
159                 debug("loading chart options: '" + configfile + "'")
160                 try:
161                     setattr(mod,
162                             'config',
163                             self._parse_config(mod, read_config(configfile)))
164                 except Exception as e:
165                     debug("something went wrong while loading configuration", e)
166             else:
167                 debug(mod.__name__ +
168                       ": configuration file '" +
169                       configfile +
170                       "' not found. Using defaults.")
171                 # set config if not found
172                 if not hasattr(mod, 'config'):
173                     mod.config = {None: {}}
174                     for var in BASE_CONFIG:
175                         try:
176                             mod.config[None][var] = getattr(mod, var)
177                         except AttributeError:
178                             mod.config[None][var] = BASE_CONFIG[var]
179         return modules
180
181     @staticmethod
182     def _parse_config(module, config):
183         """
184         Parse configuration file or extract configuration from module file.
185         Example of returned dictionary:
186             config = {'name': {
187                             'update_every': 2,
188                             'retries': 3,
189                             'priority': 30000
190                             'other_val': 123}}
191         :param module: object
192         :param config: dict
193         :return: dict
194         """
195
196         # get default values
197         defaults = {}
198         for key in BASE_CONFIG:
199             try:
200                 # get defaults from module config
201                 defaults[key] = int(config.pop(key))
202             except (KeyError, ValueError):
203                 try:
204                     # get defaults from module source code
205                     defaults[key] = getattr(module, key)
206                 except (KeyError, ValueError):
207                     # if above failed, get defaults from global dict
208                     defaults[key] = BASE_CONFIG[key]
209
210         # check if there are dict in config dict
211         many_jobs = False
212         for name in config:
213             if type(config[name]) is dict:
214                 many_jobs = True
215                 break
216
217         # assign variables needed by supervisor to every job configuration
218         if many_jobs:
219             for name in config:
220                 for key in defaults:
221                     if key not in config[name]:
222                         config[name][key] = defaults[key]
223         # if only one job is needed, values doesn't have to be in dict (in YAML)
224         else:
225             config = {None: config.copy()}
226             config[None].update(defaults)
227
228         # return dictionary of jobs where every job has BASE_CONFIG variables
229         return config
230
231     @staticmethod
232     def _create_jobs(modules):
233         """
234         Create jobs based on module.config dictionary and module.Service class definition.
235         :param modules: list
236         :return: list
237         """
238         jobs = []
239         for module in modules:
240             for name in module.config:
241                 # register a new job
242                 conf = module.config[name]
243                 try:
244                     job = module.Service(configuration=conf, name=name)
245                 except Exception as e:
246                     debug(module.__name__ +
247                           ": Couldn't start job named " +
248                           str(name) +
249                           ": " +
250                           str(e))
251                     return None
252                 else:
253                     # set execution_name (needed to plot run time graphs)
254                     job.execution_name = module.__name__
255                     if name is not None:
256                         job.execution_name += "_" + name
257                 jobs.append(job)
258
259         return [j for j in jobs if j is not None]
260
261     def _stop(self, job, reason=None):
262         """
263         Stop specified job and remove it from self.jobs list
264         Also notifies user about job failure if DEBUG_FLAG is set
265         :param job: object
266         :param reason: str
267         """
268         self.jobs.remove(job)
269         if reason is None:
270             return
271         elif reason[:3] == "no ":
272             debug("chart '" +
273                   job.execution_name,
274                   "' does not seem to have " +
275                   reason[3:] +
276                   "() function. Disabling it.")
277         elif reason[:7] == "failed ":
278             debug("chart '" +
279                   job.execution_name + "' " +
280                   reason[7:] +
281                   "() function reports failure.")
282         elif reason[:13] == "configuration":
283             debug(job.execution_name,
284                   "configuration file '" +
285                   self.configs +
286                   job.execution_name +
287                   ".conf' not found. Using defaults.")
288         elif reason[:11] == "misbehaving":
289             debug(job.execution_name, "is " + reason)
290
291     def check(self):
292         """
293         Tries to execute check() on every job.
294         This cannot fail thus it is catching every exception
295         If job.check() fails job is stopped
296         """
297         for job in self.jobs:
298             try:
299                 if not job.check():
300                     self._stop(job, "failed check")
301             except AttributeError:
302                 self._stop(job, "no check")
303             except (UnboundLocalError, Exception) as e:
304                 self._stop(job, "misbehaving. Reason: " + str(e))
305
306     def create(self):
307         """
308         Tries to execute create() on every job.
309         This cannot fail thus it is catching every exception.
310         If job.create() fails job is stopped.
311         This is also creating job run time chart.
312         """
313         for job in self.jobs:
314             try:
315                 if not job.create():
316                     self._stop(job, "failed create")
317                 else:
318                     chart = job.execution_name
319                     sys.stdout.write(
320                         "CHART netdata.plugin_pythond_" +
321                         chart +
322                         " '' 'Execution time for " +
323                         chart +
324                         " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
325                         str(job.timetable['freq']) +
326                         '\n')
327                     sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
328                     sys.stdout.flush()
329             except AttributeError:
330                 self._stop(job, "no create")
331             except (UnboundLocalError, Exception) as e:
332                 self._stop(job, "misbehaving. Reason: " + str(e))
333
334     def _update_job(self, job):
335         """
336         Tries to execute update() on specified job.
337         This cannot fail thus it is catching every exception.
338         If job.update() returns False, number of retries_left is decremented.
339         If there are no more retries, job is stopped.
340         Job is also stopped if it throws an exception.
341         This is also updating job run time chart.
342         :param job: object
343         """
344         t_start = time.time()
345         # check if it is time to execute job update() function
346         if job.timetable['next'] > t_start:
347             return
348         try:
349             if self.first_run:
350                 since_last = 0
351             else:
352                 since_last = int((t_start - job.timetable['last']) * 1000000)
353             if not job.update(since_last):
354                 if job.retries_left <= 0:
355                     self._stop(job, "update failed")
356                 job.retries_left -= 1
357                 job.timetable['next'] += job.timetable['freq']
358                 return
359         except AttributeError:
360             self._stop(job, "no update")
361             return
362         except (UnboundLocalError, Exception) as e:
363             self._stop(job, "misbehaving. Reason: " + str(e))
364             return
365         t_end = time.time()
366         job.timetable['next'] = t_end - (t_end % job.timetable['freq']) + job.timetable['freq']
367         # draw performance graph
368         sys.stdout.write("BEGIN netdata.plugin_pythond_" + job.execution_name + " " + str(since_last) + '\n')
369         sys.stdout.write("SET run_time = " + str(int((t_end - t_start) * 1000)) + '\n')
370         sys.stdout.write("END\n")
371         sys.stdout.flush()
372         job.timetable['last'] = t_start
373         job.retries_left = job.retries
374         self.first_run = False
375
376     def update(self):
377         """
378         Tries to execute update() on every job by using _update_job()
379         This will stay forever and ever and ever forever and ever it'll be the one...
380         """
381         self.first_run = True
382         while True:
383             next_runs = []
384             for job in self.jobs:
385                 self._update_job(job)
386                 try:
387                     next_runs.append(job.timetable['next'])
388                 except KeyError:
389                     pass
390             if len(next_runs) == 0:
391                 debug("No plugins loaded")
392                 sys.stdout.write("DISABLE\n")
393                 sys.exit(1)
394             time.sleep(min(next_runs) - time.time())
395
396
397 def read_config(path):
398     """
399     Read YAML configuration from specified file
400     :param path: str
401     :return: dict
402     """
403     try:
404         with open(path, 'r') as stream:
405             config = yaml.load(stream)
406     except (OSError, IOError):
407         debug(str(path), "is not a valid configuration file")
408         return None
409     except yaml.YAMLError as e:
410         debug(str(path), "is malformed:", e)
411         return None
412     return config
413
414
415 def debug(*args):
416     """
417     Print message on stderr.
418     """
419     if not DEBUG_FLAG:
420         return
421     sys.stderr.write(PROGRAM + ":")
422     for i in args:
423         sys.stderr.write(" " + str(i))
424     sys.stderr.write("\n")
425     sys.stderr.flush()
426
427
428 def parse_cmdline(directory, *commands):
429     """
430     Parse parameters from command line.
431     :param directory: str
432     :param commands: list of str
433     :return: dict
434     """
435     global DEBUG_FLAG
436     interval = None
437
438     mods = []
439     for cmd in commands[1:]:
440         if cmd == "check":
441             pass
442         elif cmd == "debug" or cmd == "all":
443             DEBUG_FLAG = True
444             # redirect stderr to stdout?
445         elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
446             DEBUG_FLAG = True
447             mods.append(cmd.replace(".chart.py", ""))
448         else:
449             DEBUG_FLAG = False
450             try:
451                 interval = int(cmd)
452             except ValueError:
453                 pass
454
455     debug("started from", commands[0], "with options:", *commands[1:])
456     if len(mods) == 0 and DEBUG_FLAG is False:
457         interval = None
458
459     return {'interval': interval,
460             'modules': mods}
461
462
463 # if __name__ == '__main__':
464 def run():
465     """
466     Main program.
467     """
468     global PROGRAM, DEBUG_FLAG
469     PROGRAM = sys.argv[0].split('/')[-1].split('.plugin')[0]
470     # parse env variables
471     # https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
472     main_dir = os.getenv('NETDATA_PLUGINS_DIR',
473                          os.path.abspath(__file__).strip("python.d.plugin.py"))
474     config_dir = os.getenv('NETDATA_CONFIG_DIR', "/etc/netdata/")
475     interval = os.getenv('NETDATA_UPDATE_EVERY', None)
476
477     # read configuration file
478     disabled = []
479     if config_dir[-1] != '/':
480         config_dir += '/'
481     configfile = config_dir + "python.d.conf"
482
483     conf = read_config(configfile)
484     if conf is not None:
485         try:
486             if str(conf['enable']) is False:
487                 debug("disabled in configuration file")
488                 sys.stdout.write("DISABLE\n")
489                 sys.exit(1)
490         except (KeyError, TypeError):
491             pass
492         try:
493             modules_conf = conf['plugins_config_dir']
494         except (KeyError, TypeError):
495             modules_conf = config_dir + "python.d/"  # default configuration directory
496         try:
497             modules_dir = conf['plugins_dir']
498         except (KeyError, TypeError):
499             modules_dir = main_dir.replace("plugins.d", "python.d")
500         try:
501             interval = conf['interval']
502         except (KeyError, TypeError):
503             pass  # use default interval from NETDATA_UPDATE_EVERY
504         try:
505             DEBUG_FLAG = conf['debug']
506         except (KeyError, TypeError):
507             pass
508         for k, v in conf.items():
509             if k in ("plugins_config_dir", "plugins_dir", "interval", "debug"):
510                 continue
511             if v is False:
512                 disabled.append(k)
513     else:
514         modules_conf = config_dir + "python.d/"
515         modules_dir = main_dir.replace("plugins.d", "python.d")
516
517     # directories should end with '/'
518     if modules_dir[-1] != '/':
519         modules_dir += "/"
520     if modules_conf[-1] != '/':
521         modules_conf += "/"
522
523     # parse passed command line arguments
524     out = parse_cmdline(modules_dir, *sys.argv)
525     modules = out['modules']
526     if out['interval'] is not None:
527         interval = out['interval']
528
529     # configure environment to run modules
530     sys.path.append(modules_dir + "python_modules")  # append path to directory with modules dependencies
531
532     # run plugins
533     charts = PythonCharts(interval, modules, modules_dir, modules_conf, disabled)
534     charts.check()
535     charts.create()
536     charts.update()
537     sys.stdout.write("DISABLE")
538
539
540 if __name__ == '__main__':
541     run()