Skip to content

Commit

Permalink
[timeseries] Requested Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nepython committed Jul 29, 2020
1 parent 75edfe3 commit 7bad1b0
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 31 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Follow the setup instructions of `openwisp-controller
}
In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval,
make use i=of the following settings
make use of the following settings

.. code-block:: python
TIMESERIES_DATABASE = {
Expand Down Expand Up @@ -387,7 +387,7 @@ MB (megabytes) instead of GB (Gigabytes) you can use:
}
}
# This needs to be declared separately but only for elasticsearch
# Please declare the operations separately in case you use elasticsearch as done below
OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = {
'upload': {'operator': '/', 'value': 1000000},
'download': {'operator': '/', 'value': 1000000},
Expand Down
7 changes: 3 additions & 4 deletions openwisp_monitoring/db/backends/elasticsearch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from openwisp_utils.utils import deep_merge_dicts

from .. import TIMESERIES_DB
from .index import MetricIndex, Point, find_metric
from .index import MetricDocument, Point, find_metric
from .queries import default_chart_query, math_map, operator_lookup
from .retention_policies import _make_policy, default_rp_policy

Expand Down Expand Up @@ -151,7 +151,7 @@ def write(self, name, values, **kwargs):
tags = kwargs.get('tags')
timestamp = kwargs.get('timestamp')
metric_id, index = find_metric(self.get_db, name, tags, rp, add=True)
metric_index = MetricIndex.get(metric_id, index=index, using=self.get_db)
metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db)
point = Point(fields=values, time=timestamp or datetime.now())
metric_index.points.append(point)
metric_index.save()
Expand All @@ -168,7 +168,7 @@ def read(self, key, fields, tags=None, limit=1, order='time', **kwargs):
metric_id, index = find_metric(self.get_db, key, tags)
except TypeError:
return []
metric_index = MetricIndex.get(index=index, id=metric_id, using=self.get_db)
metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db)
# distinguish between traffic and clients
points = []
for point in list(metric_index.points):
Expand Down Expand Up @@ -458,5 +458,4 @@ def _device_data(self, key, tags, fields, **kwargs):


# TODO:
# _fill_points should not work when group_by not specified in the query (need an option to disable it)
# _fill_points has a while which shouldn't be required
39 changes: 19 additions & 20 deletions openwisp_monitoring/db/backends/elasticsearch/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search

# TODO: Remove this. Current Structure
# Index Templates <-- Indexes <-- Documents <-- Points


class Point(InnerDoc):
time = Date(required=True, default_timezone=settings.TIME_ZONE)
fields = Nested(dynamic=True, required=True, multi=True)


class MetricIndex(Document):
class MetricDocument(Document):
tags = Nested(dynamic=True, required=False, multi=True)
points = Nested(Point)

Expand Down Expand Up @@ -47,18 +44,20 @@ def find_metric(client, index, tags, retention_policy=None, add=False):
return result['id'], result['index']
except (NotFoundError, AttributeError, IndexError):
if add:
obj = add_doc(client, index, tags, retention_policy=retention_policy)
return obj['_id'], obj['_index']
document = create_document(
client, index, tags, retention_policy=retention_policy
)
return document['_id'], document['_index']
return None


def add_doc(client, key, tags, _id=None, retention_policy=None):
def create_document(client, key, tags, _id=None, retention_policy=None):
"""
Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided.
Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided.
If no ``id`` is provided a random ``uuid`` would be used.
"""
_id = str(_id or uuid.uuid1())
# If index exists, add the doc and return
# If index exists, create the document and return
try:
index_aliases = client.indices.get_alias(index=key)
for k, v in index_aliases.items():
Expand All @@ -70,24 +69,24 @@ def add_doc(client, key, tags, _id=None, retention_policy=None):
pass
# Create a new index if it doesn't exist
name = f'{key}-000001'
obj = MetricIndex(meta={'id': _id})
obj._index = obj._index.clone(name)
document = MetricDocument(meta={'id': _id})
document._index = document._index.clone(name)
# Create a new index template if it doesn't exist
if not client.indices.exists_template(name=key):
obj._index.settings(**{'lifecycle.rollover_alias': key})
document._index.settings(**{'lifecycle.rollover_alias': key})
if retention_policy:
obj._index.settings(**{'lifecycle.name': retention_policy})
# index pattern is added for Index Lifecycle Management
obj._index.as_template(key, f'{key}-*').save(using=client)
obj.init(using=client, index=name)
obj.meta.index = name
obj.tags = tags
obj.save(using=client, index=name)
document._index.settings(**{'lifecycle.name': retention_policy})
# add index pattern is added for Index Lifecycle Management
document._index.as_template(key, f'{key}-*').save(using=client)
document.init(using=client, index=name)
document.meta.index = name
document.tags = tags
document.save(using=client, index=name)
client.indices.put_alias(index=name, name=key, body={'is_write_index': True})
if retention_policy:
client.indices.put_settings(
body={'lifecycle.name': retention_policy}, index=name
)
client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name)
client.indices.refresh(index=key)
return obj.to_dict(include_meta=True)
return document.to_dict(include_meta=True)
4 changes: 2 additions & 2 deletions openwisp_monitoring/db/backends/elasticsearch/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ def _get_chart_query():
'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}},
}
query = {}
for k, v in aggregation_dict.items():
query[k] = {'elasticsearch': _make_query(v)}
for key, value in aggregation_dict.items():
query[key] = {'elasticsearch': _make_query(value)}
return query


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,7 @@ def test_read_multiple(self):
def test_ilm_disabled(self):
with patch.object(timeseries_db, 'ilm_enabled', False):
self.assertFalse(timeseries_db.ilm_enabled)
self.assertIsNone(timeseries_db.create_or_alter_retention_policy(name='default'))
self.assertIsNone(
timeseries_db.create_or_alter_retention_policy(name='default')
)
self.assertIsNone(timeseries_db.get_list_retention_policies())
2 changes: 1 addition & 1 deletion openwisp_monitoring/monitoring/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from ...db import timeseries_db
from ...db.backends import TIMESERIES_DB
from ..charts import register_chart, unregister_chart
from ...db.backends.elasticsearch.queries import _make_query
from ..charts import register_chart, unregister_chart

start_time = now()
ten_minutes_ago = start_time - timedelta(minutes=10)
Expand Down
1 change: 0 additions & 1 deletion openwisp_monitoring/monitoring/tests/test_charts.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def test_read_summary_not_aggregate(self):
m = self._create_object_metric(name='summary_hidden')
c = self._create_chart(metric=m)
data = c.read()
# TODO: elasticsearch is returning {'value': 6.0} as default query is returned for summary
self.assertEqual(data['summary'], {'value': None})

def test_read_summary_top_fields(self):
Expand Down

0 comments on commit 7bad1b0

Please sign in to comment.