diff --git a/config.defaults.yaml b/config.defaults.yaml index 8a736f07..5ab6d135 100644 --- a/config.defaults.yaml +++ b/config.defaults.yaml @@ -40,7 +40,7 @@ kowalski: zookeeper.test: "localhost:2181" path: "kafka_2.13-3.4.1" processes_per_topic: - ZTF: 2 + ZTF: 1 database: max_pool_size: 200 diff --git a/kowalski/alert_brokers/alert_broker.py b/kowalski/alert_brokers/alert_broker.py index 14f3181d..f64c84fd 100644 --- a/kowalski/alert_brokers/alert_broker.py +++ b/kowalski/alert_brokers/alert_broker.py @@ -3,7 +3,6 @@ import base64 import datetime import gzip -import inspect import io import os import pathlib @@ -47,6 +46,7 @@ time_stamp, timer, compare_dicts, + priority_should_update, ) from warnings import simplefilter @@ -1270,7 +1270,6 @@ def alert_filter__user_defined( self, filter_templates: Sequence, alert: Mapping, - alert_history: list = [], max_time_ms: int = 1000, ) -> list: """Evaluate user-defined filters @@ -1357,6 +1356,7 @@ def alert_filter__user_defined( "active", False ): auto_followup_filter = deepcopy(_filter["auto_followup"]) + priority = 5 # validate non-optional keys if ( @@ -1379,58 +1379,13 @@ def alert_filter__user_defined( continue # there is also a priority key that is optional. If not present or if not a function, it defaults to 5 (lowest priority) - if auto_followup_filter.get("priority", None) is not None: - if isinstance(auto_followup_filter["priority"], str): - try: - auto_followup_filter["priority"] = eval( - auto_followup_filter["priority"] - ) - except Exception: - log( - f'Filter {_filter["fid"]} has an invalid auto-followup priority (could not eval str), using default of 5' - ) - continue - if isinstance( - auto_followup_filter["priority"], int - ) or isinstance(auto_followup_filter["priority"], float): - auto_followup_filter[ - "priority" - ] = lambda alert, alert_history, data: auto_followup_filter[ - "priority" - ] - elif callable(auto_followup_filter["priority"]): - # verify that the function takes 3 arguments: alert, alert_history, data - if ( - len( - inspect.signature( - auto_followup_filter["priority"] - ).parameters - ) - != 3 - ): - log( - f'Filter {_filter["fid"]} has an invalid auto-followup priority (needs 3 arguments), using default of 5' - ) - auto_followup_filter[ - "priority" - ] = lambda alert, alert_history, data: 5 - elif ( + if isinstance( auto_followup_filter.get("payload", {}).get( "priority", None - ) - is not None + ), + (int, float), ): - auto_followup_filter[ - "priority" - ] = lambda alert, alert_history, data: auto_followup_filter[ - "payload" - ][ - "priority" - ] - else: - auto_followup_filter[ - "priority" - ] = lambda alert, alert_history, data: 5 + priority = auto_followup_filter["payload"]["priority"] # validate the optional radius key, and set to 0.5 arcsec if not present if auto_followup_filter.get("radius", None) is None: @@ -1474,17 +1429,61 @@ def alert_filter__user_defined( ) if len(auto_followup_filtered_data) == 1: - priority = auto_followup_filter["priority"]( - alert, alert_history, auto_followup_filtered_data[0] - ) comment = auto_followup_filter.get("comment", None) + if "comment" in auto_followup_filtered_data[ + 0 + ] and isinstance( + auto_followup_filtered_data[0]["comment"], str + ): + comment = auto_followup_filtered_data[0]["comment"] + + payload = _filter["auto_followup"].get("payload", {}) + payload["priority"] = priority + # if the auto_followup_filtered_data contains a payload key, merge it with the existing payload + # updating the existing keys with the new values, ignoring the rest + # + # we ignore the rest because the keys from the _filter["auto_followup"] payload + # have been validated by SkyPortal and contain the necessary keys, so we ignore + # those generated in the mongodb pipeline that are not present in the _filter["auto_followup"] payload + # + # PS: we added the priority from _filter["auto_followup"] before overwriting the payload + # so that we can replace the fixed priority with the dynamic one from the pipeline + if "payload" in auto_followup_filtered_data[ + 0 + ] and isinstance( + auto_followup_filtered_data[0]["payload"], dict + ): + for key in auto_followup_filtered_data[0]["payload"]: + if key in payload: + payload[key] = auto_followup_filtered_data[0][ + "payload" + ][key] + payload["start_date"] = datetime.datetime.utcnow().strftime( + "%Y-%m-%dT%H:%M:%S.%f" + ) + payload["end_date"] = ( + datetime.datetime.utcnow() + + datetime.timedelta( + days=_filter["auto_followup"].get( + "validity_days", 7 + ) + ) + ).strftime("%Y-%m-%dT%H:%M:%S.%f") + if comment is not None: - comment += f" (priority: {str(priority)})" + comment = ( + str(comment.strip()) + + f" (priority: {str(payload['priority'])})" + ) + passed_filter["auto_followup"] = { "allocation_id": _filter["auto_followup"][ "allocation_id" ], "comment": comment, + "priority_order": _filter["auto_followup"].get( + "priority_order", "asc" + ), "data": { "obj_id": alert["objectId"], "allocation_id": _filter["auto_followup"][ @@ -1498,22 +1497,7 @@ def alert_filter__user_defined( ) ) ), - "payload": { - **_filter["auto_followup"].get("payload", {}), - "priority": priority, - "start_date": datetime.datetime.utcnow().strftime( - "%Y-%m-%dT%H:%M:%S.%f" - ), - "end_date": ( - datetime.datetime.utcnow() - + datetime.timedelta( - days=_filter["auto_followup"].get( - "validity_days", 7 - ) - ) - ).strftime("%Y-%m-%dT%H:%M:%S.%f"), - # one week validity window - }, + "payload": payload, # constraints "source_group_ids": [_filter["group_id"]], "not_if_classified": True, @@ -2241,8 +2225,11 @@ def alert_sentinel_skyportal( f"Failed to post followup request for {alert['objectId']} to SkyPortal: {e}" ) else: - # if there is an existing request, but the priority is lower than the one we want to post, + # if there is an existing request, but the priority is lower (or higher, depends of + # the priority system for that instrument/allocation) than the one we want to post, # update the existing request with the new priority + # first we pick the operator to use, > or <, based on the priority system + request_to_update = existing_requests_filtered[0][1] # if the status is completed, deleted, or failed do not update if any( @@ -2257,10 +2244,14 @@ def alert_sentinel_skyportal( # if the status is submitted, and the new priority is higher, update if "submitted" in str( request_to_update["status"] - ).lower() and float( - passed_filter["auto_followup"]["data"]["payload"]["priority"] - ) > float( - request_to_update["payload"]["priority"] + ).lower() and priority_should_update( + request_to_update["payload"]["priority"], # existing + passed_filter["auto_followup"]["data"]["payload"][ + "priority" + ], # new + passed_filter["auto_followup"].get( + "priority_order", "asc" + ), # which order warrants an update (higher or lower) ): with timer( f"Updating priority of auto followup request for {alert['objectId']} to SkyPortal", diff --git a/kowalski/alert_brokers/alert_broker_ztf.py b/kowalski/alert_brokers/alert_broker_ztf.py index 3da1b8ac..7624c616 100644 --- a/kowalski/alert_brokers/alert_broker_ztf.py +++ b/kowalski/alert_brokers/alert_broker_ztf.py @@ -196,7 +196,7 @@ def process_alert(alert: Mapping, topic: str): # execute user-defined alert filters with timer(f"Filtering of {object_id} {candid}", alert_worker.verbose > 1): passed_filters = alert_worker.alert_filter__user_defined( - alert_worker.filter_templates, alert, all_prv_candidates + alert_worker.filter_templates, alert ) if alert_worker.verbose > 1: log( diff --git a/kowalski/api/api.py b/kowalski/api/api.py index b56b22c8..cd749a69 100644 --- a/kowalski/api/api.py +++ b/kowalski/api/api.py @@ -85,6 +85,7 @@ "target_group_ids": list, "radius": float, "validity_days": int, + "priority_order": str, } @@ -2165,6 +2166,33 @@ async def post(self, request: web.Request) -> web.Response: return self.error( message=f"Cannot test {attribute} pipeline: no pipeline specified" ) + elif attribute == "auto_followup" and "priority_order" in getattr( + filter_new, attribute + ): + if getattr(filter_new, attribute)["priority_order"] not in [ + "asc", + "desc", + ]: + return self.error( + message=f"Invalid priority_order specified for auto_followup: {getattr(filter_new, attribute)['priority_order']}" + ) + new_priority_order = getattr(filter_new, attribute)["priority_order"] + new_allocation_id = getattr(filter_new, attribute)["allocation_id"] + # fetch all existing filters in the DB with auto_followup with the same allocation_id + filters_same_alloc = [ + f + async for f in request.app["mongo"]["filters"].find( + {"auto_followup.allocation_id": int(new_allocation_id)} + ) + ] + for f in filters_same_alloc: + if ( + "priority_order" in f["auto_followup"] + and f["auto_followup"]["priority_order"] != new_priority_order + ): + return self.error( + message="Cannot add new filter with auto_followup: existing filters with the same allocation have priority_order set to a different value, which is unexpected" + ) pipeline = getattr(filter_new, attribute).get("pipeline") if not isinstance(pipeline, str): pipeline = dumps(pipeline) @@ -2439,7 +2467,34 @@ async def patch(self, request: web.Request) -> web.Response: return self.error( message=f"Cannot update filter id {filter_id}: {modifiable_field} contains invalid keys" ) - + elif ( + modifiable_field == "auto_followup" + and isinstance(value, dict) + and "priority_order" in value + ): + # fetch all existing filters in the DB with auto_followup with the same allocation_id + filters_same_alloc = [ + f + async for f in request.app["mongo"]["filters"].find( + { + "auto_followup.allocation_id": int( + value["allocation_id"] + ) + } + ) + ] + # if there is any filter with the same allocation_id, and a non null priority_order that is differet + # throw an error. This is to avoid having multiple filters with the same allocation_id and different + # priority_order, which should be fixed by a sys admin, as it should not happen + for f in filters_same_alloc: + if ( + "priority_order" in f["auto_followup"] + and f["auto_followup"]["priority_order"] + != value["priority_order"] + ): + return self.error( + message=f"Cannot update filter id {filter_id}: {modifiable_field}, filters with the same allocation have priority_order set to a different value, which is unexpected" + ) if modifiable_field == "autosave" and isinstance(value, bool): pass elif isinstance(value, dict) and "pipeline" not in value: diff --git a/kowalski/tests/test_alert_broker_ztf.py b/kowalski/tests/test_alert_broker_ztf.py index ec920842..453ccbed 100644 --- a/kowalski/tests/test_alert_broker_ztf.py +++ b/kowalski/tests/test_alert_broker_ztf.py @@ -494,7 +494,6 @@ def test_alert_filter__user_defined_followup(self): "validity_days": 3, } passed_filters = self.worker.alert_filter__user_defined([filter], self.alert) - delete_alert(self.worker, self.alert) assert passed_filters is not None assert len(passed_filters) == 1 @@ -515,6 +514,33 @@ def test_alert_filter__user_defined_followup(self): ) assert (end_date - start_date).days == 3 + # try the same but with a pipeline that overwrites the payload dynamically + filter["auto_followup"]["pipeline"].append( + { + "$addFields": { + "payload.observation_type": "imaging", + "payload.priority": 0, + "comment": "Overwritten by pipeline", + } + } + ) + + passed_filters = self.worker.alert_filter__user_defined([filter], self.alert) + delete_alert(self.worker, self.alert) + + assert passed_filters is not None + assert len(passed_filters) == 1 + assert "auto_followup" in passed_filters[0] + assert ( + passed_filters[0]["auto_followup"]["data"]["payload"]["observation_type"] + == "imaging" + ) + assert passed_filters[0]["auto_followup"]["data"]["payload"]["priority"] == 0 + assert ( + passed_filters[0]["auto_followup"]["comment"] + == "Overwritten by pipeline (priority: 0)" + ) + def test_alert_filter__user_defined_followup_with_broker(self): """Test pushing an alert through a filter that also has auto follow-up activated, and broker mode activated""" if not config["misc"].get("broker", False): @@ -681,6 +707,80 @@ def test_alert_filter__user_defined_followup_with_broker(self): followup_requests_updated[0]["id"] == followup_requests[0]["id"] ) # the id should be the same + # now try using a lower priority, and verify that it was not updated (still using priority 4) + filter2["auto_followup"]["payload"]["priority"] = 3 + passed_filters = self.worker.alert_filter__user_defined( + [filter, filter2], self.alert + ) + + assert passed_filters is not None + assert len(passed_filters) == 2 + assert "auto_followup" in passed_filters[0] + + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) + self.worker.alert_sentinel_skyportal( + alert, prv_candidates, passed_filters=passed_filters + ) + + # now fetch the follow-up request from SP + # it should still be at priority 4 + response = self.worker.api_skyportal( + "GET", f"/api/followup_request?sourceID={alert['objectId']}", None + ) + assert response.status_code == 200 + followup_requests_updated = response.json()["data"].get("followup_requests", []) + followup_requests_updated = [ + f + for f in followup_requests_updated + if (f["allocation_id"] == allocation_id and f["status"] == "submitted") + ] + assert len(followup_requests_updated) == 1 + assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU" + assert followup_requests_updated[0]["payload"]["priority"] == 4 + assert followup_requests_updated[0]["id"] == followup_requests[0]["id"] + + # now we use a lower priority, but we specify for the auto_followup that lower is better for this specific allocation + # should be the same for a given allocation across filters, and that is set by SkyPortal when creating + # or editing the filter there. + # TODO: in the future, have kowalski fetch all of the allocations and have a mapper from allocation_id to priority_order + # enforced there, for all filters with the same allocation/instrument they need to trigger on. + filter["auto_followup"]["priority_order"] = "desc" + filter2["auto_followup"]["priority_order"] = "desc" + filter2["auto_followup"]["payload"]["priority"] = 1 + passed_filters = self.worker.alert_filter__user_defined( + [filter, filter2], self.alert + ) + + assert passed_filters is not None + assert len(passed_filters) == 2 + assert "auto_followup" in passed_filters[0] + + alert, prv_candidates, _ = self.worker.alert_mongify(self.alert) + self.worker.alert_sentinel_skyportal( + alert, prv_candidates, passed_filters=passed_filters + ) + + # now fetch the follow-up request from SP + # it should now be at priority 1 + response = self.worker.api_skyportal( + "GET", f"/api/followup_request?sourceID={alert['objectId']}", None + ) + assert response.status_code == 200 + followup_requests_updated = response.json()["data"].get("followup_requests", []) + followup_requests_updated = [ + f + for f in followup_requests_updated + if (f["allocation_id"] == allocation_id and f["status"] == "submitted") + ] + assert len(followup_requests_updated) == 1 + assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU" + assert followup_requests_updated[0]["payload"]["priority"] == 1 + assert followup_requests_updated[0]["id"] == followup_requests[0]["id"] + + # set the priority_order back to normal (missing or asc, as asc should be the default) + filter["auto_followup"].pop("priority_order") + filter2["auto_followup"]["priority_order"] = "asc" + # now, we'll test the target_group_ids functionality for deduplication # that is, if a filter has a target_group_ids, then it should only trigger a follow-up request # if none of the existing requests' target groups have an overlap with the target_group_ids @@ -712,7 +812,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): ) # now fetch the follow-up request from SP - # we should still have just one follow-up request, the exact same as before + # we should still have just one follow-up request, same as before but back to priority 2 response = self.worker.api_skyportal( "GET", f"/api/followup_request?sourceID={alert['objectId']}", None ) @@ -725,7 +825,7 @@ def test_alert_filter__user_defined_followup_with_broker(self): ] assert len(followup_requests_updated) == 1 assert followup_requests_updated[0]["payload"]["observation_type"] == "IFU" - assert followup_requests_updated[0]["payload"]["priority"] == 4 + assert followup_requests_updated[0]["payload"]["priority"] == 2 assert followup_requests_updated[0]["id"] == followup_requests[0]["id"] # delete the follow-up request @@ -746,6 +846,9 @@ def test_alert_filter__user_defined_followup_with_broker(self): **filter["auto_followup"], "comment": "SEDM triggered by BTSbot2", } + filter["auto_followup"]["payload"][ + "priority" + ] = 2 # just like the filter_multiple_groups passed_filters = self.worker.alert_filter__user_defined([filter], self.alert) assert passed_filters is not None diff --git a/kowalski/tests/test_api.py b/kowalski/tests/test_api.py index 83d27fee..88dc0ea0 100644 --- a/kowalski/tests/test_api.py +++ b/kowalski/tests/test_api.py @@ -348,6 +348,7 @@ async def test_filters(aiohttp_client): "active": True, "allocation_id": 1, "comment": "test auto_followup", + "priority_order": "desc", "payload": { # example payload for SEDM "observation_type": "IFU", }, @@ -371,12 +372,46 @@ async def test_filters(aiohttp_client): assert result["status"] == "success" assert "data" in result assert "auto_followup" in result["data"] + assert "priority_order" in result["data"]["auto_followup"] + assert result["data"]["auto_followup"]["priority_order"] == "desc" # pipeline has been saved as a string, so we need to do the same before comparing auto_followup[ "pipeline" ] = '[{"$match": {"candidate.drb": {"$gt": 0.9999}, "cross_matches.CLU_20190625.0": {"$exists": false}}}]' assert result["data"]["auto_followup"] == auto_followup + # verify that we can't post another filter with the same allocation_id but a different priority_order "asc" + asc_filter = make_filter(filter_id=filter_id) + asc_filter["autosave"] = autosave + asc_filter["auto_followup"] = { + "active": True, + "allocation_id": 1, + "comment": "test auto_followup asc", + "priority_order": "asc", + "payload": { # example payload for SEDM + "observation_type": "IFU", + }, + "pipeline": [ + { + "$match": { + "candidate.drb": {"$gt": 0.9999}, + "cross_matches.CLU_20190625.0": {"$exists": False}, + } + }, + ], + } + resp = await client.post( + "/api/filters", json=asc_filter, headers=headers, timeout=5 + ) + assert resp.status == 400 + result = await resp.json() + assert result["status"] == "error" + assert "message" in result + assert ( + "filters with the same allocation have priority_order set to a different value, which is unexpected" + in result["message"] + ) + # turn update_annotations on resp = await client.patch( "/api/filters", diff --git a/kowalski/utils.py b/kowalski/utils.py index 142aa00d..f9ebcdcb 100644 --- a/kowalski/utils.py +++ b/kowalski/utils.py @@ -33,6 +33,7 @@ "uid", "ZTFAlert", "retry", + "priority_should_update", ] import base64 @@ -1382,3 +1383,11 @@ def make_dmdt(self, up_to_candidate_jd=True, min_points=4): dmdt[:, :, i] = np.zeros((26, 26)) return dmdt + + +def priority_should_update(existing_priority, new_priority, priority_order="asc"): + # default to ascending + if priority_order == "desc": + return float(new_priority) < float(existing_priority) + + return float(new_priority) > float(existing_priority)