Skip to content

Commit

Permalink
[timeseries] Add initial support for elasticsearch #99
Browse files Browse the repository at this point in the history
  • Loading branch information
nepython committed Jul 21, 2020
1 parent c2f6ae8 commit abbc818
Show file tree
Hide file tree
Showing 15 changed files with 546 additions and 27 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ addons:
services:
- docker
- redis-server
- elasticsearch

branches:
only:
- master
- dev

before_install:
- docker run -d --name influxdb -e INFLUXDB_DB=openwisp2 -p 8086:8086 influxdb:alpine
# - docker run -d --name influxdb -e INFLUXDB_DB=openwisp2 -p 8086:8086 influxdb:alpine
# - docker run -p 9200:9200 docker.elastic.co/elasticsearch/elasticsearch:7.8.0
- pip install -U pip wheel setuptools
- pip install $DJANGO
- pip install -U -r requirements-test.txt
Expand Down
24 changes: 23 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,25 @@ services:
INFLUXDB_DB: openwisp2
INFLUXDB_USER: openwisp
INFLUXDB_USER_PASSWORD: openwisp

# clustered version of elasticsearch is used as that might be used in production
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
container_name: es01
environment:
- node.name: openwisp2
- cluster.name: openwisp2
- bootstrap.memory_lock: true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
redis:
image: redis:5.0-alpine
ports:
Expand All @@ -31,3 +49,7 @@ services:

volumes:
influxdb-data: {}

networks:
elastic:
driver: bridge
3 changes: 1 addition & 2 deletions openwisp_monitoring/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .backends import timeseries_db

chart_query = timeseries_db.queries.chart_query
default_chart_query = timeseries_db.queries.default_chart_query
device_data_query = timeseries_db.queries.device_data_query

__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query']
__all__ = ['timeseries_db', 'chart_query', 'device_data_query']
2 changes: 1 addition & 1 deletion openwisp_monitoring/db/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None):
except ImportError as e:
# The database backend wasn't found. Display a helpful error message
# listing all built-in database backends.
builtin_backends = ['influxdb']
builtin_backends = ['influxdb', 'elasticsearch']
if backend_name not in [
f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends
]:
Expand Down
Empty file.
323 changes: 323 additions & 0 deletions openwisp_monitoring/db/backends/elasticsearch/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
import json
import logging
from copy import deepcopy
from datetime import datetime, timedelta

from django.conf import settings
from django.core.exceptions import ValidationError
from django.utils.functional import cached_property
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ElasticsearchException, NotFoundError
from elasticsearch_dsl import Search
from elasticsearch_dsl.connections import connections
from pytz import timezone as tz

from openwisp_utils.utils import deep_merge_dicts

from .. import TIMESERIES_DB
from .index import MetricIndex, Point, find_metric
from .queries import default_chart_query, math_map, operator_lookup

logger = logging.getLogger(__name__)


class DatabaseClient(object):
_AGGREGATE = [
'filters',
'children',
'parent',
'date_histogram',
'auto_date_histogram',
'date_range',
'geo_distance',
'geohash_grid',
'geotile_grid',
'global',
'geo_centroid',
'global',
'ip_range',
'missing',
'nested',
'range',
'reverse_nested',
'significant_terms',
'significant_text',
'sampler',
'terms',
'diversified_sampler',
'composite',
'top_hits',
'avg',
'weighted_avg',
'cardinality',
'extended_stats',
'geo_bounds',
'max',
'min',
'percentiles',
'percentile_ranks',
'scripted_metric',
'stats',
'sum',
'value_count',
]
backend_name = 'elasticsearch'

def __init__(self, db_name='metric'):
self.db_name = db_name or TIMESERIES_DB['NAME']
self.client_error = ElasticsearchException

def create_database(self):
""" creates connection to elasticsearch """
connections.create_connection(hosts=[TIMESERIES_DB['HOST']])
self.get_db

def drop_database(self):
""" deletes all indices """
self.delete_metric_data()
self.get_db.close()
logger.debug('Deleted all indices from Elasticsearch')

@cached_property
def get_db(self):
""" Returns an ``Elasticsearch Client`` instance """
# TODO: AUTHENTICATION remains see `SecurityClient`
return Elasticsearch(
[f"{TIMESERIES_DB['HOST']}:{TIMESERIES_DB['PORT']}"],
http_auth=(TIMESERIES_DB['USER'], TIMESERIES_DB['PASSWORD']),
retry_on_timeout=True,
)

def create_or_alter_retention_policy(self, name, duration):
""" creates or alters existing retention policy if necessary """
# TODO
pass

def query(self, query, precision=None):
index = query.pop('key')
return Search(index=index).from_dict(query).execute().to_dict()

def write(self, name, values, **kwargs):
# TODO: Add support for retention policy
tags = kwargs.get('tags')
timestamp = kwargs.get('timestamp')
metric_id = find_metric(name, tags, add=True)
metric_index = MetricIndex().get(metric_id, index=name)
point = Point(fields=values, time=timestamp or datetime.now())
metric_index.points.append(point)
metric_index.save()

def read(self, key, fields, tags, limit=1, order='-time', **kwargs):
extra_fields = kwargs.get('extra_fields')
time_format = kwargs.get('time_format')
# since = kwargs.get('since')
metric_id = find_metric(key, tags)
if not metric_id:
return list()
try:
metric_index = MetricIndex().get(metric_id, index=key)
except NotFoundError:
return []
if order == 'time':
points = list(metric_index.points[0:limit])
elif order == '-time':
points = list(reversed(metric_index.points))[0:limit]
else:
raise self.client_error(
f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get '
'result sorted in ascending /descending order respectively.'
)
if not points:
return list()
# distinguish between traffic and clients
for point in list(points):
if fields not in point.fields.to_dict():
points.remove(point)
if extra_fields and extra_fields != '*':
assert isinstance(extra_fields, list)
_points = []
for point in points:
point = point.to_dict()
_point = {
'time': self._format_time(point['time'], time_format),
fields: point['fields'][fields],
}
for extra_field in extra_fields:
if point['fields'].get(extra_field) is not None:
_point.update({extra_field: point['fields'][extra_field]})
_points.append(_point)
points = _points
elif extra_fields == '*':
points = [
deep_merge_dicts(
p.fields.to_dict(), {'time': self._format_time(p.time, time_format)}
)
for p in points
]
else:
points = [
deep_merge_dicts(
{fields: p.fields.to_dict()[fields]},
{'time': self._format_time(p.time, time_format)},
)
for p in points
]
# if since:
# TODO:
return points

def _format_time(self, obj, time_format=None):
""" returns datetime object in isoformat / unix timestamp and UTC timezone """
if time_format == 'isoformat':
return obj.astimezone(tz=tz('UTC')).isoformat(timespec='seconds')
return int(obj.astimezone(tz=tz('UTC')).timestamp())

def get_list_query(self, query, precision='s'):
response = self.query(query, precision)
points = response['aggregations']['GroupByTime']['buckets']
list_points = self._fill_points(
query, [self._format(point) for point in points]
)
return list_points

def _fill_points(self, query, points):
_range = next(
(item for item in query['query']['bool']['must'] if item.get('range')), None
)
if not _range or not points:
return points
days = int(_range['range']['points.time']['from'][4:-3])
start_time = datetime.now()
end_time = start_time - timedelta(days=days) # include today
dummy_point = deepcopy(points[0])
interval = points[0]['time'] - points[1]['time']
start_ts = points[0]['time'] + interval
end_ts = points[-1]['time'] - interval
for field in dummy_point.keys():
dummy_point[field] = None
while start_ts < start_time.timestamp():
dummy_point['time'] = start_ts
points.insert(0, deepcopy(dummy_point))
start_ts += interval
# TODO: This needs to be fixed and shouldn't be required since intervals are set
while points[-1]['time'] < end_time.timestamp():
points.pop(-1)
while end_ts > end_time.timestamp():
dummy_point['time'] = end_ts
points.append(deepcopy(dummy_point))
end_ts -= interval
return points

def delete_metric_data(self, key=None, tags=None):
"""
deletes a specific metric based on the key and tags
provided, you may also choose to delete all metrics
"""
if key and tags:
metric_id = find_metric(key, tags)
self.get_db.delete(index=key, id=metric_id)
else:
self.get_db.indices.delete(index='*', ignore=[400, 404])

# Chart related functions below

def validate_query(self, query):
if isinstance(query, str):
query = json.loads(query)
# Elasticsearch currently supports validation of only query section,
# aggs, size, _source etc. are not supported
valid_check = self.get_db.indices.validate_query(body={'query': query['query']})
# Show a helpful message for failure
if not valid_check['valid']:
raise ValidationError(valid_check['error'])
return self._is_aggregate(query)

def _is_aggregate(self, q):
agg_dict = q['aggs']['GroupByTime']['aggs'].values()
agg = []
for item in agg_dict:
agg.append(next(iter(item)))
return True if set(agg) <= set(self._AGGREGATE) else False

def get_query(
self,
chart_type,
params,
time,
group_map,
summary=False,
fields=None,
query=None,
timezone=settings.TIME_ZONE,
):
query['key'] = params.pop('key')
query = json.dumps(query)
for k, v in params.items():
query = query.replace('{' + k + '}', v)
query = self._group_by(query, time, chart_type, group_map, strip=summary)
query = json.loads(query)
if summary:
_range = next(
(item for item in query['query']['bool']['must'] if item.get('range')),
None,
)
if _range:
query['query']['bool']['must'].remove(_range)
query['aggs']['GroupByTime']['date_histogram']['time_zone'] = timezone
return query

def _group_by(self, query, time, chart_type, group_map, strip=False):
if not self.validate_query(query):
return query
if not strip and not chart_type == 'histogram':
value = group_map[time]
query = query.replace('1d/d', f'{time}/d')
query = query.replace('10m', value)
if strip:
query = query.replace('10m', time)
return query

# TODO:
def _get_top_fields(
self,
query,
params,
chart_type,
group_map,
number,
time,
timezone=settings.TIME_ZONE,
):
pass

def _format(self, point):
pt = {}
# Convert time from milliseconds -> seconds precision
pt['time'] = int(point['key'] / 1000)
for key, value in point.items():
if isinstance(value, dict):
pt[key] = self._transform_field(key, value['value'])
return pt

def _transform_field(self, field, value):
""" Performs arithmetic operations on the field if required """
if value is None:
return value
if field in math_map:
op = operator_lookup.get(math_map[field]['operator'])
if op is not None:
value = op(value, math_map[field]['value'])
return value

def default_chart_query(self, tags):
q = deepcopy(default_chart_query)
if not tags:
q['query']['bool']['must'].pop(0)
q['query']['bool']['must'].pop(1)
return q


# Old data - delete by query (inefficient) / retention policy - Index lifecycle management
# Fix Average - currently it's computing average over all fields!
# Time Interval - fix range
# Device query
Loading

0 comments on commit abbc818

Please sign in to comment.