From f4fae6575d736d88957c2bd1f3d311ac596d7c91 Mon Sep 17 00:00:00 2001 From: nepython Date: Fri, 17 Jul 2020 19:48:14 +0530 Subject: [PATCH] [timeseries] Add index lifecycle policy to elasticsearch --- .../db/backends/elasticsearch/client.py | 104 +++++++++++------- .../db/backends/elasticsearch/index.py | 68 +++++++++--- .../db/backends/elasticsearch/queries.py | 8 ++ .../elasticsearch/retention_policies.py | 41 +++++++ openwisp_monitoring/monitoring/base/models.py | 1 - .../monitoring/tests/__init__.py | 4 +- 6 files changed, 168 insertions(+), 58 deletions(-) create mode 100644 openwisp_monitoring/db/backends/elasticsearch/retention_policies.py diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py index 8b6ab5330..3e07bd4c5 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/client.py +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -17,6 +17,7 @@ from .. import TIMESERIES_DB from .index import MetricIndex, Point, find_metric from .queries import default_chart_query, math_map, operator_lookup +from .retention_policies import _make_policy, default_rp_policy logger = logging.getLogger(__name__) @@ -69,8 +70,11 @@ def __init__(self, db_name='metric'): def create_database(self): """ creates connection to elasticsearch """ - connections.create_connection(hosts=[TIMESERIES_DB['HOST']]) - self.get_db + connections.create_connection(hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"]) + db = self.get_db + # Skip if support for Index Lifecycle Management is disabled or no privileges + self.ilm_enabled = db.ilm.start()['acknowledged'] + self.create_or_alter_retention_policy(name='default') def drop_database(self): """ deletes all indices """ @@ -81,28 +85,52 @@ def drop_database(self): @cached_property def get_db(self): """ Returns an ``Elasticsearch Client`` instance """ - # TODO: AUTHENTICATION remains see `SecurityClient` return Elasticsearch( [f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"], http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']), retry_on_timeout=True, ) - def create_or_alter_retention_policy(self, name, duration): - """ creates or alters existing retention policy if necessary """ - # TODO - pass + def create_or_alter_retention_policy(self, name, duration=None): + """ + creates or alters existing retention policy if necessary + + Note: default retention policy can't be altered with this function + """ + if not self.ilm_enabled: + return + ilm = self.get_db.ilm + if not duration: + try: + ilm.get_lifecycle(policy='default') + except NotFoundError: + ilm.put_lifecycle(policy='default', body=default_rp_policy) + return + days = f'{int(duration.split("h")[0]) // 24}d' + duration_changed = False + try: + policy = ilm.get_lifecycle(policy=name) + exists = True + current_duration = policy[name]['policy']['phases']['hot']['actions'][ + 'rollover' + ]['max_age'] + duration_changed = current_duration != days + except NotFoundError: + exists = False + if not exists or duration_changed: + policy = _make_policy(days) + ilm.put_lifecycle(policy=name, body=policy) def query(self, query, precision=None): index = query.pop('key') - return Search(index=index).from_dict(query).execute().to_dict() + return Search(using=self.get_db, index=index).from_dict(query).execute().to_dict() def write(self, name, values, **kwargs): - # TODO: Add support for retention policy + rp = kwargs.get('retention_policy') tags = kwargs.get('tags') timestamp = kwargs.get('timestamp') - metric_id = find_metric(name, tags, add=True) - metric_index = MetricIndex().get(metric_id, index=name) + metric_id = find_metric(self.get_db, name, tags, rp, add=True) + metric_index = MetricIndex.get(metric_id, index=name, using=self.get_db) point = Point(fields=values, time=timestamp or datetime.now()) metric_index.points.append(point) metric_index.save() @@ -110,14 +138,12 @@ def write(self, name, values, **kwargs): def read(self, key, fields, tags, limit=1, order='-time', **kwargs): extra_fields = kwargs.get('extra_fields') time_format = kwargs.get('time_format') + # TODO: It will be of the form 'now() - s' # since = kwargs.get('since') - metric_id = find_metric(key, tags) + metric_id = find_metric(self.get_db, key, tags) if not metric_id: - return list() - try: - metric_index = MetricIndex().get(metric_id, index=key) - except NotFoundError: return [] + metric_index = MetricIndex.get(index=key, id=metric_id, using=self.get_db) if order == 'time': points = list(metric_index.points[0:limit]) elif order == '-time': @@ -127,33 +153,28 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs): f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' 'result sorted in ascending /descending order respectively.' ) - if not points: - return list() # distinguish between traffic and clients for point in list(points): if fields not in point.fields.to_dict(): points.remove(point) if extra_fields and extra_fields != '*': assert isinstance(extra_fields, list) - _points = [] - for point in points: - point = point.to_dict() - _point = { + for count, point in enumerate(points): + fields_dict = point.to_dict()['fields'] + point = { 'time': self._format_time(point['time'], time_format), - fields: point['fields'][fields], + fields: fields_dict[fields], } for extra_field in extra_fields: - if point['fields'].get(extra_field) is not None: - _point.update({extra_field: point['fields'][extra_field]}) - _points.append(_point) - points = _points + if fields_dict.get(extra_field) is not None: + point.update({extra_field: fields_dict[extra_field]}) + points[count] = point elif extra_fields == '*': - points = [ - deep_merge_dicts( - p.fields.to_dict(), {'time': self._format_time(p.time, time_format)} + for count, point in enumerate(points): + points[count] = deep_merge_dicts( + point.fields.to_dict(), + {'time': self._format_time(point.time, time_format)}, ) - for p in points - ] else: points = [ deep_merge_dicts( @@ -190,7 +211,10 @@ def _fill_points(self, query, points): start_time = datetime.now() end_time = start_time - timedelta(days=days) # include today dummy_point = deepcopy(points[0]) - interval = points[0]['time'] - points[1]['time'] + if len(points) > 2: + interval = points[0]['time'] - points[1]['time'] + else: + interval = 600 start_ts = points[0]['time'] + interval end_ts = points[-1]['time'] - interval for field in dummy_point.keys(): @@ -210,12 +234,14 @@ def _fill_points(self, query, points): def delete_metric_data(self, key=None, tags=None): """ - deletes a specific metric based on the key and tags - provided, you may also choose to delete all metrics + deletes a specific metric based on given key and tags; + deletes all metrics if neither provided """ if key and tags: - metric_id = find_metric(key, tags) + metric_id = find_metric(self.get_db, key, tags) self.get_db.delete(index=key, id=metric_id) + elif key: + self.get_db.indices.delete(index=key, ignore=[400, 404]) else: self.get_db.indices.delete(index='*', ignore=[400, 404]) @@ -226,10 +252,10 @@ def validate_query(self, query): query = json.loads(query) # Elasticsearch currently supports validation of only query section, # aggs, size, _source etc. are not supported - valid_check = self.get_db.indices.validate_query(body={'query': query['query']}) + valid_check = self.get_db.indices.validate_query(body={'query': query['query']}, explain=True) # Show a helpful message for failure if not valid_check['valid']: - raise ValidationError(valid_check['error']) + raise ValidationError(valid_check['explanations']) return self._is_aggregate(query) def _is_aggregate(self, q): @@ -317,7 +343,7 @@ def default_chart_query(self, tags): return q -# Old data - delete by query (inefficient) / retention policy - Index lifecycle management +# TODO: # Fix Average - currently it's computing average over all fields! # Time Interval - fix range # Device query diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py index cfa866a9f..ac212c582 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/index.py +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -1,12 +1,14 @@ import uuid from django.conf import settings -from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError from elasticsearch.helpers import bulk from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search from swapper import load_model +# TODO: Remove this. Current Structure +# Index Templates <-- Indexes <-- Documents <-- Points + class Point(InnerDoc): time = Date(required=True, default_timezone=settings.TIME_ZONE) @@ -15,53 +17,85 @@ class Point(InnerDoc): class MetricIndex(Document): tags = Nested(dynamic=True, required=True, multi=True) - # returns an empty list if not present points = Nested(Point) class Index: - # name gets replaced with metric's key name = 'metric' settings = { - "number_of_shards": 1, - "number_of_replicas": 0, + 'number_of_shards': 1, + 'number_of_replicas': 0, + 'lifecycle.name': 'default', + 'lifecycle.rollover_alias': 'metric', } -def find_metric(index, tags, add=False): - client = Elasticsearch() +def find_metric(client, index, tags, retention_policy=None, add=False): search = Search(using=client, index=index) if tags: tags_dict = dict() for key, value in tags.items(): tags_dict[f'tags.{key}'] = value - q = Q('bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()]) + q = Q( + 'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()] + ) + # if index == 'device_data': + # q = Q('nested', path='tags', query=q) else: q = Q() try: return list(search.query(q).execute())[0].meta['id'] except (NotFoundError, AttributeError, IndexError): - return add_index(index, tags)['_id'] if add else None + return ( + add_doc(client, index, tags, retention_policy=retention_policy)['_id'] + if add + else None + ) -def add_index(key, tags, id=None): +def add_doc(client, key, tags, _id=None, retention_policy=None): """ Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided. If no ``id`` is provided a random ``uuid`` would be used. """ - obj = MetricIndex(meta={'id': id or uuid.uuid1()}, tags=tags) - obj.meta.index = key - obj.save() + _id = str(_id or uuid.uuid1()) + # Check if index exists + if client.indices.exists(index=key): + client.create(index=key, id=_id, body={'tags': tags}) + return {'_id': _id} + # Create a new index if it doesn't exist + name = f'{key}-000001' + obj = MetricIndex(meta={'id': _id}) + obj.meta.index = name + obj.tags = tags + # TODO: If mappings are put then aggregations don't work, find why. + # Issue similar to https://discuss.elastic.co/t/aggregations-do-not-work-any-more-index-corrupt-resolved/24947/5 + obj.save(using=client, index=name) + client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) + client.indices.put_settings(body={ + 'number_of_replicas': 0, + 'lifecycle.name': retention_policy or 'default', + 'lifecycle.rollover_alias': key, + }, index=name) + obj._index = obj._index.clone(name) + # Create a new index template if it doesn't exist + if not client.indices.exists_template(name=key): + obj._index.settings(**{'lifecycle.rollover_alias': key}) + if retention_policy: + obj._index.settings(**{'lifecycle.name': retention_policy}) + # index pattern is added for Index Lifecycle Management + obj._index.as_template(key, f'{key}-*').save(using=client) return obj.to_dict(include_meta=True) def bulk_indexing(): """ Index all existing metrics """ + from ... import timeseries_db + Metric = load_model('monitoring', 'Metric') - MetricIndex.init() - es = Elasticsearch() bulk( - client=es, + client=timeseries_db.get_db, actions=( - add_index(m.key, m.tags, m.id) for m in Metric.objects.all().iterator() + add_doc(timeseries_db.get_db, m.key, m.tags, m.id) + for m in Metric.objects.all().distinct('key').iterator() ), ) diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py index ec1c81689..002e46c32 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/queries.py +++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py @@ -11,6 +11,14 @@ ] } }, + # 'nested': { + # 'path': 'points', + # 'query': { + # 'bool': { + # 'must': [{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}] + # } + # } + # } '_source': False, 'size': 0, 'aggs': { diff --git a/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py new file mode 100644 index 000000000..d4269959e --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py @@ -0,0 +1,41 @@ +# By default age is calculated from the date the index is created but if the +# index has been rolled over than the rollover date is used to calculate the age + +default_rp_policy = { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': '30d', 'max_size': '90G'}, + 'set_priority': {'priority': 100}, + } + }, + 'warm': { + 'min_age': '30d', + 'actions': { + 'forcemerge': {'max_num_segments': 1}, + 'allocate': {'number_of_replicas': 0}, + 'set_priority': {'priority': 50}, + }, + }, + 'cold': {'min_age': '150d', 'actions': {'freeze': {}}}, + 'delete': {'min_age': '335d', 'actions': {'delete': {}}}, + } + } +} + + +def _make_policy(max_age): + return { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': max_age}, + 'set_priority': {'priority': 100}, + } + }, + 'delete': {'actions': {'delete': {}}}, + } + } + } diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 73c39a2f7..6670c5685 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -509,7 +509,6 @@ def _is_crossed_by(self, current_value, time=None): continue if not self._value_crossed(point[self.metric.field_name]): return False - print(point['time']) if self._time_crossed( make_aware(datetime.fromtimestamp(point['time'])) ): diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 1a96b9f1a..796b85c96 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -16,6 +16,7 @@ Metric = load_model('monitoring', 'Metric') AlertSettings = load_model('monitoring', 'AlertSettings') +# TODO: Queries relating to top_fields aren't yet # this custom chart configuration is used for automated testing purposes charts = { 'histogram': { @@ -125,7 +126,8 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, }