]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
remove some old comments
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
4
5 import time
6 import sys
7 import os
8 import socket
9 try:
10     from urllib.request import urlopen
11 except ImportError:
12     from urllib2 import urlopen
13
14 # from subprocess import STDOUT, PIPE, Popen
15 import threading
16 import msg
17
18
19 class BaseService(threading.Thread):
20     """
21     Prototype of Service class.
22     Implemented basic functionality to run jobs by `python.d.plugin`
23     """
24     def __init__(self, configuration=None, name=None):
25         """
26         This needs to be initialized in child classes
27         :param configuration: dict
28         :param name: str
29         """
30         threading.Thread.__init__(self)
31         self._data_stream = ""
32         self.daemon = True
33         self.retries = 0
34         self.retries_left = 0
35         self.priority = 140000
36         self.update_every = 1
37         self.name = name
38         self.override_name = None
39         self.chart_name = ""
40         self._dimensions = []
41         self._charts = []
42         if configuration is None:
43             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
44             raise RuntimeError
45         else:
46             self._extract_base_config(configuration)
47             self.timetable = {}
48             self.create_timetable()
49
50     def _extract_base_config(self, config):
51         """
52         Get basic parameters to run service
53         Minimum config:
54             config = {'update_every':1,
55                       'priority':100000,
56                       'retries':0}
57         :param config: dict
58         """
59         try:
60             self.override_name = config.pop('override_name')
61         except KeyError:
62             pass
63         self.update_every = int(config.pop('update_every'))
64         self.priority = int(config.pop('priority'))
65         self.retries = int(config.pop('retries'))
66         self.retries_left = self.retries
67         self.configuration = config
68
69     def create_timetable(self, freq=None):
70         """
71         Create service timetable.
72         `freq` is optional
73         Example:
74             timetable = {'last': 1466370091.3767564,
75                          'next': 1466370092,
76                          'freq': 1}
77         :param freq: int
78         """
79         if freq is None:
80             freq = self.update_every
81         now = time.time()
82         self.timetable = {'last': now,
83                           'next': now - (now % freq) + freq,
84                           'freq': freq}
85
86     def _run_once(self):
87         """
88         Executes self.update(interval) and draws run time chart.
89         Return value presents exit status of update()
90         :return: boolean
91         """
92         t_start = time.time()
93         # check if it is time to execute job update() function
94         if self.timetable['next'] > t_start:
95             msg.debug(self.chart_name + " will be run in " +
96                       str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
97             return True
98
99         since_last = int((t_start - self.timetable['last']) * 1000000)
100         msg.debug(self.chart_name +
101                   " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
102                   " ms (update_every: " + str(self.timetable['freq'] * 1000) +
103                   " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
104         if not self.update(since_last):
105             return False
106         t_end = time.time()
107         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
108
109         # draw performance graph
110         run_time = str(int((t_end - t_start) * 1000))
111         run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
112         run_time_chart += "SET run_time = " + run_time + '\n'
113         run_time_chart += "END\n"
114         sys.stdout.write(run_time_chart)
115         msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
116         self.timetable['last'] = t_start
117         return True
118
119     def run(self):
120         """
121         Runs job in thread. Handles retries.
122         Exits when job failed or timed out.
123         :return: None
124         """
125         self.timetable['last'] = time.time()
126         while True:
127             try:
128                 status = self._run_once()
129             except Exception as e:
130                 msg.error("Something wrong: " + str(e))
131                 return
132             if status:
133                 time.sleep(self.timetable['next'] - time.time())
134                 self.retries_left = self.retries
135             else:
136                 self.retries_left -= 1
137                 if self.retries_left <= 0:
138                     msg.error("no more retries. Exiting")
139                     return
140                 else:
141                     time.sleep(self.timetable['freq'])
142
143     def _line(self, instruction, *params):
144         """
145         Converts *params to string and joins them with one space between every one.
146         :param params: str/int/float
147         """
148         self._data_stream += instruction
149         for p in params:
150             if p is None:
151                 p = ""
152             else:
153                 p = str(p)
154             if len(p) == 0:
155                 p = "''"
156             if ' ' in p:
157                 p = "'" + p + "'"
158             self._data_stream += " " + p
159         self._data_stream += "\n"
160
161     def chart(self, type_id, name="", title="", units="", family="",
162               category="", charttype="line", priority="", update_every=""):
163         """
164         Defines a new chart.
165         :param type_id: str
166         :param name: str
167         :param title: str
168         :param units: str
169         :param family: str
170         :param category: str
171         :param charttype: str
172         :param priority: int/str
173         :param update_every: int/str
174         """
175         self._charts.append(type_id)
176         self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
177
178     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
179         """
180         Defines a new dimension for the chart
181         :param id: str
182         :param name: str
183         :param algorithm: str
184         :param multiplier: int/str
185         :param divisor: int/str
186         :param hidden: boolean
187         :return:
188         """
189         try:
190             int(multiplier)
191         except TypeError:
192             self.error("malformed dimension: multiplier is not a number:", multiplier)
193             multiplier = 1
194         try:
195             int(divisor)
196         except TypeError:
197             self.error("malformed dimension: divisor is not a number:", divisor)
198             divisor = 1
199         if name is None:
200             name = id
201         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
202             algorithm = "absolute"
203
204         self._dimensions.append(id)
205         if hidden:
206             self._line("DIMENSION", id, name, algorithm, multiplier, divisor, "hidden")
207         else:
208             self._line("DIMENSION", id, name, algorithm, multiplier, divisor)
209
210     def begin(self, type_id, microseconds=0):
211         """
212         Begin data set
213         :param type_id: str
214         :param microseconds: int
215         :return: boolean
216         """
217         if type_id not in self._charts:
218             self.error("wrong chart type_id:", type_id)
219             return False
220         try:
221             int(microseconds)
222         except TypeError:
223             self.error("malformed begin statement: microseconds are not a number:", microseconds)
224             microseconds = ""
225
226         self._line("BEGIN", type_id, microseconds)
227         return True
228
229     def set(self, id, value):
230         """
231         Set value to dimension
232         :param id: str
233         :param value: int/float
234         :return: boolean
235         """
236         if id not in self._dimensions:
237             self.error("wrong dimension id:", id)
238             return False
239         try:
240             value = str(int(value))
241         except TypeError:
242             self.error("cannot set non-numeric value:", value)
243             return False
244         self._line("SET", id, "=", value)
245         return True
246
247     def end(self):
248         self._line("END")
249
250     def commit(self):
251         """
252         Upload new data to netdata
253         """
254         print(self._data_stream)
255         self._data_stream = ""
256
257     def error(self, *params):
258         """
259         Show error message on stderr
260         """
261         msg.error(self.chart_name, *params)
262
263     def debug(self, *params):
264         """
265         Show debug message on stderr
266         """
267         msg.debug(self.chart_name, *params)
268
269     def info(self, *params):
270         """
271         Show information message on stderr
272         """
273         msg.info(self.chart_name, *params)
274
275     def check(self):
276         """
277         check() prototype
278         :return: boolean
279         """
280         msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
281         return False
282
283     def create(self):
284         """
285         create() prototype
286         :return: boolean
287         """
288         msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
289         return False
290
291     def update(self, interval):
292         """
293         update() prototype
294         :param interval: int
295         :return: boolean
296         """
297         msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
298         return False
299
300
301 class SimpleService(BaseService):
302     def __init__(self, configuration=None, name=None):
303         self.order = []
304         self.definitions = {}
305         BaseService.__init__(self, configuration=configuration, name=name)
306
307     def _get_data(self):
308         """
309         Get some data
310         :return: dict
311         """
312         return {}
313
314     def check(self):
315         """
316         :return:
317         """
318         return True
319
320     def create(self):
321         """
322         Create charts
323         :return: boolean
324         """
325         data = self._get_data()
326         if data is None:
327             return False
328
329         idx = 0
330         for name in self.order:
331             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
332             self.chart(self.__module__ + "_" + self.name + "." + name, *options)
333             # check if server has this datapoint
334             for line in self.definitions[name]['lines']:
335                 if line[0] in data:
336                     self.dimension(*line)
337             idx += 1
338
339         self.commit()
340         return True
341
342     def update(self, interval):
343         """
344         Update charts
345         :param interval: int
346         :return: boolean
347         """
348         data = self._get_data()
349         if data is None:
350             return False
351
352         updated = False
353         for chart in self.order:
354             if self.begin(self.__module__ + "_" + str(self.name) + "." + chart, interval):
355                 updated = True
356                 for dim in self.definitions[chart]['lines']:
357                     try:
358                         self.set(dim[0], data[dim[0]])
359                     except KeyError:
360                         pass
361                 self.end()
362
363         self.commit()
364
365         return updated
366
367
368 class UrlService(SimpleService):
369     def __init__(self, configuration=None, name=None):
370         self.url = ""
371         SimpleService.__init__(self, configuration=configuration, name=name)
372
373     def _get_raw_data(self):
374         """
375         Get raw data from http request
376         :return: str
377         """
378         raw = None
379         try:
380             f = urlopen(self.url, timeout=self.update_every)
381             raw = f.read().decode('utf-8')
382         except Exception as e:
383             msg.error(self.__module__, str(e))
384         finally:
385             try:
386                 f.close()
387             except:
388                 pass
389         return raw
390
391     def check(self):
392         """
393         Format configuration data and try to connect to server
394         :return: boolean
395         """
396         if self.name is None or self.name == str(None):
397             self.name = 'local'
398         else:
399             self.name = str(self.name)
400         try:
401             self.url = str(self.configuration['url'])
402         except (KeyError, TypeError):
403             pass
404
405         if self._get_data() is not None:
406             return True
407         else:
408             return False
409
410
411 class NetSocketService(SimpleService):
412     def __init__(self, configuration=None, name=None):
413         self.host = "localhost"
414         self.port = None
415         self.sock = None
416         self.request = ""
417         SimpleService.__init__(self, configuration=configuration, name=name)
418
419     def _get_raw_data(self):
420         """
421         Get raw data with low-level "socket" module.
422         :return: str
423         """
424         if self.sock is None:
425             try:
426                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
427                 sock.settimeout(self.update_every)
428                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
429                 sock.connect((self.host, self.port))
430             except Exception as e:
431                 self.sock = None
432                 return None
433
434         try:
435             sock.send(self.request)
436         except Exception:
437             try:
438                 sock.shutdown(1)
439                 sock.close()
440             except:
441                 pass
442             self.sock = None
443             return None
444
445         data = sock.recv(1024)
446         try:
447             while True:
448                 buf = sock.recv(1024)
449                 if not buf:
450                     break
451                 else:
452                     data += buf
453         except:
454             sock.close()
455             return None
456
457         return data.decode()
458
459     def _parse_config(self):
460         """
461         Format configuration data and try to connect to server
462         :return: boolean
463         """
464         if self.name is not None or self.name != str(None):
465             self.name = ""
466         else:
467             self.name = str(self.name)
468         try:
469             self.host = str(self.configuration['host'])
470         except (KeyError, TypeError):
471             self.error("No host specified. Using: '" + self.host + "'")
472         try:
473             self.port = int(self.configuration['port'])
474         except (KeyError, TypeError):
475             self.error("No port specified. Using: '" + str(self.port) + "'")
476         try:
477             self.port = int(self.configuration['request'])
478         except (KeyError, TypeError):
479             self.error("No request specified. Using: '" + str(self.request) + "'")
480         self.request = self.request.encode()
481
482
483 class LogService(SimpleService):
484     def __init__(self, configuration=None, name=None):
485         self.log_path = ""
486         self._last_position = 0
487         # self._log_reader = None
488         SimpleService.__init__(self, configuration=configuration, name=name)
489         self.retries = 100000  # basically always retry
490
491     def _get_raw_data(self):
492         lines = []
493         try:
494             if os.path.getsize(self.log_path) < self._last_position:
495                 self._last_position = 0
496             elif os.path.getsize(self.log_path) == self._last_position:
497                 return None
498             with open(self.log_path, "r") as fp:
499                 fp.seek(self._last_position)
500                 for i, line in enumerate(fp):
501                     lines.append(line)
502                 self._last_position = fp.tell()
503         except Exception as e:
504             msg.error(self.__module__, str(e))
505
506         if len(lines) != 0:
507             return lines
508         return None
509
510     def check(self):
511         if self.name is not None or self.name != str(None):
512             self.name = ""
513         else:
514             self.name = str(self.name)
515         try:
516             self.log_path = str(self.configuration['path'])
517         except (KeyError, TypeError):
518             self.error("No path to log specified. Using: '" + self.log_path + "'")
519
520         if os.access(self.log_path, os.R_OK):
521             return True
522         else:
523             self.error("Cannot access file: '" + self.log_path + "'")
524             return False
525
526     def create(self):
527         status = SimpleService.create(self)
528         self._last_position = 0
529         return status
530