From 7bad1b0e9d65c726da00d14bf765e858e7d46a6f Mon Sep 17 00:00:00 2001 From: nepython Date: Wed, 29 Jul 2020 21:50:41 +0530 Subject: [PATCH] [timeseries] Requested Changes --- README.rst | 4 +- .../db/backends/elasticsearch/client.py | 7 ++-- .../db/backends/elasticsearch/index.py | 39 +++++++++---------- .../db/backends/elasticsearch/queries.py | 4 +- .../elasticsearch/tests/client_tests.py | 4 +- .../monitoring/tests/__init__.py | 2 +- .../monitoring/tests/test_charts.py | 1 - 7 files changed, 30 insertions(+), 31 deletions(-) diff --git a/README.rst b/README.rst index 47746fea4..044c8e289 100644 --- a/README.rst +++ b/README.rst @@ -109,7 +109,7 @@ Follow the setup instructions of `openwisp-controller } In case, you wish to use ``Elasticsearch`` for timeseries data storage and retrieval, -make use i=of the following settings +make use of the following settings .. code-block:: python TIMESERIES_DATABASE = { @@ -387,7 +387,7 @@ MB (megabytes) instead of GB (Gigabytes) you can use: } } - # This needs to be declared separately but only for elasticsearch + # Please declare the operations separately in case you use elasticsearch as done below OPENWISP_MONITORING_ADDITIONAL_CHART_OPERATIONS = { 'upload': {'operator': '/', 'value': 1000000}, 'download': {'operator': '/', 'value': 1000000}, diff --git a/openwisp_monitoring/db/backends/elasticsearch/client.py b/openwisp_monitoring/db/backends/elasticsearch/client.py index bfa464188..3749936f1 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/client.py +++ b/openwisp_monitoring/db/backends/elasticsearch/client.py @@ -17,7 +17,7 @@ from openwisp_utils.utils import deep_merge_dicts from .. import TIMESERIES_DB -from .index import MetricIndex, Point, find_metric +from .index import MetricDocument, Point, find_metric from .queries import default_chart_query, math_map, operator_lookup from .retention_policies import _make_policy, default_rp_policy @@ -151,7 +151,7 @@ def write(self, name, values, **kwargs): tags = kwargs.get('tags') timestamp = kwargs.get('timestamp') metric_id, index = find_metric(self.get_db, name, tags, rp, add=True) - metric_index = MetricIndex.get(metric_id, index=index, using=self.get_db) + metric_index = MetricDocument.get(metric_id, index=index, using=self.get_db) point = Point(fields=values, time=timestamp or datetime.now()) metric_index.points.append(point) metric_index.save() @@ -168,7 +168,7 @@ def read(self, key, fields, tags=None, limit=1, order='time', **kwargs): metric_id, index = find_metric(self.get_db, key, tags) except TypeError: return [] - metric_index = MetricIndex.get(index=index, id=metric_id, using=self.get_db) + metric_index = MetricDocument.get(index=index, id=metric_id, using=self.get_db) # distinguish between traffic and clients points = [] for point in list(metric_index.points): @@ -458,5 +458,4 @@ def _device_data(self, key, tags, fields, **kwargs): # TODO: -# _fill_points should not work when group_by not specified in the query (need an option to disable it) # _fill_points has a while which shouldn't be required diff --git a/openwisp_monitoring/db/backends/elasticsearch/index.py b/openwisp_monitoring/db/backends/elasticsearch/index.py index f0af887b3..ae6b6aad4 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/index.py +++ b/openwisp_monitoring/db/backends/elasticsearch/index.py @@ -4,16 +4,13 @@ from elasticsearch.exceptions import NotFoundError from elasticsearch_dsl import Date, Document, InnerDoc, Nested, Q, Search -# TODO: Remove this. Current Structure -# Index Templates <-- Indexes <-- Documents <-- Points - class Point(InnerDoc): time = Date(required=True, default_timezone=settings.TIME_ZONE) fields = Nested(dynamic=True, required=True, multi=True) -class MetricIndex(Document): +class MetricDocument(Document): tags = Nested(dynamic=True, required=False, multi=True) points = Nested(Point) @@ -47,18 +44,20 @@ def find_metric(client, index, tags, retention_policy=None, add=False): return result['id'], result['index'] except (NotFoundError, AttributeError, IndexError): if add: - obj = add_doc(client, index, tags, retention_policy=retention_policy) - return obj['_id'], obj['_index'] + document = create_document( + client, index, tags, retention_policy=retention_policy + ) + return document['_id'], document['_index'] return None -def add_doc(client, key, tags, _id=None, retention_policy=None): +def create_document(client, key, tags, _id=None, retention_policy=None): """ - Add index to elasticsearch using ``keys``, ``tags`` and ``id`` provided. + Adds document to relevant index using ``keys``, ``tags`` and ``id`` provided. If no ``id`` is provided a random ``uuid`` would be used. """ _id = str(_id or uuid.uuid1()) - # If index exists, add the doc and return + # If index exists, create the document and return try: index_aliases = client.indices.get_alias(index=key) for k, v in index_aliases.items(): @@ -70,19 +69,19 @@ def add_doc(client, key, tags, _id=None, retention_policy=None): pass # Create a new index if it doesn't exist name = f'{key}-000001' - obj = MetricIndex(meta={'id': _id}) - obj._index = obj._index.clone(name) + document = MetricDocument(meta={'id': _id}) + document._index = document._index.clone(name) # Create a new index template if it doesn't exist if not client.indices.exists_template(name=key): - obj._index.settings(**{'lifecycle.rollover_alias': key}) + document._index.settings(**{'lifecycle.rollover_alias': key}) if retention_policy: - obj._index.settings(**{'lifecycle.name': retention_policy}) - # index pattern is added for Index Lifecycle Management - obj._index.as_template(key, f'{key}-*').save(using=client) - obj.init(using=client, index=name) - obj.meta.index = name - obj.tags = tags - obj.save(using=client, index=name) + document._index.settings(**{'lifecycle.name': retention_policy}) + # add index pattern is added for Index Lifecycle Management + document._index.as_template(key, f'{key}-*').save(using=client) + document.init(using=client, index=name) + document.meta.index = name + document.tags = tags + document.save(using=client, index=name) client.indices.put_alias(index=name, name=key, body={'is_write_index': True}) if retention_policy: client.indices.put_settings( @@ -90,4 +89,4 @@ def add_doc(client, key, tags, _id=None, retention_policy=None): ) client.indices.put_settings(body={'lifecycle.rollover_alias': key}, index=name) client.indices.refresh(index=key) - return obj.to_dict(include_meta=True) + return document.to_dict(include_meta=True) diff --git a/openwisp_monitoring/db/backends/elasticsearch/queries.py b/openwisp_monitoring/db/backends/elasticsearch/queries.py index 440aa2c06..bdae617eb 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/queries.py +++ b/openwisp_monitoring/db/backends/elasticsearch/queries.py @@ -122,8 +122,8 @@ def _get_chart_query(): 'disk': {'disk_usage': {'avg': {'field': 'points.fields.used_disk'}}}, } query = {} - for k, v in aggregation_dict.items(): - query[k] = {'elasticsearch': _make_query(v)} + for key, value in aggregation_dict.items(): + query[key] = {'elasticsearch': _make_query(value)} return query diff --git a/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py index 597d1706f..42c65ea52 100644 --- a/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py +++ b/openwisp_monitoring/db/backends/elasticsearch/tests/client_tests.py @@ -248,5 +248,7 @@ def test_read_multiple(self): def test_ilm_disabled(self): with patch.object(timeseries_db, 'ilm_enabled', False): self.assertFalse(timeseries_db.ilm_enabled) - self.assertIsNone(timeseries_db.create_or_alter_retention_policy(name='default')) + self.assertIsNone( + timeseries_db.create_or_alter_retention_policy(name='default') + ) self.assertIsNone(timeseries_db.get_list_retention_policies()) diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index f85c2d70e..d23d462e2 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -7,8 +7,8 @@ from ...db import timeseries_db from ...db.backends import TIMESERIES_DB -from ..charts import register_chart, unregister_chart from ...db.backends.elasticsearch.queries import _make_query +from ..charts import register_chart, unregister_chart start_time = now() ten_minutes_ago = start_time - timedelta(minutes=10) diff --git a/openwisp_monitoring/monitoring/tests/test_charts.py b/openwisp_monitoring/monitoring/tests/test_charts.py index 53838d59f..9241e2dd4 100644 --- a/openwisp_monitoring/monitoring/tests/test_charts.py +++ b/openwisp_monitoring/monitoring/tests/test_charts.py @@ -46,7 +46,6 @@ def test_read_summary_not_aggregate(self): m = self._create_object_metric(name='summary_hidden') c = self._create_chart(metric=m) data = c.read() - # TODO: elasticsearch is returning {'value': 6.0} as default query is returned for summary self.assertEqual(data['summary'], {'value': None}) def test_read_summary_top_fields(self):