]> arthur.barton.de Git - netdata.git/blob - python.d/python_modules/base.py
remove debug
[netdata.git] / python.d / python_modules / base.py
1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
4
5 import time
6 import sys
7 import os
8 import socket
9 import select
10 try:
11     import urllib.request as urllib2
12 except ImportError:
13     import urllib2
14
15 from subprocess import Popen, PIPE
16
17 import threading
18 import msg
19
20
21 class BaseService(threading.Thread):
22     """
23     Prototype of Service class.
24     Implemented basic functionality to run jobs by `python.d.plugin`
25     """
26     def __init__(self, configuration=None, name=None):
27         """
28         This needs to be initialized in child classes
29         :param configuration: dict
30         :param name: str
31         """
32         threading.Thread.__init__(self)
33         self._data_stream = ""
34         self.daemon = True
35         self.retries = 0
36         self.retries_left = 0
37         self.priority = 140000
38         self.update_every = 1
39         self.name = name
40         self.override_name = None
41         self.chart_name = ""
42         self._dimensions = []
43         self._charts = []
44         if configuration is None:
45             self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
46             raise RuntimeError
47         else:
48             self._extract_base_config(configuration)
49             self.timetable = {}
50             self.create_timetable()
51
52     def _extract_base_config(self, config):
53         """
54         Get basic parameters to run service
55         Minimum config:
56             config = {'update_every':1,
57                       'priority':100000,
58                       'retries':0}
59         :param config: dict
60         """
61         try:
62             self.override_name = config.pop('name')
63         except KeyError:
64             pass
65         self.update_every = int(config.pop('update_every'))
66         self.priority = int(config.pop('priority'))
67         self.retries = int(config.pop('retries'))
68         self.retries_left = self.retries
69         self.configuration = config
70
71     def create_timetable(self, freq=None):
72         """
73         Create service timetable.
74         `freq` is optional
75         Example:
76             timetable = {'last': 1466370091.3767564,
77                          'next': 1466370092,
78                          'freq': 1}
79         :param freq: int
80         """
81         if freq is None:
82             freq = self.update_every
83         now = time.time()
84         self.timetable = {'last': now,
85                           'next': now - (now % freq) + freq,
86                           'freq': freq}
87
88     def _run_once(self):
89         """
90         Executes self.update(interval) and draws run time chart.
91         Return value presents exit status of update()
92         :return: boolean
93         """
94         t_start = time.time()
95         # check if it is time to execute job update() function
96         if self.timetable['next'] > t_start:
97             #msg.debug(self.chart_name + " will be run in " +
98             #          str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
99             msg.debug(self.chart_name, "will be run in", str(int((self.timetable['next'] - t_start) * 1000)), "ms")
100             return True
101
102         since_last = int((t_start - self.timetable['last']) * 1000000)
103         #msg.debug(self.chart_name +
104         #          " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
105         #          " ms (update_every: " + str(self.timetable['freq'] * 1000) +
106         #          " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
107         msg.debug(self.chart_name,
108                   "ready to run, after", str(int((t_start - self.timetable['last']) * 1000)),
109                   "ms (update_every:", str(self.timetable['freq'] * 1000),
110                   "ms, latency:", str(int((t_start - self.timetable['next']) * 1000)), "ms")
111         if not self.update(since_last):
112             self.error("update function failed.")
113             return False
114         t_end = time.time()
115         self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
116         # draw performance graph
117         run_time = str(int((t_end - t_start) * 1000))
118         #run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
119         #run_time_chart += "SET run_time = " + run_time + '\n'
120         #run_time_chart += "END\n"
121         #sys.stdout.write(run_time_chart)
122         sys.stdout.write("BEGIN netdata.plugin_pythond_%s %s\nSET run_time = %s\nEND\n" % \
123                          (self.chart_name, str(since_last), run_time))
124
125         #msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
126         msg.debug(self.chart_name, "updated in", str(run_time), "ms")
127         self.timetable['last'] = t_start
128         return True
129
130     def run(self):
131         """
132         Runs job in thread. Handles retries.
133         Exits when job failed or timed out.
134         :return: None
135         """
136         self.timetable['last'] = time.time()
137         while True:
138             try:
139                 status = self._run_once()
140             except Exception as e:
141                 msg.error("Something wrong: ", str(e))
142                 return
143             if status:
144                 time.sleep(self.timetable['next'] - time.time())
145                 self.retries_left = self.retries
146             else:
147                 self.retries_left -= 1
148                 if self.retries_left <= 0:
149                     msg.error("no more retries. Exiting")
150                     return
151                 else:
152                     time.sleep(self.timetable['freq'])
153
154     @staticmethod
155     def _format(*args):
156         params = []
157         append = params.append
158         for p in args:
159             if p is None:
160                 append(p)
161                 continue
162             if type(p) is not str:
163                 p = str(p)
164             if ' ' in p:
165                 p = "'" + p + "'"
166             append(p)
167         return params
168
169     def _line(self, instruction, *params):
170         """
171         Converts *params to string and joins them with one space between every one.
172         :param params: str/int/float
173         """
174         #self._data_stream += instruction
175         tmp = list(map((lambda x: "''" if x is None or len(x) == 0 else x), params))
176
177         self._data_stream += "%s %s\n" % (instruction, str(" ".join(tmp)))
178
179         # self.error(str(" ".join(tmp)))
180         # for p in params:
181         #     if p is None:
182         #         p = ""
183         #     else:
184         #         p = str(p)
185         #     if len(p) == 0:
186         #         p = "''"
187         #     if ' ' in p:
188         #         p = "'" + p + "'"
189         #     self._data_stream += " " + p
190         #self._data_stream += "\n"
191
192     def chart(self, type_id, name="", title="", units="", family="",
193               category="", charttype="line", priority="", update_every=""):
194         """
195         Defines a new chart.
196         :param type_id: str
197         :param name: str
198         :param title: str
199         :param units: str
200         :param family: str
201         :param category: str
202         :param charttype: str
203         :param priority: int/str
204         :param update_every: int/str
205         """
206         self._charts.append(type_id)
207         #self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
208
209         p = self._format(type_id, name, title, units, family, category, charttype, priority, update_every)
210         self._line("CHART", *p)
211
212     def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
213         """
214         Defines a new dimension for the chart
215         :param id: str
216         :param name: str
217         :param algorithm: str
218         :param multiplier: int/str
219         :param divisor: int/str
220         :param hidden: boolean
221         :return:
222         """
223         try:
224             int(multiplier)
225         except TypeError:
226             self.error("malformed dimension: multiplier is not a number:", multiplier)
227             multiplier = 1
228         try:
229             int(divisor)
230         except TypeError:
231             self.error("malformed dimension: divisor is not a number:", divisor)
232             divisor = 1
233         if name is None:
234             name = id
235         if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
236             algorithm = "absolute"
237
238         self._dimensions.append(str(id))
239         if hidden:
240             p = self._format(id, name, algorithm, multiplier, divisor, "hidden")
241             #self._line("DIMENSION", id, name, algorithm, str(multiplier), str(divisor), "hidden")
242         else:
243             p = self._format(id, name, algorithm, multiplier, divisor)
244             #self._line("DIMENSION", id, name, algorithm, str(multiplier), str(divisor))
245
246         self._line("DIMENSION", *p)
247
248     def begin(self, type_id, microseconds=0):
249         """
250         Begin data set
251         :param type_id: str
252         :param microseconds: int
253         :return: boolean
254         """
255         if type_id not in self._charts:
256             self.error("wrong chart type_id:", type_id)
257             return False
258         try:
259             int(microseconds)
260         except TypeError:
261             self.error("malformed begin statement: microseconds are not a number:", microseconds)
262             microseconds = ""
263
264         self._line("BEGIN", type_id, str(microseconds))
265         return True
266
267     def set(self, id, value):
268         """
269         Set value to dimension
270         :param id: str
271         :param value: int/float
272         :return: boolean
273         """
274         if id not in self._dimensions:
275             self.error("wrong dimension id:", id, "Available dimensions are:", *self._dimensions)
276             return False
277         try:
278             value = str(int(value))
279         except TypeError:
280             self.error("cannot set non-numeric value:", value)
281             return False
282         self._line("SET", id, "=", str(value))
283         return True
284
285     def end(self):
286         self._line("END")
287
288     def commit(self):
289         """
290         Upload new data to netdata
291         """
292         print(self._data_stream)
293         self._data_stream = ""
294
295     def error(self, *params):
296         """
297         Show error message on stderr
298         """
299         msg.error(self.chart_name, *params)
300
301     def debug(self, *params):
302         """
303         Show debug message on stderr
304         """
305         msg.debug(self.chart_name, *params)
306
307     def info(self, *params):
308         """
309         Show information message on stderr
310         """
311         msg.info(self.chart_name, *params)
312
313     def check(self):
314         """
315         check() prototype
316         :return: boolean
317         """
318         msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
319         return False
320
321     def create(self):
322         """
323         create() prototype
324         :return: boolean
325         """
326         msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
327         return False
328
329     def update(self, interval):
330         """
331         update() prototype
332         :param interval: int
333         :return: boolean
334         """
335         msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
336         return False
337
338
339 class SimpleService(BaseService):
340     def __init__(self, configuration=None, name=None):
341         self.order = []
342         self.definitions = {}
343         BaseService.__init__(self, configuration=configuration, name=name)
344
345     def _get_data(self):
346         """
347         Get some data
348         :return: dict
349         """
350         return {}
351
352     def check(self):
353         """
354         :return:
355         """
356         return True
357
358     def create(self):
359         """
360         Create charts
361         :return: boolean
362         """
363         data = self._get_data()
364         if data is None:
365             return False
366
367         idx = 0
368         for name in self.order:
369             options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
370             self.chart(self.chart_name + "." + name, *options)
371             # check if server has this datapoint
372             for line in self.definitions[name]['lines']:
373                 if line[0] in data:
374                     self.dimension(*line)
375             idx += 1
376
377         self.commit()
378         return True
379
380     def update(self, interval):
381         """
382         Update charts
383         :param interval: int
384         :return: boolean
385         """
386         data = self._get_data()
387         if data is None:
388             self.debug("_get_data() returned no data")
389             return False
390
391         updated = False
392         for chart in self.order:
393             if self.begin(self.chart_name + "." + chart, interval):
394                 updated = True
395                 for dim in self.definitions[chart]['lines']:
396                     try:
397                         self.set(dim[0], data[dim[0]])
398                     except KeyError:
399                         pass
400                 self.end()
401
402         self.commit()
403         if not updated:
404             self.error("no charts to update")
405
406         return updated
407
408
409 class UrlService(SimpleService):
410     def __init__(self, configuration=None, name=None):
411         self.url = ""
412         self.user = None
413         self.password = None
414         SimpleService.__init__(self, configuration=configuration, name=name)
415
416     def __add_auth(self):
417         passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
418         passman.add_password(None, self.url, self.user, self.password)
419         authhandler = urllib2.HTTPBasicAuthHandler(passman)
420         opener = urllib2.build_opener(authhandler)
421         urllib2.install_opener(opener)
422
423     def _get_raw_data(self):
424         """
425         Get raw data from http request
426         :return: str
427         """
428         raw = None
429         try:
430             f = urllib2.urlopen(self.url, timeout=self.update_every)
431         except Exception as e:
432             self.error(str(e))
433             return None
434
435         try:
436             raw = f.read().decode('utf-8')
437         except Exception as e:
438             self.error(str(e))
439         finally:
440             f.close()
441         return raw
442
443     def check(self):
444         """
445         Format configuration data and try to connect to server
446         :return: boolean
447         """
448         if self.name is None or self.name == str(None):
449             self.name = 'local'
450             self.chart_name += "_" + self.name
451         else:
452             self.name = str(self.name)
453         try:
454             self.url = str(self.configuration['url'])
455         except (KeyError, TypeError):
456             pass
457         try:
458             self.user = str(self.configuration['user'])
459         except (KeyError, TypeError):
460             pass
461         try:
462             self.password = str(self.configuration['pass'])
463         except (KeyError, TypeError):
464             pass
465
466         if self.user is not None and self.password is not None:
467             self.__add_auth()
468
469         if self._get_data() is not None:
470             return True
471         else:
472             return False
473
474
475 class SocketService(SimpleService):
476     def __init__(self, configuration=None, name=None):
477         self._sock = None
478         self._keep_alive = False
479         self.host = "localhost"
480         self.port = None
481         self.unix_socket = None
482         self.request = ""
483         self.__socket_config = None
484         SimpleService.__init__(self, configuration=configuration, name=name)
485
486     def _connect(self):
487         """
488         Recreate socket and connect to it since sockets cannot be reused after closing
489         Available configurations are IPv6, IPv4 or UNIX socket
490         :return:
491         """
492         try:
493             if self.unix_socket is None:
494                 if self.__socket_config is None:
495                     # establish ipv6 or ipv4 connection.
496                     for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
497                         try:
498                             af, socktype, proto, canonname, sa = res
499                             self._sock = socket.socket(af, socktype, proto)
500                         except socket.error as msg:
501                             self._sock = None
502                             continue
503                         try:
504                             self._sock.connect(sa)
505                         except socket.error as msg:
506                             self._disconnect()
507                             continue
508                         self.__socket_config = res
509                         break
510                 else:
511                     # connect to socket with previously established configuration
512                     try:
513                         af, socktype, proto, canonname, sa = self.__socket_config
514                         self._sock = socket.socket(af, socktype, proto)
515                         self._sock.connect(sa)
516                     except socket.error as msg:
517                         self._disconnect()
518             else:
519                 # connect to unix socket
520                 self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
521                 self._sock.connect(self.unix_socket)
522         except Exception as e:
523             self.error(str(e),
524                        "Cannot create socket with following configuration: host:", str(self.host),
525                        "port:", str(self.port),
526                        "socket:", str(self.unix_socket))
527             self._sock = None
528         self._sock.setblocking(0)
529
530     def _disconnect(self):
531         """
532         Close socket connection
533         :return:
534         """
535         try:
536             self._sock.shutdown(2)  # 0 - read, 1 - write, 2 - all
537             self._sock.close()
538         except Exception:
539             pass
540         self._sock = None
541
542     def _send(self):
543         """
544         Send request.
545         :return: boolean
546         """
547         # Send request if it is needed
548         if self.request != "".encode():
549             try:
550                 self._sock.send(self.request)
551             except Exception as e:
552                 self._disconnect()
553                 self.error(str(e),
554                            "used configuration: host:", str(self.host),
555                            "port:", str(self.port),
556                            "socket:", str(self.unix_socket))
557                 return False
558         return True
559
560     def _receive(self):
561         """
562         Receive data from socket
563         :return: str
564         """
565         data = ""
566         while True:
567             try:
568                 ready_to_read, _, in_error = select.select([self._sock], [], [], 60)
569             except Exception as e:
570                 self.debug("SELECT", str(e))
571                 self._disconnect()
572                 break
573             if len(ready_to_read) > 0:
574                 buf = self._sock.recv(4096)
575                 if len(buf) == 0 or buf is None:  # handle server disconnect
576                     break
577                 data += buf.decode()
578                 if self._check_raw_data(data):
579                     break
580             else:
581                 self.error("Socket timed out.")
582                 self._disconnect()
583                 break
584
585         return data
586
587     def _get_raw_data(self):
588         """
589         Get raw data with low-level "socket" module.
590         :return: str
591         """
592         if self._sock is None:
593             self._connect()
594
595         # Send request if it is needed
596         if not self._send():
597             return None
598
599         data = self._receive()
600
601         if not self._keep_alive:
602             self._disconnect()
603
604         return data
605
606     def _check_raw_data(self, data):
607         """
608         Check if all data has been gathered from socket
609         :param data: str
610         :return: boolean
611         """
612         return True
613
614     def _parse_config(self):
615         """
616         Parse configuration data
617         :return: boolean
618         """
619         if self.name is None or self.name == str(None):
620             self.name = ""
621         else:
622             self.name = str(self.name)
623         try:
624             self.unix_socket = str(self.configuration['socket'])
625         except (KeyError, TypeError):
626             self.debug("No unix socket specified. Trying TCP/IP socket.")
627             try:
628                 self.host = str(self.configuration['host'])
629             except (KeyError, TypeError):
630                 self.debug("No host specified. Using: '" + self.host + "'")
631             try:
632                 self.port = int(self.configuration['port'])
633             except (KeyError, TypeError):
634                 self.debug("No port specified. Using: '" + str(self.port) + "'")
635         try:
636             self.request = str(self.configuration['request'])
637         except (KeyError, TypeError):
638             self.debug("No request specified. Using: '" + str(self.request) + "'")
639         self.request = self.request.encode()
640
641
642 class LogService(SimpleService):
643     def __init__(self, configuration=None, name=None):
644         self.log_path = ""
645         self._last_position = 0
646         # self._log_reader = None
647         SimpleService.__init__(self, configuration=configuration, name=name)
648         self.retries = 100000  # basically always retry
649
650     def _get_raw_data(self):
651         """
652         Get log lines since last poll
653         :return: list
654         """
655         lines = []
656         try:
657             if os.path.getsize(self.log_path) < self._last_position:
658                 self._last_position = 0
659             elif os.path.getsize(self.log_path) == self._last_position:
660                 self.debug("Log file hasn't changed. No new data.")
661                 return None
662             with open(self.log_path, "r") as fp:
663                 fp.seek(self._last_position)
664                 for i, line in enumerate(fp):
665                     lines.append(line)
666                 self._last_position = fp.tell()
667         except Exception as e:
668             self.error(str(e))
669
670         if len(lines) != 0:
671             return lines
672         else:
673             self.error("No data collected.")
674             return None
675
676     def check(self):
677         """
678         Parse basic configuration and check if log file exists
679         :return: boolean
680         """
681         if self.name is not None or self.name != str(None):
682             self.name = ""
683         else:
684             self.name = str(self.name)
685         try:
686             self.log_path = str(self.configuration['path'])
687         except (KeyError, TypeError):
688             self.error("No path to log specified. Using: '" + self.log_path + "'")
689
690         if os.access(self.log_path, os.R_OK):
691             return True
692         else:
693             self.error("Cannot access file: '" + self.log_path + "'")
694             return False
695
696     def create(self):
697         status = SimpleService.create(self)
698         self._last_position = 0
699         return status
700
701
702 class ExecutableService(SimpleService):
703     #command_whitelist = ['exim', 'postqueue']
704     bad_substrings = ('&', '|', ';', '>', '<')
705
706     def __init__(self, configuration=None, name=None):
707         self.command = ""
708         SimpleService.__init__(self, configuration=configuration, name=name)
709
710     def _get_raw_data(self):
711         """
712         Get raw data from executed command
713         :return: str
714         """
715         try:
716             p = Popen(self.command, stdout=PIPE, stderr=PIPE)
717         except Exception as e:
718             self.error("Executing command", self.command, "resulted in error:", str(e))
719             return None
720         data = []
721         for line in p.stdout.readlines():
722             data.append(str(line.decode()))
723
724         if len(data) == 0:
725             self.error("No data collected.")
726             return None
727
728         return data
729
730     def check(self):
731         """
732         Parse basic configuration, check if command is whitelisted and is returning values
733         :return: boolean
734         """
735         if self.name is not None or self.name != str(None):
736             self.name = ""
737         else:
738             self.name = str(self.name)
739         try:
740             self.command = str(self.configuration['command'])
741         except (KeyError, TypeError):
742             self.error("No command specified. Using: '" + self.command + "'")
743         self.command = self.command.split(' ')
744         #if self.command[0] not in self.command_whitelist:
745         #    self.error("Command is not whitelisted.")
746         #    return False
747
748         for arg in self.command[1:]:
749             if any(st in arg for st in self.bad_substrings):
750                 self.error("Bad command argument:" + " ".join(self.command[1:]))
751                 return False
752         # test command and search for it in /usr/sbin or /sbin when failed
753         base = self.command[0].split('/')[-1]
754         if self._get_raw_data() is None:
755             for prefix in ['/sbin/', '/usr/sbin/']:
756                 self.command[0] = prefix + base
757                 if os.path.isfile(self.command[0]):
758                     break
759                 #if self._get_raw_data() is not None:
760                 #    break
761
762         if self._get_data() is None or len(self._get_data()) == 0:
763             self.error("Command", self.command, "returned no data")
764             return False
765         return True