Skip to content

Commit

Permalink
[monitoring] Some changes updated #274
Browse files Browse the repository at this point in the history
Fixes #274
  • Loading branch information
praptisharma28 committed Jul 4, 2024
1 parent bff44f1 commit 3859434
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 90 deletions.
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ ENV NAME=openwisp-monitoring \
REDIS_HOST=redis
CMD ["sh", "docker-entrypoint.sh"]
EXPOSE 8000
# Command to run the application
16 changes: 1 addition & 15 deletions openwisp_monitoring/db/backends/influxdb2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,6 @@ def _parse_read_result(self, result):
parsed_result.append(parsed_record)
return parsed_result

def get_ping_data_query(self, bucket, start, stop, device_ids):
device_filter = ' or '.join([f'r["object_id"] == "{id}"' for id in device_ids])
query = f'''
from(bucket: "{bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "ping")
|> filter(fn: (r) => r["_field"] == "loss" or r["_field"] == "reachable" or r["_field"] == "rtt_avg" or r["_field"] == "rtt_max" or r["_field"] == "rtt_min")
|> filter(fn: (r) => r["content_type"] == "config.device")
|> filter(fn: (r) => {device_filter})
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
return query

def execute_query(self, query):
try:
result = self.query_api.query(query)
Expand Down Expand Up @@ -309,7 +295,7 @@ def get_query(
timezone=settings.TIME_ZONE
):
bucket = self.bucket
measurement = params.get('measurement')
measurement = params.get('key')
if not measurement or measurement == 'None':
logger.error("Invalid or missing measurement in params")
return None
Expand Down
24 changes: 16 additions & 8 deletions openwisp_monitoring/db/backends/influxdb2/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> yield(name: "rtt")'
)
},
Expand Down Expand Up @@ -82,7 +83,8 @@
' |> sum()'
' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))'
' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})'
' |> yield(name: "traffic")'
)
Expand All @@ -100,7 +102,8 @@
' |> sum()'
' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))'
' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})'
' |> yield(name: "general_traffic")'
)
Expand Down Expand Up @@ -151,7 +154,8 @@
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)'
' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> yield(name: "signal_strength")'
)
},
Expand All @@ -165,7 +169,8 @@
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)'
' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> yield(name: "signal_quality")'
)
},
Expand All @@ -191,7 +196,8 @@
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)'
' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {sent_bps_tcp: "TCP", sent_bps_udp: "UDP"})'
' |> yield(name: "bandwidth")'
)
Expand All @@ -206,7 +212,8 @@
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> sum()'
' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {sent_bytes_tcp: "TCP", sent_bytes_udp: "UDP"})'
' |> yield(name: "transfer")'
)
Expand Down Expand Up @@ -244,7 +251,8 @@
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)'
' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {_time: "time"})'
' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")'
' |> rename(columns: {lost_packets: "lost_datagram", total_packets: "total_datagram"})'
' |> yield(name: "datagram")'
)
Expand Down
6 changes: 3 additions & 3 deletions openwisp_monitoring/db/backends/influxdb2/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,12 @@ def test_read_order(self, mock_influxdb_client):
# Test ascending read order
m.read(limit=2, order='time')
query = mock_query_api.query.call_args[0][0]
self.assertIn('|> sort(columns: ["_time"], desc: false)', query)
self.assertIn('|> sort(columns: ["time"], desc: false)', query)

# Test descending read order
m.read(limit=2, order='-time')
query = mock_query_api.query.call_args[0][0]
self.assertIn('|> sort(columns: ["_time"], desc: true)', query)
self.assertIn('|> sort(columns: ["time"], desc: true)', query)

# Test invalid read order
with self.assertRaises(ValueError):
Expand All @@ -258,4 +258,4 @@ def ping_write_microseconds_precision(self, mock_influxdb_client):
self.assertEqual(call_args_2['record']['time'], '2020-07-31T22:05:47.235152')

if __name__ == '__main__':
unittest.main()
unittest.main()
1 change: 1 addition & 0 deletions openwisp_monitoring/device/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ def update_status(self, value):
if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP:
self.device.management_ip = None
self.device.save(update_fields=['management_ip'])

health_status_changed.send(sender=self.__class__, instance=self, status=value)

@property
Expand Down
37 changes: 13 additions & 24 deletions openwisp_monitoring/monitoring/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ def get_query(
params.update({
'start_date': start_date,
'end_date': end_date,
'measurement': self.config_dict.get('measurement', self.metric.key),
'field_name': fields or self.config_dict.get('field_name'),
# 'measurement': self.config_dict.get('measurement', self.metric.key),
# 'field_name': fields or self.config_dict.get('field_name'),
})
if not params.get('organization_id') and self.config_dict.get('__all__', False):
params['organization_id'] = ['__all__']
Expand Down Expand Up @@ -769,44 +769,33 @@ def read(
)
query_kwargs.update(additional_query_kwargs)
if self.top_fields:
points = summary = timeseries_db._get_top_fields(
default_query=self._default_query,
chart_type=self.type,
group_map=self.GROUP_MAP,
number=self.top_fields,
params=self._get_query_params(self.DEFAULT_TIME),
time=time,
query=self.query,
get_fields=False,
fields = self.get_top_fields(self.top_fields)
data_query = self.get_query(fields=fields, **query_kwargs)
summary_query = self.get_query(
fields=fields, summary=True, **query_kwargs
)
else:
data_query = self.get_query(**query_kwargs)
summary_query = self.get_query(summary=True, **query_kwargs)
points = timeseries_db.get_list_query(data_query, key=self.metric.key)
summary = timeseries_db.get_list_query(
summary_query, key=self.metric.key
)
points = timeseries_db.get_list_query(data_query)
summary = timeseries_db.get_list_query(summary_query)
except timeseries_db.client_error as e:
logger.error(f"Error fetching data: {e}", exc_info=True)
raise
raise e

for point in points:
time_value = point.get('time') or point.get('_time')
if not time_value:
logger.warning(f"Point missing time value: {point}")
continue

time_value = point.get('time')
try:
formatted_time = self._parse_and_format_time(time_value, timezone)
except ValueError as e:
logger.warning(f"Error parsing time value: {time_value}. Error: {e}")
continue

for key, value in point.items():
if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'):
if key in ('time', 'result', 'table', 'content_type', 'object_id'):
continue
traces.setdefault(key, [])
if decimal_places is not None and value is not None:
if decimal_places and isinstance(value, (int, float)):
value = self._round(value, decimal_places)
traces[key].append(value)

Expand All @@ -821,7 +810,7 @@ def read(
# Handle summary calculation
if summary:
for key, value in summary[0].items():
if key in ('time', '_time', 'result', 'table', 'content_type', 'object_id'):
if key in ('time', 'result', 'table', 'content_type', 'object_id'):
continue
if not timeseries_db.validate_query(self.query):
value = None
Expand Down
93 changes: 63 additions & 30 deletions openwisp_monitoring/monitoring/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,14 @@
"'{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum()'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> sum()'
' |> yield(name: "histogram")'
),
},
},
Expand Down Expand Up @@ -129,9 +133,13 @@
"content_type = '{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> yield(name: "default")'
),
},
},
Expand All @@ -147,11 +155,13 @@
"content_type = '{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" or '
'r["_measurement"] == "value2" and '
'r["content_type"] == "{content_type}" and '
'r["object_id"] == "{object_id}")'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}" or r._field == "value2")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> yield(name: "multiple_test")'
),
},
},
Expand All @@ -167,9 +177,15 @@
" GROUP BY time(1d), metric_num"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}") '
'|> group(columns: ["metric_num"]) |> sum() |> cumulativeSum() |> window(every: 1d)'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> group(columns: ["metric_num"])'
' |> sum()'
' |> cumulativeSum()'
' |> window(every: 1d)'
' |> yield(name: "group_by_tag")'
),
},
'summary_query': {
Expand All @@ -178,9 +194,14 @@
" GROUP BY time(30d), metric_num"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}") '
'|> group(columns: ["metric_num"]) |> sum() |> window(every: 30d)'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "ping")'
' |> filter(fn: (r) => r._field == "loss")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> mean()'
' |> yield(name: "summary")'
),
},
},
Expand All @@ -196,10 +217,14 @@
"content_type = '{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean()'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> mean()'
' |> yield(name: "mean_test")'
),
},
},
Expand All @@ -215,10 +240,14 @@
"content_type = '{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> sum()'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> sum()'
' |> yield(name: "sum_test")'
),
},
},
Expand All @@ -235,10 +264,14 @@
"'{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}") '
'|> mean()'
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> mean()'
' |> yield(name: "top_fields_mean")'
),
},
},
Expand Down
13 changes: 9 additions & 4 deletions openwisp_monitoring/monitoring/tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ def _get_new_metric(self):
"WHERE time >= '{time}' AND content_type = "
"'{content_type}' AND object_id = '{object_id}'"
),
'influxdb2': (
'from(bucket: "{key}") |> range(start: {time}) '
'|> filter(fn: (r) => r["_measurement"] == "{field_name}" and '
'r["content_type"] == "{content_type}" and r["object_id"] == "{object_id}")'
'influxdb2': (
'from(bucket: "mybucket")'
' |> range(start: {time}, stop: {end_date})'
' |> filter(fn: (r) => r._measurement == "{measurement}")'
' |> filter(fn: (r) => r._field == "{field_name}")'
' |> filter(fn: (r) => r.content_type == "{content_type}")'
' |> filter(fn: (r) => r.object_id == "{object_id}")'
' |> sum()'
' |> yield(name: "histogram")'
),
},
}
Expand Down
Loading

0 comments on commit 3859434

Please sign in to comment.