diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index cc3731fae0..6cec168b5d 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -752,13 +752,6 @@ def launch_task(self, task_record: TaskRecord) -> Future: monitor_resources=executor.monitor_resources(), run_dir=self.run_dir) - # hint executors that this function will be monitored - if task_record['resource_specification']: - task_record['resource_specification'].update({'_is_monitoring_enabled': True}) - else: - if task_record['resource_specification']: - task_record['resource_specification'].update({'_is_monitoring_enabled': False}) - with self.submitter_lock: exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) self.update_task_state(task_record, States.launched) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 572729a6c8..d7a7bfe2ad 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -341,11 +341,15 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.debug(f'Got resource specification: {resource_specification}') - is_monitoring_enabled = resource_specification.get('_is_monitoring_enabled', False) + # If `_parsl_monitoring_task_id` is in kwargs, Parsl monitoring code is enabled. + is_monitoring_enabled = '_parsl_monitoring_task_id' in kwargs # Default execution mode of apps is regular exec_mode = resource_specification.get('exec_mode', self.function_exec_mode) + # Fall back to regular execution if a function is Parsl-monitored as a monitored function is invocation-specific. + # Note that it is possible to get the wrapped function by calling the `__wrapped__` attribute when monitoring is enabled. + # It will disable the monitoring wrapper code however. if exec_mode == 'serverless' and is_monitoring_enabled: logger.warning("A serverless task cannot run with Parsl monitoring enabled. Falling back to execute this task as a regular task.") exec_mode = 'regular'