diff --git a/parsl/executors/globus_compute.py b/parsl/executors/globus_compute.py index a6fd8221d0..a7920783a2 100644 --- a/parsl/executors/globus_compute.py +++ b/parsl/executors/globus_compute.py @@ -98,6 +98,7 @@ def __init__( self.batch_size = batch_size self.amqp_port = amqp_port self.client = client + self.executor_kwargs = kwargs if not _globus_compute_enabled: raise OptionalModuleMissing( @@ -105,21 +106,21 @@ def __init__( "GlobusComputeExecutor requires globus-compute-sdk installed" ) + def start(self) -> None: + """ Start the Globus Compute Executor """ + self._executor: Executor = Executor( - endpoint_id=endpoint_id, - task_group_id=task_group_id, - resource_specification=resource_specification, - user_endpoint_config=user_endpoint_config, - label=label, - batch_size=batch_size, - amqp_port=amqp_port, + endpoint_id=self.endpoint_id, + task_group_id=self.task_group_id, + resource_specification=self.resource_specification, + user_endpoint_config=self.user_endpoint_config, + label=self.label, + batch_size=self.batch_size, + amqp_port=self.amqp_port, client=self.client, - **kwargs + **self.executor_kwargs ) - def start(self) -> None: - pass - def submit(self, func: Callable, resource_specification: Dict[str, Any], *args: Any, **kwargs: Any) -> Future: """ Submit func to globus-compute @@ -164,4 +165,4 @@ def shutdown(self): GCE.shutdown will cancel all futures that have not yet registered with Globus Compute and will not wait for the launched futures to complete. """ - return self._executor.shutdown(wait=False, cancel_futures=True) + self._executor.shutdown(wait=False, cancel_futures=True)