From be49865d8da62cdf66f4ebbe2619ee072339fab0 Mon Sep 17 00:00:00 2001 From: nepython Date: Fri, 17 Jul 2020 19:48:14 +0530 Subject: [PATCH] [timeseries] Add index lifecycle policy to elasticsearch --- .../db/backends/elasticsearch/client.py | 84 +++++++++++-------- .../db/backends/elasticsearch/index.py | 61 ++++++++++---- .../elasticsearch/retention_policies.py | 41 +++++++++ .../monitoring/tests/__init__.py | 4 +- 4 files changed, 140 insertions(+), 50 deletions(-) create mode 100644 openwisp_monitoring/db/backends/elasticsearch/retention_policies.py diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py index 8b6ab5330..22eeadc74 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/client.py +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -17,6 +17,7 @@ from .. import TIMESERIES_DB from .index import MetricIndex, Point, find_metric from .queries import default_chart_query, math_map, operator_lookup +from .retention_policies import _make_policy, default_rp_policy logger = logging.getLogger(__name__) @@ -70,7 +71,10 @@ def __init__(self, db_name='metric'): def create_database(self): """ creates connection to elasticsearch """ connections.create_connection(hosts=[TIMESERIES_DB['HOST']]) - self.get_db + db = self.get_db + # Skip if support for Index Lifecycle Management is disabled or no privileges + self.ilm_enabled = db.ilm.start()['acknowledged'] + self.create_or_alter_retention_policy(name='default') def drop_database(self): """ deletes all indices """ @@ -81,27 +85,48 @@ def drop_database(self): @cached_property def get_db(self): """ Returns an ``Elasticsearch Client`` instance """ - # TODO: AUTHENTICATION remains see `SecurityClient` return Elasticsearch( [f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"], http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']), retry_on_timeout=True, ) - def create_or_alter_retention_policy(self, name, duration): - """ creates or alters existing retention policy if necessary """ - # TODO - pass + def create_or_alter_retention_policy(self, name, duration=None): + """ + creates or alters existing retention policy if necessary + + Note: default retention policy can't be altered with this function + """ + if not self.ilm_enabled: + return + ilm = self.get_db.ilm + if not duration: + ilm.put_lifecycle(policy=name, body=default_rp_policy) + return + days = f'{int(duration.split("h")[0]) // 24}d' + duration_changed = False + try: + policy = ilm.get_lifecycle('default') + exists = True + current_duration = policy['default']['policy']['phases']['hot']['actions'][ + 'rollover' + ]['max_age'] + duration_changed = current_duration != days + except NotFoundError: + exists = False + if not exists or duration_changed: + policy = _make_policy(days) + ilm.put_lifecycle(policy=name, body=policy) def query(self, query, precision=None): index = query.pop('key') return Search(index=index).from_dict(query).execute().to_dict() def write(self, name, values, **kwargs): - # TODO: Add support for retention policy + rp = kwargs.get('retention_policy') tags = kwargs.get('tags') timestamp = kwargs.get('timestamp') - metric_id = find_metric(name, tags, add=True) + metric_id = find_metric(self.get_db, name, tags, rp, add=True) metric_index = MetricIndex().get(metric_id, index=name) point = Point(fields=values, time=timestamp or datetime.now()) metric_index.points.append(point) @@ -111,13 +136,10 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs): extra_fields = kwargs.get('extra_fields') time_format = kwargs.get('time_format') # since = kwargs.get('since') - metric_id = find_metric(key, tags) + metric_id = find_metric(self.get_db, key, tags) if not metric_id: - return list() - try: - metric_index = MetricIndex().get(metric_id, index=key) - except NotFoundError: return [] + metric_index = self.get_db.get(index=key, id=metric_id) if order == 'time': points = list(metric_index.points[0:limit]) elif order == '-time': @@ -127,33 +149,27 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs): f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' 'result sorted in ascending /descending order respectively.' ) - if not points: - return list() # distinguish between traffic and clients for point in list(points): if fields not in point.fields.to_dict(): points.remove(point) if extra_fields and extra_fields != '*': assert isinstance(extra_fields, list) - _points = [] - for point in points: - point = point.to_dict() - _point = { + for count, point in enumerate(points): + fields_dict = point.to_dict()['fields'] + point = { 'time': self._format_time(point['time'], time_format), - fields: point['fields'][fields], + fields: fields_dict[fields], } for extra_field in extra_fields: - if point['fields'].get(extra_field) is not None: - _point.update({extra_field: point['fields'][extra_field]}) - _points.append(_point) - points = _points + if fields_dict.get(extra_field) is not None: + point.update({extra_field: fields_dict[extra_field]}) + points[count] = point elif extra_fields == '*': - points = [ - deep_merge_dicts( - p.fields.to_dict(), {'time': self._format_time(p.time, time_format)} + for count, point in enumerate(points): + points[count] = deep_merge_dicts( + point.fields.to_dict(), {'time': self._format_time(point.time, time_format)} ) - for p in points - ] else: points = [ deep_merge_dicts( @@ -210,12 +226,14 @@ def _fill_points(self, query, points): def delete_metric_data(self, key=None, tags=None): """ - deletes a specific metric based on the key and tags - provided, you may also choose to delete all metrics + deletes a specific metric based on given key and tags; + deletes all metrics if neither provided """ if key and tags: - metric_id = find_metric(key, tags) + metric_id = find_metric(self.get_db, key, tags) self.get_db.delete(index=key, id=metric_id) + elif key: + self.get_db.indices.delete(index=key, ignore=[400, 404]) else: self.get_db.indices.delete(index='*', ignore=[400, 404]) @@ -317,7 +335,7 @@ def default_chart_query(self, tags): return q -# Old data - delete by query (inefficient) / retention policy - Index lifecycle management +# TODO: # Fix Average - currently it's computing average over all fields! # Time Interval - fix range # Device query diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py index cfa866a9f..d21705bb6 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/index.py +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -1,12 +1,14 @@ import uuid from django.conf import settings -from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError from elasticsearch.helpers import bulk from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search from swapper import load_model +# TODO: Remove this. Current Structure +# Index Templates <-- Indexes <-- Documents <-- Points + class Point(InnerDoc): time = Date(required=True, default_timezone=settings.TIME_ZONE) @@ -15,20 +17,19 @@ class Point(InnerDoc): class MetricIndex(Document): tags = Nested(dynamic=True, required=True, multi=True) - # returns an empty list if not present points = Nested(Point) class Index: - # name gets replaced with metric's key name = 'metric' settings = { - "number_of_shards": 1, - "number_of_replicas": 0, + 'number_of_shards': 1, + 'number_of_replicas': 0, + 'lifecycle.name': 'default', + 'lifecycle.rollover_alias': 'metric', } -def find_metric(index, tags, add=False): - client = Elasticsearch() +def find_metric(client, index, tags, retention_policy=None, add=False): search = Search(using=client, index=index) if tags: tags_dict = dict() @@ -40,28 +41,56 @@ def find_metric(index, tags, add=False): try: return list(search.query(q).execute())[0].meta['id'] except (NotFoundError, AttributeError, IndexError): - return add_index(index, tags)['_id'] if add else None + return ( + add_doc(client, index, tags, retention_policy=retention_policy)['_id'] + if add + else None + ) -def add_index(key, tags, id=None): +def add_doc(client, key, tags, _id=None, retention_policy=None): """ Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided. If no ``id`` is provided a random ``uuid`` would be used. """ - obj = MetricIndex(meta={'id': id or uuid.uuid1()}, tags=tags) - obj.meta.index = key - obj.save() + _id = str(_id or uuid.uuid1()) + # Check if index exists + if client.indices.exists(index=key): + client.create(index=key, id=_id, body={'tags': tags}) + return {'_id': _id} + # Create a new index if it doesn't exist + name = f'{key}-000001' + obj = MetricIndex() + obj._index = obj._index.clone(name) + # Create a new index template if it doesn't exist + if not client.indices.exists_template(name=key): + obj._index.settings(**{'lifecycle.rollover_alias': key}) + if retention_policy: + obj._index.settings(**{'lifecycle.name': retention_policy}) + # index pattern is added for Index Lifecycle Management + obj._index.as_template(key, f'{key}-*').save(using=client) + obj.init(using=client, index=name) + obj.meta = {'id': _id, 'index': name} + obj.tags = tags + obj.save(using=client, index=name) + if retention_policy: + client.indices.put_settings( + body={'lifecycle.name': retention_policy}, index=key + ) + client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name) + client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) return obj.to_dict(include_meta=True) def bulk_indexing(): """ Index all existing metrics """ + from ... import timeseries_db + Metric = load_model('monitoring', 'Metric') - MetricIndex.init() - es = Elasticsearch() bulk( - client=es, + client=timeseries_db.get_db, actions=( - add_index(m.key, m.tags, m.id) for m in Metric.objects.all().iterator() + add_doc(timeseries_db.get_db, m.key, m.tags, m.id) + for m in Metric.objects.all().distinct('key').iterator() ), ) diff --git a/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py new file mode 100644 index 000000000..d4269959e --- /dev/null +++ b/openwisp_monitoring/db/backends/elasticsearch/retention_policies.py @@ -0,0 +1,41 @@ +# By default age is calculated from the date the index is created but if the +# index has been rolled over than the rollover date is used to calculate the age + +default_rp_policy = { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': '30d', 'max_size': '90G'}, + 'set_priority': {'priority': 100}, + } + }, + 'warm': { + 'min_age': '30d', + 'actions': { + 'forcemerge': {'max_num_segments': 1}, + 'allocate': {'number_of_replicas': 0}, + 'set_priority': {'priority': 50}, + }, + }, + 'cold': {'min_age': '150d', 'actions': {'freeze': {}}}, + 'delete': {'min_age': '335d', 'actions': {'delete': {}}}, + } + } +} + + +def _make_policy(max_age): + return { + 'policy': { + 'phases': { + 'hot': { + 'actions': { + 'rollover': {'max_age': max_age}, + 'set_priority': {'priority': 100}, + } + }, + 'delete': {'actions': {'delete': {}}}, + } + } + } diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 1a96b9f1a..796b85c96 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -16,6 +16,7 @@ Metric = load_model('monitoring', 'Metric') AlertSettings = load_model('monitoring', 'AlertSettings') +# TODO: Queries relating to top_fields aren't yet # this custom chart configuration is used for automated testing purposes charts = { 'histogram': { @@ -125,7 +126,8 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'elasticsearch': _make_query(), }, }, }