]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
Merge pull request #661 from Flums/patch-5
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
4
5 import time
6 import sys
7 import os
8 try:
9     from urllib.request import urlopen
10 except ImportError:
11     from urllib2 import urlopen
12
13 # from subprocess import STDOUT, PIPE, Popen
14 import threading
15 import msg
16
17
18 class BaseService(threading.Thread):
19     """
20     Prototype of Service class.
21     Implemented basic functionality to run jobs by `python.d.plugin`
22     """
23     def __init__(self, configuration=None, name=None):
24         """
25         This needs to be initialized in child classes
26         :param configuration: dict
27         :param name: str
28         """
29         threading.Thread.__init__(self)
30         self._data_stream = ""
31         self.daemon = True
32         self.retries = 0
33         self.retries_left = 0
34         self.priority = 140000
35         self.update_every = 1
36         self.name = name
37         self.override_name = None
38         self.chart_name = ""
39         self._dimensions = []
40         self._charts = []
41         if configuration is None:
42             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
43             raise RuntimeError
44         else:
45             self._extract_base_config(configuration)
46             self.timetable = {}
47             self.create_timetable()
48
49     def _extract_base_config(self, config):
50         """
51         Get basic parameters to run service
52         Minimum config:
53             config = {'update_every':1,
54                       'priority':100000,
55                       'retries':0}
56         :param config: dict
57         """
58         try:
59             self.override_name = config.pop('override_name')
60         except KeyError:
61             pass
62         self.update_every = int(config.pop('update_every'))
63         self.priority = int(config.pop('priority'))
64         self.retries = int(config.pop('retries'))
65         self.retries_left = self.retries
66         self.configuration = config
67
68     def create_timetable(self, freq=None):
69         """
70         Create service timetable.
71         `freq` is optional
72         Example:
73             timetable = {'last': 1466370091.3767564,
74                          'next': 1466370092,
75                          'freq': 1}
76         :param freq: int
77         """
78         if freq is None:
79             freq = self.update_every
80         now = time.time()
81         self.timetable = {'last': now,
82                           'next': now - (now % freq) + freq,
83                           'freq': freq}
84
85     def _run_once(self):
86         """
87         Executes self.update(interval) and draws run time chart.
88         Return value presents exit status of update()
89         :return: boolean
90         """
91         t_start = time.time()
92         # check if it is time to execute job update() function
93         if self.timetable['next'] > t_start:
94             msg.debug(self.chart_name + " will be run in " +
95                       str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
96             return True
97
98         since_last = int((t_start - self.timetable['last']) * 1000000)
99         msg.debug(self.chart_name +
100                   " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
101                   " ms (update_every: " + str(self.timetable['freq'] * 1000) +
102                   " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
103         if not self.update(since_last):
104             return False
105         t_end = time.time()
106         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
107
108         # draw performance graph
109         run_time = str(int((t_end - t_start) * 1000))
110         run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
111         run_time_chart += "SET run_time = " + run_time + '\n'
112         run_time_chart += "END\n"
113         sys.stdout.write(run_time_chart)
114         msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
115         self.timetable['last'] = t_start
116         return True
117
118     def run(self):
119         """
120         Runs job in thread. Handles retries.
121         Exits when job failed or timed out.
122         :return: None
123         """
124         self.timetable['last'] = time.time()
125         while True:
126             try:
127                 status = self._run_once()
128             except Exception as e:
129                 msg.error("Something wrong: " + str(e))
130                 return
131             if status:
132                 time.sleep(self.timetable['next'] - time.time())
133                 self.retries_left = self.retries
134             else:
135                 self.retries -= 1
136                 if self.retries_left <= 0:
137                     msg.error("no more retries. Exiting")
138                     return
139                 else:
140                     time.sleep(self.timetable['freq'])
141
142     def _line(self, instruction, *params):
143         """
144         Converts *params to string and joins them with one space between every one.
145         :param params: str/int/float
146         """
147         self._data_stream += instruction
148         for p in params:
149             if p is None:
150                 p = ""
151             else:
152                 p = str(p)
153             if len(p) == 0:
154                 p = "''"
155             if ' ' in p:
156                 p = "'" + p + "'"
157             self._data_stream += " " + p
158         self._data_stream += "\n"
159
160     def chart(self, type_id, name="", title="", units="", family="",
161               category="", charttype="line", priority="", update_every=""):
162         """
163         Defines a new chart.
164         :param type_id: str
165         :param name: str
166         :param title: str
167         :param units: str
168         :param family: str
169         :param category: str
170         :param charttype: str
171         :param priority: int/str
172         :param update_every: int/str
173         """
174         self._charts.append(type_id)
175         self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
176
177     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
178         """
179         Defines a new dimension for the chart
180         :param id: str
181         :param name: str
182         :param algorithm: str
183         :param multiplier: int/str
184         :param divisor: int/str
185         :param hidden: boolean
186         :return:
187         """
188         try:
189             int(multiplier)
190         except TypeError:
191             self.error("malformed dimension: multiplier is not a number:", multiplier)
192             multiplier = 1
193         try:
194             int(divisor)
195         except TypeError:
196             self.error("malformed dimension: divisor is not a number:", divisor)
197             divisor = 1
198         if name is None:
199             name = id
200         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
201             algorithm = "absolute"
202
203         self._dimensions.append(id)
204         if hidden:
205             self._line("DIMENSION", id, name, algorithm, multiplier, divisor, "hidden")
206         else:
207             self._line("DIMENSION", id, name, algorithm, multiplier, divisor)
208
209     def begin(self, type_id, microseconds=0):
210         """
211         Begin data set
212         :param type_id: str
213         :param microseconds: int
214         :return: boolean
215         """
216         if type_id not in self._charts:
217             self.error("wrong chart type_id:", type_id)
218             return False
219         try:
220             int(microseconds)
221         except TypeError:
222             self.error("malformed begin statement: microseconds are not a number:", microseconds)
223             microseconds = ""
224
225         self._line("BEGIN", type_id, microseconds)
226         return True
227
228     def set(self, id, value):
229         """
230         Set value to dimension
231         :param id: str
232         :param value: int/float
233         :return: boolean
234         """
235         if id not in self._dimensions:
236             self.error("wrong dimension id:", id)
237             return False
238         try:
239             value = str(int(value))
240         except TypeError:
241             self.error("cannot set non-numeric value:", value)
242             return False
243         self._line("SET", id, "=", value)
244         return True
245
246     def end(self):
247         self._line("END")
248
249     def commit(self):
250         """
251         Upload new data to netdata
252         """
253         print(self._data_stream)
254         self._data_stream = ""
255
256     def error(self, *params):
257         """
258         Show error message on stderr
259         """
260         msg.error(self.chart_name, *params)
261
262     def debug(self, *params):
263         """
264         Show debug message on stderr
265         """
266         msg.debug(self.chart_name, *params)
267
268     def info(self, *params):
269         """
270         Show information message on stderr
271         """
272         msg.info(self.chart_name, *params)
273
274     def check(self):
275         """
276         check() prototype
277         :return: boolean
278         """
279         msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
280         return False
281
282     def create(self):
283         """
284         create() prototype
285         :return: boolean
286         """
287         msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
288         return False
289
290     def update(self, interval):
291         """
292         update() prototype
293         :param interval: int
294         :return: boolean
295         """
296         msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
297         return False
298
299
300 class SimpleService(BaseService):
301     def __init__(self, configuration=None, name=None):
302         self.order = []
303         self.definitions = {}
304         BaseService.__init__(self, configuration=configuration, name=name)
305
306     def _get_data(self):
307         """
308         Get raw data from http request
309         :return: str
310         """
311         return ""
312
313     def _format_data(self):
314         """
315         Format data received from http request
316         :return: dict
317         """
318         return {}
319
320     def check(self):
321         """
322         :return:
323         """
324         return True
325
326     def create(self):
327         """
328         Create charts
329         :return: boolean
330         """
331         data = self._format_data()
332         if data is None:
333             return False
334
335         idx = 0
336         for name in self.order:
337             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
338             self.chart(self.__module__ + "_" + self.name + "." + name, *options)
339             # check if server has this datapoint
340             for line in self.definitions[name]['lines']:
341                 if line[0] in data:
342                     self.dimension(*line)
343             idx += 1
344
345         self.commit()
346         return True
347
348     def update(self, interval):
349         """
350         Update charts
351         :param interval: int
352         :return: boolean
353         """
354         data = self._format_data()
355         if data is None:
356             return False
357
358         updated = False
359         for chart in self.order:
360             if self.begin(self.__module__ + "_" + str(self.name) + "." + chart, interval):
361                 updated = True
362                 for dim in self.definitions[chart]['lines']:
363                     try:
364                         self.set(dim[0], data[dim[0]])
365                     except KeyError:
366                         pass
367                 self.end()
368
369         self.commit()
370
371         return updated
372
373
374 class UrlService(SimpleService):
375     def __init__(self, configuration=None, name=None):
376         # definitions are created dynamically in create() method based on 'charts' dictionary. format:
377         # definitions = {
378         #     'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
379         # }
380         self.url = ""
381         SimpleService.__init__(self, configuration=configuration, name=name)
382
383     def _get_data(self):
384         """
385         Get raw data from http request
386         :return: str
387         """
388         raw = None
389         try:
390             f = urlopen(self.url, timeout=self.update_every)
391             raw = f.read().decode('utf-8')
392         except Exception as e:
393             msg.error(self.__module__, str(e))
394         finally:
395             try:
396                 f.close()
397             except:
398                 pass
399         return raw
400
401     def check(self):
402         """
403         Format configuration data and try to connect to server
404         :return: boolean
405         """
406         if self.name is None or self.name == str(None):
407             self.name = 'local'
408         else:
409             self.name = str(self.name)
410         try:
411             self.url = str(self.configuration['url'])
412         except (KeyError, TypeError):
413             pass
414
415         if self._format_data() is not None:
416             return True
417         else:
418             return False
419
420
421 class LogService(SimpleService):
422     def __init__(self, configuration=None, name=None):
423         # definitions are created dynamically in create() method based on 'charts' dictionary. format:
424         # definitions = {
425         #     'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
426         # }
427         self.log_path = ""
428         self._last_position = 0
429         # self._log_reader = None
430         SimpleService.__init__(self, configuration=configuration, name=name)
431         self.retries = 100000  # basically always retry
432
433     def _get_data(self):
434         lines = []
435         try:
436             if os.path.getsize(self.log_path) < self._last_position:
437                 self._last_position = 0
438             elif os.path.getsize(self.log_path) == self._last_position:
439                 return None
440             with open(self.log_path, "r") as fp:
441                 fp.seek(self._last_position)
442                 for i, line in enumerate(fp):
443                     lines.append(line)
444                 self._last_position = fp.tell()
445         except Exception as e:
446             msg.error(self.__module__, str(e))
447
448         if len(lines) != 0:
449             return lines
450         return None
451
452     def check(self):
453         if self.name is not None or self.name != str(None):
454             self.name = ""
455         else:
456             self.name = str(self.name)
457         try:
458             self.log_path = str(self.configuration['path'])
459         except (KeyError, TypeError):
460             self.error("No path to log specified. Using: '" + self.log_path + "'")
461
462         if os.access(self.log_path, os.R_OK):
463             return True
464         else:
465             self.error("Cannot access file: '" + self.log_path + "'")
466             return False
467
468     def create(self):
469         status = SimpleService.create(self)
470         self._last_position = 0
471         return status
472