Skip to content

Commit

Permalink
TaskVineExecutor: Improve serverless (#2894)
Browse files Browse the repository at this point in the history
This PR brings changes that addresses concerns in PR #2849.

* Added and edited comments and debug messages so they are accurate or more meaningful.
* Fixed the issue where function and arguments are serialized twice. Now they are serialized once.
* Added stronger type hints and checking in several places.
* Added a default function execution mode option in the Executor interface to support for applications that want to exclusively run functions in either mode. Users can still override this global option by specifying the execution mode at the function level via parsl_resource_specification.
* Use multiprocessing.Event to signal the stopping condition instead of Value.
* Other minor fixes.
  • Loading branch information
tphung3 authored Oct 9, 2023
1 parent 3badf9d commit 6b963ba
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 57 deletions.
45 changes: 20 additions & 25 deletions parsl/executors/taskvine/exec_parsl_function.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.files import File
from parsl.utils import get_std_fname_mode
import traceback
import sys

import pickle
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.files import File
from parsl.utils import get_std_fname_mode
from parsl.serialize import deserialize

# This scripts executes a parsl function which is pickled in 3 files:
# This scripts executes a parsl function which is pickled in 4 files:
#
# exec_parsl_function.py map_file function_file result_file
# exec_parsl_function.py map_file function_file argument_file result_file
#
# map_file: Contains a pickled dictionary that indicates which local_paths the
# parsl Files should take.
#
# function_file: Contains a pickle parsl function. Function might be serialized in advance.
# See @parsl.serialize.concretes.py
#
# argument_file: Contains the serialized arguments to the function call.
#
# result_file: A file path, whose content will contain the result of the function, including any
# exception generated. Exceptions will be wrapped with RemoteExceptionWrapper.
#
Expand All @@ -26,12 +30,6 @@
#


def load_pickled_file(filename: str):
""" Load a pickled file and return its pickled object."""
with open(filename, "rb") as f_in:
return pickle.load(f_in)


def dump_result_to_file(result_file: str, result_package):
""" Dump a result to the given result file."""
with open(result_file, "wb") as f_out:
Expand Down Expand Up @@ -78,9 +76,9 @@ def remap_all_files(mapping, fn_args, fn_kwargs):
remap_location(mapping, maybe_file)


def unpack_object(serialized_obj, user_namespace):
from parsl.serialize import deserialize
obj = deserialize(serialized_obj)
def unpack_object_from_file(path):
with open(path, 'rb') as f:
obj = deserialize(f.read())
return obj


Expand Down Expand Up @@ -115,26 +113,23 @@ def encode_byte_code_function(user_namespace, fn, fn_name, args_name, kwargs_nam
def load_function(map_file, function_file, argument_file):
# Decodes the function and its file arguments to be executed into
# function_code, and updates a user namespace with the function name and
# the variable named result_name. When the function is executed, its result
# the variable named `result_name`. When the function is executed, its result
# will be stored in this variable in the user namespace.
# Returns (namespace, function_code, result_name)

# Create the namespace to isolate the function execution.
user_ns = locals()
user_ns.update({'__builtins__': __builtins__})

packed_function = load_pickled_file(function_file)
packed_argument = load_pickled_file(argument_file)

fn = unpack_object(packed_function, user_ns)
args_dict = unpack_object(packed_argument, user_ns)
fn = unpack_object_from_file(function_file)
args_dict = unpack_object_from_file(argument_file)
fn_args = args_dict['args']
fn_kwargs = args_dict['kwargs']
fn_name = 'parsl_tmp_func_name'

mapping = load_pickled_file(map_file)
mapping = unpack_object_from_file(map_file)
remap_all_files(mapping, fn_args, fn_kwargs)

# Create the namespace to isolate the function execution.
user_ns = locals()
user_ns.update({'__builtins__': __builtins__})

(code, result_name) = encode_function(user_ns, fn, fn_name, fn_args, fn_kwargs)

return (user_ns, code, result_name)
Expand Down
46 changes: 25 additions & 21 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import hashlib
import subprocess
import os
import pickle
import queue
import inspect
import shutil
import itertools
import uuid
from ctypes import c_bool
from concurrent.futures import Future
from typing import List, Optional, Union
from typing import List, Optional, Union, Literal

# Import Parsl constructs
import parsl.utils as putils
Expand Down Expand Up @@ -71,12 +69,17 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
with respect to other executors.
Default is "TaskVineExecutor".
worker_launch_method: str
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']]
Choose to use Parsl provider, TaskVine factory, or
manual user-provided workers to scale workers.
Options are among {'provider', 'factory', 'manual'}.
Default is 'factory'.
function_exec_mode: Union[Literal['regular'], Literal['serverless']]
Choose to execute functions with a regular fresh python process or a
pre-warmed forked python process.
Default is 'regular'.
manager_config: TaskVineManagerConfig
Configuration for the TaskVine manager. Default
Expand All @@ -98,7 +101,8 @@ class TaskVineExecutor(BlockProviderExecutor, putils.RepresentationMixin):
@typeguard.typechecked
def __init__(self,
label: str = "TaskVineExecutor",
worker_launch_method: str = 'factory',
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
Expand All @@ -107,9 +111,6 @@ def __init__(self,
# Set worker launch option for this executor
if worker_launch_method == 'factory' or worker_launch_method == 'manual':
provider = None
elif worker_launch_method != 'provider':
raise ExecutorError(self, "Worker launch option '{worker_launch_method}' \
is not supported.")

# Initialize the parent class with the execution provider and block error handling enabled.
# If provider is None, then no worker is launched via the provider method.
Expand All @@ -126,6 +127,7 @@ def __init__(self,
# Executor configurations
self.label = label
self.worker_launch_method = worker_launch_method
self.function_exec_mode = function_exec_mode
self.manager_config = manager_config
self.factory_config = factory_config
self.storage_access = storage_access
Expand All @@ -136,8 +138,8 @@ def __init__(self,
# Queue to send finished tasks from TaskVine manager process to TaskVine executor process
self._finished_task_queue: multiprocessing.Queue = multiprocessing.Queue()

# Value to signal whether the manager and factory processes should stop running
self._should_stop = multiprocessing.Value(c_bool, False)
# Event to signal whether the manager and factory processes should stop running
self._should_stop = multiprocessing.Event()

# TaskVine manager process
self._submit_process = None
Expand Down Expand Up @@ -267,10 +269,11 @@ def start(self):
# Begin work
self._submit_process.start()

# Run worker scaler either with Parsl provider or TaskVine factory
# Run worker scaler either with Parsl provider or TaskVine factory.
# Skip if workers are launched manually.
if self.worker_launch_method == 'factory':
self._factory_process.start()
else:
elif self.worker_launch_method == 'provider':
self.initialize_scaling()

self._collector_thread.start()
Expand Down Expand Up @@ -314,7 +317,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
logger.debug(f'Got resource specification: {resource_specification}')

# Default execution mode of apps is regular
exec_mode = resource_specification.get('exec_mode', 'regular')
exec_mode = resource_specification.get('exec_mode', self.function_exec_mode)

# Detect resources and features of a submitted Parsl app
cores = None
Expand Down Expand Up @@ -391,9 +394,9 @@ def submit(self, func, resource_specification, *args, **kwargs):
and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file))

# Serialize function object and arguments, separately
self._serialize_object(function_file, func)
self._serialize_object_to_file(function_file, func)
args_dict = {'args': args, 'kwargs': kwargs}
self._serialize_object(argument_file, args_dict)
self._serialize_object_to_file(argument_file, args_dict)

# Construct the map file of local filenames at worker
self._construct_map_file(map_file, input_files, output_files)
Expand Down Expand Up @@ -466,11 +469,13 @@ def _patch_providers(self):
if self.project_password_file:
self.provider.transfer_input_files.append(self.project_password_file)

def _serialize_object(self, path, obj):
def _serialize_object_to_file(self, path, obj):
"""Takes any object and serializes it to the file path."""
serialized_obj = serialize(obj, buffer_threshold=1024 * 1024)
with open(path, 'wb') as f_out:
pickle.dump(serialized_obj, f_out)
written = 0
while written < len(serialized_obj):
written += f_out.write(serialized_obj[written:])

def _construct_map_file(self, map_file, input_files, output_files):
""" Map local filepath of parsl files to the filenames at the execution worker.
Expand All @@ -485,8 +490,7 @@ def _construct_map_file(self, map_file, input_files, output_files):
else:
remote_name = local_name
file_translation_map[local_name] = remote_name
with open(map_file, "wb") as f_out:
pickle.dump(file_translation_map, f_out)
self._serialize_object_to_file(map_file, file_translation_map)

def _register_file(self, parsl_file):
"""Generates a tuple (parsl_file.filepath, stage, cache) to give to
Expand Down Expand Up @@ -592,7 +596,7 @@ def shutdown(self, *args, **kwargs):
collector thread, which shuts down the TaskVine system submission.
"""
logger.debug("TaskVine shutdown started")
self._should_stop.value = True
self._should_stop.set()

# Remove the workers that are still going
kill_ids = [self.blocks[block] for block in self.blocks.keys()]
Expand All @@ -618,7 +622,7 @@ def _collect_taskvine_results(self):
"""
logger.debug("Starting Collector Thread")
try:
while not self._should_stop.value:
while not self._should_stop.is_set():
if not self._submit_process.is_alive():
raise ExecutorError(self, "taskvine Submit Process is not alive")

Expand Down
8 changes: 2 additions & 6 deletions parsl/executors/taskvine/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import time

from parsl.process_loggers import wrap_with_logs
from parsl.executors.taskvine.errors import TaskVineFactoryFailure
Expand Down Expand Up @@ -54,11 +53,8 @@ def _taskvine_factory(should_stop, factory_config):
if factory_config.batch_options:
factory.batch_options = factory_config.batch_options

# setup factory context and sleep for a second in every loop to
# avoid wasting CPU
# run factory through Python context and wait for signal to stop.
with factory:
while not should_stop.value:
time.sleep(1)
should_stop.wait()

logger.debug("Exiting TaskVine factory process")
return 0
12 changes: 7 additions & 5 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file,
env_tarball = str(uuid.uuid4()) + '.tar.gz'
logger.debug(f'Creating a poncho environment at {env_tarball} from conda environment {manager_config.env_pack}')
subprocess.run([poncho_create_script, manager_config.env_pack, env_tarball], stdout=subprocess.DEVNULL, check=True)
else:
env_tarball = manager_config.env_pack
poncho_env_file = m.declare_poncho(env_tarball, cache=True, peer_transfer=True)
poncho_env_to_file[manager_config.env_pack] = poncho_env_file
else:
Expand Down Expand Up @@ -194,15 +196,15 @@ def _taskvine_submit_wait(ready_task_queue=None,

logger.debug("Entering main loop of TaskVine manager")

while not should_stop.value:
# Monitor the task queue
while not should_stop.is_set():
# Check if executor process is still running
ppid = os.getppid()
if ppid != orig_ppid:
logger.debug("new Process")
logger.debug("Executor process is detected to have exited. Exiting..")
break

# Submit tasks
while ready_task_queue.qsize() > 0 and not should_stop.value:
while ready_task_queue.qsize() > 0 and not should_stop.is_set():
# Obtain task from ready_task_queue
try:
task = ready_task_queue.get(timeout=1)
Expand Down Expand Up @@ -377,7 +379,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
# If the queue is not empty wait on the TaskVine queue for a task
task_found = True
if not m.empty():
while task_found and not should_stop.value:
while task_found and not should_stop.is_set():
# Obtain the task from the queue
t = m.wait(1)
if t is None:
Expand Down

0 comments on commit 6b963ba

Please sign in to comment.