diff --git a/Makefile b/Makefile index 90f20601e9..a758f5fa6e 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,7 @@ radical_local_test: .PHONY: config_local_test config_local_test: $(CCTOOLS_INSTALL) - pip3 install ".[monitoring,visualization,proxystore]" + pip3 install ".[monitoring,visualization,proxystore,serializer_demos]" PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..1488b6bfd9 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -87,6 +87,20 @@ from invoking a Parsl app. This includes as the return value of a An specific example of this is integrating Globus Compute tasks into a Parsl task graph. See :ref:`label-join-globus-compute` + +Serialization +------------- + +By default Parsl will serialize objects with either ``pickle`` or ``dill``, in +some cases applying an LRU cache. + +Parsl has an unstable API to register new serialization methods + +Additional serialization methods can be registered by XXXXX TODO XXXXX + +Limitations: +TODO + Dependency resolution --------------------- diff --git a/mypy.ini b/mypy.ini index 604fa4d07a..57c7ee850b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -69,6 +69,12 @@ check_untyped_defs = True disallow_subclassing_any = True disallow_untyped_defs = True +[mypy-parsl.serialize.plugin_proxystore_deep_pickle.*] +disallow_subclassing_any = False + +[mypy-parsl.serialize.plugin_codeprotector.*] +disallow_subclassing_any = False + [mypy-parsl.serialize.proxystore.*] # parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any") disallow_subclassing_any = False @@ -183,6 +189,9 @@ ignore_missing_imports = True #[mypy-multiprocessing.synchronization.*] #ignore_missing_imports = True +[mypy-proxystore.connectors.file] +ignore_missing_imports = True + [mypy-pandas.*] ignore_missing_imports = True @@ -204,5 +213,11 @@ ignore_missing_imports = True [mypy-setproctitle.*] ignore_missing_imports = True + +# for serializer plugin demos: + +[mypy-serpent.*] +ignore_missing_imports = True + [mypy-proxystore.*] ignore_missing_imports = True diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ad88702744..72e0256744 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -493,6 +493,9 @@ def _queue_management_worker(self): if 'result' in msg: result = deserialize(msg['result']) + # ^ if this raises an exception, queue management worker fails and + # parsl hangs. this is more relevant now allowing user-pluggable + # serialization and so user code exceptions can be raised here. task_fut.set_result(result) elif 'exception' in msg: diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 188c68db14..577af37d82 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,12 +1,9 @@ -import logging -from abc import abstractmethod +from abc import ABCMeta, abstractmethod from functools import cached_property from typing import Any -logger = logging.getLogger(__name__) - -class SerializerBase: +class SerializerBase(metaclass=ABCMeta): """ Adds shared functionality for all serializer implementations """ diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index f8e76f174b..90d4c07aba 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -8,34 +8,80 @@ logger = logging.getLogger(__name__) +methods_for_code: List[SerializerBase] +methods_for_code = [] + +methods_for_data: List[SerializerBase] +methods_for_data = [] + +deserializers: Dict[bytes, SerializerBase] +deserializers = {} + + +# maybe it's weird to want to clear away all serializers rather than just the +# data or just code ones? eg in proxystore test, clearing serializers means +# there is no code serializer... so just data sreializer should be cleared +# (or if only having one kind of serializer, no code/data distinction, then +# this is more consistent, but clearing serializer is still a bit weird in +# that case (maybe for security?) - with inserting one near the start more +# usual? +# def clear_serializers() -> None: +# # does not clear deserializers because remote sending back will have a +# # different serializer list (and wants to send back results with one of +# # the default 4) so clearing all the deserializers means we cannot receive +# # results... +# global methods_for_data, methods_for_code +# methods_for_code = [] +# methods_for_data = [] + + +def unregister_serializer(serializer: SerializerBase) -> None: + logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}") + logger.info(f"BENC: unregistering serializer {serializer}") + if serializer.identifier in deserializers: + del deserializers[serializer.identifier] + else: + logger.warning("BENC: not found in deserializers list") + if serializer in methods_for_code: + logger.info("BENC: removing serializer from methods_for_code") + methods_for_code.remove(serializer) + else: + logger.warning("BENC: not found in methods for code") + if serializer in methods_for_data: + logger.info("BENC: removing serializer from methods_for_data") + methods_for_data.remove(serializer) + else: + logger.warning("BENC: not found in methods for data") -methods_for_code: Dict[bytes, SerializerBase] = {} +# structuring it this way is probably wrong - should perhaps be a single +# Pickle-variant (or dill-variant) that is used, with all pluggable hooked +# in - eg proxystore should hook into the same Pickle/Dill subclass as +# CodeProtector? def register_method_for_code(s: SerializerBase) -> None: - methods_for_code[s.identifier] = s + deserializers[s.identifier] = s + methods_for_code.insert(0, s) register_method_for_code(concretes.DillCallableSerializer()) - - -methods_for_data: Dict[bytes, SerializerBase] = {} +# register_method_for_code(CodeProtectorSerializer()) def register_method_for_data(s: SerializerBase) -> None: - methods_for_data[s.identifier] = s + deserializers[s.identifier] = s + methods_for_data.insert(0, s) +# These must be registered in reverse order of +# importance: later registered serializers +# will take priority over earlier ones. This is +# to facilitate user registered serializers + register_method_for_data(concretes.PickleSerializer()) register_method_for_data(concretes.DillSerializer()) -# When deserialize dynamically loads a deserializer, it will be stored here, -# rather than in the methods_for_* dictionaries, so that loading does not -# cause it to be used for future serializations. -additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {} - - def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: """Serialize and pack function and parameters @@ -106,24 +152,35 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: Individual serialization methods might raise a TypeError (eg. if objects are non serializable) This method will raise the exception from the last method that was tried, if all methods fail. """ - result: Union[bytes, Exception] + logger.info(f"BENC: Trying to serialize {obj}") + result: Union[bytes, Exception, None] + result = None if callable(obj): methods = methods_for_code else: methods = methods_for_data - for method in methods.values(): + if methods == []: + raise RuntimeError("There are no configured serializers") + + for method in methods: try: + logger.info(f"BENC: trying serializer {method}") result = method.identifier + b'\n' + method.serialize(obj) except Exception as e: + logger.warning(f"BENC: serializer {method} skipping, with exception: {e}") result = e continue else: break - if isinstance(result, BaseException): + if result is None: + raise RuntimeError("No serializer returned a result") + elif isinstance(result, BaseException): + logger.error("Serializer returned an excepton, reraise") raise result else: + logger.debug("Serialization complete") if len(result) > buffer_threshold: logger.warning(f"Serialized object exceeds buffer threshold of {buffer_threshold} bytes, this could cause overflows") return result @@ -139,12 +196,8 @@ def deserialize(payload: bytes) -> Any: """ header, body = payload.split(b'\n', 1) - if header in methods_for_code: - deserializer = methods_for_code[header] - elif header in methods_for_data: - deserializer = methods_for_data[header] - elif header in additional_methods_for_deserialization: - deserializer = additional_methods_for_deserialization[header] + if header in deserializers: + deserializer = deserializers[header] else: logger.info("Trying to dynamically load deserializer: {!r}".format(header)) # This is a user plugin point, so expect exceptions to happen. @@ -154,7 +207,7 @@ def deserialize(payload: bytes) -> Any: module = importlib.import_module(decoded_module_name) deserializer_class = getattr(module, class_name.decode('utf-8')) deserializer = deserializer_class() - additional_methods_for_deserialization[header] = deserializer + deserializers[header] = deserializer except Exception as e: raise DeserializerPluginError(header) from e diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py new file mode 100644 index 0000000000..75f6966f22 --- /dev/null +++ b/parsl/serialize/plugin_codeprotector.py @@ -0,0 +1,44 @@ +import io +import logging +import sys +import types +from typing import Any + +import dill + +from parsl.serialize.base import SerializerBase + +logger = logging.getLogger(__name__) + + +def _deprotectCode(major: int, minor: int, b: bytes) -> Any: + if sys.version_info.major != major: + raise RuntimeError("Major version mismatch deserializing code") + if sys.version_info.minor != minor: + raise RuntimeError("Major version mismatch deserializing code") + + return dill.loads(b) + + +class CodeProtectorPickler(dill.Pickler): + + def reducer_override(self, o: Any) -> Any: + logger.info(f"BENC: reducing object {o!r} of type {type(o)}") + if isinstance(o, types.CodeType): + logger.info(f"BENC: special casing code object {o!r} of type {type(o)}") + return (_deprotectCode, (sys.version_info.major, sys.version_info.minor, dill.dumps(o))) + + return NotImplemented + + +class CodeProtectorSerializer(SerializerBase): + + def serialize(self, data: Any) -> bytes: + + f = io.BytesIO() + pickler = CodeProtectorPickler(file=f) + pickler.dump(data) + return f.getvalue() + + def deserialize(self, body: bytes) -> Any: + return dill.loads(body) diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py new file mode 100644 index 0000000000..ea76de8ec1 --- /dev/null +++ b/parsl/serialize/plugin_proxystore.py @@ -0,0 +1,44 @@ +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +import pickle +from typing import Any, Optional + +from proxystore.connectors.file import FileConnector +from proxystore.store import Store, register_store + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import register_method_for_data + + +class ProxyStoreSerializer(SerializerBase): + + def __init__(self, store: Optional[Store] = None) -> None: + """Because of jumbled use of this class for init-time configurable + serialization, and non-configurable remote deserializer loading, the + store field can be None... TODO: this would go away if serializer and + deserializer were split into different objects/classes/functions.""" + self._store = store + + def serialize(self, data: Any) -> bytes: + assert self._store is not None + assert data is not None + p = self._store.proxy(data) + return pickle.dumps(p) + + def deserialize(self, body: bytes) -> Any: + return pickle.loads(body) + + +def register_proxystore_serializer() -> None: + """Initializes proxystore and registers it as a serializer with parsl""" + serializer = create_proxystore_serializer() + register_method_for_data(serializer) + + +def create_proxystore_serializer() -> ProxyStoreSerializer: + """Creates a serializer but does not register with global system - so this + can be used in testing.""" + + import uuid + store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) + register_store(store) + return ProxyStoreSerializer(store) diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py new file mode 100644 index 0000000000..45a2c9d43f --- /dev/null +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -0,0 +1,74 @@ +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +import io +import logging +from typing import Any, Optional, Type + +import dill +from proxystore.connectors.file import FileConnector +from proxystore.store import Store, register_store + +from parsl.serialize.base import SerializerBase + +logger = logging.getLogger(__name__) + + +class ProxyStoreDeepPickler(dill.Pickler): + + def __init__(self, *args: Any, policy: Type, store: Store, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._store = store + self._policy = policy + + def reducer_override(self, o: Any) -> Any: + logger.info(f"BENC: reducing object {o!r}") + + if type(o) is self._policy: # not isinstance, because want exact class match + logger.info("BENC: Policy class detected") + proxy = self._store.proxy(o) + return proxy.__reduce__() + else: + # fall through to pickle... + return NotImplemented + + +class ProxyStoreDeepSerializer(SerializerBase): + + def __init__(self, *, policy: Optional[Type] = None, store: Optional[Store] = None) -> None: + """Because of jumbled use of this class for init-time configurable + serialization, and non-configurable remote deserializer loading, the + store and policy fields can be None... TODO: this would go away if serializer and + deserializer were split into different objects/classes/functions, like Pickler and + Unpickler are""" + self._store = store + self._policy = policy + + def serialize(self, data: Any) -> bytes: + assert self._store is not None + assert self._policy is not None + + assert data is not None + + # TODO: pluggable policy should go here... what does that look like? + # TODO: this policy belongs in the pickler plugin, not top level parsl serializer plugin + # if not isinstance(data, int): + # raise RuntimeError(f"explicit policy will only proxy ints, not {type(data)}") + + f = io.BytesIO() + pickler = ProxyStoreDeepPickler(file=f, store=self._store, policy=self._policy) + pickler.dump(data) + return f.getvalue() + + def deserialize(self, body: bytes) -> Any: + # because we aren't customising deserialization, use regular + # dill for deserialization; but otherwise could create a + # custom Unpickler here... + return dill.loads(body) + + +def create_deep_proxystore_serializer(*, policy: Type) -> ProxyStoreDeepSerializer: + """Creates a serializer but does not register with global system - so this + can be used in testing.""" + + store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + register_store(store) + return ProxyStoreDeepSerializer(store=store, policy=policy) diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py new file mode 100644 index 0000000000..338dc91cac --- /dev/null +++ b/parsl/serialize/plugin_serpent.py @@ -0,0 +1,21 @@ +from typing import Any + +import serpent + +from parsl.serialize.base import SerializerBase + + +class SerpentSerializer(SerializerBase): + + def serialize(self, data: Any) -> bytes: + body = serpent.dumps(data) + roundtripped_data = serpent.loads(body) + + # this round trip is because serpent will sometimes serialize objects + # as best as it can, which is not good enough... + if data != roundtripped_data: + raise ValueError(f"SerpentSerializer cannot roundtrip {data} -> {body} -> {roundtripped_data}") + return body + + def deserialize(self, body: bytes) -> Any: + return serpent.loads(body) diff --git a/parsl/tests/test_serialization/config_proxystore.py b/parsl/tests/test_serialization/config_proxystore.py new file mode 100644 index 0000000000..4752b2800d --- /dev/null +++ b/parsl/tests/test_serialization/config_proxystore.py @@ -0,0 +1,64 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +import os + +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SingleNodeLauncher + +# imports for monitoring: +from parsl.monitoring import MonitoringHub +from parsl.providers import LocalProvider + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + + +def fresh_config(): + import parsl.serialize.plugin_proxystore as pspps + pspps.register_proxystore_serializer() + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py new file mode 100644 index 0000000000..36a9e40035 --- /dev/null +++ b/parsl/tests/test_serialization/config_serpent.py @@ -0,0 +1,79 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +import os + +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SingleNodeLauncher + +# imports for monitoring: +from parsl.monitoring import MonitoringHub +from parsl.providers import LocalProvider +from parsl.serialize.facade import ( # TODO: move this into parsl.serialize root as its user exposed + methods_for_data, + register_method_for_data, +) +from parsl.serialize.plugin_serpent import SerpentSerializer + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + +import logging + +logger = logging.getLogger(__name__) + + +def fresh_config(): + # get rid of the default serializers so that only serpent will be used in + # data mode. + global methods_for_data + logger.error(f"BENC: before reset, methods_for_data = {methods_for_data}") + methods_for_data.clear() + logger.error(f"BENC: after reset, methods_for_data = {methods_for_data}") + register_method_for_data(SerpentSerializer()) + logger.error(f"BENC: after config, methods_for_data = {methods_for_data}") + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/test_htex_code_cache.py b/parsl/tests/test_serialization/test_htex_code_cache.py index 65cb72527a..85c2348368 100644 --- a/parsl/tests/test_serialization/test_htex_code_cache.py +++ b/parsl/tests/test_serialization/test_htex_code_cache.py @@ -17,9 +17,9 @@ def test_caching() -> None: # for future serializer devs: if this is failing because you added another # code serializer, you'll also probably need to re-think what is being tested # about serialization caching here. - assert len(methods_for_code) == 1 - - serializer = methods_for_code[b'C2'] + assert len(methods_for_code) == 1, "This test expects one default code serializer" + serializer = methods_for_code[0] + assert serializer.identifier == b'C2', "This test expects the default code serializer to have identifier C2" # force type to Any here because a serializer method coming from # methods_for_code doesn't statically have any cache management diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py new file mode 100644 index 0000000000..e2099299bd --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin.py @@ -0,0 +1,55 @@ +# test the serializer plugin API + +import pytest + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class ConstSerializer(SerializerBase): + """This is a test deliberately misbehaving serializer. + It serializes all values to the same constant, and likewise for + deserialization. + """ + + # TODO: better enforcement of the presence of these values? tied into abstract base class? + + def serialize(self, o): + return B_MAGIC + + def deserialize(self, b): + assert b == B_MAGIC + return V_MAGIC + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@pytest.mark.local +def test_const_inprocess(): + s = ConstSerializer() + register_method_for_data(s) + + try: + + # check that we aren't using one of the real serializers + # that really works + assert deserialize(serialize(1)) != 1 + + # and that the behaviour looks like ConstSerializer + assert serialize(1) == s.identifier + b'\n' + B_MAGIC + assert deserialize(serialize(1)) == V_MAGIC + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py new file mode 100644 index 0000000000..8f8fe46b04 --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -0,0 +1,57 @@ +# test the serializer plugin API +import logging + +import pytest + +import parsl +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.tests.configs.htex_local import fresh_config as local_config + +logger = logging.getLogger(__name__) + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class XXXXSerializer(SerializerBase): + """This is a test deserializer but puts some padding round to help distinguish... + """ + + def serialize(self, o): + import dill + logger.error(f"BENC: XXXX serializer serializing value {o} of type {type(o)}") + return dill.dumps(o) + + def deserialize(self, b): + import dill + return dill.loads(b) + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@parsl.python_app +def func(x): + return x + + +@pytest.mark.local +def test_const_inprocess(): + s = XXXXSerializer() + register_method_for_data(s) + + try: + assert func(100).result() == 100 # but how do we know this went through XXXXSerializer? (or not) + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py new file mode 100644 index 0000000000..4aa59800c6 --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -0,0 +1,11 @@ +import pytest + +from parsl.serialize.plugin_proxystore import create_proxystore_serializer + + +@pytest.mark.local +def test_proxystore_wrapper(): + s = create_proxystore_serializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 diff --git a/parsl/tests/test_serialization/test_proxystore_configured.py b/parsl/tests/test_serialization/test_proxystore_configured.py index 90bea12ac2..6e51729c77 100644 --- a/parsl/tests/test_serialization/test_proxystore_configured.py +++ b/parsl/tests/test_serialization/test_proxystore_configured.py @@ -5,7 +5,7 @@ import parsl from parsl.serialize.facade import ( - additional_methods_for_deserialization, + deserializers, methods_for_data, register_method_for_data, ) @@ -15,6 +15,7 @@ def local_setup(): + global s from proxystore.connectors.file import FileConnector from proxystore.store import Store, register_store @@ -30,11 +31,6 @@ def local_setup(): global previous_methods previous_methods = methods_for_data.copy() - # get rid of all data serialization methods, in preparation for using only - # proxystore. put all the old methods as additional methods used only for - # deserialization, because those will be needed to deserialize the results, - # which will be serialized using the default serializer set. - additional_methods_for_deserialization.update(previous_methods) methods_for_data.clear() register_method_for_data(s) @@ -42,12 +38,13 @@ def local_setup(): def local_teardown(): + global s parsl.dfk().cleanup() methods_for_data.clear() - methods_for_data.update(previous_methods) + methods_for_data.extend(previous_methods) - additional_methods_for_deserialization.clear() + del deserializers[s.identifier] @parsl.python_app diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py new file mode 100644 index 0000000000..a454d3f44e --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -0,0 +1,72 @@ +# test the serializer plugin API +import logging + +import pytest + +import parsl +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.serialize.plugin_proxystore_deep_pickle import ( + create_deep_proxystore_serializer, +) +from parsl.tests.configs.htex_local import fresh_config as local_config + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x, y): + return x + y + 1 + + +@parsl.python_app +def func2(v): + return str(v) + + +class MyDemo: + def __init__(self, v): + self._v = v + + def __str__(self): + return str(self._v) + + +# TODO: this kind of test, I'd like to be able to run the entire test suite with the +# proxystore serializer configured, so as to see it tested against many cases. +# A bodge is to do that in fresh_config - but that doesn't work in the case of several +# DFKs in a single process, but there is nothing to unconfigure the process-wide state. +# Is this only a test limit? or is it something that should be properly exposed to +# users? + +@pytest.mark.local +def test_proxystore_single_call(): + later_c = [v for v in parsl.serialize.facade.methods_for_code] + later_d = [v for v in parsl.serialize.facade.methods_for_data] + + # clear the data serializers, leaving the code serializers in place + parsl.serialize.facade.methods_for_data = [] + + s = create_deep_proxystore_serializer(policy=MyDemo) + + register_method_for_data(s) + + try: + assert func(100, 4).result() == 105 # but how do we know this went through proxystore? + + m = MyDemo([1, 2, 3, 4]) + + assert func2(m).result() == "[1, 2, 3, 4]" + + finally: + unregister_serializer(s) + + parsl.serialize.facade.methods_for_code = later_c + parsl.serialize.facade.methods_for_data = later_d + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_proxystore_htex.py b/parsl/tests/test_serialization/test_proxystore_htex.py new file mode 100644 index 0000000000..9f71ea3b85 --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore_htex.py @@ -0,0 +1,43 @@ +# test the serializer plugin API +import logging + +import pytest + +import parsl +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.serialize.plugin_proxystore import create_proxystore_serializer +from parsl.tests.configs.htex_local import fresh_config as local_config + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x, y): + return x + y + 1 + + +# TODO: this kind of test, I'd like to be able to run the entire test suite with the +# proxystore serializer configured, so as to see it tested against many cases. +# A bodge is to do that in fresh_config - but that doesn't work in the case of several +# DFKs in a single process, but there is nothing to unconfigure the process-wide state. +# Is this only a test limit? or is it something that should be properly exposed to +# users? + +@pytest.mark.local +def test_proxystore_single_call(): + s = create_proxystore_serializer() # TODO this isnt' testing the register_proxystore_serializer function... + register_method_for_data(s) + + try: + assert func(100, 4).result() == 105 # but how do we know this went through proxystore? + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_serpent_htex.py b/parsl/tests/test_serialization/test_serpent_htex.py new file mode 100644 index 0000000000..395f6dc8de --- /dev/null +++ b/parsl/tests/test_serialization/test_serpent_htex.py @@ -0,0 +1,36 @@ +# test the serializer plugin API +import logging + +import pytest + +import parsl +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.serialize.plugin_serpent import SerpentSerializer +from parsl.tests.configs.htex_local import fresh_config as local_config + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x): + return x + 1 + + +@pytest.mark.local +def test_serpent_single_call(): + s = SerpentSerializer() + register_method_for_data(s) + + try: + assert func(100).result() == 101 # but how do we know this went through serpent? TODO + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/setup.py b/setup.py index 85e014dc18..2b6dffad15 100755 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ 'azure' : ['azure<=4', 'msrestazure'], 'workqueue': ['work_queue'], 'flux': ['pyyaml', 'cffi', 'jsonschema'], + 'serializer_demos': ['proxystore', 'serpent'], 'proxystore': ['proxystore'], 'radical-pilot': ['radical.pilot==1.60', 'radical.utils==1.60'], # Disabling psi-j since github direct links are not allowed by pypi