Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-triggering: Overwrite the payload using the auto_followup pipeline's output + priority order #291

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 51 additions & 71 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import base64
import datetime
import gzip
import inspect
import io
import os
import pathlib
Expand Down Expand Up @@ -1270,7 +1269,6 @@ def alert_filter__user_defined(
self,
filter_templates: Sequence,
alert: Mapping,
alert_history: list = [],
max_time_ms: int = 1000,
) -> list:
"""Evaluate user-defined filters
Expand Down Expand Up @@ -1357,6 +1355,7 @@ def alert_filter__user_defined(
"active", False
):
auto_followup_filter = deepcopy(_filter["auto_followup"])
priority = 5

# validate non-optional keys
if (
Expand All @@ -1379,58 +1378,13 @@ def alert_filter__user_defined(
continue

# there is also a priority key that is optional. If not present or if not a function, it defaults to 5 (lowest priority)
if auto_followup_filter.get("priority", None) is not None:
if isinstance(auto_followup_filter["priority"], str):
try:
auto_followup_filter["priority"] = eval(
auto_followup_filter["priority"]
)
except Exception:
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (could not eval str), using default of 5'
)
continue
if isinstance(
auto_followup_filter["priority"], int
) or isinstance(auto_followup_filter["priority"], float):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"priority"
]
elif callable(auto_followup_filter["priority"]):
# verify that the function takes 3 arguments: alert, alert_history, data
if (
len(
inspect.signature(
auto_followup_filter["priority"]
).parameters
)
!= 3
):
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (needs 3 arguments), using default of 5'
)
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
elif (
if isinstance(
auto_followup_filter.get("payload", {}).get(
"priority", None
)
is not None
),
(int, float),
):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"payload"
][
"priority"
]
else:
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
priority = auto_followup_filter["payload"]["priority"]

# validate the optional radius key, and set to 0.5 arcsec if not present
if auto_followup_filter.get("radius", None) is None:
Expand Down Expand Up @@ -1474,12 +1428,53 @@ def alert_filter__user_defined(
)

if len(auto_followup_filtered_data) == 1:
priority = auto_followup_filter["priority"](
alert, alert_history, auto_followup_filtered_data[0]
)
comment = auto_followup_filter.get("comment", None)
if "comment" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["comment"], str
):
comment = auto_followup_filtered_data[0]["comment"]

payload = _filter["auto_followup"].get("payload", {})
payload["priority"] = priority
# if the auto_followup_filtered_data contains a payload key, merge it with the existing payload
# updating the existing keys with the new values, ignoring the rest
#
# we ignore the rest because the keys from the _filter["auto_followup"] payload
# have been validated by SkyPortal and contain the necessary keys, so we ignore
# those generated in the mongodb pipeline that are not present in the _filter["auto_followup"] payload
#
# PS: we added the priority from _filter["auto_followup"] before overwriting the payload
# so that we can replace the fixed priority with the dynamic one from the pipeline
if "payload" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["payload"], dict
):
for key in auto_followup_filtered_data[0]["payload"]:
if key in payload:
payload[key] = auto_followup_filtered_data[0][
"payload"
][key]
payload["start_date"] = datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
)
payload["end_date"] = (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f")

if comment is not None:
comment += f" (priority: {str(priority)})"
comment = (
str(comment.strip())
+ f" (priority: {str(payload['priority'])})"
)

passed_filter["auto_followup"] = {
"allocation_id": _filter["auto_followup"][
"allocation_id"
Expand All @@ -1498,22 +1493,7 @@ def alert_filter__user_defined(
)
)
),
"payload": {
**_filter["auto_followup"].get("payload", {}),
"priority": priority,
"start_date": datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
),
"end_date": (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f"),
# one week validity window
},
"payload": payload,
# constraints
"source_group_ids": [_filter["group_id"]],
"not_if_classified": True,
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def process_alert(alert: Mapping, topic: str):
# execute user-defined alert filters
with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1):
passed_filters = alert_worker.alert_filter__user_defined(
alert_worker.filter_templates, alert, all_prv_candidates
alert_worker.filter_templates, alert
)
if alert_worker.verbose > 1:
log(
Expand Down
28 changes: 27 additions & 1 deletion kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def test_alert_filter__user_defined_followup(self):
"validity_days": 3,
}
passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
delete_alert(self.worker, self.alert)

assert passed_filters is not None
assert len(passed_filters) == 1
Expand All @@ -515,6 +514,33 @@ def test_alert_filter__user_defined_followup(self):
)
assert (end_date - start_date).days == 3

# try the same but with a pipeline that overwrites the payload dynamically
filter["auto_followup"]["pipeline"].append(
{
"$addFields": {
"payload.observation_type": "imaging",
"payload.priority": 0,
"comment": "Overwritten by pipeline",
}
}
)

passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
delete_alert(self.worker, self.alert)

assert passed_filters is not None
assert len(passed_filters) == 1
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "imaging"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 0
assert (
passed_filters[0]["auto_followup"]["comment"]
== "Overwritten by pipeline (priority: 0)"
)

def test_alert_filter__user_defined_followup_with_broker(self):
"""Test pushing an alert through a filter that also has auto follow-up activated, and broker mode activated"""
if not config["misc"].get("broker", False):
Expand Down
Loading