1 # -*- coding: utf-8 -*-
2 # Description: elastic search node stats netdata python.d module
5 from base import UrlService
6 from socket import gethostbyname, gaierror
8 from queue import Queue
10 from Queue import Queue
11 from threading import Thread
12 from collections import namedtuple
13 from json import loads
15 # default module values (can be overridden per job in `config`)
21 METHODS = namedtuple('METHODS', ['get_data_function', 'url'])
24 ('indices.search.fetch_current', None, None),
25 ('indices.search.fetch_total', None, None),
26 ('indices.search.query_current', None, None),
27 ('indices.search.query_total', None, None),
28 ('indices.search.query_time_in_millis', None, None),
29 ('indices.search.fetch_time_in_millis', None, None),
30 ('indices.indexing.index_total', 'indexing_index_total', None),
31 ('indices.indexing.index_current', 'indexing_index_current', None),
32 ('indices.indexing.index_time_in_millis', 'indexing_index_time_in_millis', None),
33 ('indices.refresh.total', 'refresh_total', None),
34 ('indices.refresh.total_time_in_millis', 'refresh_total_time_in_millis', None),
35 ('indices.flush.total', 'flush_total', None),
36 ('indices.flush.total_time_in_millis', 'flush_total_time_in_millis', None),
37 ('jvm.gc.collectors.young.collection_count', 'young_collection_count', None),
38 ('jvm.gc.collectors.old.collection_count', 'old_collection_count', None),
39 ('jvm.gc.collectors.young.collection_time_in_millis', 'young_collection_time_in_millis', None),
40 ('jvm.gc.collectors.old.collection_time_in_millis', 'old_collection_time_in_millis', None),
41 ('jvm.mem.heap_used_percent', 'jvm_heap_percent', None),
42 ('jvm.mem.heap_committed_in_bytes', 'jvm_heap_commit', None),
43 ('thread_pool.bulk.queue', 'bulk_queue', None),
44 ('thread_pool.bulk.rejected', 'bulk_rejected', None),
45 ('thread_pool.index.queue', 'index_queue', None),
46 ('thread_pool.index.rejected', 'index_rejected', None),
47 ('thread_pool.search.queue', 'search_queue', None),
48 ('thread_pool.search.rejected', 'search_rejected', None),
49 ('thread_pool.merge.queue', 'merge_queue', None),
50 ('thread_pool.merge.rejected', 'merge_rejected', None),
51 ('indices.fielddata.memory_size_in_bytes', 'index_fdata_memory', None),
52 ('indices.fielddata.evictions', None, None),
53 ('breakers.fielddata.tripped', None, None),
54 ('http.current_open', 'http_current_open', None),
55 ('transport.rx_size_in_bytes', 'transport_rx_size_in_bytes', None),
56 ('transport.tx_size_in_bytes', 'transport_tx_size_in_bytes', None),
57 ('process.max_file_descriptors', None, None),
58 ('process.open_file_descriptors', None, None)
62 ('nodes.count.data_only', 'count_data_only', None),
63 ('nodes.count.master_data', 'count_master_data', None),
64 ('nodes.count.total', 'count_total', None),
65 ('nodes.count.master_only', 'count_master_only', None),
66 ('nodes.count.client', 'count_client', None),
67 ('indices.docs.count', 'docs_count', None),
68 ('indices.query_cache.hit_count', 'query_cache_hit_count', None),
69 ('indices.query_cache.miss_count', 'query_cache_miss_count', None),
70 ('indices.store.size_in_bytes', 'store_size_in_bytes', None),
71 ('indices.count', 'indices_count', None),
72 ('indices.shards.total', 'shards_total', None)
76 ('number_of_nodes', 'health_number_of_nodes', None),
77 ('number_of_data_nodes', 'health_number_of_data_nodes', None),
78 ('number_of_pending_tasks', 'health_number_of_pending_tasks', None),
79 ('number_of_in_flight_fetch', 'health_number_of_in_flight_fetch', None),
80 ('active_shards', 'health_active_shards', None),
81 ('relocating_shards', 'health_relocating_shards', None),
82 ('unassigned_shards', 'health_unassigned_shards', None),
83 ('delayed_unassigned_shards', 'health_delayed_unassigned_shards', None),
84 ('initializing_shards', 'health_initializing_shards', None),
85 ('active_shards_percent_as_number', 'health_active_shards_percent_as_number', None)
88 # charts order (can be overridden if you want less charts, or different order)
89 ORDER = ['search_perf_total', 'search_perf_current', 'search_perf_time', 'search_latency', 'index_perf_total',
90 'index_perf_current', 'index_perf_time', 'index_latency', 'jvm_mem_heap', 'jvm_gc_count',
91 'jvm_gc_time', 'host_metrics_file_descriptors', 'host_metrics_http', 'host_metrics_transport',
92 'thread_pool_qr_q', 'thread_pool_qr_r', 'fdata_cache', 'fdata_ev_tr', 'cluster_health_status',
93 'cluster_health_nodes', 'cluster_health_shards', 'cluster_stats_nodes', 'cluster_stats_query_cache',
94 'cluster_stats_docs', 'cluster_stats_store', 'cluster_stats_indices_shards']
97 'search_perf_total': {
98 'options': [None, 'Total number of queries, fetches', 'number of', 'search performance',
99 'es.search_query_total', 'stacked'],
101 ['query_total', 'queries', 'incremental'],
102 ['fetch_total', 'fetches', 'incremental']
104 'search_perf_current': {
105 'options': [None, 'Number of queries, fetches in progress', 'number of', 'search performance',
106 'es.search_query_current', 'stacked'],
108 ['query_current', 'queries', 'absolute'],
109 ['fetch_current', 'fetches', 'absolute']
111 'search_perf_time': {
112 'options': [None, 'Time spent on queries, fetches', 'seconds', 'search performance',
113 'es.search_time', 'stacked'],
115 ['query_time_in_millis', 'query', 'incremental', 1, 1000],
116 ['fetch_time_in_millis', 'fetch', 'incremental', 1, 1000]
119 'options': [None, 'Query and fetch latency', 'ms', 'search performance', 'es.search_latency', 'stacked'],
121 ['query_latency', 'query', 'absolute', 1, 1000],
122 ['fetch_latency', 'fetch', 'absolute', 1, 1000]
124 'index_perf_total': {
125 'options': [None, 'Total number of documents indexed, index refreshes, index flushes to disk', 'number of',
126 'indexing performance', 'es.index_performance_total', 'stacked'],
128 ['indexing_index_total', 'indexed', 'incremental'],
129 ['refresh_total', 'refreshes', 'incremental'],
130 ['flush_total', 'flushes', 'incremental']
132 'index_perf_current': {
133 'options': [None, 'Number of documents currently being indexed', 'currently indexed',
134 'indexing performance', 'es.index_performance_current', 'stacked'],
136 ['indexing_index_current', 'documents', 'absolute']
139 'options': [None, 'Time spent on indexing, refreshing, flushing', 'seconds', 'indexing performance',
140 'es.search_time', 'stacked'],
142 ['indexing_index_time_in_millis', 'indexing', 'incremental', 1, 1000],
143 ['refresh_total_time_in_millis', 'refreshing', 'incremental', 1, 1000],
144 ['flush_total_time_in_millis', 'flushing', 'incremental', 1, 1000]
147 'options': [None, 'Indexing and flushing latency', 'ms', 'indexing performance',
148 'es.index_latency', 'stacked'],
150 ['indexing_latency', 'indexing', 'absolute', 1, 1000],
151 ['flushing_latency', 'flushing', 'absolute', 1, 1000]
154 'options': [None, 'JVM heap currently in use/committed', 'percent/MB', 'memory usage and gc',
155 'es.jvm_heap', 'area'],
157 ['jvm_heap_percent', 'inuse', 'absolute'],
158 ['jvm_heap_commit', 'commit', 'absolute', -1, 1048576]
161 'options': [None, 'Count of garbage collections', 'counts', 'memory usage and gc', 'es.gc_count', 'stacked'],
163 ['young_collection_count', 'young', 'incremental'],
164 ['old_collection_count', 'old', 'incremental']
167 'options': [None, 'Time spent on garbage collections', 'ms', 'memory usage and gc', 'es.gc_time', 'stacked'],
169 ['young_collection_time_in_millis', 'young', 'incremental'],
170 ['old_collection_time_in_millis', 'old', 'incremental']
172 'thread_pool_qr_q': {
173 'options': [None, 'Number of queued threads in thread pool', 'queued threads', 'queues and rejections',
174 'es.thread_pool_queued', 'stacked'],
176 ['bulk_queue', 'bulk', 'absolute'],
177 ['index_queue', 'index', 'absolute'],
178 ['search_queue', 'search', 'absolute'],
179 ['merge_queue', 'merge', 'absolute']
181 'thread_pool_qr_r': {
182 'options': [None, 'Number of rejected threads in thread pool', 'rejected threads', 'queues and rejections',
183 'es.thread_pool_rejected', 'stacked'],
185 ['bulk_rejected', 'bulk', 'absolute'],
186 ['index_rejected', 'index', 'absolute'],
187 ['search_rejected', 'search', 'absolute'],
188 ['merge_rejected', 'merge', 'absolute']
191 'options': [None, 'Fielddata cache size', 'MB', 'fielddata cache', 'es.fdata_cache', 'line'],
193 ['index_fdata_memory', 'cache', 'absolute', 1, 1048576]
196 'options': [None, 'Fielddata evictions and circuit breaker tripped count', 'number of events',
197 'fielddata cache', 'es.evictions_tripped', 'line'],
199 ['evictions', None, 'incremental'],
200 ['tripped', None, 'incremental']
202 'cluster_health_nodes': {
203 'options': [None, 'Nodes and tasks statistics', 'units', 'cluster health API',
204 'es.cluster_health_nodes', 'stacked'],
206 ['health_number_of_nodes', 'nodes', 'absolute'],
207 ['health_number_of_data_nodes', 'data_nodes', 'absolute'],
208 ['health_number_of_pending_tasks', 'pending_tasks', 'absolute'],
209 ['health_number_of_in_flight_fetch', 'in_flight_fetch', 'absolute']
211 'cluster_health_status': {
212 'options': [None, 'Cluster status', 'status', 'cluster health API',
213 'es.cluster_health_status', 'area'],
215 ['status_green', 'green', 'absolute'],
216 ['status_red', 'red', 'absolute'],
217 ['status_foo1', None, 'absolute'],
218 ['status_foo2', None, 'absolute'],
219 ['status_foo3', None, 'absolute'],
220 ['status_yellow', 'yellow', 'absolute']
222 'cluster_health_shards': {
223 'options': [None, 'Shards statistics', 'shards', 'cluster health API',
224 'es.cluster_health_shards', 'stacked'],
226 ['health_active_shards', 'active_shards', 'absolute'],
227 ['health_relocating_shards', 'relocating_shards', 'absolute'],
228 ['health_unassigned_shards', 'unassigned', 'absolute'],
229 ['health_delayed_unassigned_shards', 'delayed_unassigned', 'absolute'],
230 ['health_initializing_shards', 'initializing', 'absolute'],
231 ['health_active_shards_percent_as_number', 'active_percent', 'absolute']
233 'cluster_stats_nodes': {
234 'options': [None, 'Nodes statistics', 'nodes', 'cluster stats API',
235 'es.cluster_nodes', 'stacked'],
237 ['count_data_only', 'data_only', 'absolute'],
238 ['count_master_data', 'master_data', 'absolute'],
239 ['count_total', 'total', 'absolute'],
240 ['count_master_only', 'master_only', 'absolute'],
241 ['count_client', 'client', 'absolute']
243 'cluster_stats_query_cache': {
244 'options': [None, 'Query cache statistics', 'queries', 'cluster stats API',
245 'es.cluster_query_cache', 'stacked'],
247 ['query_cache_hit_count', 'hit', 'incremental'],
248 ['query_cache_miss_count', 'miss', 'incremental']
250 'cluster_stats_docs': {
251 'options': [None, 'Docs statistics', 'count', 'cluster stats API',
252 'es.cluster_docs', 'line'],
254 ['docs_count', 'docs', 'absolute']
256 'cluster_stats_store': {
257 'options': [None, 'Store statistics', 'MB', 'cluster stats API',
258 'es.cluster_store', 'line'],
260 ['store_size_in_bytes', 'size', 'absolute', 1, 1048567]
262 'cluster_stats_indices_shards': {
263 'options': [None, 'Indices and shards statistics', 'count', 'cluster stats API',
264 'es.cluster_indices_shards', 'stacked'],
266 ['indices_count', 'indices', 'absolute'],
267 ['shards_total', 'shards', 'absolute']
269 'host_metrics_transport': {
270 'options': [None, 'Cluster communication transport metrics', 'kbit/s', 'host metrics',
271 'es.host_transport', 'area'],
273 ['transport_rx_size_in_bytes', 'in', 'incremental', 8, 1000],
274 ['transport_tx_size_in_bytes', 'out', 'incremental', -8, 1000]
276 'host_metrics_file_descriptors': {
277 'options': [None, 'Available file descriptors in percent', 'percent', 'host metrics',
278 'es.host_descriptors', 'area'],
280 ['file_descriptors_used', 'used', 'absolute', 1, 10]
282 'host_metrics_http': {
283 'options': [None, 'Opened HTTP connections', 'connections', 'host metrics',
284 'es.host_http_connections', 'line'],
286 ['http_current_open', 'opened', 'absolute', 1, 1]
291 class Service(UrlService):
292 def __init__(self, configuration=None, name=None):
293 UrlService.__init__(self, configuration=configuration, name=name)
295 self.definitions = CHARTS
296 self.host = self.configuration.get('host')
297 self.port = self.configuration.get('port', 9200)
298 self.scheme = self.configuration.get('scheme', 'http')
299 self.latency = dict()
300 self.methods = list()
303 # We can't start if <host> AND <port> not specified
304 if not all([self.host, self.port, isinstance(self.host, str), isinstance(self.port, (str, int))]):
305 self.error('Host is not defined in the module configuration file')
308 # It as a bad idea to use hostname.
309 # Hostname -> ip address
311 self.host = gethostbyname(self.host)
312 except gaierror as error:
313 self.error(str(error))
316 scheme = 'http' if self.scheme else 'https'
317 # Add handlers (auth, self signed cert accept)
318 self.url = '%s://%s:%s' % (scheme, self.host, self.port)
319 self._UrlService__add_openers()
320 # Create URL for every Elasticsearch API
321 url_node_stats = '%s://%s:%s/_nodes/_local/stats' % (scheme, self.host, self.port)
322 url_cluster_health = '%s://%s:%s/_cluster/health' % (scheme, self.host, self.port)
323 url_cluster_stats = '%s://%s:%s/_cluster/stats' % (scheme, self.host, self.port)
325 # Create list of enabled API calls
326 user_choice = [bool(self.configuration.get('node_stats', True)),
327 bool(self.configuration.get('cluster_health', True)),
328 bool(self.configuration.get('cluster_stats', True))]
330 avail_methods = [METHODS(get_data_function=self._get_node_stats_, url=url_node_stats),
331 METHODS(get_data_function=self._get_cluster_health_, url=url_cluster_health),
332 METHODS(get_data_function=self._get_cluster_stats_, url=url_cluster_stats)]
334 # Remove disabled API calls from 'avail methods'
335 self.methods = [avail_methods[e[0]] for e in enumerate(avail_methods) if user_choice[e[0]]]
337 # Run _get_data for ALL active API calls.
338 api_check_result = dict()
339 data_from_check = dict()
340 for method in self.methods:
342 api_check_result[method.url] = method.get_data_function(None, method.url)
343 data_from_check.update(api_check_result[method.url] or dict())
344 except KeyError as error:
345 self.error('Failed to parse %s. Error: %s' % (method.url, str(error)))
348 # We can start ONLY if all active API calls returned NOT None
349 if not all(api_check_result.values()):
350 self.error('Plugin could not get data from all APIs')
353 self._data_from_check = data_from_check
361 for method in self.methods:
362 th = Thread(target=method.get_data_function, args=(queue, method.url))
366 for thread in threads:
368 result.update(queue.get())
370 return result or None
372 def _get_cluster_health_(self, queue, url):
374 Format data received from http request
378 raw_data = self._get_raw_data(url)
381 return queue.put(dict()) if queue else None
383 data = loads(raw_data)
385 to_netdata = fetch_data_(raw_data=data, metrics_list=HEALTH_STATS)
387 to_netdata.update({'status_green': 0, 'status_red': 0, 'status_yellow': 0,
388 'status_foo1': 0, 'status_foo2': 0, 'status_foo3': 0})
389 current_status = 'status_' + data['status']
390 to_netdata[current_status] = 1
392 return queue.put(to_netdata) if queue else to_netdata
394 def _get_cluster_stats_(self, queue, url):
396 Format data received from http request
400 raw_data = self._get_raw_data(url)
403 return queue.put(dict()) if queue else None
405 data = loads(raw_data)
407 to_netdata = fetch_data_(raw_data=data, metrics_list=CLUSTER_STATS)
409 return queue.put(to_netdata) if queue else to_netdata
411 def _get_node_stats_(self, queue, url):
413 Format data received from http request
417 raw_data = self._get_raw_data(url)
420 return queue.put(dict()) if queue else None
422 data = loads(raw_data)
424 node = list(data['nodes'].keys())[0]
425 to_netdata = fetch_data_(raw_data=data['nodes'][node], metrics_list=NODE_STATS)
427 # Search performance latency
428 to_netdata['query_latency'] = self.find_avg_(to_netdata['query_total'],
429 to_netdata['query_time_in_millis'], 'query_latency')
430 to_netdata['fetch_latency'] = self.find_avg_(to_netdata['fetch_total'],
431 to_netdata['fetch_time_in_millis'], 'fetch_latency')
433 # Indexing performance latency
434 to_netdata['indexing_latency'] = self.find_avg_(to_netdata['indexing_index_total'],
435 to_netdata['indexing_index_time_in_millis'], 'index_latency')
436 to_netdata['flushing_latency'] = self.find_avg_(to_netdata['flush_total'],
437 to_netdata['flush_total_time_in_millis'], 'flush_latency')
439 to_netdata['file_descriptors_used'] = round(float(to_netdata['open_file_descriptors'])
440 / to_netdata['max_file_descriptors'] * 1000)
442 return queue.put(to_netdata) if queue else to_netdata
444 def find_avg_(self, value1, value2, key):
445 if key not in self.latency:
446 self.latency.update({key: [value1, value2]})
449 if not self.latency[key][0] == value1:
450 latency = round(float(value2 - self.latency[key][1]) / float(value1 - self.latency[key][0]) * 1000)
451 self.latency.update({key: [value1, value2]})
454 self.latency.update({key: [value1, value2]})
458 def fetch_data_(raw_data, metrics_list):
460 for metric, new_name, function in metrics_list:
462 for key in metric.split('.'):
467 if not isinstance(value, dict) and key:
468 to_netdata[new_name or key] = value if not function else function(value)