From 031ce8ea779fe61ba6373b07003e1959db8427eb Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 4 Jul 2023 08:45:27 +0000 Subject: [PATCH] serializer plugin API TODO: (broken) make a test that runs using a custom serializer to an htex worker... to check that remote processes can load a custom serializer. prior to this PR, serializer registration happened in order of class definiton, which in the case of a single concretes file, is the order of the serializers inside that file. this does not work well when trying to define other serializers in other files, because the order of import of files can change subtly and because of distant effects. so doing serialiser registration in a subclass hook is not a very nice thing to do here... this PR should move to a model which defines the default de*serializers explicitly, and allows the user to modify that list explicitly too, to add in new de*serializers. limitations: this is part of work needed to allow serializer plugins, but not all of it, as it does not define how a remote worker will discover plugged-in serializers - for that, perhaps, the header should become something importable? instead of a registry-based ID bytes string. contrasting with that: proxystore doesn't need its own deserializer! it can use the existing pickle/picklecode deserialisers, because what it generates is a pickled object... and that's a legitimate use case... which suggests that serializers don't need to be indexed by "serializer" ID at all on the serializing side, but only on the deserializing side... this API also will not easily allow serializer methods to be defined for values returned from workers to the submit side: the expected place of registration, near the start of the user workflow, does not have a corresponding place on the worker side. this PR separates out serialization behaviour (object -> bytestream) from deserialization behaviour: (identifier, bytestream) -> object, because serializer might not (and in this prototype, *cannot*) implement custom deserialization... due to non-registration on the remote side, and instead can only generate dill/pickle targeted bytestreams the motivating use case for this basic serializer plugin API is supporting proxystore, prototyped in https://github.com/Parsl/parsl/pull/2718 https://labs.globus.org/projects/proxystore.html this PR could also contain an optional proxystore plugin that would need to be activated by a user loading remote serialisers via importlib is a lot of stuff... perhaps i should... just pickle the deserializer and send it (and let pickle deal with that?) - i guess there's two paths here: one is loading remote deserialisers, and the other is focusing on the proxystore use case, which, differently, wants access to the pickle deserializer remotely - and so that drives clearer separation of serializers vs deserialisers... that 2nd case is probably waht i should concentrate on... right now this does module loading in order to work with the class based serializer design inherited from before... but maybe that's not how it should end up... perhaps just serializer callables and deserializer callables? or based around modules? it seems nice to be able to configure the serializer (aka make a class or a partially applied callable) for configuring proxystore... but unclear how that would work for remote end configuration. TODO: as an example plug in, also show 'serpent' as a serialiser/deserialiser pair --- docs/userguide/plugins.rst | 16 ++++ mypy.ini | 9 ++ parsl/serialize/base.py | 26 ++---- parsl/serialize/facade.py | 87 +++++++++++++++---- parsl/serialize/plugin_proxystore.py | 49 +++++++++++ parsl/serialize/plugin_serpent.py | 23 +++++ .../test_serialization/config_proxystore.py | 67 ++++++++++++++ .../test_serialization/config_serpent.py | 70 +++++++++++++++ parsl/tests/test_serialization/test_plugin.py | 57 ++++++++++++ .../test_serialization/test_plugin_htex.py | 61 +++++++++++++ .../test_serialization/test_proxystore.py | 10 +++ .../test_proxystore_htex.py | 39 +++++++++ .../test_serialization/test_serpent_htex.py | 32 +++++++ requirements.txt | 2 + 14 files changed, 511 insertions(+), 37 deletions(-) create mode 100644 parsl/serialize/plugin_proxystore.py create mode 100644 parsl/serialize/plugin_serpent.py create mode 100644 parsl/tests/test_serialization/config_proxystore.py create mode 100644 parsl/tests/test_serialization/config_serpent.py create mode 100644 parsl/tests/test_serialization/test_plugin.py create mode 100644 parsl/tests/test_serialization/test_plugin_htex.py create mode 100644 parsl/tests/test_serialization/test_proxystore.py create mode 100644 parsl/tests/test_serialization/test_proxystore_htex.py create mode 100644 parsl/tests/test_serialization/test_serpent_htex.py diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index e3d4b18186..7e0340cb06 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -77,3 +77,19 @@ from invoking a Parsl app. This includes as the return value of a An specific example of this is integrating Globus Compute tasks into a Parsl task graph. See :ref:`label-join-globus-compute` + + +Serialization +------------- + +By default Parsl will serialize objects with either `pickle` or `dill`, in +some cases applying an LRU cache. + +Parsl has an unstable API to register new serialization methods + +Additional serialization methods can be registered by XXXXX TODO XXXXX + +Limitations: +these mechanisms are not registered on the remote side, so custom +deserialization cannot be implemented -- instead, custom serializers can +only generate byte streams to be deserialized by pickle or dill diff --git a/mypy.ini b/mypy.ini index f8c3c05b8c..f5e3de5b0c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -198,3 +198,12 @@ ignore_missing_imports = True [mypy-setproctitle.*] ignore_missing_imports = True + + +# for serializer plugin demos: + +[mypy-serpent.*] +ignore_missing_imports = True + +[mypy-proxystore.connectors.file] +ignore_missing_imports = True diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index cda2d39f1e..0dea791cfc 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,34 +1,20 @@ -from abc import abstractmethod -import logging +from abc import abstractmethod, ABCMeta from typing import Any -logger = logging.getLogger(__name__) -# GLOBALS -METHODS_MAP_CODE = {} -METHODS_MAP_DATA = {} - - -class SerializerBase: +class SerializerBase(metaclass=ABCMeta): """ Adds shared functionality for all serializer implementations """ - def __init_subclass__(cls, **kwargs: Any) -> None: - """ This forces all child classes to register themselves as - methods for serializing code or data - """ - super().__init_subclass__(**kwargs) - - if cls._for_code: - METHODS_MAP_CODE[cls._identifier] = cls - if cls._for_data: - METHODS_MAP_DATA[cls._identifier] = cls - + # For deserializer _identifier: bytes + + # For serializer _for_code: bool _for_data: bool + # For deserializer @property def identifier(self) -> bytes: """Get that identifier that will be used to indicate in byte streams diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 0759349ac2..c294833792 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,22 +1,64 @@ -from parsl.serialize.concretes import * # noqa: F403,F401 -from parsl.serialize.base import METHODS_MAP_DATA, METHODS_MAP_CODE import logging +import parsl.serialize.concretes as c +from parsl.serialize.base import SerializerBase from typing import Any, List, Union logger = logging.getLogger(__name__) +# these are used for two directions: -""" Instantiate the appropriate classes -""" -methods_for_code = {} -methods_for_data = {} +# 1. to iterate over to find a valid serializer +# for that, the ordering is important -for key in METHODS_MAP_CODE: - methods_for_code[key] = METHODS_MAP_CODE[key]() +# 2. to perform an ID -> deserializer lookup -for key in METHODS_MAP_DATA: - methods_for_data[key] = METHODS_MAP_DATA[key]() +# These must be registered in reverse order of +# importance: later registered serializers +# will take priority over earlier ones. This is +# to facilitate user registered serializers + +methods_for_code: List[SerializerBase] +methods_for_code = [] + +methods_for_data: List[SerializerBase] +methods_for_data = [] + +deserializers = {} + + +def register_serializer(serializer: SerializerBase) -> None: + deserializers[serializer._identifier] = serializer + + if serializer._for_code: + methods_for_code.insert(0, serializer) + if serializer._for_data: + methods_for_data.insert(0, serializer) + + +def unregister_serializer(serializer: SerializerBase) -> None: + logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}") + logger.info(f"BENC: unregistering serializer {serializer}") + if serializer._identifier in deserializers: + del deserializers[serializer._identifier] + else: + logger.warning("BENC: not found in deserializers list") + if serializer in methods_for_code: + logger.info("BENC: removing serializer from methods_for_code") + methods_for_code.remove(serializer) + else: + logger.warning("BENC: not found in methods for code") + if serializer in methods_for_data: + logger.info("BENC: removing serializer from methods_for_data") + methods_for_data.remove(serializer) + else: + logger.warning("BENC: not found in methods for data") + + +register_serializer(c.DillSerializer()) +register_serializer(c.DillCallableSerializer()) +register_serializer(c.PickleSerializer()) +register_serializer(c.PickleCallableSerializer()) def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: @@ -64,10 +106,12 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: else: methods = methods_for_data - for method in methods.values(): + for method in methods: try: + logger.info(f"BENC: trying serializer {method}") result = method._identifier + b'\n' + method.serialize(obj) except Exception as e: + logger.warning(f"BENC: serializer {method} skipping, with exception: {e}") result = e continue else: @@ -91,13 +135,22 @@ def deserialize(payload: bytes) -> Any: """ header, body = payload.split(b'\n', 1) - if header in methods_for_code: - result = methods_for_code[header].deserialize(body) - elif header in methods_for_data: - result = methods_for_data[header].deserialize(body) + if header in deserializers: + result = deserializers[header].deserialize(body) else: - raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header)) - + logger.warning("BENC: unknown serialization header: {!r} - trying to load dynamically".format(header)) + import importlib + module_name, class_name = header.split(b' ', 1) + try: + decoded_module_name = module_name.decode('utf-8') + except UnicodeDecodeError as e: + raise RuntimeError(f"Got unicode error with string {module_name!r} exception is {e}") + module = importlib.import_module(decoded_module_name) + deserializer_class = getattr(module, class_name.decode('utf-8')) + deserializer = deserializer_class() + result = deserializer.deserialize(body) + + # raise TypeError("Invalid serialization header: {!r}".format(header)) return result diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py new file mode 100644 index 0000000000..5c3eacced2 --- /dev/null +++ b/parsl/serialize/plugin_proxystore.py @@ -0,0 +1,49 @@ +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +from proxystore.store import Store, register_store # type: ignore +from proxystore.connectors.file import FileConnector +from parsl.serialize.facade import register_serializer + +from parsl.serialize.base import SerializerBase + +from typing import Any, Optional + +import pickle + + +class ProxyStoreSerializer(SerializerBase): + # TODO: better enforcement of this being bytes, because that's common error i'm making + # and probably it should be autogenerated... + _identifier = b'parsl.serialize.plugin_proxystore ProxyStoreSerializer' # must be class name + _for_code = False + _for_data = True + + def __init__(self, store: Optional[Store] = None) -> None: + """Because of jumbled use of this class for init-time configurable + serialization, and non-configurable remote deserializer loading, the + store field can be None... TODO: this would go away if serializer and + deserializer were split into different objects/classes/functions.""" + self._store = store + + def serialize(self, data: Any) -> bytes: + assert self._store is not None + assert data is not None + p = self._store.proxy(data) + return pickle.dumps(p) + + def deserialize(self, body: bytes) -> Any: + return pickle.loads(body) + + +def register_proxystore_serializer() -> None: + """Initializes proxystore and registers it as a serializer with parsl""" + serializer = create_proxystore_serializer() + register_serializer(serializer) + + +def create_proxystore_serializer() -> ProxyStoreSerializer: + """Creates a serializer but does not register with global system - so this + can be used in testing.""" + + store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + register_store(store) + return ProxyStoreSerializer(store) diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py new file mode 100644 index 0000000000..5020267101 --- /dev/null +++ b/parsl/serialize/plugin_serpent.py @@ -0,0 +1,23 @@ +from parsl.serialize.base import SerializerBase +import serpent + +from typing import Any + + +class SerpentSerializer(SerializerBase): + _identifier = b'parsl.serialize.plugin_serpent SerpentSerializer' + _for_code = False + _for_data = True + + def serialize(self, data: Any) -> bytes: + body = serpent.dumps(data) + roundtripped_data = serpent.loads(body) + + # this round trip is because serpent will sometimes serialize objects + # as best as it can, which is not good enough... + if data != roundtripped_data: + raise ValueError(f"SerpentSerializer cannot roundtrip {data} -> {body} -> {roundtripped_data}") + return body + + def deserialize(self, body: bytes) -> Any: + return serpent.loads(body) diff --git a/parsl/tests/test_serialization/config_proxystore.py b/parsl/tests/test_serialization/config_proxystore.py new file mode 100644 index 0000000000..b6536409d0 --- /dev/null +++ b/parsl/tests/test_serialization/config_proxystore.py @@ -0,0 +1,67 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +# imports for monitoring: +from parsl.monitoring import MonitoringHub + +import os + +from parsl.providers import LocalProvider +from parsl.channels import LocalChannel +from parsl.launchers import SingleNodeLauncher + +from parsl.config import Config +from parsl.executors import HighThroughputExecutor + + +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + + +def fresh_config(): + import parsl.serialize.plugin_proxystore as pspps + pspps.register_proxystore_serializer() + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py new file mode 100644 index 0000000000..5175fa03c8 --- /dev/null +++ b/parsl/tests/test_serialization/config_serpent.py @@ -0,0 +1,70 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +# imports for monitoring: +from parsl.monitoring import MonitoringHub + +import os + +from parsl.providers import LocalProvider +from parsl.channels import LocalChannel +from parsl.launchers import SingleNodeLauncher + +from parsl.config import Config +from parsl.executors import HighThroughputExecutor + + +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging + +from parsl.serialize.facade import register_serializer # TODO: move this into parsl.serialize root as its user exposed + +from parsl.serialize.plugin_serpent import SerpentSerializer + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + + +def fresh_config(): + register_serializer(SerpentSerializer()) + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py new file mode 100644 index 0000000000..bd546f5a5d --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin.py @@ -0,0 +1,57 @@ +# test the serializer plugin API + +import pytest + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class ConstSerializer(SerializerBase): + """This is a test deliberately misbehaving serializer. + It serializes all values to the same constant, and likewise for + deserialization. + """ + + _for_code = True + _for_data = True + + # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? + _identifier = b'parsl.tests.test_serializer.test_plugin ConstSerializer' + # note a space in the name here not final dot, to distinguish modules vs attribute in module + + # TODO: better enforcement of the presence of these values? tied into abstract base class? + + def serialize(self, o): + return B_MAGIC + + def deserialize(self, b): + assert b == B_MAGIC + return V_MAGIC + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@pytest.mark.local +def test_const_inprocess(): + s = ConstSerializer() + register_serializer(s) + + try: + + # check that we aren't using one of the real serializers + # that really works + assert deserialize(serialize(1)) != 1 + + # and that the behaviour looks like ConstSerializer + assert serialize(1) == ConstSerializer._identifier + b'\n' + B_MAGIC + assert deserialize(serialize(1)) == V_MAGIC + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py new file mode 100644 index 0000000000..f486c4ae3c --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -0,0 +1,61 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +logger = logging.getLogger(__name__) + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class XXXXSerializer(SerializerBase): + """This is a test deserializer but puts some padding round to help distinguish... + """ + + _for_code = True + _for_data = True + + # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? + _identifier = b'parsl.tests.test_serialization.test_plugin_htex XXXXSerializer' + # note a space in the name here not final dot, to distinguish modules vs attribute in module + + # TODO: better enforcement of the presence of these values? tied into abstract base class? + + def serialize(self, o): + import dill + logger.error(f"BENC: XXXX serializer serializing value {o} of type {type(o)}") + return dill.dumps(o) + + def deserialize(self, b): + import dill + return dill.loads(b) + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@parsl.python_app +def func(x): + return x + + +@pytest.mark.local +def test_const_inprocess(): + s = XXXXSerializer() + register_serializer(s) + + try: + assert func(100).result() == 100 # but how do we know this went through XXXXSerializer? (or not) + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py new file mode 100644 index 0000000000..08813c60db --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -0,0 +1,10 @@ +import pytest +from parsl.serialize.plugin_proxystore import create_proxystore_serializer + + +@pytest.mark.local +def test_proxystore_wrapper(): + s = create_proxystore_serializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 diff --git a/parsl/tests/test_serialization/test_proxystore_htex.py b/parsl/tests/test_serialization/test_proxystore_htex.py new file mode 100644 index 0000000000..8187702bdb --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore_htex.py @@ -0,0 +1,39 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +from parsl.serialize.plugin_proxystore import create_proxystore_serializer + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x): + return x + 1 + + +# TODO: this kind of test, I'd like to be able to run the entire test suite with the +# proxystore serializer configured, so as to see it tested against many cases. +# A bodge is to do that in fresh_config - but that doesn't work in the case of several +# DFKs in a single process, but there is nothing to unconfigure the process-wide state. +# Is this only a test limit? or is it something that should be properly exposed to +# users? + +@pytest.mark.local +def test_proxystore_single_call(): + s = create_proxystore_serializer() # TODO this isnt' testing the register_proxystore_serializer function... + register_serializer(s) + + try: + assert func(100).result() == 101 # but how do we know this went through proxystore? + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_serpent_htex.py b/parsl/tests/test_serialization/test_serpent_htex.py new file mode 100644 index 0000000000..c69b53dbd2 --- /dev/null +++ b/parsl/tests/test_serialization/test_serpent_htex.py @@ -0,0 +1,32 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +from parsl.serialize.plugin_serpent import SerpentSerializer + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x): + return x + 1 + + +@pytest.mark.local +def test_serpent_single_call(): + s = SerpentSerializer() + register_serializer(s) + + try: + assert func(100).result() == 101 # but how do we know this went through serpent? TODO + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/requirements.txt b/requirements.txt index fa1c5bf281..6a757cbd72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ requests paramiko psutil>=5.5.1 setproctitle +proxystore # for proxystore demo plugin +serpent # for serpent demo plugin