Skip to content

Commit

Permalink
Auto-triggering: Overwrite the payload using the auto_followup pipeli…
Browse files Browse the repository at this point in the history
…ne's output + priority order (#291)

* allow the filter itself to overwrite the follow-up request's payload and/or the comment being posted

* add the machinery needed to have Kowalski use the right priority order (ascending or descending) when figuring out if we need to update an existing follow-up request
  • Loading branch information
Theodlz authored May 15, 2024
1 parent e379a00 commit f8a430a
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 82 deletions.
2 changes: 1 addition & 1 deletion config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ kowalski:
zookeeper.test: "localhost:2181"
path: "kafka_2.13-3.4.1"
processes_per_topic:
ZTF: 2
ZTF: 1

database:
max_pool_size: 200
Expand Down
143 changes: 67 additions & 76 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import base64
import datetime
import gzip
import inspect
import io
import os
import pathlib
Expand Down Expand Up @@ -47,6 +46,7 @@
time_stamp,
timer,
compare_dicts,
priority_should_update,
)
from warnings import simplefilter

Expand Down Expand Up @@ -1270,7 +1270,6 @@ def alert_filter__user_defined(
self,
filter_templates: Sequence,
alert: Mapping,
alert_history: list = [],
max_time_ms: int = 1000,
) -> list:
"""Evaluate user-defined filters
Expand Down Expand Up @@ -1357,6 +1356,7 @@ def alert_filter__user_defined(
"active", False
):
auto_followup_filter = deepcopy(_filter["auto_followup"])
priority = 5

# validate non-optional keys
if (
Expand All @@ -1379,58 +1379,13 @@ def alert_filter__user_defined(
continue

# there is also a priority key that is optional. If not present or if not a function, it defaults to 5 (lowest priority)
if auto_followup_filter.get("priority", None) is not None:
if isinstance(auto_followup_filter["priority"], str):
try:
auto_followup_filter["priority"] = eval(
auto_followup_filter["priority"]
)
except Exception:
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (could not eval str), using default of 5'
)
continue
if isinstance(
auto_followup_filter["priority"], int
) or isinstance(auto_followup_filter["priority"], float):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"priority"
]
elif callable(auto_followup_filter["priority"]):
# verify that the function takes 3 arguments: alert, alert_history, data
if (
len(
inspect.signature(
auto_followup_filter["priority"]
).parameters
)
!= 3
):
log(
f'Filter {_filter["fid"]} has an invalid auto-followup priority (needs 3 arguments), using default of 5'
)
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
elif (
if isinstance(
auto_followup_filter.get("payload", {}).get(
"priority", None
)
is not None
),
(int, float),
):
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: auto_followup_filter[
"payload"
][
"priority"
]
else:
auto_followup_filter[
"priority"
] = lambda alert, alert_history, data: 5
priority = auto_followup_filter["payload"]["priority"]

# validate the optional radius key, and set to 0.5 arcsec if not present
if auto_followup_filter.get("radius", None) is None:
Expand Down Expand Up @@ -1474,17 +1429,61 @@ def alert_filter__user_defined(
)

if len(auto_followup_filtered_data) == 1:
priority = auto_followup_filter["priority"](
alert, alert_history, auto_followup_filtered_data[0]
)
comment = auto_followup_filter.get("comment", None)
if "comment" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["comment"], str
):
comment = auto_followup_filtered_data[0]["comment"]

payload = _filter["auto_followup"].get("payload", {})
payload["priority"] = priority
# if the auto_followup_filtered_data contains a payload key, merge it with the existing payload
# updating the existing keys with the new values, ignoring the rest
#
# we ignore the rest because the keys from the _filter["auto_followup"] payload
# have been validated by SkyPortal and contain the necessary keys, so we ignore
# those generated in the mongodb pipeline that are not present in the _filter["auto_followup"] payload
#
# PS: we added the priority from _filter["auto_followup"] before overwriting the payload
# so that we can replace the fixed priority with the dynamic one from the pipeline
if "payload" in auto_followup_filtered_data[
0
] and isinstance(
auto_followup_filtered_data[0]["payload"], dict
):
for key in auto_followup_filtered_data[0]["payload"]:
if key in payload:
payload[key] = auto_followup_filtered_data[0][
"payload"
][key]
payload["start_date"] = datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
)
payload["end_date"] = (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f")

if comment is not None:
comment += f" (priority: {str(priority)})"
comment = (
str(comment.strip())
+ f" (priority: {str(payload['priority'])})"
)

passed_filter["auto_followup"] = {
"allocation_id": _filter["auto_followup"][
"allocation_id"
],
"comment": comment,
"priority_order": _filter["auto_followup"].get(
"priority_order", "asc"
),
"data": {
"obj_id": alert["objectId"],
"allocation_id": _filter["auto_followup"][
Expand All @@ -1498,22 +1497,7 @@ def alert_filter__user_defined(
)
)
),
"payload": {
**_filter["auto_followup"].get("payload", {}),
"priority": priority,
"start_date": datetime.datetime.utcnow().strftime(
"%Y-%m-%dT%H:%M:%S.%f"
),
"end_date": (
datetime.datetime.utcnow()
+ datetime.timedelta(
days=_filter["auto_followup"].get(
"validity_days", 7
)
)
).strftime("%Y-%m-%dT%H:%M:%S.%f"),
# one week validity window
},
"payload": payload,
# constraints
"source_group_ids": [_filter["group_id"]],
"not_if_classified": True,
Expand Down Expand Up @@ -2241,8 +2225,11 @@ def alert_sentinel_skyportal(
f"Failed to post followup request for {alert['objectId']} to SkyPortal: {e}"
)
else:
# if there is an existing request, but the priority is lower than the one we want to post,
# if there is an existing request, but the priority is lower (or higher, depends of
# the priority system for that instrument/allocation) than the one we want to post,
# update the existing request with the new priority
# first we pick the operator to use, > or <, based on the priority system

request_to_update = existing_requests_filtered[0][1]
# if the status is completed, deleted, or failed do not update
if any(
Expand All @@ -2257,10 +2244,14 @@ def alert_sentinel_skyportal(
# if the status is submitted, and the new priority is higher, update
if "submitted" in str(
request_to_update["status"]
).lower() and float(
passed_filter["auto_followup"]["data"]["payload"]["priority"]
) > float(
request_to_update["payload"]["priority"]
).lower() and priority_should_update(
request_to_update["payload"]["priority"], # existing
passed_filter["auto_followup"]["data"]["payload"][
"priority"
], # new
passed_filter["auto_followup"].get(
"priority_order", "asc"
), # which order warrants an update (higher or lower)
):
with timer(
f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal",
Expand Down
2 changes: 1 addition & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def process_alert(alert: Mapping, topic: str):
# execute user-defined alert filters
with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1):
passed_filters = alert_worker.alert_filter__user_defined(
alert_worker.filter_templates, alert, all_prv_candidates
alert_worker.filter_templates, alert
)
if alert_worker.verbose > 1:
log(
Expand Down
57 changes: 56 additions & 1 deletion kowalski/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"target_group_ids": list,
"radius": float,
"validity_days": int,
"priority_order": str,
}


Expand Down Expand Up @@ -2165,6 +2166,33 @@ async def post(self, request: web.Request) -> web.Response:
return self.error(
message=f"Cannot test {attribute} pipeline: no pipeline specified"
)
elif attribute == "auto_followup" and "priority_order" in getattr(
filter_new, attribute
):
if getattr(filter_new, attribute)["priority_order"] not in [
"asc",
"desc",
]:
return self.error(
message=f"Invalid priority_order specified for auto_followup: {getattr(filter_new, attribute)['priority_order']}"
)
new_priority_order = getattr(filter_new, attribute)["priority_order"]
new_allocation_id = getattr(filter_new, attribute)["allocation_id"]
# fetch all existing filters in the DB with auto_followup with the same allocation_id
filters_same_alloc = [
f
async for f in request.app["mongo"]["filters"].find(
{"auto_followup.allocation_id": int(new_allocation_id)}
)
]
for f in filters_same_alloc:
if (
"priority_order" in f["auto_followup"]
and f["auto_followup"]["priority_order"] != new_priority_order
):
return self.error(
message="Cannot add new filter with auto_followup: existing filters with the same allocation have priority_order set to a different value, which is unexpected"
)
pipeline = getattr(filter_new, attribute).get("pipeline")
if not isinstance(pipeline, str):
pipeline = dumps(pipeline)
Expand Down Expand Up @@ -2439,7 +2467,34 @@ async def patch(self, request: web.Request) -> web.Response:
return self.error(
message=f"Cannot update filter id {filter_id}: {modifiable_field} contains invalid keys"
)

elif (
modifiable_field == "auto_followup"
and isinstance(value, dict)
and "priority_order" in value
):
# fetch all existing filters in the DB with auto_followup with the same allocation_id
filters_same_alloc = [
f
async for f in request.app["mongo"]["filters"].find(
{
"auto_followup.allocation_id": int(
value["allocation_id"]
)
}
)
]
# if there is any filter with the same allocation_id, and a non null priority_order that is differet
# throw an error. This is to avoid having multiple filters with the same allocation_id and different
# priority_order, which should be fixed by a sys admin, as it should not happen
for f in filters_same_alloc:
if (
"priority_order" in f["auto_followup"]
and f["auto_followup"]["priority_order"]
!= value["priority_order"]
):
return self.error(
message=f"Cannot update filter id {filter_id}: {modifiable_field}, filters with the same allocation have priority_order set to a different value, which is unexpected"
)
if modifiable_field == "autosave" and isinstance(value, bool):
pass
elif isinstance(value, dict) and "pipeline" not in value:
Expand Down
Loading

0 comments on commit f8a430a

Please sign in to comment.