Skip to content

Commit

Permalink
allow the filter itself to overwrite the follow-up request's payload …
Browse files Browse the repository at this point in the history
…and/or the comment being posted
  • Loading branch information
Theodlz committed May 8, 2024
1 parent cd67e6d commit 1bef41a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 73 deletions.
122 changes: 51 additions & 71 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import base64
import datetime
import gzip
import inspect
import io
import os
import pathlib
Expand Down Expand Up @@ -1270,7 +1269,6 @@ def alert_filter__user_defined(
self,
filter_templates: Sequence,
alert: Mapping,
alert_history: list = [],
max_time_ms: int = 1000,
) -> list:
"""Evaluate user-defined filters
Expand Down Expand Up @@ -1357,6 +1355,7 @@ def alert_filter__user_defined(
"active", False
):
auto_followup_filter = deepcopy(_filter["auto_followup"])
priority = 5

# validate non-optional keys
if (
Expand All @@ -1379,58 +1378,13 @@ def alert_filter__user_defined(
continue

# there is also a priority key that is optional. If not present or if not a function, it defaults to 5 (lowest priority)
if auto_followup_filter.get("priority", None) is not None:
if isinstance(auto_followup_filter["priority"], str):
try:
auto_followup_filter["priority"] = eval(
auto_followup_filter["priority"]
)
except Exception:
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (could not eval str), using default of 5'
)
continue
if isinstance(
auto_followup_filter["priority"], int
) or isinstance(auto_followup_filter["priority"], float):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"priority"
]
elif callable(auto_followup_filter["priority"]):
# verify that the function takes 3 arguments: alert, alert_history, data
if (
len(
inspect.signature(
auto_followup_filter["priority"]
).parameters
)
!= 3
):
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (needs 3 arguments), using default of 5'
)
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
elif (
if isinstance(
auto_followup_filter.get("payload", {}).get(
"priority", None
)
is not None
),
(int, float),
):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"payload"
][
"priority"
]
else:
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
priority = auto_followup_filter["payload"]["priority"]

# validate the optional radius key, and set to 0.5 arcsec if not present
if auto_followup_filter.get("radius", None) is None:
Expand Down Expand Up @@ -1474,12 +1428,53 @@ def alert_filter__user_defined(
)

if len(auto_followup_filtered_data) == 1:
priority = auto_followup_filter["priority"](
alert, alert_history, auto_followup_filtered_data[0]
)
comment = auto_followup_filter.get("comment", None)
if "comment" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["comment"], str
):
comment = auto_followup_filtered_data[0]["comment"]

payload = _filter["auto_followup"].get("payload", {})
payload["priority"] = priority
# if the auto_followup_filtered_data contains a payload key, merge it with the existing payload
# updating the existing keys with the new values, ignoring the rest
#
# we ignore the rest because the keys from the _filter["auto_followup"] payload
# have been validated by SkyPortal and contain the necessary keys, so we ignore
# those generated in the mongodb pipeline that are not present in the _filter["auto_followup"] payload
#
# PS: we added the priority from _filter["auto_followup"] before overwriting the payload
# so that we can replace the fixed priority with the dynamic one from the pipeline
if "payload" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["payload"], dict
):
for key in auto_followup_filtered_data[0]["payload"]:
if key in payload:
payload[key] = auto_followup_filtered_data[0][
"payload"
][key]
payload["start_date"] = datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
)
payload["end_date"] = (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f")

if comment is not None:
comment += f" (priority: {str(priority)})"
comment = (
str(comment.strip())
+ f" (priority: {str(payload['priority'])})"
)

passed_filter["auto_followup"] = {
"allocation_id": _filter["auto_followup"][
"allocation_id"
Expand All @@ -1498,22 +1493,7 @@ def alert_filter__user_defined(
)
)
),
"payload": {
**_filter["auto_followup"].get("payload", {}),
"priority": priority,
"start_date": datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
),
"end_date": (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f"),
# one week validity window
},
"payload": payload,
# constraints
"source_group_ids": [_filter["group_id"]],
"not_if_classified": True,
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def process_alert(alert: Mapping, topic: str):
# execute user-defined alert filters
with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1):
passed_filters = alert_worker.alert_filter__user_defined(
alert_worker.filter_templates, alert, all_prv_candidates
alert_worker.filter_templates, alert
)
if alert_worker.verbose > 1:
log(
Expand Down
28 changes: 27 additions & 1 deletion kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def test_alert_filter__user_defined_followup(self):
"validity_days": 3,
}
passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
delete_alert(self.worker, self.alert)

assert passed_filters is not None
assert len(passed_filters) == 1
Expand All @@ -515,6 +514,33 @@ def test_alert_filter__user_defined_followup(self):
)
assert (end_date - start_date).days == 3

# try the same but with a pipeline that overwrites the payload dynamically
filter["auto_followup"]["pipeline"].append(
{
"$addFields": {
"payload.observation_type": "imaging",
"payload.priority": 0,
"comment": "Overwritten by pipeline",
}
}
)

passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
delete_alert(self.worker, self.alert)

assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "imaging"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 0
assert (
passed_filters[0]["auto_followup"]["comment"]
== "Overwritten by pipeline (priority: 0)"
)

def test_alert_filter__user_defined_followup_with_broker(self):
"""Test pushing an alert through a filter that also has auto follow-up activated, and broker mode activated"""
if not config["misc"].get("broker", False):
Expand Down

0 comments on commit 1bef41a

Please sign in to comment.