]> arthur.barton.de Git - netdata.git/blob - python.d/elasticsearch.chart.py
ff841f17c0a6adc0a0129ec8ae8bee80709b2902
[netdata.git] / python.d / elasticsearch.chart.py
1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
3 # Author: l2isbad
4
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
8 try:
9         from queue import Queue
10 except ImportError:
11         from Queue import Queue
12 from threading import Thread
13
14 # default module values (can be overridden per job in `config`)
15 # update_every = 2
16 update_every = 5
17 priority = 60000
18 retries = 60
19
20 # charts order (can be overridden if you want less charts, or different order)
21 ORDER = ['search_perf_total', 'search_perf_time', 'search_latency', 'index_perf_total', 'index_perf_time',
22          'index_latency', 'jvm_mem_heap', 'jvm_gc_count', 'jvm_gc_time', 'host_metrics_file_descriptors',
23          'host_metrics_http', 'host_metrics_transport', 'thread_pool_qr', 'fdata_cache', 'fdata_ev_tr',
24          'cluster_health_status', 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes',
25          'cluster_stats_query_cache', 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
26
27 CHARTS = {
28     'search_perf_total': {
29         'options': [None, 'Number of queries, fetches', 'queries', 'Search performance', 'es.search_query', 'stacked'],
30         'lines': [
31             ['query_total', 'search_total', 'incremental'],
32             ['fetch_total', 'fetch_total', 'incremental'],
33             ['query_current', 'search_current', 'absolute'],
34             ['fetch_current', 'fetch_current', 'absolute']
35         ]},
36     'search_perf_time': {
37         'options': [None, 'Time spent on queries, fetches', 'seconds', 'Search performance', 'es.search_time', 'stacked'],
38         'lines': [
39             ['query_time_in_millis', 'query', 'incremental', 1, 1000],
40             ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
41         ]},
42     'search_latency': {
43         'options': [None, 'Query and fetch latency', 'ms', 'Search performance', 'es.search_latency', 'stacked'],
44         'lines': [
45             ['query_latency', 'query', 'absolute', 1, 1000],
46             ['fetch_latency', 'fetch', 'absolute', 1, 1000]
47         ]},
48     'index_perf_total': {
49         'options': [None, 'Number of documents indexed, index refreshes, flushes', 'documents/indexes',
50                     'Indexing performance', 'es.index_doc', 'stacked'],
51         'lines': [
52             ['indexing_index_total', 'indexed', 'incremental'],
53             ['refresh_total', 'refreshes', 'incremental'],
54             ['flush_total', 'flushes', 'incremental'],
55             ['indexing_index_current', 'indexed_current', 'absolute'],
56         ]},
57     'index_perf_time': {
58         'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'Indexing performance',
59                     'es.search_time', 'stacked'],
60         'lines': [
61             ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
62             ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
63             ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
64         ]},
65     'index_latency': {
66         'options': [None, 'Indexing and flushing latency', 'ms', 'Indexing performance',
67                     'es.index_latency', 'stacked'],
68         'lines': [
69             ['indexing_latency', 'indexing', 'absolute', 1, 1000],
70             ['flushing_latency', 'flushing', 'absolute', 1, 1000]
71         ]},
72     'jvm_mem_heap': {
73         'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'Memory usage and gc',
74                     'es.jvm_heap', 'area'],
75         'lines': [
76             ['jvm_heap_percent', 'inuse', 'absolute'],
77             ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
78         ]},
79     'jvm_gc_count': {
80         'options': [None, 'Count of garbage collections', 'counts', 'Memory usage and gc', 'es.gc_count', 'stacked'],
81         'lines': [
82             ['young_collection_count', 'young', 'incremental'],
83             ['old_collection_count', 'old', 'incremental']
84         ]},
85     'jvm_gc_time': {
86         'options': [None, 'Time spent on garbage collections', 'ms', 'Memory usage and gc', 'es.gc_time', 'stacked'],
87         'lines': [
88             ['young_collection_time_in_millis', 'young', 'incremental'],
89             ['old_collection_time_in_millis', 'old', 'incremental']
90         ]},
91     'thread_pool_qr': {
92         'options': [None, 'Number of queued/rejected threads in thread pool', 'threads', 'Queues and rejections',
93                     'es.qr', 'stacked'],
94         'lines': [
95             ['bulk_queue', 'bulk_queue', 'absolute'],
96             ['index_queue', 'index_queue', 'absolute'],
97             ['search_queue', 'search_queue', 'absolute'],
98             ['merge_queue', 'merge_queue', 'absolute'],
99             ['bulk_rejected', 'bulk_rej', 'absolute'],
100             ['index_rejected', 'index_rej', 'absolute'],
101             ['search_rejected', 'search_rej', 'absolute'],
102             ['merge_rejected', 'merge_rej', 'absolute']
103         ]},
104     'fdata_cache': {
105         'options': [None, 'Fielddata cache size', 'MB', 'Fielddata cache', 'es.fdata_cache', 'line'],
106         'lines': [
107             ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
108         ]},
109     'fdata_ev_tr': {
110         'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
111                     'Fielddata cache', 'es.fdata_ev_tr', 'line'],
112         'lines': [
113             ['index_fdata_evic', 'evictions', 'incremental'],
114             ['breakers_fdata_trip', 'tripped', 'incremental']
115         ]},
116     'cluster_health_nodes': {
117         'options': [None, 'Nodes and tasks statistics', 'units', 'Cluster health API',
118                     'es.cluster_health', 'stacked'],
119         'lines': [
120             ['health_number_of_nodes', 'nodes', 'absolute'],
121             ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
122             ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
123             ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
124         ]},
125     'cluster_health_status': {
126         'options': [None, 'Cluster status', 'status', 'Cluster health API',
127                     'es.cluster_health_status', 'area'],
128         'lines': [
129             ['status_green', 'green', 'absolute'],
130             ['status_red', 'red', 'absolute'],
131             ['status_foo1', None, 'absolute'],
132             ['status_foo2', None, 'absolute'],
133             ['status_foo3', None, 'absolute'],
134             ['status_yellow', 'yellow', 'absolute']
135         ]},
136     'cluster_health_shards': {
137         'options': [None, 'Shards statistics', 'shards', 'Cluster health API',
138                     'es.cluster_health_sharts', 'stacked'],
139         'lines': [
140             ['health_active_shards', 'active_shards', 'absolute'],
141             ['health_relocating_shards', 'relocating_shards', 'absolute'],
142             ['health_unassigned_shards', 'unassigned', 'absolute'],
143             ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
144             ['health_initializing_shards', 'initializing', 'absolute'],
145             ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
146         ]},
147     'cluster_stats_nodes': {
148         'options': [None, 'Nodes statistics', 'nodes', 'Cluster stats API',
149                     'es.cluster_stats_nodes', 'stacked'],
150         'lines': [
151             ['count_data_only', 'data_only', 'absolute'],
152             ['count_master_data', 'master_data', 'absolute'],
153             ['count_total', 'total', 'absolute'],
154             ['count_master_only', 'master_only', 'absolute'],
155             ['count_client', 'client', 'absolute']
156         ]},
157     'cluster_stats_query_cache': {
158         'options': [None, 'Query cache statistics', 'queries', 'Cluster stats API',
159                     'es.cluster_stats_query_cache', 'stacked'],
160         'lines': [
161             ['query_cache_hit_count', 'hit', 'incremental'],
162             ['query_cache_miss_count', 'miss', 'incremental']
163         ]},
164     'cluster_stats_docs': {
165         'options': [None, 'Docs statistics', 'count', 'Cluster stats API',
166                     'es.cluster_stats_docs', 'line'],
167         'lines': [
168             ['docs_count', 'docs', 'absolute']
169         ]},
170     'cluster_stats_store': {
171         'options': [None, 'Store statistics', 'MB', 'Cluster stats API',
172                     'es.cluster_stats_store', 'line'],
173         'lines': [
174             ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
175         ]},
176     'cluster_stats_indices_shards': {
177         'options': [None, 'Indices and shards statistics', 'count', 'Cluster stats API',
178                     'es.cluster_stats_ind_sha', 'stacked'],
179         'lines': [
180             ['indices_count', 'indices', 'absolute'],
181             ['shards_total', 'shards', 'absolute']
182         ]},
183     'host_metrics_transport': {
184         'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'Host metrics',
185                     'es.host_metrics_transport', 'area'],
186         'lines': [
187             ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
188             ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
189         ]},
190     'host_metrics_file_descriptors': {
191         'options': [None, 'Available file descriptors in percent', 'percent', 'Host metrics',
192                     'es.host_metrics_descriptors', 'area'],
193         'lines': [
194             ['file_descriptors_used', 'used', 'absolute', 1, 10]
195         ]},
196     'host_metrics_http': {
197         'options': [None, 'Opened HTTP connections', 'connections', 'Host metrics',
198                     'es.host_metrics_http', 'line'],
199         'lines': [
200             ['http_current_open', 'opened', 'absolute', 1, 1]
201         ]}
202 }
203
204
205 class Service(UrlService):
206     def __init__(self, configuration=None, name=None):
207         UrlService.__init__(self, configuration=configuration, name=name)
208         self.order = ORDER
209         self.definitions = CHARTS
210         self.host = self.configuration.get('host')
211         self.port = self.configuration.get('port')
212         self.user = self.configuration.get('user')
213         self.password = self.configuration.get('pass')
214         self.latency = dict()
215
216     def check(self):
217         # We can't start if <host> AND <port> not specified
218         if not all([self.host, self.port]):
219             return False
220
221         # It as a bad idea to use hostname.
222         # Hostname -> ipaddress
223         try:
224             self.host = gethostbyname(self.host)
225         except Exception as e:
226             self.error(str(e))
227             return False
228
229         # HTTP Auth? NOT TESTED
230         self.auth = self.user and self.password
231
232         # Create URL for every Elasticsearch API
233         url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
234         url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
235         url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
236
237         # Create list of enabled API calls
238         user_choice = [bool(self.configuration.get('node_stats', True)),
239                        bool(self.configuration.get('cluster_health', True)),
240                        bool(self.configuration.get('cluster_stats', True))]
241         
242         avail_methods = [(self._get_node_stats, url_node_stats), 
243                         (self._get_cluster_health, url_cluster_health),
244                         (self._get_cluster_stats, url_cluster_stats)]
245
246         # Remove disabled API calls from 'avail methods'
247         self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
248
249         # Run _get_data for ALL active API calls. 
250         api_result = {}
251         for method in self.methods:
252             api_result[method[1]] = (bool(self._get_raw_data(method[1])))
253
254         # We can start ONLY if all active API calls returned NOT None
255         if not all(api_result.values()):
256             self.error('Plugin could not get data from all APIs')
257             self.error('%s' % api_result)
258             return False
259         else:
260             self.info('%s' % api_result)
261             self.info('Plugin was started successfully')
262
263             return True
264
265     def _get_raw_data(self, url):
266         try:
267             if not self.auth:
268                 raw_data = get(url)
269             else:
270                 raw_data = get(url, auth=(self.user, self.password))
271         except Exception:
272             return None
273
274         return raw_data
275
276     def _get_data(self):
277         threads = list()
278         queue = Queue()
279         result = dict()
280
281         for method in self.methods:
282             th = Thread(target=method[0], args=(queue, method[1]))
283             th.start()
284             threads.append(th)
285
286         for thread in threads:
287             thread.join()
288             result.update(queue.get())
289
290         return result or None
291
292     def _get_cluster_health(self, queue, url):
293         """
294         Format data received from http request
295         :return: dict
296         """
297
298         data = self._get_raw_data(url)
299
300         if not data:
301             queue.put({})
302         else:
303             data = data.json()
304
305             to_netdata = dict()
306             to_netdata.update(update_key('health', data))
307             to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
308                                'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
309             to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
310
311             queue.put(to_netdata)
312
313     def _get_cluster_stats(self, queue, url):
314         """
315         Format data received from http request
316         :return: dict
317         """
318
319         data = self._get_raw_data(url)
320
321         if not data:
322             queue.put({})
323         else:
324             data = data.json()
325
326             to_netdata = dict()
327             to_netdata.update(update_key('count', data['nodes']['count']))
328             to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
329             to_netdata.update(update_key('docs', data['indices']['docs']))
330             to_netdata.update(update_key('store', data['indices']['store']))
331             to_netdata['indices_count'] = data['indices']['count']
332             to_netdata['shards_total'] = data['indices']['shards']['total']
333
334             queue.put(to_netdata)
335
336     def _get_node_stats(self, queue, url):
337         """
338         Format data received from http request
339         :return: dict
340         """
341
342         data = self._get_raw_data(url)
343
344         if not data:
345             queue.put({})
346         else:
347             data = data.json()
348             node = list(data['nodes'].keys())[0]
349             to_netdata = dict()
350             # Search performance metrics
351             to_netdata.update(data['nodes'][node]['indices']['search'])
352             to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
353                                                to_netdata['query_time_in_millis'], 'query_latency')
354             to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
355                                                to_netdata['fetch_time_in_millis'], 'fetch_latency')
356
357             # Indexing performance metrics
358             for key in ['indexing', 'refresh', 'flush']:
359                 to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
360             to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
361                                                to_netdata['indexing_index_time_in_millis'], 'index_latency')
362             to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
363                                                to_netdata['flush_total_time_in_millis'], 'flush_latency')
364             # Memory usage and garbage collection
365             to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
366             to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
367             to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
368             to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
369
370             # Thread pool queues and rejections
371             for key in ['bulk', 'index', 'search', 'merge']:
372                 to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
373
374             # Fielddata cache
375             to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
376             to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
377             to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
378
379             # Host metrics
380             to_netdata.update(update_key('http', data['nodes'][node]['http']))
381             to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
382             to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
383                                                         / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
384             
385             queue.put(to_netdata)
386
387     def find_avg(self, value1, value2, key):
388         if key not in self.latency:
389             self.latency.update({key: [value1, value2]})
390             return 0
391         else:
392             if not self.latency[key][0] == value1:
393                 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
394                 self.latency.update({key: [value1, value2]})
395                 return latency
396             else:
397                 self.latency.update({key: [value1, value2]})
398                 return 0
399
400
401 def update_key(string, dictionary):
402     return {'_'.join([string, k]): v for k, v in dictionary.items()}