1 # -*- coding: utf-8 -*-
2 # Description: prototypes for netdata python.d modules
3 # Author: Pawel Krupa (paulfantom)
10 from urllib.request import urlopen
12 from urllib2 import urlopen
14 # from subprocess import STDOUT, PIPE, Popen
19 class BaseService(threading.Thread):
21 Prototype of Service class.
22 Implemented basic functionality to run jobs by `python.d.plugin`
24 def __init__(self, configuration=None, name=None):
26 This needs to be initialized in child classes
27 :param configuration: dict
30 threading.Thread.__init__(self)
31 self._data_stream = ""
35 self.priority = 140000
38 self.override_name = None
42 if configuration is None:
43 self.error("BaseService: no configuration parameters supplied. Cannot create Service.")
46 self._extract_base_config(configuration)
48 self.create_timetable()
50 def _extract_base_config(self, config):
52 Get basic parameters to run service
54 config = {'update_every':1,
60 self.override_name = config.pop('override_name')
63 self.update_every = int(config.pop('update_every'))
64 self.priority = int(config.pop('priority'))
65 self.retries = int(config.pop('retries'))
66 self.retries_left = self.retries
67 self.configuration = config
69 def create_timetable(self, freq=None):
71 Create service timetable.
74 timetable = {'last': 1466370091.3767564,
80 freq = self.update_every
82 self.timetable = {'last': now,
83 'next': now - (now % freq) + freq,
88 Executes self.update(interval) and draws run time chart.
89 Return value presents exit status of update()
93 # check if it is time to execute job update() function
94 if self.timetable['next'] > t_start:
95 msg.debug(self.chart_name + " will be run in " +
96 str(int((self.timetable['next'] - t_start) * 1000)) + " ms")
99 since_last = int((t_start - self.timetable['last']) * 1000000)
100 msg.debug(self.chart_name +
101 " ready to run, after " + str(int((t_start - self.timetable['last']) * 1000)) +
102 " ms (update_every: " + str(self.timetable['freq'] * 1000) +
103 " ms, latency: " + str(int((t_start - self.timetable['next']) * 1000)) + " ms)")
104 if not self.update(since_last):
107 self.timetable['next'] = t_end - (t_end % self.timetable['freq']) + self.timetable['freq']
109 # draw performance graph
110 run_time = str(int((t_end - t_start) * 1000))
111 run_time_chart = "BEGIN netdata.plugin_pythond_" + self.chart_name + " " + str(since_last) + '\n'
112 run_time_chart += "SET run_time = " + run_time + '\n'
113 run_time_chart += "END\n"
114 sys.stdout.write(run_time_chart)
115 msg.debug(self.chart_name + " updated in " + str(run_time) + " ms")
116 self.timetable['last'] = t_start
121 Runs job in thread. Handles retries.
122 Exits when job failed or timed out.
125 self.timetable['last'] = time.time()
128 status = self._run_once()
129 except Exception as e:
130 msg.error("Something wrong: " + str(e))
133 time.sleep(self.timetable['next'] - time.time())
134 self.retries_left = self.retries
136 self.retries_left -= 1
137 if self.retries_left <= 0:
138 msg.error("no more retries. Exiting")
141 time.sleep(self.timetable['freq'])
143 def _line(self, instruction, *params):
145 Converts *params to string and joins them with one space between every one.
146 :param params: str/int/float
148 self._data_stream += instruction
158 self._data_stream += " " + p
159 self._data_stream += "\n"
161 def chart(self, type_id, name="", title="", units="", family="",
162 category="", charttype="line", priority="", update_every=""):
171 :param charttype: str
172 :param priority: int/str
173 :param update_every: int/str
175 self._charts.append(type_id)
176 self._line("CHART", type_id, name, title, units, family, category, charttype, priority, update_every)
178 def dimension(self, id, name=None, algorithm="absolute", multiplier=1, divisor=1, hidden=False):
180 Defines a new dimension for the chart
183 :param algorithm: str
184 :param multiplier: int/str
185 :param divisor: int/str
186 :param hidden: boolean
192 self.error("malformed dimension: multiplier is not a number:", multiplier)
197 self.error("malformed dimension: divisor is not a number:", divisor)
201 if algorithm not in ("absolute", "incremental", "percentage-of-absolute-row", "percentage-of-incremental-row"):
202 algorithm = "absolute"
204 self._dimensions.append(id)
206 self._line("DIMENSION", id, name, algorithm, multiplier, divisor, "hidden")
208 self._line("DIMENSION", id, name, algorithm, multiplier, divisor)
210 def begin(self, type_id, microseconds=0):
214 :param microseconds: int
217 if type_id not in self._charts:
218 self.error("wrong chart type_id:", type_id)
223 self.error("malformed begin statement: microseconds are not a number:", microseconds)
226 self._line("BEGIN", type_id, microseconds)
229 def set(self, id, value):
231 Set value to dimension
233 :param value: int/float
236 if id not in self._dimensions:
237 self.error("wrong dimension id:", id)
240 value = str(int(value))
242 self.error("cannot set non-numeric value:", value)
244 self._line("SET", id, "=", value)
252 Upload new data to netdata
254 print(self._data_stream)
255 self._data_stream = ""
257 def error(self, *params):
259 Show error message on stderr
261 msg.error(self.chart_name, *params)
263 def debug(self, *params):
265 Show debug message on stderr
267 msg.debug(self.chart_name, *params)
269 def info(self, *params):
271 Show information message on stderr
273 msg.info(self.chart_name, *params)
280 msg.error("Service " + str(self.__module__) + "doesn't implement check() function")
288 msg.error("Service " + str(self.__module__) + "doesn't implement create() function?")
291 def update(self, interval):
297 msg.error("Service " + str(self.__module__) + "doesn't implement update() function")
301 class SimpleService(BaseService):
302 def __init__(self, configuration=None, name=None):
304 self.definitions = {}
305 BaseService.__init__(self, configuration=configuration, name=name)
325 data = self._get_data()
330 for name in self.order:
331 options = self.definitions[name]['options'] + [self.priority + idx, self.update_every]
332 self.chart(self.__module__ + "_" + self.name + "." + name, *options)
333 # check if server has this datapoint
334 for line in self.definitions[name]['lines']:
336 self.dimension(*line)
342 def update(self, interval):
348 data = self._get_data()
353 for chart in self.order:
354 if self.begin(self.__module__ + "_" + str(self.name) + "." + chart, interval):
356 for dim in self.definitions[chart]['lines']:
358 self.set(dim[0], data[dim[0]])
368 class UrlService(SimpleService):
369 def __init__(self, configuration=None, name=None):
370 # definitions are created dynamically in create() method based on 'charts' dictionary. format:
372 # 'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
375 SimpleService.__init__(self, configuration=configuration, name=name)
377 def _get_raw_data(self):
379 Get raw data from http request
384 f = urlopen(self.url, timeout=self.update_every)
385 raw = f.read().decode('utf-8')
386 except Exception as e:
387 msg.error(self.__module__, str(e))
397 Format configuration data and try to connect to server
400 if self.name is None or self.name == str(None):
403 self.name = str(self.name)
405 self.url = str(self.configuration['url'])
406 except (KeyError, TypeError):
409 if self._get_data() is not None:
415 class NetSocketService(SimpleService):
416 def __init__(self, configuration=None, name=None):
417 # definitions are created dynamically in create() method based on 'charts' dictionary. format:
419 # 'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
421 self.host = "localhost"
425 SimpleService.__init__(self, configuration=configuration, name=name)
427 def _get_raw_data(self):
429 Get raw data with low-level "socket" module.
432 if self.sock is None:
434 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
435 sock.settimeout(self.update_every)
436 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
437 sock.connect((self.host, self.port))
438 except Exception as e:
443 sock.send(self.request)
453 data = sock.recv(1024)
456 buf = sock.recv(1024)
467 def _parse_config(self):
469 Format configuration data and try to connect to server
472 if self.name is not None or self.name != str(None):
475 self.name = str(self.name)
477 self.host = str(self.configuration['host'])
478 except (KeyError, TypeError):
479 self.error("No host specified. Using: '" + self.host + "'")
481 self.port = int(self.configuration['port'])
482 except (KeyError, TypeError):
483 self.error("No port specified. Using: '" + str(self.port) + "'")
485 self.port = int(self.configuration['request'])
486 except (KeyError, TypeError):
487 self.error("No request specified. Using: '" + str(self.request) + "'")
488 self.request = self.request.encode()
491 class LogService(SimpleService):
492 def __init__(self, configuration=None, name=None):
493 # definitions are created dynamically in create() method based on 'charts' dictionary. format:
495 # 'chart_name_in_netdata' : [ charts['chart_name_in_netdata']['lines']['name'] ]
498 self._last_position = 0
499 # self._log_reader = None
500 SimpleService.__init__(self, configuration=configuration, name=name)
501 self.retries = 100000 # basically always retry
503 def _get_raw_data(self):
506 if os.path.getsize(self.log_path) < self._last_position:
507 self._last_position = 0
508 elif os.path.getsize(self.log_path) == self._last_position:
510 with open(self.log_path, "r") as fp:
511 fp.seek(self._last_position)
512 for i, line in enumerate(fp):
514 self._last_position = fp.tell()
515 except Exception as e:
516 msg.error(self.__module__, str(e))
523 if self.name is not None or self.name != str(None):
526 self.name = str(self.name)
528 self.log_path = str(self.configuration['path'])
529 except (KeyError, TypeError):
530 self.error("No path to log specified. Using: '" + self.log_path + "'")
532 if os.access(self.log_path, os.R_OK):
535 self.error("Cannot access file: '" + self.log_path + "'")
539 status = SimpleService.create(self)
540 self._last_position = 0