]> arthur.barton.de Git - netdata.git/blob - python.d/elasticsearch.chart.py
ab-debian 0.20170311.01-0ab1, upstream v1.5.0-573-g0fba967b
[netdata.git] / python.d / elasticsearch.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
3 # Author: l2isbad
4
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
8 try:
9         from queue import Queue
10 except ImportError:
11         from Queue import Queue
12 from threading import Thread
13 from collections import namedtuple
14
15 # default module values (can be overridden per job in `config`)
16 # update_every = 2
17 update_every = 5
18 priority = 60000
19 retries = 60
20
21 METHODS = namedtuple('METHODS', ['get_data_function', 'url'])
22
23 NODE_STATS = [
24     ('indices.search.fetch_current', None, None),
25     ('indices.search.fetch_total', None, None),
26     ('indices.search.query_current', None, None),
27     ('indices.search.query_total', None, None),
28     ('indices.search.query_time_in_millis', None, None),
29     ('indices.search.fetch_time_in_millis', None, None),
30     ('indices.indexing.index_total', 'indexing_index_total', None),
31     ('indices.indexing.index_current', 'indexing_index_current', None),
32     ('indices.indexing.index_time_in_millis', 'indexing_index_time_in_millis', None),
33     ('indices.refresh.total', 'refresh_total', None),
34     ('indices.refresh.total_time_in_millis', 'refresh_total_time_in_millis', None),
35     ('indices.flush.total', 'flush_total', None),
36     ('indices.flush.total_time_in_millis', 'flush_total_time_in_millis', None),
37     ('jvm.gc.collectors.young.collection_count', 'young_collection_count', None),
38     ('jvm.gc.collectors.old.collection_count', 'old_collection_count', None),
39     ('jvm.gc.collectors.young.collection_time_in_millis', 'young_collection_time_in_millis', None),
40     ('jvm.gc.collectors.old.collection_time_in_millis', 'old_collection_time_in_millis', None),
41     ('jvm.mem.heap_used_percent', 'jvm_heap_percent', None),
42     ('jvm.mem.heap_committed_in_bytes', 'jvm_heap_commit', None),
43     ('thread_pool.bulk.queue', 'bulk_queue', None),
44     ('thread_pool.bulk.rejected', 'bulk_rejected', None),
45     ('thread_pool.index.queue', 'index_queue', None),
46     ('thread_pool.index.rejected', 'index_rejected', None),
47     ('thread_pool.search.queue', 'search_queue', None),
48     ('thread_pool.search.rejected', 'search_rejected', None),
49     ('thread_pool.merge.queue', 'merge_queue', None),
50     ('thread_pool.merge.rejected', 'merge_rejected', None),
51     ('indices.fielddata.memory_size_in_bytes', 'index_fdata_memory', None),
52     ('indices.fielddata.evictions', None, None),
53     ('breakers.fielddata.tripped', None, None),
54     ('http.current_open', 'http_current_open', None),
55     ('transport.rx_size_in_bytes', 'transport_rx_size_in_bytes', None),
56     ('transport.tx_size_in_bytes', 'transport_tx_size_in_bytes', None),
57     ('process.max_file_descriptors', None, None),
58     ('process.open_file_descriptors', None, None)
59 ]
60
61 CLUSTER_STATS = [
62     ('nodes.count.data_only', 'count_data_only', None),
63     ('nodes.count.master_data', 'count_master_data', None),
64     ('nodes.count.total', 'count_total', None),
65     ('nodes.count.master_only', 'count_master_only', None),
66     ('nodes.count.client', 'count_client', None),
67     ('indices.docs.count', 'docs_count', None),
68     ('indices.query_cache.hit_count', 'query_cache_hit_count', None),
69     ('indices.query_cache.miss_count', 'query_cache_miss_count', None),
70     ('indices.store.size_in_bytes', 'store_size_in_bytes', None),
71     ('indices.count', 'indices_count', None),
72     ('indices.shards.total', 'shards_total', None)
73 ]
74
75 HEALTH_STATS = [
76     ('number_of_nodes', 'health_number_of_nodes', None),
77     ('number_of_data_nodes', 'health_number_of_data_nodes', None),
78     ('number_of_pending_tasks', 'health_number_of_pending_tasks', None),
79     ('number_of_in_flight_fetch', 'health_number_of_in_flight_fetch', None),
80     ('active_shards', 'health_active_shards', None),
81     ('relocating_shards', 'health_relocating_shards', None),
82     ('unassigned_shards', 'health_unassigned_shards', None),
83     ('delayed_unassigned_shards', 'health_delayed_unassigned_shards', None),
84     ('initializing_shards', 'health_initializing_shards', None),
85     ('active_shards_percent_as_number', 'health_active_shards_percent_as_number', None)
86 ]
87
88 # charts order (can be overridden if you want less charts, or different order)
89 ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
90          'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
91          'jvm_gc_time', 'host_metrics_file_descriptors', 'host_metrics_http', 'host_metrics_transport',
92          'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache', 'fdata_ev_tr', 'cluster_health_status',
93          'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes', 'cluster_stats_query_cache',
94          'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
95
96 CHARTS = {
97     'search_perf_total': {
98         'options': [None, 'Total number of queries, fetches', 'number of', 'search performance',
99                     'es.search_query_total', 'stacked'],
100         'lines': [
101             ['query_total', 'queries', 'incremental'],
102             ['fetch_total', 'fetches', 'incremental']
103         ]},
104     'search_perf_current': {
105         'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance',
106                     'es.search_query_current', 'stacked'],
107         'lines': [
108             ['query_current', 'queries', 'absolute'],
109             ['fetch_current', 'fetches', 'absolute']
110         ]},
111     'search_perf_time': {
112         'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance',
113                     'es.search_time', 'stacked'],
114         'lines': [
115             ['query_time_in_millis', 'query', 'incremental', 1, 1000],
116             ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
117         ]},
118     'search_latency': {
119         'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
120         'lines': [
121             ['query_latency', 'query', 'absolute', 1, 1000],
122             ['fetch_latency', 'fetch', 'absolute', 1, 1000]
123         ]},
124     'index_perf_total': {
125         'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
126                     'indexing performance', 'es.index_performance_total', 'stacked'],
127         'lines': [
128             ['indexing_index_total', 'indexed', 'incremental'],
129             ['refresh_total', 'refreshes', 'incremental'],
130             ['flush_total', 'flushes', 'incremental']
131         ]},
132     'index_perf_current': {
133         'options': [None, 'Number of documents currently being indexed', 'currently indexed',
134                     'indexing performance', 'es.index_performance_current', 'stacked'],
135         'lines': [
136             ['indexing_index_current', 'documents', 'absolute']
137         ]},
138     'index_perf_time': {
139         'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
140                     'es.search_time', 'stacked'],
141         'lines': [
142             ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
143             ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
144             ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
145         ]},
146     'index_latency': {
147         'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
148                     'es.index_latency', 'stacked'],
149         'lines': [
150             ['indexing_latency', 'indexing', 'absolute', 1, 1000],
151             ['flushing_latency', 'flushing', 'absolute', 1, 1000]
152         ]},
153     'jvm_mem_heap': {
154         'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
155                     'es.jvm_heap', 'area'],
156         'lines': [
157             ['jvm_heap_percent', 'inuse', 'absolute'],
158             ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
159         ]},
160     'jvm_gc_count': {
161         'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
162         'lines': [
163             ['young_collection_count', 'young', 'incremental'],
164             ['old_collection_count', 'old', 'incremental']
165         ]},
166     'jvm_gc_time': {
167         'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
168         'lines': [
169             ['young_collection_time_in_millis', 'young', 'incremental'],
170             ['old_collection_time_in_millis', 'old', 'incremental']
171         ]},
172     'thread_pool_qr_q': {
173         'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
174                     'es.thread_pool_queued', 'stacked'],
175         'lines': [
176             ['bulk_queue', 'bulk', 'absolute'],
177             ['index_queue', 'index', 'absolute'],
178             ['search_queue', 'search', 'absolute'],
179             ['merge_queue', 'merge', 'absolute']
180         ]},
181     'thread_pool_qr_r': {
182         'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
183                     'es.thread_pool_rejected', 'stacked'],
184         'lines': [
185             ['bulk_rejected', 'bulk', 'absolute'],
186             ['index_rejected', 'index', 'absolute'],
187             ['search_rejected', 'search', 'absolute'],
188             ['merge_rejected', 'merge', 'absolute']
189         ]},
190     'fdata_cache': {
191         'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
192         'lines': [
193             ['index_fdata_memory', 'cache', 'absolute', 1, 1048576]
194         ]},
195     'fdata_ev_tr': {
196         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
197                     'fielddata cache', 'es.evictions_tripped', 'line'],
198         'lines': [
199             ['evictions', None, 'incremental'],
200             ['tripped', None, 'incremental']
201         ]},
202     'cluster_health_nodes': {
203         'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
204                     'es.cluster_health_nodes', 'stacked'],
205         'lines': [
206             ['health_number_of_nodes', 'nodes', 'absolute'],
207             ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
208             ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
209             ['health_number_of_in_flight_fetch', 'in_flight_fetch', 'absolute']
210         ]},
211     'cluster_health_status': {
212         'options': [None, 'Cluster status', 'status', 'cluster health API',
213                     'es.cluster_health_status', 'area'],
214         'lines': [
215             ['status_green', 'green', 'absolute'],
216             ['status_red', 'red', 'absolute'],
217             ['status_foo1', None, 'absolute'],
218             ['status_foo2', None, 'absolute'],
219             ['status_foo3', None, 'absolute'],
220             ['status_yellow', 'yellow', 'absolute']
221         ]},
222     'cluster_health_shards': {
223         'options': [None, 'Shards statistics', 'shards', 'cluster health API',
224                     'es.cluster_health_shards', 'stacked'],
225         'lines': [
226             ['health_active_shards', 'active_shards', 'absolute'],
227             ['health_relocating_shards', 'relocating_shards', 'absolute'],
228             ['health_unassigned_shards', 'unassigned', 'absolute'],
229             ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
230             ['health_initializing_shards', 'initializing', 'absolute'],
231             ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
232         ]},
233     'cluster_stats_nodes': {
234         'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
235                     'es.cluster_nodes', 'stacked'],
236         'lines': [
237             ['count_data_only', 'data_only', 'absolute'],
238             ['count_master_data', 'master_data', 'absolute'],
239             ['count_total', 'total', 'absolute'],
240             ['count_master_only', 'master_only', 'absolute'],
241             ['count_client', 'client', 'absolute']
242         ]},
243     'cluster_stats_query_cache': {
244         'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
245                     'es.cluster_query_cache', 'stacked'],
246         'lines': [
247             ['query_cache_hit_count', 'hit', 'incremental'],
248             ['query_cache_miss_count', 'miss', 'incremental']
249         ]},
250     'cluster_stats_docs': {
251         'options': [None, 'Docs statistics', 'count', 'cluster stats API',
252                     'es.cluster_docs', 'line'],
253         'lines': [
254             ['docs_count', 'docs', 'absolute']
255         ]},
256     'cluster_stats_store': {
257         'options': [None, 'Store statistics', 'MB', 'cluster stats API',
258                     'es.cluster_store', 'line'],
259         'lines': [
260             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
261         ]},
262     'cluster_stats_indices_shards': {
263         'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
264                     'es.cluster_indices_shards', 'stacked'],
265         'lines': [
266             ['indices_count', 'indices', 'absolute'],
267             ['shards_total', 'shards', 'absolute']
268         ]},
269     'host_metrics_transport': {
270         'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
271                     'es.host_transport', 'area'],
272         'lines': [
273             ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
274             ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
275         ]},
276     'host_metrics_file_descriptors': {
277         'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
278                     'es.host_descriptors', 'area'],
279         'lines': [
280             ['file_descriptors_used', 'used', 'absolute', 1, 10]
281         ]},
282     'host_metrics_http': {
283         'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
284                     'es.host_http_connections', 'line'],
285         'lines': [
286             ['http_current_open', 'opened', 'absolute', 1, 1]
287         ]}
288 }
289
290
291 class Service(UrlService):
292     def __init__(self, configuration=None, name=None):
293         UrlService.__init__(self, configuration=configuration, name=name)
294         self.order = ORDER
295         self.definitions = CHARTS
296         self.host = self.configuration.get('host')
297         self.port = self.configuration.get('port')
298         self.user = self.configuration.get('user')
299         self.password = self.configuration.get('pass')
300         self.latency = dict()
301         self.methods = list()
302         self.auth = self.user and self.password
303
304     def check(self):
305         # We can't start if <host> AND <port> not specified
306         if not all([self.host, self.port]):
307             return False
308
309         # It as a bad idea to use hostname.
310         # Hostname -> ip address
311         try:
312             self.host = gethostbyname(self.host)
313         except Exception as error:
314             self.error(str(error))
315             return False
316
317         # Create URL for every Elasticsearch API
318         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
319         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
320         url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
321
322         # Create list of enabled API calls
323         user_choice = [bool(self.configuration.get('node_stats', True)),
324                        bool(self.configuration.get('cluster_health', True)),
325                        bool(self.configuration.get('cluster_stats', True))]
326
327         avail_methods = [METHODS(get_data_function=self._get_node_stats_, url=url_node_stats),
328                          METHODS(get_data_function=self._get_cluster_health_, url=url_cluster_health),
329                          METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
330
331         # Remove disabled API calls from 'avail methods'
332         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
333
334         # Run _get_data for ALL active API calls. 
335         api_check_result = dict()
336         for method in self.methods:
337             try:
338                 api_check_result[method.url] = (bool(method.get_data_function(None, method.url)))
339             except KeyError as error:
340                 self.error('Failed to parse %s. Error: %s'  % (method.url, str(error)))
341                 return False
342
343         # We can start ONLY if all active API calls returned NOT None
344         if not all(api_check_result.values()):
345             self.error('Plugin could not get data from all APIs')
346             self.error('%s' % api_check_result)
347             return False
348         else:
349             self.info('%s' % api_check_result)
350             self.info('Plugin was started successfully')
351
352             return True
353
354     def _get_raw_data(self, url):
355         try:
356             if not self.auth:
357                 raw_data = get(url)
358             else:
359                 raw_data = get(url, auth=(self.user, self.password))
360         except Exception:
361             return None
362
363         return raw_data
364
365     def _get_data(self):
366         threads = list()
367         queue = Queue()
368         result = dict()
369
370         for method in self.methods:
371             th = Thread(target=method.get_data_function, args=(queue, method.url))
372             th.start()
373             threads.append(th)
374
375         for thread in threads:
376             thread.join()
377             result.update(queue.get())
378
379         return result or None
380
381     def _get_cluster_health_(self, queue, url):
382         """
383         Format data received from http request
384         :return: dict
385         """
386
387         data = self._get_raw_data(url)
388
389         if not data:
390             return queue.put(dict()) if queue else None
391         else:
392             data = data.json() if hasattr(data.json, '__call__') else data.json
393
394             to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
395
396             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
397                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
398             current_status = 'status_' + data['status']
399             to_netdata[current_status] = 1
400
401             return queue.put(to_netdata) if queue else to_netdata
402
403     def _get_cluster_stats_(self, queue, url):
404         """
405         Format data received from http request
406         :return: dict
407         """
408
409         data = self._get_raw_data(url)
410
411         if not data:
412             return queue.put(dict()) if queue else None
413         else:
414             data = data.json() if hasattr(data.json, '__call__') else data.json
415
416             to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
417
418             return queue.put(to_netdata) if queue else to_netdata
419
420     def _get_node_stats_(self, queue, url):
421         """
422         Format data received from http request
423         :return: dict
424         """
425
426         data = self._get_raw_data(url)
427
428         if not data:
429             return queue.put(dict()) if queue else None
430         else:
431             data = data.json() if hasattr(data.json, '__call__') else data.json
432
433             node = list(data['nodes'].keys())[0]
434             to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
435
436             # Search performance latency
437             to_netdata['query_latency'] = self.find_avg_(to_netdata['query_total'],
438                                                          to_netdata['query_time_in_millis'], 'query_latency')
439             to_netdata['fetch_latency'] = self.find_avg_(to_netdata['fetch_total'],
440                                                          to_netdata['fetch_time_in_millis'], 'fetch_latency')
441
442             # Indexing performance latency
443             to_netdata['indexing_latency'] = self.find_avg_(to_netdata['indexing_index_total'],
444                                                             to_netdata['indexing_index_time_in_millis'], 'index_latency')
445             to_netdata['flushing_latency'] = self.find_avg_(to_netdata['flush_total'],
446                                                             to_netdata['flush_total_time_in_millis'], 'flush_latency')
447
448             to_netdata['file_descriptors_used'] = round(float(to_netdata['open_file_descriptors'])
449                                                         / to_netdata['max_file_descriptors'] * 1000)
450
451             return queue.put(to_netdata) if queue else to_netdata
452
453     def find_avg_(self, value1, value2, key):
454         if key not in self.latency:
455             self.latency.update({key: [value1, value2]})
456             return 0
457         else:
458             if not self.latency[key][0] == value1:
459                 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
460                 self.latency.update({key: [value1, value2]})
461                 return latency
462             else:
463                 self.latency.update({key: [value1, value2]})
464                 return 0
465
466
467 def fetch_data_(raw_data, metrics_list):
468     to_netdata = dict()
469     for metric, new_name, function in metrics_list:
470         value = raw_data
471         for key in metric.split('.'):
472             try:
473                 value = value[key]
474             except KeyError:
475                 break
476         if not isinstance(value, dict) and key:
477             to_netdata[new_name or key] = value if not function else function(value)
478
479     return to_netdata
480