diff --git a/Dockerfile b/Dockerfile index 8cf5a1a1..12aafb97 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,4 +27,3 @@ ENV NAME=openwisp-monitoring \ REDIS_HOST=redis CMD ["sh", "docker-entrypoint.sh"] EXPOSE 8000 -# Command to run the application diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py index e0327a82..b2401548 100644 --- a/openwisp_monitoring/db/backends/influxdb2/client.py +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -214,20 +214,6 @@ def _parse_read_result(self, result): parsed_result.append(parsed_record) return parsed_result - def get_ping_data_query(self, bucket, start, stop, device_ids): - device_filter = ' or '.join([f'r["object_id"] == "{id}"' for id in device_ids]) - query = f''' - from(bucket: "{bucket}") - |> range(start: {start}, stop: {stop}) - |> filter(fn: (r) => r["_measurement"] == "ping") - |> filter(fn: (r) => r["_field"] == "loss" or r["_field"] == "reachable" or r["_field"] == "rtt_avg" or r["_field"] == "rtt_max" or r["_field"] == "rtt_min") - |> filter(fn: (r) => r["content_type"] == "config.device") - |> filter(fn: (r) => {device_filter}) - |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) - |> yield(name: "mean") - ''' - return query - def execute_query(self, query): try: result = self.query_api.query(query) @@ -309,7 +295,7 @@ def get_query( timezone=settings.TIME_ZONE ): bucket = self.bucket - measurement = params.get('measurement') + measurement = params.get('key') if not measurement or measurement == 'None': logger.error("Invalid or missing measurement in params") return None diff --git a/openwisp_monitoring/db/backends/influxdb2/queries.py b/openwisp_monitoring/db/backends/influxdb2/queries.py index 2b62d03c..0af4588d 100644 --- a/openwisp_monitoring/db/backends/influxdb2/queries.py +++ b/openwisp_monitoring/db/backends/influxdb2/queries.py @@ -34,7 +34,8 @@ ' |> filter(fn: (r) => r.content_type == "{content_type}")' ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> yield(name: "rtt")' ) }, @@ -82,7 +83,8 @@ ' |> sum()' ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' ' |> yield(name: "traffic")' ) @@ -100,7 +102,8 @@ ' |> sum()' ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' ' |> yield(name: "general_traffic")' ) @@ -151,7 +154,8 @@ ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> yield(name: "signal_strength")' ) }, @@ -165,7 +169,8 @@ ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> yield(name: "signal_quality")' ) }, @@ -191,7 +196,8 @@ ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> rename(columns: {sent_bps_tcp: "TCP", sent_bps_udp: "UDP"})' ' |> yield(name: "bandwidth")' ) @@ -206,7 +212,8 @@ ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> sum()' ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> rename(columns: {sent_bytes_tcp: "TCP", sent_bytes_udp: "UDP"})' ' |> yield(name: "transfer")' ) @@ -244,7 +251,8 @@ ' |> filter(fn: (r) => r.content_type == "{content_type}")' ' |> filter(fn: (r) => r.object_id == "{object_id}")' ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' - ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' ' |> rename(columns: {lost_packets: "lost_datagram", total_packets: "total_datagram"})' ' |> yield(name: "datagram")' ) diff --git a/openwisp_monitoring/db/backends/influxdb2/tests.py b/openwisp_monitoring/db/backends/influxdb2/tests.py index 5283bda8..77cd3001 100644 --- a/openwisp_monitoring/db/backends/influxdb2/tests.py +++ b/openwisp_monitoring/db/backends/influxdb2/tests.py @@ -231,12 +231,12 @@ def test_read_order(self, mock_influxdb_client): # Test ascending read order m.read(limit=2, order='time') query = mock_query_api.query.call_args[0][0] - self.assertIn('|> sort(columns: ["_time"], desc: false)', query) + self.assertIn('|> sort(columns: ["time"], desc: false)', query) # Test descending read order m.read(limit=2, order='-time') query = mock_query_api.query.call_args[0][0] - self.assertIn('|> sort(columns: ["_time"], desc: true)', query) + self.assertIn('|> sort(columns: ["time"], desc: true)', query) # Test invalid read order with self.assertRaises(ValueError): @@ -258,4 +258,4 @@ def ping_write_microseconds_precision(self, mock_influxdb_client): self.assertEqual(call_args_2['record']['time'], '2020-07-31T22:05:47.235152') if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index 52e45e74..cdb9ba8e 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -384,6 +384,7 @@ def update_status(self, value): if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: self.device.management_ip = None self.device.save(update_fields=['management_ip']) + health_status_changed.send(sender=self.__class__, instance=self, status=value) @property diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 247815df..1cd15a71 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -678,8 +678,8 @@ def get_query( params.update({ 'start_date': start_date, 'end_date': end_date, - 'measurement': self.config_dict.get('measurement', self.metric.key), - 'field_name': fields or self.config_dict.get('field_name'), + # 'measurement': self.config_dict.get('measurement', self.metric.key), + # 'field_name': fields or self.config_dict.get('field_name'), }) if not params.get('organization_id') and self.config_dict.get('__all__', False): params['organization_id'] = ['__all__'] @@ -769,33 +769,22 @@ def read( ) query_kwargs.update(additional_query_kwargs) if self.top_fields: - points = summary = timeseries_db._get_top_fields( - default_query=self._default_query, - chart_type=self.type, - group_map=self.GROUP_MAP, - number=self.top_fields, - params=self._get_query_params(self.DEFAULT_TIME), - time=time, - query=self.query, - get_fields=False, + fields = self.get_top_fields(self.top_fields) + data_query = self.get_query(fields=fields, **query_kwargs) + summary_query = self.get_query( + fields=fields, summary=True, **query_kwargs ) else: data_query = self.get_query(**query_kwargs) summary_query = self.get_query(summary=True, **query_kwargs) - points = timeseries_db.get_list_query(data_query, key=self.metric.key) - summary = timeseries_db.get_list_query( - summary_query, key=self.metric.key - ) + points = timeseries_db.get_list_query(data_query) + summary = timeseries_db.get_list_query(summary_query) except timeseries_db.client_error as e: logger.error(f"Error fetching data: {e}", exc_info=True) - raise + raise e for point in points: - time_value = point.get('time') or point.get('_time') - if not time_value: - logger.warning(f"Point missing time value: {point}") - continue - + time_value = point.get('time') try: formatted_time = self._parse_and_format_time(time_value, timezone) except ValueError as e: @@ -803,10 +792,10 @@ def read( continue for key, value in point.items(): - if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'): + if key in ('time', 'result', 'table', 'content_type', 'object_id'): continue traces.setdefault(key, []) - if decimal_places is not None and value is not None: + if decimal_places and isinstance(value, (int, float)): value = self._round(value, decimal_places) traces[key].append(value) @@ -821,7 +810,7 @@ def read( # Handle summary calculation if summary: for key, value in summary[0].items(): - if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'): + if key in ('time', 'result', 'table', 'content_type', 'object_id'): continue if not timeseries_db.validate_query(self.query): value = None diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 2c155bce..3018ddd2 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -94,10 +94,14 @@ "'{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> sum()' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "histogram")' ), }, }, @@ -129,9 +133,13 @@ "content_type = '{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> yield(name: "default")' ), }, }, @@ -147,11 +155,13 @@ "content_type = '{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" or ' - 'r["_measurement"] == "value2" and ' - 'r["content_type"] == "{content_type}" and ' - 'r["object_id"] == "{object_id}")' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}" or r._field == "value2")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> yield(name: "multiple_test")' ), }, }, @@ -167,9 +177,15 @@ " GROUP BY time(1d), metric_num" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}") ' - '|> group(columns: ["metric_num"]) |> sum() |> cumulativeSum() |> window(every: 1d)' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> group(columns: ["metric_num"])' + ' |> sum()' + ' |> cumulativeSum()' + ' |> window(every: 1d)' + ' |> yield(name: "group_by_tag")' ), }, 'summary_query': { @@ -178,9 +194,14 @@ " GROUP BY time(30d), metric_num" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}") ' - '|> group(columns: ["metric_num"]) |> sum() |> window(every: 30d)' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "ping")' + ' |> filter(fn: (r) => r._field == "loss")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "summary")' ), }, }, @@ -196,10 +217,14 @@ "content_type = '{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean()' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "mean_test")' ), }, }, @@ -215,10 +240,14 @@ "content_type = '{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> sum()' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "sum_test")' ), }, }, @@ -235,10 +264,14 @@ "'{content_type}' AND object_id = '{object_id}'" ), 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") ' - '|> mean()' + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "top_fields_mean")' ), }, }, diff --git a/openwisp_monitoring/monitoring/tests/test_configuration.py b/openwisp_monitoring/monitoring/tests/test_configuration.py index bd276870..202c8d2e 100644 --- a/openwisp_monitoring/monitoring/tests/test_configuration.py +++ b/openwisp_monitoring/monitoring/tests/test_configuration.py @@ -35,10 +35,15 @@ def _get_new_metric(self): "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" ), - 'influxdb2': ( - 'from(bucket: "{key}") |> range(start: {time}) ' - '|> filter(fn: (r) => r["_measurement"] == "{field_name}" and ' - 'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")' + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "histogram")' ), }, } diff --git a/openwisp_monitoring/views.py b/openwisp_monitoring/views.py index 21ab811e..840042a5 100644 --- a/openwisp_monitoring/views.py +++ b/openwisp_monitoring/views.py @@ -135,11 +135,6 @@ def _get_charts_data(self, charts, time, timezone, start_date, end_date): chart_dict['connect_points'] = chart.connect_points if chart.trace_labels: chart_dict['trace_labels'] = chart.trace_labels - # Handle None values in summary - if 'summary' in chart_dict: - for key, value in chart_dict['summary'].items(): - if value is None: - chart_dict['summary'][key] = 'N/A' except InvalidChartConfigException: logger.exception(f'Skipped chart for metric {chart.metric}') continue