1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
5 from base import UrlService
6 from requests import get
7 from socket import gethostbyname
9 from queue import Queue
11 from Queue import Queue
12 from threading import Thread
14 # default module values (can be overridden per job in `config`)
20 # charts order (can be overridden if you want less charts, or different order)
21 ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
22 'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
23 'jvm_gc_time', 'host_metrics_file_descriptors', 'host_metrics_http', 'host_metrics_transport',
24 'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache', 'fdata_ev_tr', 'cluster_health_status',
25 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes', 'cluster_stats_query_cache',
26 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
29 'search_perf_total': {
30 'options': [None, 'Total number of queries, fetches', 'number of', 'search performance', 'es.search_query_total', 'stacked'],
32 ['query_total', 'queries', 'incremental'],
33 ['fetch_total', 'fetches', 'incremental']
35 'search_perf_current': {
36 'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance', 'es.search_query_current', 'stacked'],
38 ['query_current', 'queries', 'absolute'],
39 ['fetch_current', 'fetches', 'absolute']
42 'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance', 'es.search_time', 'stacked'],
44 ['query_time_in_millis', 'query', 'incremental', 1, 1000],
45 ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
48 'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
50 ['query_latency', 'query', 'absolute', 1, 1000],
51 ['fetch_latency', 'fetch', 'absolute', 1, 1000]
54 'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
55 'indexing performance', 'es.index_performance_total', 'stacked'],
57 ['indexing_index_total', 'indexed', 'incremental'],
58 ['refresh_total', 'refreshes', 'incremental'],
59 ['flush_total', 'flushes', 'incremental']
61 'index_perf_current': {
62 'options': [None, 'Number of documents currently being indexed', 'currently indexed',
63 'indexing performance', 'es.index_performance_current', 'stacked'],
65 ['indexing_index_current', 'documents', 'absolute']
68 'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
69 'es.search_time', 'stacked'],
71 ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
72 ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
73 ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
76 'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
77 'es.index_latency', 'stacked'],
79 ['indexing_latency', 'indexing', 'absolute', 1, 1000],
80 ['flushing_latency', 'flushing', 'absolute', 1, 1000]
83 'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
84 'es.jvm_heap', 'area'],
86 ['jvm_heap_percent', 'inuse', 'absolute'],
87 ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
90 'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
92 ['young_collection_count', 'young', 'incremental'],
93 ['old_collection_count', 'old', 'incremental']
96 'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
98 ['young_collection_time_in_millis', 'young', 'incremental'],
99 ['old_collection_time_in_millis', 'old', 'incremental']
101 'thread_pool_qr_q': {
102 'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
103 'es.qr_queued', 'stacked'],
105 ['bulk_queue', 'bulk', 'absolute'],
106 ['index_queue', 'index', 'absolute'],
107 ['search_queue', 'search', 'absolute'],
108 ['merge_queue', 'merge', 'absolute']
110 'thread_pool_qr_r': {
111 'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
112 'es.qr_rejected', 'stacked'],
114 ['bulk_rejected', 'bulk', 'absolute'],
115 ['index_rejected', 'index', 'absolute'],
116 ['search_rejected', 'search', 'absolute'],
117 ['merge_rejected', 'merge', 'absolute']
120 'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
122 ['index_fdata_mem', 'mem_size', 'absolute', 1, 1048576]
125 'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
126 'fielddata cache', 'es.fdata_ev_tr', 'line'],
128 ['index_fdata_evic', 'evictions', 'incremental'],
129 ['breakers_fdata_trip', 'tripped', 'incremental']
131 'cluster_health_nodes': {
132 'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
133 'es.cluster_health', 'stacked'],
135 ['health_number_of_nodes', 'nodes', 'absolute'],
136 ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
137 ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
138 ['health_number_of_in_flight_fetch', 'inflight_fetch', 'absolute']
140 'cluster_health_status': {
141 'options': [None, 'Cluster status', 'status', 'cluster health API',
142 'es.cluster_health_status', 'area'],
144 ['status_green', 'green', 'absolute'],
145 ['status_red', 'red', 'absolute'],
146 ['status_foo1', None, 'absolute'],
147 ['status_foo2', None, 'absolute'],
148 ['status_foo3', None, 'absolute'],
149 ['status_yellow', 'yellow', 'absolute']
151 'cluster_health_shards': {
152 'options': [None, 'Shards statistics', 'shards', 'cluster health API',
153 'es.cluster_health_sharts', 'stacked'],
155 ['health_active_shards', 'active_shards', 'absolute'],
156 ['health_relocating_shards', 'relocating_shards', 'absolute'],
157 ['health_unassigned_shards', 'unassigned', 'absolute'],
158 ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
159 ['health_initializing_shards', 'initializing', 'absolute'],
160 ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
162 'cluster_stats_nodes': {
163 'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
164 'es.cluster_stats_nodes', 'stacked'],
166 ['count_data_only', 'data_only', 'absolute'],
167 ['count_master_data', 'master_data', 'absolute'],
168 ['count_total', 'total', 'absolute'],
169 ['count_master_only', 'master_only', 'absolute'],
170 ['count_client', 'client', 'absolute']
172 'cluster_stats_query_cache': {
173 'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
174 'es.cluster_stats_query_cache', 'stacked'],
176 ['query_cache_hit_count', 'hit', 'incremental'],
177 ['query_cache_miss_count', 'miss', 'incremental']
179 'cluster_stats_docs': {
180 'options': [None, 'Docs statistics', 'count', 'cluster stats API',
181 'es.cluster_stats_docs', 'line'],
183 ['docs_count', 'docs', 'absolute']
185 'cluster_stats_store': {
186 'options': [None, 'Store statistics', 'MB', 'cluster stats API',
187 'es.cluster_stats_store', 'line'],
189 ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
191 'cluster_stats_indices_shards': {
192 'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
193 'es.cluster_stats_ind_sha', 'stacked'],
195 ['indices_count', 'indices', 'absolute'],
196 ['shards_total', 'shards', 'absolute']
198 'host_metrics_transport': {
199 'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
200 'es.host_metrics_transport', 'area'],
202 ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
203 ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
205 'host_metrics_file_descriptors': {
206 'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
207 'es.host_metrics_descriptors', 'area'],
209 ['file_descriptors_used', 'used', 'absolute', 1, 10]
211 'host_metrics_http': {
212 'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
213 'es.host_metrics_http', 'line'],
215 ['http_current_open', 'opened', 'absolute', 1, 1]
220 class Service(UrlService):
221 def __init__(self, configuration=None, name=None):
222 UrlService.__init__(self, configuration=configuration, name=name)
224 self.definitions = CHARTS
225 self.host = self.configuration.get('host')
226 self.port = self.configuration.get('port')
227 self.user = self.configuration.get('user')
228 self.password = self.configuration.get('pass')
229 self.latency = dict()
232 # We can't start if <host> AND <port> not specified
233 if not all([self.host, self.port]):
236 # It as a bad idea to use hostname.
237 # Hostname -> ipaddress
239 self.host = gethostbyname(self.host)
240 except Exception as e:
244 # HTTP Auth? NOT TESTED
245 self.auth = self.user and self.password
247 # Create URL for every Elasticsearch API
248 url_node_stats = 'http://%s:%s/_nodes/_local/stats' % (self.host, self.port)
249 url_cluster_health = 'http://%s:%s/_cluster/health' % (self.host, self.port)
250 url_cluster_stats = 'http://%s:%s/_cluster/stats' % (self.host, self.port)
252 # Create list of enabled API calls
253 user_choice = [bool(self.configuration.get('node_stats', True)),
254 bool(self.configuration.get('cluster_health', True)),
255 bool(self.configuration.get('cluster_stats', True))]
257 avail_methods = [(self._get_node_stats, url_node_stats),
258 (self._get_cluster_health, url_cluster_health),
259 (self._get_cluster_stats, url_cluster_stats)]
261 # Remove disabled API calls from 'avail methods'
262 self.methods = [avail_methods[_] for _ in range(len(avail_methods)) if user_choice[_]]
264 # Run _get_data for ALL active API calls.
266 for method in self.methods:
267 api_result[method[1]] = (bool(self._get_raw_data(method[1])))
269 # We can start ONLY if all active API calls returned NOT None
270 if not all(api_result.values()):
271 self.error('Plugin could not get data from all APIs')
272 self.error('%s' % api_result)
275 self.info('%s' % api_result)
276 self.info('Plugin was started successfully')
280 def _get_raw_data(self, url):
285 raw_data = get(url, auth=(self.user, self.password))
296 for method in self.methods:
297 th = Thread(target=method[0], args=(queue, method[1]))
301 for thread in threads:
303 result.update(queue.get())
305 return result or None
307 def _get_cluster_health(self, queue, url):
309 Format data received from http request
313 data = self._get_raw_data(url)
318 data = data.json() if '__call__' in dir(data.json) else data.json
321 to_netdata.update(update_key('health', data))
322 to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
323 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
324 to_netdata[''.join(['status_', to_netdata.get('health_status', '')])] = 1
326 queue.put(to_netdata)
328 def _get_cluster_stats(self, queue, url):
330 Format data received from http request
334 data = self._get_raw_data(url)
339 data = data.json() if '__call__' in dir(data.json) else data.json
342 to_netdata.update(update_key('count', data['nodes']['count']))
343 to_netdata.update(update_key('query_cache', data['indices']['query_cache']))
344 to_netdata.update(update_key('docs', data['indices']['docs']))
345 to_netdata.update(update_key('store', data['indices']['store']))
346 to_netdata['indices_count'] = data['indices']['count']
347 to_netdata['shards_total'] = data['indices'].get('shards', {}).get('total')
349 queue.put(to_netdata)
351 def _get_node_stats(self, queue, url):
353 Format data received from http request
357 data = self._get_raw_data(url)
362 data = data.json() if '__call__' in dir(data.json) else data.json
364 node = list(data['nodes'].keys())[0]
366 # Search performance metrics
367 to_netdata.update(data['nodes'][node]['indices']['search'])
368 to_netdata['query_latency'] = self.find_avg(to_netdata['query_total'],
369 to_netdata['query_time_in_millis'], 'query_latency')
370 to_netdata['fetch_latency'] = self.find_avg(to_netdata['fetch_total'],
371 to_netdata['fetch_time_in_millis'], 'fetch_latency')
373 # Indexing performance metrics
374 for key in ['indexing', 'refresh', 'flush']:
375 to_netdata.update(update_key(key, data['nodes'][node]['indices'].get(key, {})))
376 to_netdata['indexing_latency'] = self.find_avg(to_netdata['indexing_index_total'],
377 to_netdata['indexing_index_time_in_millis'], 'index_latency')
378 to_netdata['flushing_latency'] = self.find_avg(to_netdata['flush_total'],
379 to_netdata['flush_total_time_in_millis'], 'flush_latency')
380 # Memory usage and garbage collection
381 to_netdata.update(update_key('young', data['nodes'][node]['jvm']['gc']['collectors']['young']))
382 to_netdata.update(update_key('old', data['nodes'][node]['jvm']['gc']['collectors']['old']))
383 to_netdata['jvm_heap_percent'] = data['nodes'][node]['jvm']['mem']['heap_used_percent']
384 to_netdata['jvm_heap_commit'] = data['nodes'][node]['jvm']['mem']['heap_committed_in_bytes']
386 # Thread pool queues and rejections
387 for key in ['bulk', 'index', 'search', 'merge']:
388 to_netdata.update(update_key(key, data['nodes'][node]['thread_pool'].get(key, {})))
391 to_netdata['index_fdata_mem'] = data['nodes'][node]['indices']['fielddata']['memory_size_in_bytes']
392 to_netdata['index_fdata_evic'] = data['nodes'][node]['indices']['fielddata']['evictions']
393 to_netdata['breakers_fdata_trip'] = data['nodes'][node]['breakers']['fielddata']['tripped']
396 to_netdata.update(update_key('http', data['nodes'][node]['http']))
397 to_netdata.update(update_key('transport', data['nodes'][node]['transport']))
398 to_netdata['file_descriptors_used'] = round(float(data['nodes'][node]['process']['open_file_descriptors'])
399 / data['nodes'][node]['process']['max_file_descriptors'] * 1000)
401 queue.put(to_netdata)
403 def find_avg(self, value1, value2, key):
404 if key not in self.latency:
405 self.latency.update({key: [value1, value2]})
408 if not self.latency[key][0] == value1:
409 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
410 self.latency.update({key: [value1, value2]})
413 self.latency.update({key: [value1, value2]})
417 def update_key(string, dictionary):
418 return dict([('_'.join([string, elem[0]]), elem[1]) for elem in dictionary.items()])