Skip to content

Commit

Permalink
[timeseries] Add index lifecycle policy to elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
nepython committed Jul 21, 2020
1 parent abbc818 commit f4fae65
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 58 deletions.
104 changes: 65 additions & 39 deletions openwisp_monitoring/db/backends/elasticsearch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .. import TIMESERIES_DB
from .index import MetricIndex, Point, find_metric
from .queries import default_chart_query, math_map, operator_lookup
from .retention_policies import _make_policy, default_rp_policy

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,8 +70,11 @@ def __init__(self, db_name='metric'):

def create_database(self):
""" creates connection to elasticsearch """
connections.create_connection(hosts=[TIMESERIES_DB['HOST']])
self.get_db
connections.create_connection(hosts=[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"])
db = self.get_db
# Skip if support for Index Lifecycle Management is disabled or no privileges
self.ilm_enabled = db.ilm.start()['acknowledged']
self.create_or_alter_retention_policy(name='default')

def drop_database(self):
""" deletes all indices """
Expand All @@ -81,43 +85,65 @@ def drop_database(self):
@cached_property
def get_db(self):
""" Returns an ``Elasticsearch Client`` instance """
# TODO: AUTHENTICATION remains see `SecurityClient`
return Elasticsearch(
[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"],
http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
retry_on_timeout=True,
)

def create_or_alter_retention_policy(self, name, duration):
""" creates or alters existing retention policy if necessary """
# TODO
pass
def create_or_alter_retention_policy(self, name, duration=None):
"""
creates or alters existing retention policy if necessary
Note: default retention policy can't be altered with this function
"""
if not self.ilm_enabled:
return
ilm = self.get_db.ilm
if not duration:
try:
ilm.get_lifecycle(policy='default')
except NotFoundError:
ilm.put_lifecycle(policy='default', body=default_rp_policy)
return
days = f'{int(duration.split("h")[0]) // 24}d'
duration_changed = False
try:
policy = ilm.get_lifecycle(policy=name)
exists = True
current_duration = policy[name]['policy']['phases']['hot']['actions'][
'rollover'
]['max_age']
duration_changed = current_duration != days
except NotFoundError:
exists = False
if not exists or duration_changed:
policy = _make_policy(days)
ilm.put_lifecycle(policy=name, body=policy)

def query(self, query, precision=None):
index = query.pop('key')
return Search(index=index).from_dict(query).execute().to_dict()
return Search(using=self.get_db, index=index).from_dict(query).execute().to_dict()

def write(self, name, values, **kwargs):
# TODO: Add support for retention policy
rp = kwargs.get('retention_policy')
tags = kwargs.get('tags')
timestamp = kwargs.get('timestamp')
metric_id = find_metric(name, tags, add=True)
metric_index = MetricIndex().get(metric_id, index=name)
metric_id = find_metric(self.get_db, name, tags, rp, add=True)
metric_index = MetricIndex.get(metric_id, index=name, using=self.get_db)
point = Point(fields=values, time=timestamp or datetime.now())
metric_index.points.append(point)
metric_index.save()

def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
extra_fields = kwargs.get('extra_fields')
time_format = kwargs.get('time_format')
# TODO: It will be of the form 'now() - <int>s'
# since = kwargs.get('since')
metric_id = find_metric(key, tags)
metric_id = find_metric(self.get_db, key, tags)
if not metric_id:
return list()
try:
metric_index = MetricIndex().get(metric_id, index=key)
except NotFoundError:
return []
metric_index = MetricIndex.get(index=key, id=metric_id, using=self.get_db)
if order == 'time':
points = list(metric_index.points[0:limit])
elif order == '-time':
Expand All @@ -127,33 +153,28 @@ def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
'result sorted in ascending /descending order respectively.'
)
if not points:
return list()
# distinguish between traffic and clients
for point in list(points):
if fields not in point.fields.to_dict():
points.remove(point)
if extra_fields and extra_fields != '*':
assert isinstance(extra_fields, list)
_points = []
for point in points:
point = point.to_dict()
_point = {
for count, point in enumerate(points):
fields_dict = point.to_dict()['fields']
point = {
'time': self._format_time(point['time'], time_format),
fields: point['fields'][fields],
fields: fields_dict[fields],
}
for extra_field in extra_fields:
if point['fields'].get(extra_field) is not None:
_point.update({extra_field: point['fields'][extra_field]})
_points.append(_point)
points = _points
if fields_dict.get(extra_field) is not None:
point.update({extra_field: fields_dict[extra_field]})
points[count] = point
elif extra_fields == '*':
points = [
deep_merge_dicts(
p.fields.to_dict(), {'time': self._format_time(p.time, time_format)}
for count, point in enumerate(points):
points[count] = deep_merge_dicts(
point.fields.to_dict(),
{'time': self._format_time(point.time, time_format)},
)
for p in points
]
else:
points = [
deep_merge_dicts(
Expand Down Expand Up @@ -190,7 +211,10 @@ def _fill_points(self, query, points):
start_time = datetime.now()
end_time = start_time - timedelta(days=days) # include today
dummy_point = deepcopy(points[0])
interval = points[0]['time'] - points[1]['time']
if len(points) > 2:
interval = points[0]['time'] - points[1]['time']
else:
interval = 600
start_ts = points[0]['time'] + interval
end_ts = points[-1]['time'] - interval
for field in dummy_point.keys():
Expand All @@ -210,12 +234,14 @@ def _fill_points(self, query, points):

def delete_metric_data(self, key=None, tags=None):
"""
deletes a specific metric based on the key and tags
provided, you may also choose to delete all metrics
deletes a specific metric based on given key and tags;
deletes all metrics if neither provided
"""
if key and tags:
metric_id = find_metric(key, tags)
metric_id = find_metric(self.get_db, key, tags)
self.get_db.delete(index=key, id=metric_id)
elif key:
self.get_db.indices.delete(index=key, ignore=[400, 404])
else:
self.get_db.indices.delete(index='*', ignore=[400, 404])

Expand All @@ -226,10 +252,10 @@ def validate_query(self, query):
query = json.loads(query)
# Elasticsearch currently supports validation of only query section,
# aggs, size, _source etc. are not supported
valid_check = self.get_db.indices.validate_query(body={'query': query['query']})
valid_check = self.get_db.indices.validate_query(body={'query': query['query']}, explain=True)
# Show a helpful message for failure
if not valid_check['valid']:
raise ValidationError(valid_check['error'])
raise ValidationError(valid_check['explanations'])
return self._is_aggregate(query)

def _is_aggregate(self, q):
Expand Down Expand Up @@ -317,7 +343,7 @@ def default_chart_query(self, tags):
return q


# Old data - delete by query (inefficient) / retention policy - Index lifecycle management
# TODO:
# Fix Average - currently it's computing average over all fields!
# Time Interval - fix range
# Device query
68 changes: 51 additions & 17 deletions openwisp_monitoring/db/backends/elasticsearch/index.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import uuid

from django.conf import settings
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import bulk
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search
from swapper import load_model

# TODO: Remove this. Current Structure
# Index Templates <-- Indexes <-- Documents <-- Points


class Point(InnerDoc):
time = Date(required=True, default_timezone=settings.TIME_ZONE)
Expand All @@ -15,53 +17,85 @@ class Point(InnerDoc):

class MetricIndex(Document):
tags = Nested(dynamic=True, required=True, multi=True)
# returns an empty list if not present
points = Nested(Point)

class Index:
# name gets replaced with metric's key
name = 'metric'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0,
'number_of_shards': 1,
'number_of_replicas': 0,
'lifecycle.name': 'default',
'lifecycle.rollover_alias': 'metric',
}


def find_metric(index, tags, add=False):
client = Elasticsearch()
def find_metric(client, index, tags, retention_policy=None, add=False):
search = Search(using=client, index=index)
if tags:
tags_dict = dict()
for key, value in tags.items():
tags_dict[f'tags.{key}'] = value
q = Q('bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()])
q = Q(
'bool', must=[Q('match', **{k: str(v)}) for k, v in tags_dict.items()]
)
# if index == 'device_data':
# q = Q('nested', path='tags', query=q)
else:
q = Q()
try:
return list(search.query(q).execute())[0].meta['id']
except (NotFoundError, AttributeError, IndexError):
return add_index(index, tags)['_id'] if add else None
return (
add_doc(client, index, tags, retention_policy=retention_policy)['_id']
if add
else None
)


def add_index(key, tags, id=None):
def add_doc(client, key, tags, _id=None, retention_policy=None):
"""
Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided.
If no ``id`` is provided a random ``uuid`` would be used.
"""
obj = MetricIndex(meta={'id': id or uuid.uuid1()}, tags=tags)
obj.meta.index = key
obj.save()
_id = str(_id or uuid.uuid1())
# Check if index exists
if client.indices.exists(index=key):
client.create(index=key, id=_id, body={'tags': tags})
return {'_id': _id}
# Create a new index if it doesn't exist
name = f'{key}-000001'
obj = MetricIndex(meta={'id': _id})
obj.meta.index = name
obj.tags = tags
# TODO: If mappings are put then aggregations don't work, find why.
# Issue similar to https://discuss.elastic.co/t/aggregations-do-not-work-any-more-index-corrupt-resolved/24947/5
obj.save(using=client, index=name)
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
client.indices.put_settings(body={
'number_of_replicas': 0,
'lifecycle.name': retention_policy or 'default',
'lifecycle.rollover_alias': key,
}, index=name)
obj._index = obj._index.clone(name)
# Create a new index template if it doesn't exist
if not client.indices.exists_template(name=key):
obj._index.settings(**{'lifecycle.rollover_alias': key})
if retention_policy:
obj._index.settings(**{'lifecycle.name': retention_policy})
# index pattern is added for Index Lifecycle Management
obj._index.as_template(key, f'{key}-*').save(using=client)
return obj.to_dict(include_meta=True)


def bulk_indexing():
""" Index all existing metrics """
from ... import timeseries_db

Metric = load_model('monitoring', 'Metric')
MetricIndex.init()
es = Elasticsearch()
bulk(
client=es,
client=timeseries_db.get_db,
actions=(
add_index(m.key, m.tags, m.id) for m in Metric.objects.all().iterator()
add_doc(timeseries_db.get_db, m.key, m.tags, m.id)
for m in Metric.objects.all().distinct('key').iterator()
),
)
8 changes: 8 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
]
}
},
# 'nested': {
# 'path': 'points',
# 'query': {
# 'bool': {
# 'must': [{'range': {'points.time': {'from': 'now-1d/d', 'to': 'now/d'}}}]
# }
# }
# }
'_source': False,
'size': 0,
'aggs': {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# By default age is calculated from the date the index is created but if the
# index has been rolled over than the rollover date is used to calculate the age

default_rp_policy = {
'policy': {
'phases': {
'hot': {
'actions': {
'rollover': {'max_age': '30d', 'max_size': '90G'},
'set_priority': {'priority': 100},
}
},
'warm': {
'min_age': '30d',
'actions': {
'forcemerge': {'max_num_segments': 1},
'allocate': {'number_of_replicas': 0},
'set_priority': {'priority': 50},
},
},
'cold': {'min_age': '150d', 'actions': {'freeze': {}}},
'delete': {'min_age': '335d', 'actions': {'delete': {}}},
}
}
}


def _make_policy(max_age):
return {
'policy': {
'phases': {
'hot': {
'actions': {
'rollover': {'max_age': max_age},
'set_priority': {'priority': 100},
}
},
'delete': {'actions': {'delete': {}}},
}
}
}
1 change: 0 additions & 1 deletion openwisp_monitoring/monitoring/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ def _is_crossed_by(self, current_value, time=None):
continue
if not self._value_crossed(point[self.metric.field_name]):
return False
print(point['time'])
if self._time_crossed(
make_aware(datetime.fromtimestamp(point['time']))
):
Expand Down
4 changes: 3 additions & 1 deletion openwisp_monitoring/monitoring/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Metric = load_model('monitoring', 'Metric')
AlertSettings = load_model('monitoring', 'AlertSettings')

# TODO: Queries relating to top_fields aren't yet
# this custom chart configuration is used for automated testing purposes
charts = {
'histogram': {
Expand Down Expand Up @@ -125,7 +126,8 @@
"SELECT {fields|MEAN} FROM {key} "
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
)
),
'elasticsearch': _make_query(),
},
},
}
Expand Down

0 comments on commit f4fae65

Please sign in to comment.