diff --git a/alab_management/resource_manager/resource_requester.py b/alab_management/resource_manager/resource_requester.py index 3c5de40b..2e40790c 100644 --- a/alab_management/resource_manager/resource_requester.py +++ b/alab_management/resource_manager/resource_requester.py @@ -28,6 +28,10 @@ ] # the raw request sent by task process +class RequestCanceledError(Exception): + """Request Canceled Error.""" + + class CombinedTimeoutError(TimeoutError, concurrent.futures.TimeoutError): """ Combined TimeoutError. @@ -240,7 +244,7 @@ def request_resources( _id: ObjectId = cast(ObjectId, result.inserted_id) self._waiting[_id] = {"f": f, "device_str_to_request": device_str_to_request} try: - result = f.result(timeout=timeout) + result = self.get_concurrent_result(f, timeout=timeout) except concurrent.futures.TimeoutError as e: # if the request is not fulfilled, cancel it to make sure the resources are released request = self._request_collection.find_one_and_update({ @@ -256,8 +260,7 @@ def request_resources( f"Request {result.inserted_id} timed out after {timeout} seconds." ) from e else: # if the request is fulfilled, return the result normally, wrong timeout - result = f.result(timeout=None) - + result = self.get_concurrent_result(f) return { **self._post_process_requested_resource( devices=result["devices"], @@ -267,6 +270,22 @@ def request_resources( "request_id": result["request_id"], } + @staticmethod + def get_concurrent_result(f: Future, timeout: float | None = None): + """ + Get the result of a future with a timeout. + If the request is canceled, we will catch a RequestCanceledError and hang the program. + The hanged program will be killed by the abort exception in the task actor, which will + be handled in the task actor to clean up the lab. + """ + try: + return f.result(timeout=timeout) + except RequestCanceledError: + # if there is an abort signal, we will just hang the program + while True: + # abort signal here. It should be handled in the task actor + time.sleep(1) + def release_resources(self, request_id: ObjectId): """Release a request by request_id.""" # For the requests that were CANCELED or ERROR, but have assigned resources, release them @@ -412,11 +431,7 @@ def _handle_canceled_request(self, request_id: ObjectId): # for the canceled request, we will return an empty result # and wait for the abort to be handled by the task actor - f.set_result({ - "devices": {}, - "sample_positions": {}, - "request_id": request_id, - }) + f.set_exception(RequestCanceledError("Abort signal received.")) @staticmethod def _post_process_requested_resource(