Skip to content

Commit

Permalink
update followup requests priority, prevent duplicates better, get ful…
Browse files Browse the repository at this point in the history
…l alert history for ML features
  • Loading branch information
Theodlz committed Oct 4, 2023
1 parent 63c1258 commit 928a275
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 22 deletions.
136 changes: 120 additions & 16 deletions kowalski/alert_brokers/alert_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
retry,
time_stamp,
timer,
compare_dicts,
)
from warnings import simplefilter

Expand Down Expand Up @@ -1843,7 +1844,14 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
]

if len(passed_filters_followup) > 0:
# first fetch the followup requests on SkyPortal for this alert
# first sort all the filters by priority (highest first)
passed_filters_followup = sorted(
passed_filters_followup,
key=lambda f: f["auto_followup"]["data"]["payload"]["priority"],
reverse=True,
)

# then, fetch the existing followup requests on SkyPortal for this alert
with timer(
f"Getting followup requests for {alert['objectId']} from SkyPortal",
self.verbose > 1,
Expand All @@ -1860,23 +1868,35 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
for r in existing_requests
if r["status"] in ["completed", "submitted"]
]
# sort by priority (highest first)
existing_requests = sorted(
existing_requests,
key=lambda r: r["payload"]["priority"],
reverse=True,
)
else:
log(f"Failed to get followup requests for {alert['objectId']}")
existing_requests = []

for passed_filter in passed_filters_followup:
# post a followup request with the payload and allocation_id
# if there isn't already a pending request for this alert and this allocation_id
if (
len(
[
r
for r in existing_requests
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
]
# look for existing requests with the same allocation, group, and payload
existing_requests_filtered = [
(i, r)
for (i, r) in enumerate(existing_requests)
if r["allocation_id"]
== passed_filter["auto_followup"]["allocation_id"]
and set([passed_filter["group_id"]]).issubset(
[g["id"] for g in r["target_groups"]]
)
== 0
):
and compare_dicts(
passed_filter["auto_followup"]["data"]["payload"],
r["payload"],
ignore_keys=["priority", "start_date", "end_date"],
)
is True
]
if len(existing_requests_filtered) == 0:
# if no existing request, post a new one
with timer(
f"Posting auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
Expand All @@ -1899,6 +1919,24 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
log(
f"Posted followup request for {alert['objectId']} to SkyPortal"
)
# add it to the existing requests
existing_requests.append(
{
"allocation_id": passed_filter["auto_followup"][
"allocation_id"
],
"payload": passed_filter["auto_followup"][
"data"
]["payload"],
"target_groups": [
{
"id": passed_filter["group_id"],
}
],
"status": "submitted",
}
)

if (
passed_filter["auto_followup"].get("comment", None)
is not None
Expand Down Expand Up @@ -1943,6 +1981,72 @@ def alert_sentinel_skyportal(self, alert, prv_candidates, passed_filters):
f"Failed to post followup request for {alert['objectId']} to SkyPortal: {e}"
)
else:
log(
f"Pending Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal"
)
# if there is an existing request, but the priority is lower than the one we want to post,
# update the existing request with the new priority
request_to_update = existing_requests_filtered[0][1]
if (
passed_filter["auto_followup"]["data"]["payload"]["priority"]
> request_to_update["payload"]["priority"]
):
with timer(
f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal",
self.verbose > 1,
):
# to update, the api needs to get the request id, target group id, and payload
# so we'll basically get that from the existing request, and simply update the priority
try:
data = {
"target_group_id": [
g["id"]
for g in request_to_update["target_groups"]
],
"payload": {
**request_to_update["payload"],
"priority": passed_filter["auto_followup"][
"data"
]["payload"]["priority"],
},
"obj_id": alert["objectId"],
"status": request_to_update["status"],
"allocation_id": request_to_update["allocation_id"],
}
response = self.api_skyportal(
"PUT",
f"/api/followup_request/{request_to_update['id']}",
data,
)
if (
response.json()["status"] == "success"
and response.json()
.get("data", {})
.get("ignored", False)
is False
):
log(
f"Updated priority of followup request for {alert['objectId']} to SkyPortal"
)
# update the existing_requests list
existing_requests[existing_requests_filtered[0][0]][
"priority"
] = passed_filter["auto_followup"]["data"][
"payload"
][
"priority"
]

# TODO: post a comment to the source to mention the update
else:
raise ValueError(
response.json().get(
"message",
"unknow error updating followup request",
)
)
except Exception as e:
log(
f"Failed to update priority of followup request for {alert['objectId']} to SkyPortal: {e}"
)
else:
log(
f"Pending Followup request for {alert['objectId']} and allocation_id {passed_filter['auto_followup']['allocation_id']} already exists on SkyPortal, no need for update"
)
13 changes: 12 additions & 1 deletion kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,18 @@ def process_alert(alert: Mapping, topic: str):
and len(existing_aux.get("prv_candidates", [])) > 0
):
all_prv_candidates += existing_aux["prv_candidates"]
del existing_aux

# get all alerts for this objectId:
existing_alerts = list(
alert_worker.mongo.db[alert_worker.collection_alerts].find(
{"objectId": object_id}, {"candidate": 1}
)
)
if len(existing_alerts) > 0:
all_prv_candidates += [
existing_alert["candidate"] for existing_alert in existing_alerts
]
del existing_aux, existing_alerts

# ML models:
with timer(f"MLing of {object_id} {candid}", alert_worker.verbose > 1):
Expand Down
69 changes: 64 additions & 5 deletions kowalski/tests/test_alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,36 @@ def test_alert_filter__user_defined_followup_with_broker(self):
"allocation_id": allocation_id,
"payload": { # example payload for SEDM
"observation_type": "IFU",
"priority": 3,
"priority": 2,
},
}
passed_filters = self.worker.alert_filter__user_defined([filter], self.alert)
# make a copy of that filter, but with priority 3
filter2 = deepcopy(filter)
filter2["auto_followup"]["payload"]["priority"] = 3
passed_filters = self.worker.alert_filter__user_defined(
[filter, filter2], self.alert
)

assert passed_filters is not None
assert len(passed_filters) == 1
assert len(passed_filters) == 2 # both filters should have passed
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 3
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 2
assert "auto_followup" in passed_filters[1]
assert (
passed_filters[1]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 3

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# it should have deduplicated and used the highest priority
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
Expand All @@ -328,7 +340,54 @@ def test_alert_filter__user_defined_followup_with_broker(self):
]
assert len(followup_requests) == 1
assert followup_requests[0]["payload"]["observation_type"] == "IFU"
assert followup_requests[0]["payload"]["priority"] == 3
assert (
followup_requests[0]["payload"]["priority"] == 3
) # it should have deduplicated and used the highest priority

# now run it once more, but with a higher priority to see if the update works
filter2["auto_followup"]["payload"]["priority"] = 4
passed_filters = self.worker.alert_filter__user_defined(
[filter, filter2], self.alert
)

assert passed_filters is not None
assert len(passed_filters) == 2
assert "auto_followup" in passed_filters[0]
assert (
passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 2
assert "auto_followup" in passed_filters[1]
assert (
passed_filters[1]["auto_followup"]["data"]["payload"]["observation_type"]
== "IFU"
)
assert passed_filters[1]["auto_followup"]["data"]["payload"]["priority"] == 4

alert, prv_candidates = self.worker.alert_mongify(self.alert)
self.worker.alert_sentinel_skyportal(alert, prv_candidates, passed_filters)

# now fetch the follow-up request from SP
# it should have deduplicated and used the highest priority
response = self.worker.api_skyportal(
"GET", f"/api/followup_request?sourceID={alert['objectId']}", None
)
assert response.status_code == 200
followup_requests_updated = response.json()["data"].get("followup_requests", [])
followup_requests_updated = [
f
for f in followup_requests_updated
if (f["allocation_id"] == allocation_id and f["status"] == "submitted")
]
assert len(followup_requests_updated) == 1
assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU"
assert (
followup_requests_updated[0]["payload"]["priority"] == 4
) # it should have deduplicated and used the highest priority
assert (
followup_requests_updated[0]["id"] == followup_requests[0]["id"]
) # the id should be the same

# delete the follow-up request
response = self.worker.api_skyportal(
Expand Down
20 changes: 20 additions & 0 deletions kowalski/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,26 @@ def str_to_numeric(s):
return float(s)


def compare_dicts(a: dict, b: dict, ignore_keys=[], same_keys=False):
"""Compare two followup payloads, making sure that a is the same as b or a subset of b, ignoring certain keys"""
if same_keys and len(a) != len(b):
return False
for k, v in a.items():
if k in ignore_keys:
continue
if k not in b:
return False
if isinstance(v, dict):
if not compare_dicts(v, b[k]):
return False
elif isinstance(v, list):
if not all([i in b[k] for i in v]):
return False
elif b[k] != v:
return False
return True


class ZTFAlert:
def __init__(self, alert, alert_history, models, label=None, **kwargs):
self.kwargs = kwargs
Expand Down

0 comments on commit 928a275

Please sign in to comment.