]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
New `NetSocketService` class + squid.chart.py + fix infinite retries loop
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
4
5 import time
6 import sys
7 import os
8 import socket
9 try:
10     from urllib.request import urlopen
11 except ImportError:
12     from urllib2 import urlopen
13
14 # from subprocess import STDOUT, PIPE, Popen
15 import threading
16 import msg
17
18
19 class BaseService(threading.Thread):
20     """
21     Prototype of Service class.
22     Implemented basic functionality to run jobs by `python.d.plugin`
23     """
24     def __init__(self, configuration=None, name=None):
25         """
26         This needs to be initialized in child classes
27         :param configuration: dict
28         :param name: str
29         """
30         threading.Thread.__init__(self)
31         self._data_stream = ""
32         self.daemon = True
33         self.retries = 0
34         self.retries_left = 0
35         self.priority = 140000
36         self.update_every = 1
37         self.name = name
38         self.override_name = None
39         self.chart_name = ""
40         self._dimensions = []
41         self._charts = []
42         if configuration is None:
43             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
44             raise RuntimeError
45         else:
46             self._extract_base_config(configuration)
47             self.timetable = {}
48             self.create_timetable()
49
50     def _extract_base_config(self, config):
51         """
52         Get basic parameters to run service
53         Minimum config:
54             config = {'update_every':1,
55                       'priority':100000,
56                       'retries':0}
57         :param config: dict
58         """
59         try:
60             self.override_name = config.pop('override_name')
61         except KeyError:
62             pass
63         self.update_every = int(config.pop('update_every'))
64         self.priority = int(config.pop('priority'))
65         self.retries = int(config.pop('retries'))
66         self.retries_left = self.retries
67         self.configuration = config
68
69     def create_timetable(self, freq=None):
70         """
71         Create service timetable.
72         `freq` is optional
73         Example:
74             timetable = {'last': 1466370091.3767564,
75                          'next': 1466370092,
76                          'freq': 1}
77         :param freq: int
78         """
79         if freq is None:
80             freq = self.update_every
81         now = time.time()
82         self.timetable = {'last': now,
83                           'next': now - (now % freq) + freq,
84                           'freq': freq}
85
86     def _run_once(self):
87         """
88         Executes self.update(interval) and draws run time chart.
89         Return value presents exit status of update()
90         :return: boolean
91         """
92         t_start = time.time()
93         # check if it is time to execute job update() function
94         if self.timetable['next'] > t_start:
95             msg.debug(self.chart_name + " will be run in " +
96                       str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
97             return True
98
99         since_last = int((t_start - self.timetable['last']) * 1000000)
100         msg.debug(self.chart_name +
101                   " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
102                   " ms (update_every: " + str(self.timetable['freq'] * 1000) +
103                   " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
104         if not self.update(since_last):
105             return False
106         t_end = time.time()
107         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
108
109         # draw performance graph
110         run_time = str(int((t_end - t_start) * 1000))
111         run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
112         run_time_chart += "SET run_time = " + run_time + '\n'
113         run_time_chart += "END\n"
114         sys.stdout.write(run_time_chart)
115         msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
116         self.timetable['last'] = t_start
117         return True
118
119     def run(self):
120         """
121         Runs job in thread. Handles retries.
122         Exits when job failed or timed out.
123         :return: None
124         """
125         self.timetable['last'] = time.time()
126         while True:
127             try:
128                 status = self._run_once()
129             except Exception as e:
130                 msg.error("Something wrong: " + str(e))
131                 return
132             if status:
133                 time.sleep(self.timetable['next'] - time.time())
134                 self.retries_left = self.retries
135             else:
136                 self.retries_left -= 1
137                 if self.retries_left <= 0:
138                     msg.error("no more retries. Exiting")
139                     return
140                 else:
141                     time.sleep(self.timetable['freq'])
142
143     def _line(self, instruction, *params):
144         """
145         Converts *params to string and joins them with one space between every one.
146         :param params: str/int/float
147         """
148         self._data_stream += instruction
149         for p in params:
150             if p is None:
151                 p = ""
152             else:
153                 p = str(p)
154             if len(p) == 0:
155                 p = "''"
156             if ' ' in p:
157                 p = "'" + p + "'"
158             self._data_stream += " " + p
159         self._data_stream += "\n"
160
161     def chart(self, type_id, name="", title="", units="", family="",
162               category="", charttype="line", priority="", update_every=""):
163         """
164         Defines a new chart.
165         :param type_id: str
166         :param name: str
167         :param title: str
168         :param units: str
169         :param family: str
170         :param category: str
171         :param charttype: str
172         :param priority: int/str
173         :param update_every: int/str
174         """
175         self._charts.append(type_id)
176         self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
177
178     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
179         """
180         Defines a new dimension for the chart
181         :param id: str
182         :param name: str
183         :param algorithm: str
184         :param multiplier: int/str
185         :param divisor: int/str
186         :param hidden: boolean
187         :return:
188         """
189         try:
190             int(multiplier)
191         except TypeError:
192             self.error("malformed dimension: multiplier is not a number:", multiplier)
193             multiplier = 1
194         try:
195             int(divisor)
196         except TypeError:
197             self.error("malformed dimension: divisor is not a number:", divisor)
198             divisor = 1
199         if name is None:
200             name = id
201         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
202             algorithm = "absolute"
203
204         self._dimensions.append(id)
205         if hidden:
206             self._line("DIMENSION", id, name, algorithm, multiplier, divisor, "hidden")
207         else:
208             self._line("DIMENSION", id, name, algorithm, multiplier, divisor)
209
210     def begin(self, type_id, microseconds=0):
211         """
212         Begin data set
213         :param type_id: str
214         :param microseconds: int
215         :return: boolean
216         """
217         if type_id not in self._charts:
218             self.error("wrong chart type_id:", type_id)
219             return False
220         try:
221             int(microseconds)
222         except TypeError:
223             self.error("malformed begin statement: microseconds are not a number:", microseconds)
224             microseconds = ""
225
226         self._line("BEGIN", type_id, microseconds)
227         return True
228
229     def set(self, id, value):
230         """
231         Set value to dimension
232         :param id: str
233         :param value: int/float
234         :return: boolean
235         """
236         if id not in self._dimensions:
237             self.error("wrong dimension id:", id)
238             return False
239         try:
240             value = str(int(value))
241         except TypeError:
242             self.error("cannot set non-numeric value:", value)
243             return False
244         self._line("SET", id, "=", value)
245         return True
246
247     def end(self):
248         self._line("END")
249
250     def commit(self):
251         """
252         Upload new data to netdata
253         """
254         print(self._data_stream)
255         self._data_stream = ""
256
257     def error(self, *params):
258         """
259         Show error message on stderr
260         """
261         msg.error(self.chart_name, *params)
262
263     def debug(self, *params):
264         """
265         Show debug message on stderr
266         """
267         msg.debug(self.chart_name, *params)
268
269     def info(self, *params):
270         """
271         Show information message on stderr
272         """
273         msg.info(self.chart_name, *params)
274
275     def check(self):
276         """
277         check() prototype
278         :return: boolean
279         """
280         msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
281         return False
282
283     def create(self):
284         """
285         create() prototype
286         :return: boolean
287         """
288         msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
289         return False
290
291     def update(self, interval):
292         """
293         update() prototype
294         :param interval: int
295         :return: boolean
296         """
297         msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
298         return False
299
300
301 class SimpleService(BaseService):
302     def __init__(self, configuration=None, name=None):
303         self.order = []
304         self.definitions = {}
305         BaseService.__init__(self, configuration=configuration, name=name)
306
307     def _get_data(self):
308         """
309         Get some data
310         :return: dict
311         """
312         return {}
313
314     def check(self):
315         """
316         :return:
317         """
318         return True
319
320     def create(self):
321         """
322         Create charts
323         :return: boolean
324         """
325         data = self._get_data()
326         if data is None:
327             return False
328
329         idx = 0
330         for name in self.order:
331             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
332             self.chart(self.__module__ + "_" + self.name + "." + name, *options)
333             # check if server has this datapoint
334             for line in self.definitions[name]['lines']:
335                 if line[0] in data:
336                     self.dimension(*line)
337             idx += 1
338
339         self.commit()
340         return True
341
342     def update(self, interval):
343         """
344         Update charts
345         :param interval: int
346         :return: boolean
347         """
348         data = self._get_data()
349         if data is None:
350             return False
351
352         updated = False
353         for chart in self.order:
354             if self.begin(self.__module__ + "_" + str(self.name) + "." + chart, interval):
355                 updated = True
356                 for dim in self.definitions[chart]['lines']:
357                     try:
358                         self.set(dim[0], data[dim[0]])
359                     except KeyError:
360                         pass
361                 self.end()
362
363         self.commit()
364
365         return updated
366
367
368 class UrlService(SimpleService):
369     def __init__(self, configuration=None, name=None):
370         # definitions are created dynamically in create() method based on 'charts' dictionary. format:
371         # definitions = {
372         #     'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
373         # }
374         self.url = ""
375         SimpleService.__init__(self, configuration=configuration, name=name)
376
377     def _get_raw_data(self):
378         """
379         Get raw data from http request
380         :return: str
381         """
382         raw = None
383         try:
384             f = urlopen(self.url, timeout=self.update_every)
385             raw = f.read().decode('utf-8')
386         except Exception as e:
387             msg.error(self.__module__, str(e))
388         finally:
389             try:
390                 f.close()
391             except:
392                 pass
393         return raw
394
395     def check(self):
396         """
397         Format configuration data and try to connect to server
398         :return: boolean
399         """
400         if self.name is None or self.name == str(None):
401             self.name = 'local'
402         else:
403             self.name = str(self.name)
404         try:
405             self.url = str(self.configuration['url'])
406         except (KeyError, TypeError):
407             pass
408
409         if self._get_data() is not None:
410             return True
411         else:
412             return False
413
414
415 class NetSocketService(SimpleService):
416     def __init__(self, configuration=None, name=None):
417         # definitions are created dynamically in create() method based on 'charts' dictionary. format:
418         # definitions = {
419         #     'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
420         # }
421         self.host = "localhost"
422         self.port = None
423         self.sock = None
424         self.request = ""
425         SimpleService.__init__(self, configuration=configuration, name=name)
426
427     def _get_raw_data(self):
428         """
429         Get raw data with low-level "socket" module.
430         :return: str
431         """
432         if self.sock is None:
433             try:
434                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
435                 sock.settimeout(self.update_every)
436                 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
437                 sock.connect((self.host, self.port))
438             except Exception as e:
439                 self.sock = None
440                 return None
441
442         try:
443             sock.send(self.request)
444         except Exception:
445             try:
446                 sock.shutdown(1)
447                 sock.close()
448             except:
449                 pass
450             self.sock = None
451             return None
452
453         data = sock.recv(1024)
454         try:
455             while True:
456                 buf = sock.recv(1024)
457                 if not buf:
458                     break
459                 else:
460                     data += buf
461         except:
462             sock.close()
463             return None
464
465         return data.decode()
466
467     def _parse_config(self):
468         """
469         Format configuration data and try to connect to server
470         :return: boolean
471         """
472         if self.name is not None or self.name != str(None):
473             self.name = ""
474         else:
475             self.name = str(self.name)
476         try:
477             self.host = str(self.configuration['host'])
478         except (KeyError, TypeError):
479             self.error("No host specified. Using: '" + self.host + "'")
480         try:
481             self.port = int(self.configuration['port'])
482         except (KeyError, TypeError):
483             self.error("No port specified. Using: '" + str(self.port) + "'")
484         try:
485             self.port = int(self.configuration['request'])
486         except (KeyError, TypeError):
487             self.error("No request specified. Using: '" + str(self.request) + "'")
488         self.request = self.request.encode()
489
490
491 class LogService(SimpleService):
492     def __init__(self, configuration=None, name=None):
493         # definitions are created dynamically in create() method based on 'charts' dictionary. format:
494         # definitions = {
495         #     'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
496         # }
497         self.log_path = ""
498         self._last_position = 0
499         # self._log_reader = None
500         SimpleService.__init__(self, configuration=configuration, name=name)
501         self.retries = 100000  # basically always retry
502
503     def _get_raw_data(self):
504         lines = []
505         try:
506             if os.path.getsize(self.log_path) < self._last_position:
507                 self._last_position = 0
508             elif os.path.getsize(self.log_path) == self._last_position:
509                 return None
510             with open(self.log_path, "r") as fp:
511                 fp.seek(self._last_position)
512                 for i, line in enumerate(fp):
513                     lines.append(line)
514                 self._last_position = fp.tell()
515         except Exception as e:
516             msg.error(self.__module__, str(e))
517
518         if len(lines) != 0:
519             return lines
520         return None
521
522     def check(self):
523         if self.name is not None or self.name != str(None):
524             self.name = ""
525         else:
526             self.name = str(self.name)
527         try:
528             self.log_path = str(self.configuration['path'])
529         except (KeyError, TypeError):
530             self.error("No path to log specified. Using: '" + self.log_path + "'")
531
532         if os.access(self.log_path, os.R_OK):
533             return True
534         else:
535             self.error("Cannot access file: '" + self.log_path + "'")
536             return False
537
538     def create(self):
539         status = SimpleService.create(self)
540         self._last_position = 0
541         return status
542