From 6932e96a68a4ab3fc81cebb0970c5cbe5c769f01 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 May 2023 11:22:25 +0000 Subject: [PATCH 01/39] Passes many tests but fails at test_memoize.py --- parsl/serialize/concretes.py | 26 ++++++++++++++++++++++++++ parsl/serialize/facade.py | 3 +++ 2 files changed, 29 insertions(+) diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index e66ab17a73..a9cafa24bc 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -7,6 +7,32 @@ from typing import Any +from proxystore.store import Store, get_store, register_store +from proxystore.connectors.file import FileConnector +store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) +register_store(store) + +class ProxyStoreSerializer(SerializerBase): + _identifier = b'99\n' + _for_code = False + _for_data = True + + def serialize(self, data: Any) -> bytes: + + store = get_store("parsl_store") + assert store is not None, "Could not find store" + + p = store.proxy(data) + + d = pickle.dumps(p) + + return self.identifier + d + + def deserialize(self, payload: bytes) -> Any: + chomped = self.chomp(payload) + proxy = pickle.loads(chomped) + return proxy + class PickleSerializer(SerializerBase): """ Pickle serialization covers most python objects, with some notable exceptions: diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 4ff9e08c1d..39767e08dd 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -78,9 +78,12 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: break else: for method in methods_for_data.values(): + logger.error(f"BENC: trying serializer {method}") try: result = method.serialize(obj) + logger.error(f"BENC: successful with serializer {method}") except Exception as e: + logger.error(f"BENC: failed with with serializer {method}, error: {e}", exc_info=True) result = e continue else: From 5e84f9f3a672d28703a80604a171b2676a8345dc Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 May 2023 13:04:42 +0000 Subject: [PATCH 02/39] fix flake8, mypy --- parsl/serialize/concretes.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index a9cafa24bc..0fac693ef5 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -7,18 +7,21 @@ from typing import Any -from proxystore.store import Store, get_store, register_store +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +from proxystore.store import Store, get_store, register_store # type: ignore + from proxystore.connectors.file import FileConnector store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) register_store(store) + class ProxyStoreSerializer(SerializerBase): _identifier = b'99\n' _for_code = False _for_data = True def serialize(self, data: Any) -> bytes: - + store = get_store("parsl_store") assert store is not None, "Could not find store" From af42d3a1723324a82bbaca1d3e98ce5bf96765d4 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 May 2023 13:11:36 +0000 Subject: [PATCH 03/39] install proxystore --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 07cdba8ba5..e640b1cd85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ requests paramiko psutil>=5.5.1 setproctitle +proxystore From f29054e8fa9e05e84386be23254d78186f2bc583 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 May 2023 13:31:32 +0000 Subject: [PATCH 04/39] Can't use pluggable serializers for memoization... i) they aren't required to return consistent results ii) changing serialiser order/configuration between runs would mean memoisation get different values between runs for the same input structure --- parsl/dataflow/memoization.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/parsl/dataflow/memoization.py b/parsl/dataflow/memoization.py index 45c6e51d23..7623f76992 100644 --- a/parsl/dataflow/memoization.py +++ b/parsl/dataflow/memoization.py @@ -11,7 +11,13 @@ from concurrent.futures import Future -from parsl.serialize import serialize +# this serialize doesn't work right for memoisation when using Interesting Serializers such as proxystore +# from parsl.serialize import serialize + +def serialize(d: Any) -> bytes: + import pickle + return pickle.dumps(d) + import types logger = logging.getLogger(__name__) From 0acdbcf5e545213f570c56cc0443aea682c7443f Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 19 May 2023 13:33:37 +0000 Subject: [PATCH 05/39] fix flake8 --- parsl/dataflow/memoization.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/parsl/dataflow/memoization.py b/parsl/dataflow/memoization.py index 7623f76992..1a08fef395 100644 --- a/parsl/dataflow/memoization.py +++ b/parsl/dataflow/memoization.py @@ -14,10 +14,12 @@ # this serialize doesn't work right for memoisation when using Interesting Serializers such as proxystore # from parsl.serialize import serialize + def serialize(d: Any) -> bytes: import pickle return pickle.dumps(d) + import types logger = logging.getLogger(__name__) From b15565c5fcdcc2e1a31f6fbbc56d62042bfd4eda Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 May 2023 13:03:44 +0000 Subject: [PATCH 06/39] Isolate one test from parsl/tests/test_python_apps/test_memoize_bad_id_for_memo.py that is still failing, for reasons i do not understand --- parsl/tests/test_proxystore.py | 41 ++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 parsl/tests/test_proxystore.py diff --git a/parsl/tests/test_proxystore.py b/parsl/tests/test_proxystore.py new file mode 100644 index 0000000000..4f71540043 --- /dev/null +++ b/parsl/tests/test_proxystore.py @@ -0,0 +1,41 @@ +import pytest +from parsl import python_app +from parsl.dataflow.memoization import id_for_memo + + +# this class should not have a memoizer registered for it +class Unmemoizable: + pass + + +# this class should have a memoizer that always raises an +# exception +class FailingMemoizable: + pass + + +class FailingMemoizerTestError(ValueError): + pass + + +@id_for_memo.register(FailingMemoizable) +def failing_memoizer(v, output_ref=False): + raise FailingMemoizerTestError("Deliberate memoizer failure") + + +@python_app(cache=True) +def noop_app(x, inputs=[], cache=True): + return None + + +@python_app +def sleep(t): + import time + time.sleep(t) + + +def test_python_unmemoizable_after_dep(): + sleep_fut = sleep(1) + fut = noop_app(Unmemoizable(), inputs=[sleep_fut]) + with pytest.raises(ValueError): + fut.result() From c075a36541926bc9297413d8d973dda89100c327 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 23 May 2023 13:30:51 +0000 Subject: [PATCH 07/39] I think proxystore cannot store objects of type None --- parsl/serialize/concretes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index 0fac693ef5..8ff7159fd2 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -21,7 +21,7 @@ class ProxyStoreSerializer(SerializerBase): _for_data = True def serialize(self, data: Any) -> bytes: - + assert data is not None store = get_store("parsl_store") assert store is not None, "Could not find store" From 501b065e4cb3baaeda8b4bdd7b6ba1bf453b6c5e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 30 Jun 2023 17:29:53 +0000 Subject: [PATCH 08/39] Remove unused internal serializer _list_methods method This mirrors a removal in Globus Compute, which originally contributed this code as a fork of the funcX serializer: PR https://github.com/funcx-faas/funcX/pull/1153 commit (in funcX) 955e666fe6279571bf6024c239fc789595d8e206 --- parsl/serialize/facade.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index ff55d94f19..14de96fd10 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,8 +1,8 @@ from parsl.serialize.concretes import * # noqa: F403,F401 -from parsl.serialize.base import METHODS_MAP_DATA, METHODS_MAP_CODE, SerializerBase +from parsl.serialize.base import METHODS_MAP_DATA, METHODS_MAP_CODE import logging -from typing import Any, Dict, List, Tuple, Union +from typing import Any, List, Union logger = logging.getLogger(__name__) @@ -23,10 +23,6 @@ methods_for_data[key] = METHODS_MAP_DATA[key]() -def _list_methods() -> Tuple[Dict[bytes, SerializerBase], Dict[bytes, SerializerBase]]: - return methods_for_code, methods_for_data - - def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: """Serialize and pack function and parameters From 66920e72d34346072e33eebe21291a564c1efaa6 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 30 Jun 2023 17:19:39 +0000 Subject: [PATCH 09/39] Add some basic unit tests of serializer components --- parsl/tests/test_serialization/test_basic.py | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 parsl/tests/test_serialization/test_basic.py diff --git a/parsl/tests/test_serialization/test_basic.py b/parsl/tests/test_serialization/test_basic.py new file mode 100644 index 0000000000..ad89d1eccc --- /dev/null +++ b/parsl/tests/test_serialization/test_basic.py @@ -0,0 +1,24 @@ +import pytest +from parsl.serialize import serialize, deserialize +from parsl.serialize.concretes import DillSerializer, PickleSerializer + + +@pytest.mark.local +def test_serialize(): + assert deserialize(serialize(1)) == 1 + + +@pytest.mark.local +def test_pickle_wrapper(): + s = PickleSerializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 + + +@pytest.mark.local +def test_dill_wrapper(): + s = DillSerializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 From 035bf98e7a3b84c6827b67ebdaa37f250adbfcda Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 30 Jun 2023 18:07:26 +0000 Subject: [PATCH 10/39] Add __init__ --- parsl/tests/test_serialization/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 parsl/tests/test_serialization/__init__.py diff --git a/parsl/tests/test_serialization/__init__.py b/parsl/tests/test_serialization/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From a039bb3ff66d2a3f43166c6ab678b0252f403bc1 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 29 Jun 2023 07:23:35 +0000 Subject: [PATCH 11/39] Parse serialization header only once Prior to this PR, the serialization header was parsed twice during deserialization, and in two different ways, leading to an awkward header format: the header must be exactly three bytes long for one (length based) parser, and the third byte must be a \n for the other (new-line based) parser. This PR removes the length based parser, instead allowing an arbitrary length \n-terminated header. Backwards/forwards compatibility: The parsl serialization wire format is not a user exposed interface, as parsl requires the same version of parsl to be installed at all locations in a parsl deployment. This PR does not change any on the wire byte sequences when used with the two existing de*serializers, but opens the opportunity for future implementations to use more than two bytes for identifiers. Performance: I made a basic benchmark of serializing and deserializing a few thousand integers (deliberately using a simple object like `int` so that the object processing would be quite small, allowing changes in header time more chance to show up). My main concern with this PR is that I didn't make things noticeably worse, not to improve performance. After this PR, a serialization->deserialization round trip is about 200ns faster (1165ns before vs 965ns after), so I am happy that this PR does not] damage performance. Future: This is part of ongoing work to introduce user pluggable de*serializers. --- parsl/serialize/base.py | 16 ---------------- parsl/serialize/concretes.py | 22 ++++++++-------------- parsl/serialize/facade.py | 12 ++++++------ 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index faa8c5e6f9..944ef2880c 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -21,9 +21,6 @@ def __init_subclass__(cls, **kwargs: Any) -> None: """ super().__init_subclass__(**kwargs) - assert len(cls._identifier) == 3 - assert cls._identifier[2] == 10 # \n in decimal - if cls._for_code: METHODS_MAP_CODE[cls._identifier] = cls if cls._for_data: @@ -43,19 +40,6 @@ def identifier(self) -> bytes: """ return self._identifier - def chomp(self, payload: bytes) -> bytes: - """ If the payload starts with the identifier, return the remaining block - - Parameters - ---------- - payload : str - Payload blob - """ - s_id, payload = payload.split(b'\n', 1) - if (s_id + b'\n') != self.identifier: - raise TypeError("Buffer does not start with parsl.serialize identifier:{!r}".format(self.identifier)) - return payload - def enable_caching(self, maxsize: int = 128) -> None: """ Add functools.lru_cache onto the serialize, deserialize methods """ diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index e66ab17a73..7c9b9eb36f 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -17,18 +17,15 @@ class PickleSerializer(SerializerBase): * [sometimes] issues with wrapped/decorated functions """ - _identifier = b'01\n' + _identifier = b'01' _for_code = True _for_data = True def serialize(self, data: Any) -> bytes: - x = pickle.dumps(data) - return self.identifier + x + return pickle.dumps(data) - def deserialize(self, payload: bytes) -> Any: - chomped = self.chomp(payload) - data = pickle.loads(chomped) - return data + def deserialize(self, body: bytes) -> Any: + return pickle.loads(body) class DillSerializer(SerializerBase): @@ -43,15 +40,12 @@ class DillSerializer(SerializerBase): * closures """ - _identifier = b'02\n' + _identifier = b'02' _for_code = True _for_data = True def serialize(self, data: Any) -> bytes: - x = dill.dumps(data) - return self.identifier + x + return dill.dumps(data) - def deserialize(self, payload: bytes) -> Any: - chomped = self.chomp(payload) - data = dill.loads(chomped) - return data + def deserialize(self, body: bytes) -> Any: + return dill.loads(body) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index ff55d94f19..d3b2691e0a 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -10,7 +10,6 @@ """ Instantiate the appropriate classes """ headers = list(METHODS_MAP_CODE.keys()) + list(METHODS_MAP_DATA.keys()) -header_size = len(headers[0]) methods_for_code = {} methods_for_data = {} @@ -70,7 +69,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: if callable(obj): for method in methods_for_code.values(): try: - result = method.serialize(obj) + result = method._identifier + b'\n' + method.serialize(obj) except Exception as e: result = e continue @@ -79,7 +78,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: else: for method in methods_for_data.values(): try: - result = method.serialize(obj) + result = method._identifier + b'\n' + method.serialize(obj) except Exception as e: result = e continue @@ -102,11 +101,12 @@ def deserialize(payload: bytes) -> Any: Payload object to be deserialized """ - header = payload[0:header_size] + header, body = payload.split(b'\n', 1) + if header in methods_for_code: - result = methods_for_code[header].deserialize(payload) + result = methods_for_code[header].deserialize(body) elif header in methods_for_data: - result = methods_for_data[header].deserialize(payload) + result = methods_for_data[header].deserialize(body) else: raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header)) From 8254d95833c6e62ae9f6f9cd61eea24c0612b689 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 2 Jul 2023 07:21:16 +0000 Subject: [PATCH 12/39] Fix up proxystore serializer for recent changes, add a test case --- parsl/serialize/concretes.py | 10 ++++------ parsl/tests/test_serialization/test_proxystore.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) create mode 100644 parsl/tests/test_serialization/test_proxystore.py diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index 9eb9f46025..1ca94a7587 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -16,7 +16,7 @@ class ProxyStoreSerializer(SerializerBase): - _identifier = b'99\n' + _identifier = b'99' _for_code = False _for_data = True @@ -29,12 +29,10 @@ def serialize(self, data: Any) -> bytes: d = pickle.dumps(p) - return self.identifier + d + return d - def deserialize(self, payload: bytes) -> Any: - chomped = self.chomp(payload) - proxy = pickle.loads(chomped) - return proxy + def deserialize(self, body: bytes) -> Any: + return pickle.loads(body) class PickleSerializer(SerializerBase): diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py new file mode 100644 index 0000000000..74a62aa2ea --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -0,0 +1,10 @@ +import pytest +from parsl.serialize.concretes import ProxyStoreSerializer + + +@pytest.mark.local +def test_proxystore_wrapper(): + s = ProxyStoreSerializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 From 26da3681e9b2c51973a2f5d5fdcfbff858d0e443 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 2 Jul 2023 07:30:33 +0000 Subject: [PATCH 13/39] Disable mypy for proxystore connectors --- mypy.ini | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mypy.ini b/mypy.ini index f8c3c05b8c..8aa3dbf71d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -181,6 +181,9 @@ ignore_missing_imports = True #[mypy-multiprocessing.synchronization.*] #ignore_missing_imports = True +[mypy-proxystore.connectors.file] +ignore_missing_imports = True + [mypy-pandas.*] ignore_missing_imports = True From 73702ba1178c45265275c4f12e4c991a9cdc6e30 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 2 Jul 2023 07:36:18 +0000 Subject: [PATCH 14/39] WIP remove enable_caching --- parsl/serialize/base.py | 10 ---------- parsl/serialize/facade.py | 6 ++++-- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 944ef2880c..18418ffffc 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -40,16 +40,6 @@ def identifier(self) -> bytes: """ return self._identifier - def enable_caching(self, maxsize: int = 128) -> None: - """ Add functools.lru_cache onto the serialize, deserialize methods - """ - - # ignore types here because mypy at the moment is not fond of monkeypatching - self.serialize = functools.lru_cache(maxsize=maxsize)(self.serialize) # type: ignore[method-assign] - self.deserialize = functools.lru_cache(maxsize=maxsize)(self.deserialize) # type: ignore[method-assign] - - return - @abstractmethod def serialize(self, data: Any) -> bytes: pass diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 2581ae478f..3746d732d4 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -9,14 +9,16 @@ """ Instantiate the appropriate classes """ -headers = list(METHODS_MAP_CODE.keys()) + list(METHODS_MAP_DATA.keys()) +# unused now... +# headers = list(METHODS_MAP_CODE.keys()) + list(METHODS_MAP_DATA.keys()) + +# TODO: don't need to duplicate these lists as mathods_for_code and METHODS_MAP_CODE... methods_for_code = {} methods_for_data = {} for key in METHODS_MAP_CODE: methods_for_code[key] = METHODS_MAP_CODE[key]() - methods_for_code[key].enable_caching(maxsize=128) for key in METHODS_MAP_DATA: methods_for_data[key] = METHODS_MAP_DATA[key]() From 49006952d93b2dd3e9b5c4056003ac9e54dd8f13 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 2 Jul 2023 07:40:23 +0000 Subject: [PATCH 15/39] remove unused headers - port from PR #2784 --- parsl/serialize/facade.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 3746d732d4..fe4d96ba7b 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -10,9 +10,6 @@ """ Instantiate the appropriate classes """ -# unused now... -# headers = list(METHODS_MAP_CODE.keys()) + list(METHODS_MAP_DATA.keys()) - # TODO: don't need to duplicate these lists as mathods_for_code and METHODS_MAP_CODE... methods_for_code = {} methods_for_data = {} From b37efc7ac9ef7caa842deab0a66159066c67a8f3 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Sun, 2 Jul 2023 07:51:28 +0000 Subject: [PATCH 16/39] fix linting --- parsl/serialize/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 18418ffffc..8367e6ad70 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,6 +1,5 @@ from abc import abstractmethod import logging -import functools from typing import Any From 4ea2e18517466b2a6aabe725bbc3c04b5a39e9f4 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 4 Jul 2023 08:45:27 +0000 Subject: [PATCH 17/39] serializer plugin API TODO: (broken) make a test that runs using a custom serializer to an htex worker... to check that remote processes can load a custom serializer. prior to this PR, serializer registration happened in order of class definiton, which in the case of a single concretes file, is the order of the serializers inside that file. this does not work well when trying to define other serializers in other files, because the order of import of files can change subtly and because of distant effects. so doing serialiser registration in a subclass hook is not a very nice thing to do here... this PR should move to a model which defines the default de*serializers explicitly, and allows the user to modify that list explicitly too, to add in new de*serializers. limitations: this is part of work needed to allow serializer plugins, but not all of it, as it does not define how a remote worker will discover plugged-in serializers - for that, perhaps, the header should become something importable? instead of a registry-based ID bytes string. contrasting with that: proxystore doesn't need its own deserializer! it can use the existing pickle/picklecode deserialisers, because what it generates is a pickled object... and that's a legitimate use case... which suggests that serializers don't need to be indexed by "serializer" ID at all on the serializing side, but only on the deserializing side... this API also will not easily allow serializer methods to be defined for values returned from workers to the submit side: the expected place of registration, near the start of the user workflow, does not have a corresponding place on the worker side. this PR separates out serialization behaviour (object -> bytestream) from deserialization behaviour: (identifier, bytestream) -> object, because serializer might not (and in this prototype, *cannot*) implement custom deserialization... due to non-registration on the remote side, and instead can only generate dill/pickle targeted bytestreams the motivating use case for this basic serializer plugin API is supporting proxystore, prototyped in https://github.com/Parsl/parsl/pull/2718 https://labs.globus.org/projects/proxystore.html this PR also contain an optional proxystore plugin that would need to be activated by a user, and an attempt at an in-pickle (recursive style) proxystore plugin that is policy-aware and can proxy stuff deeper inside lists... to deal with proxying only interesting objects deep inside graphs - which is actually what happens with regular argument lists when they're non-trivial. loading remote serialisers via importlib is a lot of stuff... perhaps i should... just pickle the deserializer and send it (and let pickle deal with that?) - i guess there's two paths here: one is loading remote deserialisers, and the other is focusing on the proxystore use case, which, differently, wants access to the pickle deserializer remotely - and so that drives clearer separation of serializers vs deserialisers... that 2nd case is probably waht i should concentrate on... right now this does module loading in order to work with the class based serializer design inherited from before... but maybe that's not how it should end up... perhaps just serializer callables and deserializer callables? or based around modules? it seems nice to be able to configure the serializer (aka make a class or a partially applied callable) for configuring proxystore... but unclear how that would work for remote end configuration. --- docs/userguide/plugins.rst | 16 +++ mypy.ini | 9 ++ parsl/executors/high_throughput/executor.py | 3 + parsl/serialize/base.py | 26 +---- parsl/serialize/facade.py | 104 ++++++++++++++---- parsl/serialize/plugin_proxystore.py | 49 +++++++++ .../plugin_proxystore_deep_pickle.py | 90 +++++++++++++++ parsl/serialize/plugin_serpent.py | 23 ++++ .../test_serialization/config_proxystore.py | 67 +++++++++++ .../test_serialization/config_serpent.py | 70 ++++++++++++ parsl/tests/test_serialization/test_plugin.py | 57 ++++++++++ .../test_serialization/test_plugin_htex.py | 61 ++++++++++ .../test_serialization/test_proxystore.py | 10 ++ .../test_proxystore_deep_pickle_htex.py | 62 +++++++++++ .../test_proxystore_htex.py | 39 +++++++ .../test_serialization/test_serpent_htex.py | 32 ++++++ requirements.txt | 2 + 17 files changed, 681 insertions(+), 39 deletions(-) create mode 100644 parsl/serialize/plugin_proxystore.py create mode 100644 parsl/serialize/plugin_proxystore_deep_pickle.py create mode 100644 parsl/serialize/plugin_serpent.py create mode 100644 parsl/tests/test_serialization/config_proxystore.py create mode 100644 parsl/tests/test_serialization/config_serpent.py create mode 100644 parsl/tests/test_serialization/test_plugin.py create mode 100644 parsl/tests/test_serialization/test_plugin_htex.py create mode 100644 parsl/tests/test_serialization/test_proxystore.py create mode 100644 parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py create mode 100644 parsl/tests/test_serialization/test_proxystore_htex.py create mode 100644 parsl/tests/test_serialization/test_serpent_htex.py diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index e3d4b18186..7e0340cb06 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -77,3 +77,19 @@ from invoking a Parsl app. This includes as the return value of a An specific example of this is integrating Globus Compute tasks into a Parsl task graph. See :ref:`label-join-globus-compute` + + +Serialization +------------- + +By default Parsl will serialize objects with either `pickle` or `dill`, in +some cases applying an LRU cache. + +Parsl has an unstable API to register new serialization methods + +Additional serialization methods can be registered by XXXXX TODO XXXXX + +Limitations: +these mechanisms are not registered on the remote side, so custom +deserialization cannot be implemented -- instead, custom serializers can +only generate byte streams to be deserialized by pickle or dill diff --git a/mypy.ini b/mypy.ini index f8c3c05b8c..f5e3de5b0c 100644 --- a/mypy.ini +++ b/mypy.ini @@ -198,3 +198,12 @@ ignore_missing_imports = True [mypy-setproctitle.*] ignore_missing_imports = True + + +# for serializer plugin demos: + +[mypy-serpent.*] +ignore_missing_imports = True + +[mypy-proxystore.connectors.file] +ignore_missing_imports = True diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index c112e1119e..7dffd5d629 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -432,6 +432,9 @@ def _queue_management_worker(self): if 'result' in msg: result = deserialize(msg['result']) + # ^ if this raises an exception, queue management worker fails and + # parsl hangs. this is more relevant now allowing user-pluggable + # serialization and so user code exceptions can be raised here. task_fut.set_result(result) elif 'exception' in msg: diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index cda2d39f1e..0dea791cfc 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,34 +1,20 @@ -from abc import abstractmethod -import logging +from abc import abstractmethod, ABCMeta from typing import Any -logger = logging.getLogger(__name__) -# GLOBALS -METHODS_MAP_CODE = {} -METHODS_MAP_DATA = {} - - -class SerializerBase: +class SerializerBase(metaclass=ABCMeta): """ Adds shared functionality for all serializer implementations """ - def __init_subclass__(cls, **kwargs: Any) -> None: - """ This forces all child classes to register themselves as - methods for serializing code or data - """ - super().__init_subclass__(**kwargs) - - if cls._for_code: - METHODS_MAP_CODE[cls._identifier] = cls - if cls._for_data: - METHODS_MAP_DATA[cls._identifier] = cls - + # For deserializer _identifier: bytes + + # For serializer _for_code: bool _for_data: bool + # For deserializer @property def identifier(self) -> bytes: """Get that identifier that will be used to indicate in byte streams diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 0759349ac2..b1089b0597 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,22 +1,69 @@ -from parsl.serialize.concretes import * # noqa: F403,F401 -from parsl.serialize.base import METHODS_MAP_DATA, METHODS_MAP_CODE import logging +import parsl.serialize.concretes as c +from parsl.serialize.base import SerializerBase from typing import Any, List, Union logger = logging.getLogger(__name__) +# these are used for two directions: -""" Instantiate the appropriate classes -""" -methods_for_code = {} -methods_for_data = {} +# 1. to iterate over to find a valid serializer +# for that, the ordering is important -for key in METHODS_MAP_CODE: - methods_for_code[key] = METHODS_MAP_CODE[key]() +# 2. to perform an ID -> deserializer lookup -for key in METHODS_MAP_DATA: - methods_for_data[key] = METHODS_MAP_DATA[key]() +# These must be registered in reverse order of +# importance: later registered serializers +# will take priority over earlier ones. This is +# to facilitate user registered serializers + +methods_for_code: List[SerializerBase] +methods_for_code = [] + +methods_for_data: List[SerializerBase] +methods_for_data = [] + +deserializers = {} + +def clear_serializers(): + # does not clear deserializers because remote sending back will have a different serializer list (and wants to send back results with one of the default 4) so clearing all the deserializers means we cannot receive results... + global methods_for_data, methods_for_code + methods_for_code = [] + methods_for_data = [] + +def register_serializer(serializer: SerializerBase) -> None: + deserializers[serializer._identifier] = serializer + + if serializer._for_code: + methods_for_code.insert(0, serializer) + if serializer._for_data: + methods_for_data.insert(0, serializer) + + +def unregister_serializer(serializer: SerializerBase) -> None: + logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}") + logger.info(f"BENC: unregistering serializer {serializer}") + if serializer._identifier in deserializers: + del deserializers[serializer._identifier] + else: + logger.warning("BENC: not found in deserializers list") + if serializer in methods_for_code: + logger.info("BENC: removing serializer from methods_for_code") + methods_for_code.remove(serializer) + else: + logger.warning("BENC: not found in methods for code") + if serializer in methods_for_data: + logger.info("BENC: removing serializer from methods_for_data") + methods_for_data.remove(serializer) + else: + logger.warning("BENC: not found in methods for data") + + +register_serializer(c.DillSerializer()) +register_serializer(c.DillCallableSerializer()) +register_serializer(c.PickleSerializer()) +register_serializer(c.PickleCallableSerializer()) def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: @@ -58,24 +105,34 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: Individual serialization methods might raise a TypeError (eg. if objects are non serializable) This method will raise the exception from the last method that was tried, if all methods fail. """ - result: Union[bytes, Exception] + logger.info(f"BENC: Trying to serialize {obj}") + result: Union[bytes, Exception, None] + result = None if callable(obj): methods = methods_for_code else: methods = methods_for_data - for method in methods.values(): + for method in methods: try: + logger.info(f"BENC: trying serializer {method}") result = method._identifier + b'\n' + method.serialize(obj) except Exception as e: + logger.warning(f"BENC: serializer {method} skipping, with exception: {e}") result = e continue else: break - if isinstance(result, BaseException): + # if no serializer found + if result is None: + logger.error("BENC: no serializer returned a result") + raise RuntimeError("BENC: No serializers") + elif isinstance(result, BaseException): + logger.error("BENC: exception from final serializer") raise result else: + logger.info("BENC: serialization complete") if len(result) > buffer_threshold: logger.warning(f"Serialized object exceeds buffer threshold of {buffer_threshold} bytes, this could cause overflows") return result @@ -91,13 +148,22 @@ def deserialize(payload: bytes) -> Any: """ header, body = payload.split(b'\n', 1) - if header in methods_for_code: - result = methods_for_code[header].deserialize(body) - elif header in methods_for_data: - result = methods_for_data[header].deserialize(body) + if header in deserializers: + result = deserializers[header].deserialize(body) else: - raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header)) - + logger.warning("BENC: unknown serialization header: {!r} - trying to load dynamically".format(header)) + import importlib + module_name, class_name = header.split(b' ', 1) + try: + decoded_module_name = module_name.decode('utf-8') + except UnicodeDecodeError as e: + raise RuntimeError(f"Got unicode error with string {module_name!r} exception is {e}") + module = importlib.import_module(decoded_module_name) + deserializer_class = getattr(module, class_name.decode('utf-8')) + deserializer = deserializer_class() + result = deserializer.deserialize(body) + + # raise TypeError("Invalid serialization header: {!r}".format(header)) return result diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py new file mode 100644 index 0000000000..5c3eacced2 --- /dev/null +++ b/parsl/serialize/plugin_proxystore.py @@ -0,0 +1,49 @@ +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +from proxystore.store import Store, register_store # type: ignore +from proxystore.connectors.file import FileConnector +from parsl.serialize.facade import register_serializer + +from parsl.serialize.base import SerializerBase + +from typing import Any, Optional + +import pickle + + +class ProxyStoreSerializer(SerializerBase): + # TODO: better enforcement of this being bytes, because that's common error i'm making + # and probably it should be autogenerated... + _identifier = b'parsl.serialize.plugin_proxystore ProxyStoreSerializer' # must be class name + _for_code = False + _for_data = True + + def __init__(self, store: Optional[Store] = None) -> None: + """Because of jumbled use of this class for init-time configurable + serialization, and non-configurable remote deserializer loading, the + store field can be None... TODO: this would go away if serializer and + deserializer were split into different objects/classes/functions.""" + self._store = store + + def serialize(self, data: Any) -> bytes: + assert self._store is not None + assert data is not None + p = self._store.proxy(data) + return pickle.dumps(p) + + def deserialize(self, body: bytes) -> Any: + return pickle.loads(body) + + +def register_proxystore_serializer() -> None: + """Initializes proxystore and registers it as a serializer with parsl""" + serializer = create_proxystore_serializer() + register_serializer(serializer) + + +def create_proxystore_serializer() -> ProxyStoreSerializer: + """Creates a serializer but does not register with global system - so this + can be used in testing.""" + + store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + register_store(store) + return ProxyStoreSerializer(store) diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py new file mode 100644 index 0000000000..571a252f3a --- /dev/null +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -0,0 +1,90 @@ +# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] +from proxystore.store import Store, register_store # type: ignore +from proxystore.connectors.file import FileConnector +from parsl.serialize.facade import register_serializer + +from parsl.serialize.base import SerializerBase + +from typing import Any, Optional + +import dill + +import io + +import logging + +from typing import Type + +logger = logging.getLogger(__name__) + +class ProxyStoreDeepPickler(dill.Pickler): + + def __init__(self, *args, policy, store, **kwargs): + super().__init__(*args, **kwargs) + self._store = store + self._policy = policy + + def reducer_override(self, o): + logger.info(f"BENC: reducing object {o}") + + if type(o) is self._policy: # not isinstance, because don't want subclasses (like bool None: + """Because of jumbled use of this class for init-time configurable + serialization, and non-configurable remote deserializer loading, the + store and policy fields can be None... TODO: this would go away if serializer and + deserializer were split into different objects/classes/functions, like Pickler and + Unpickler are""" + self._store = store + self._policy = policy + + def serialize(self, data: Any) -> bytes: + assert self._store is not None + assert self._policy is not None + + assert data is not None + + # TODO: pluggable policy should go here... what does that look like? + # TODO: this policy belongs in the pickler plugin, not top level parsl serializer plugin + # if not isinstance(data, int): + # raise RuntimeError(f"explicit policy will only proxy ints, not {type(data)}") + + f = io.BytesIO() + pickler = ProxyStoreDeepPickler(file=f, store=self._store, policy=self._policy) + pickler.dump(data) + return f.getvalue() + + def deserialize(self, body: bytes) -> Any: + # because we aren't customising deserialization, use regular + # dill for deserialization; but otherwise could create a + # custom Unpickler here... + return dill.loads(body) + + +def register_proxystore_serializer() -> None: + """Initializes proxystore and registers it as a serializer with parsl""" + serializer = create_proxystore_serializer_deep_pickle() + register_serializer(serializer) + + +def create_deep_proxystore_serializer(*, policy: Type) -> ProxyStoreDeepSerializer: + """Creates a serializer but does not register with global system - so this + can be used in testing.""" + + store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + register_store(store) + return ProxyStoreDeepSerializer(store=store, policy=policy) diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py new file mode 100644 index 0000000000..5020267101 --- /dev/null +++ b/parsl/serialize/plugin_serpent.py @@ -0,0 +1,23 @@ +from parsl.serialize.base import SerializerBase +import serpent + +from typing import Any + + +class SerpentSerializer(SerializerBase): + _identifier = b'parsl.serialize.plugin_serpent SerpentSerializer' + _for_code = False + _for_data = True + + def serialize(self, data: Any) -> bytes: + body = serpent.dumps(data) + roundtripped_data = serpent.loads(body) + + # this round trip is because serpent will sometimes serialize objects + # as best as it can, which is not good enough... + if data != roundtripped_data: + raise ValueError(f"SerpentSerializer cannot roundtrip {data} -> {body} -> {roundtripped_data}") + return body + + def deserialize(self, body: bytes) -> Any: + return serpent.loads(body) diff --git a/parsl/tests/test_serialization/config_proxystore.py b/parsl/tests/test_serialization/config_proxystore.py new file mode 100644 index 0000000000..b6536409d0 --- /dev/null +++ b/parsl/tests/test_serialization/config_proxystore.py @@ -0,0 +1,67 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +# imports for monitoring: +from parsl.monitoring import MonitoringHub + +import os + +from parsl.providers import LocalProvider +from parsl.channels import LocalChannel +from parsl.launchers import SingleNodeLauncher + +from parsl.config import Config +from parsl.executors import HighThroughputExecutor + + +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + + +def fresh_config(): + import parsl.serialize.plugin_proxystore as pspps + pspps.register_proxystore_serializer() + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py new file mode 100644 index 0000000000..5175fa03c8 --- /dev/null +++ b/parsl/tests/test_serialization/config_serpent.py @@ -0,0 +1,70 @@ +"""htex local, but using the proxy store serializer... + +DANGER! this will modify the global serializer environment, so any +future parsl stuff done in the same process as this configuration +will not see the default serializer environment... +""" + +# imports for monitoring: +from parsl.monitoring import MonitoringHub + +import os + +from parsl.providers import LocalProvider +from parsl.channels import LocalChannel +from parsl.launchers import SingleNodeLauncher + +from parsl.config import Config +from parsl.executors import HighThroughputExecutor + + +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.file_noop import NoOpFileStaging + +from parsl.serialize.facade import register_serializer # TODO: move this into parsl.serialize root as its user exposed + +from parsl.serialize.plugin_serpent import SerpentSerializer + +working_dir = os.getcwd() + "/" + "test_htex_alternate" + + +def fresh_config(): + register_serializer(SerpentSerializer()) + + return Config( + executors=[ + HighThroughputExecutor( + address="127.0.0.1", + label="htex_Local", + working_dir=working_dir, + storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], + worker_debug=True, + cores_per_worker=1, + heartbeat_period=2, + heartbeat_threshold=5, + poll_period=1, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=0, + min_blocks=0, + max_blocks=5, + launcher=SingleNodeLauncher(), + ), + block_error_handler=False + ) + ], + strategy='simple', + app_cache=True, checkpoint_mode='task_exit', + retries=2, + monitoring=MonitoringHub( + hub_address="localhost", + hub_port=55055, + monitoring_debug=False, + resource_monitoring_interval=1, + ), + usage_tracking=True + ) + + +config = fresh_config() diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py new file mode 100644 index 0000000000..bd546f5a5d --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin.py @@ -0,0 +1,57 @@ +# test the serializer plugin API + +import pytest + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class ConstSerializer(SerializerBase): + """This is a test deliberately misbehaving serializer. + It serializes all values to the same constant, and likewise for + deserialization. + """ + + _for_code = True + _for_data = True + + # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? + _identifier = b'parsl.tests.test_serializer.test_plugin ConstSerializer' + # note a space in the name here not final dot, to distinguish modules vs attribute in module + + # TODO: better enforcement of the presence of these values? tied into abstract base class? + + def serialize(self, o): + return B_MAGIC + + def deserialize(self, b): + assert b == B_MAGIC + return V_MAGIC + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@pytest.mark.local +def test_const_inprocess(): + s = ConstSerializer() + register_serializer(s) + + try: + + # check that we aren't using one of the real serializers + # that really works + assert deserialize(serialize(1)) != 1 + + # and that the behaviour looks like ConstSerializer + assert serialize(1) == ConstSerializer._identifier + b'\n' + B_MAGIC + assert deserialize(serialize(1)) == V_MAGIC + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py new file mode 100644 index 0000000000..f486c4ae3c --- /dev/null +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -0,0 +1,61 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +logger = logging.getLogger(__name__) + +B_MAGIC = b'3626874628368432' # arbitrary const bytestring +V_MAGIC = 777 # arbitrary const object + + +class XXXXSerializer(SerializerBase): + """This is a test deserializer but puts some padding round to help distinguish... + """ + + _for_code = True + _for_data = True + + # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? + _identifier = b'parsl.tests.test_serialization.test_plugin_htex XXXXSerializer' + # note a space in the name here not final dot, to distinguish modules vs attribute in module + + # TODO: better enforcement of the presence of these values? tied into abstract base class? + + def serialize(self, o): + import dill + logger.error(f"BENC: XXXX serializer serializing value {o} of type {type(o)}") + return dill.dumps(o) + + def deserialize(self, b): + import dill + return dill.loads(b) + +# ugh... registering in globus state which will screw with other tests +# so should protect with a finally block: +# serializer registration is, generally, intended to be a not-undoable +# operation, though... + + +@parsl.python_app +def func(x): + return x + + +@pytest.mark.local +def test_const_inprocess(): + s = XXXXSerializer() + register_serializer(s) + + try: + assert func(100).result() == 100 # but how do we know this went through XXXXSerializer? (or not) + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py new file mode 100644 index 0000000000..08813c60db --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -0,0 +1,10 @@ +import pytest +from parsl.serialize.plugin_proxystore import create_proxystore_serializer + + +@pytest.mark.local +def test_proxystore_wrapper(): + s = create_proxystore_serializer() + d = s.serialize(1) + assert isinstance(d, bytes) + assert s.deserialize(d) == 1 diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py new file mode 100644 index 0000000000..0dbb3cab51 --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -0,0 +1,62 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer, clear_serializers + +from parsl.serialize.plugin_proxystore_deep_pickle import create_deep_proxystore_serializer + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x, y): + return x + y + 1 + +@parsl.python_app +def func2(v): + return str(v) + +class MyDemo: + def __init__(self, v): + self._v = v + + def __str__(self): + return str(self._v) + + +# TODO: this kind of test, I'd like to be able to run the entire test suite with the +# proxystore serializer configured, so as to see it tested against many cases. +# A bodge is to do that in fresh_config - but that doesn't work in the case of several +# DFKs in a single process, but there is nothing to unconfigure the process-wide state. +# Is this only a test limit? or is it something that should be properly exposed to +# users? + +@pytest.mark.local +def test_proxystore_single_call(): + later_c = [v for v in parsl.serialize.facade.methods_for_code] + later_d = [v for v in parsl.serialize.facade.methods_for_data] + clear_serializers() # does not clear deserializers... to allow results to come back... + s = create_deep_proxystore_serializer(policy=MyDemo) + + + register_serializer(s) + + try: + assert func(100, 4).result() == 105 # but how do we know this went through proxystore? + + m = MyDemo([1,2,3,4]) + + assert func2(m).result() == "[1, 2, 3, 4]" + + finally: + unregister_serializer(s) + + parsl.serialize.facade.methods_for_code = later_c + parsl.serialize.facade.methods_for_data = later_d + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_proxystore_htex.py b/parsl/tests/test_serialization/test_proxystore_htex.py new file mode 100644 index 0000000000..1efa596b5f --- /dev/null +++ b/parsl/tests/test_serialization/test_proxystore_htex.py @@ -0,0 +1,39 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +from parsl.serialize.plugin_proxystore import create_proxystore_serializer + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x, y): + return x + y + 1 + + +# TODO: this kind of test, I'd like to be able to run the entire test suite with the +# proxystore serializer configured, so as to see it tested against many cases. +# A bodge is to do that in fresh_config - but that doesn't work in the case of several +# DFKs in a single process, but there is nothing to unconfigure the process-wide state. +# Is this only a test limit? or is it something that should be properly exposed to +# users? + +@pytest.mark.local +def test_proxystore_single_call(): + s = create_proxystore_serializer() # TODO this isnt' testing the register_proxystore_serializer function... + register_serializer(s) + + try: + assert func(100, 4).result() == 105 # but how do we know this went through proxystore? + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/parsl/tests/test_serialization/test_serpent_htex.py b/parsl/tests/test_serialization/test_serpent_htex.py new file mode 100644 index 0000000000..c69b53dbd2 --- /dev/null +++ b/parsl/tests/test_serialization/test_serpent_htex.py @@ -0,0 +1,32 @@ +# test the serializer plugin API +import logging +import pytest +import parsl + +from parsl.tests.configs.htex_local import fresh_config as local_config + +from parsl.serialize.base import SerializerBase +from parsl.serialize.facade import serialize, deserialize, register_serializer, unregister_serializer + +from parsl.serialize.plugin_serpent import SerpentSerializer + +logger = logging.getLogger(__name__) + + +@parsl.python_app +def func(x): + return x + 1 + + +@pytest.mark.local +def test_serpent_single_call(): + s = SerpentSerializer() + register_serializer(s) + + try: + assert func(100).result() == 101 # but how do we know this went through serpent? TODO + + finally: + unregister_serializer(s) + + assert deserialize(serialize(1)) == 1 # check serialisation is basically working again diff --git a/requirements.txt b/requirements.txt index fa1c5bf281..6a757cbd72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,5 @@ requests paramiko psutil>=5.5.1 setproctitle +proxystore # for proxystore demo plugin +serpent # for serpent demo plugin From c9cbda9abbbb0b9859a81f9352333f0fa2b619de Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 15:22:43 +0000 Subject: [PATCH 18/39] Fixup mypy --- mypy.ini | 6 +++--- parsl/serialize/facade.py | 9 +++++++-- .../serialize/plugin_proxystore_deep_pickle.py | 18 ++++++------------ .../test_proxystore_deep_pickle_htex.py | 7 ++++--- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/mypy.ini b/mypy.ini index 1493e3adc8..ababaf8d8b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -73,6 +73,9 @@ disallow_subclassing_any = True warn_unreachable = True disallow_untyped_defs = True +[mypy-parsl.serialize.plugin_proxystore_deep_pickle.*] +disallow_subclassing_any = False + [mypy-parsl.executors.base.*] disallow_untyped_defs = True disallow_any_expr = True @@ -207,6 +210,3 @@ ignore_missing_imports = True [mypy-serpent.*] ignore_missing_imports = True - -[mypy-proxystore.connectors.file] -ignore_missing_imports = True diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index b1089b0597..2d8c2dae03 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -26,12 +26,17 @@ deserializers = {} -def clear_serializers(): - # does not clear deserializers because remote sending back will have a different serializer list (and wants to send back results with one of the default 4) so clearing all the deserializers means we cannot receive results... + +def clear_serializers() -> None: + # does not clear deserializers because remote sending back will have a + # different serializer list (and wants to send back results with one of + # the default 4) so clearing all the deserializers means we cannot receive + # results... global methods_for_data, methods_for_code methods_for_code = [] methods_for_data = [] + def register_serializer(serializer: SerializerBase) -> None: deserializers[serializer._identifier] = serializer diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index 571a252f3a..c4016dd9ef 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -1,7 +1,6 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] from proxystore.store import Store, register_store # type: ignore from proxystore.connectors.file import FileConnector -from parsl.serialize.facade import register_serializer from parsl.serialize.base import SerializerBase @@ -17,18 +16,19 @@ logger = logging.getLogger(__name__) + class ProxyStoreDeepPickler(dill.Pickler): - def __init__(self, *args, policy, store, **kwargs): + def __init__(self, *args: Any, policy: Type, store: Store, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self._store = store self._policy = policy - def reducer_override(self, o): + def reducer_override(self, o: Any) -> Any: logger.info(f"BENC: reducing object {o}") - if type(o) is self._policy: # not isinstance, because don't want subclasses (like bool None: + def __init__(self, *, policy: Optional[Type] = None, store: Optional[Store] = None) -> None: """Because of jumbled use of this class for init-time configurable serialization, and non-configurable remote deserializer loading, the store and policy fields can be None... TODO: this would go away if serializer and @@ -75,12 +75,6 @@ def deserialize(self, body: bytes) -> Any: return dill.loads(body) -def register_proxystore_serializer() -> None: - """Initializes proxystore and registers it as a serializer with parsl""" - serializer = create_proxystore_serializer_deep_pickle() - register_serializer(serializer) - - def create_deep_proxystore_serializer(*, policy: Type) -> ProxyStoreDeepSerializer: """Creates a serializer but does not register with global system - so this can be used in testing.""" diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py index 0dbb3cab51..7dce270298 100644 --- a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -17,10 +17,12 @@ def func(x, y): return x + y + 1 + @parsl.python_app def func2(v): return str(v) + class MyDemo: def __init__(self, v): self._v = v @@ -40,16 +42,15 @@ def __str__(self): def test_proxystore_single_call(): later_c = [v for v in parsl.serialize.facade.methods_for_code] later_d = [v for v in parsl.serialize.facade.methods_for_data] - clear_serializers() # does not clear deserializers... to allow results to come back... + clear_serializers() s = create_deep_proxystore_serializer(policy=MyDemo) - register_serializer(s) try: assert func(100, 4).result() == 105 # but how do we know this went through proxystore? - m = MyDemo([1,2,3,4]) + m = MyDemo([1, 2, 3, 4]) assert func2(m).result() == "[1, 2, 3, 4]" From fdef4ed3a3a0a62428561f82c1780bdbad8e1351 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 15:24:54 +0000 Subject: [PATCH 19/39] Use repr not str, for less confusion in log message --- parsl/serialize/plugin_proxystore_deep_pickle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index c4016dd9ef..51dac96314 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -25,7 +25,7 @@ def __init__(self, *args: Any, policy: Type, store: Store, **kwargs: Any) -> Non self._policy = policy def reducer_override(self, o: Any) -> Any: - logger.info(f"BENC: reducing object {o}") + logger.info(f"BENC: reducing object {o!r}") if type(o) is self._policy: # not isinstance, because want exact class match logger.info("BENC: Policy class detected") From 299924fefe6a704c4355f182f02a7ccd336c6d45 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 15:36:39 +0000 Subject: [PATCH 20/39] fix mypy with newer proxystore --- parsl/serialize/plugin_proxystore.py | 2 +- parsl/serialize/plugin_proxystore_deep_pickle.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 5c3eacced2..2e1400ca29 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -1,5 +1,5 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] -from proxystore.store import Store, register_store # type: ignore +from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector from parsl.serialize.facade import register_serializer diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index 51dac96314..1bfd30f131 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -1,5 +1,5 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] -from proxystore.store import Store, register_store # type: ignore +from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector from parsl.serialize.base import SerializerBase From 1fd4098359e55e3a30ef63208db00e391373c147 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 15:39:45 +0000 Subject: [PATCH 21/39] remove mis-added test --- parsl/tests/test_proxystore.py | 41 ---------------------------------- 1 file changed, 41 deletions(-) delete mode 100644 parsl/tests/test_proxystore.py diff --git a/parsl/tests/test_proxystore.py b/parsl/tests/test_proxystore.py deleted file mode 100644 index 4f71540043..0000000000 --- a/parsl/tests/test_proxystore.py +++ /dev/null @@ -1,41 +0,0 @@ -import pytest -from parsl import python_app -from parsl.dataflow.memoization import id_for_memo - - -# this class should not have a memoizer registered for it -class Unmemoizable: - pass - - -# this class should have a memoizer that always raises an -# exception -class FailingMemoizable: - pass - - -class FailingMemoizerTestError(ValueError): - pass - - -@id_for_memo.register(FailingMemoizable) -def failing_memoizer(v, output_ref=False): - raise FailingMemoizerTestError("Deliberate memoizer failure") - - -@python_app(cache=True) -def noop_app(x, inputs=[], cache=True): - return None - - -@python_app -def sleep(t): - import time - time.sleep(t) - - -def test_python_unmemoizable_after_dep(): - sleep_fut = sleep(1) - fut = noop_app(Unmemoizable(), inputs=[sleep_fut]) - with pytest.raises(ValueError): - fut.result() From f0d3af138b90cb413a95b7bf96919f8b68ed9c3e Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 16:10:25 +0000 Subject: [PATCH 22/39] Add UUID to store name --- parsl/serialize/plugin_proxystore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 2e1400ca29..604c9a017f 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -44,6 +44,7 @@ def create_proxystore_serializer() -> ProxyStoreSerializer: """Creates a serializer but does not register with global system - so this can be used in testing.""" - store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp")) + import uuid + store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp")) register_store(store) return ProxyStoreSerializer(store) From 1ad9c87d9677f2fd3b5f728d59737d8a0fb15794 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Mon, 10 Jul 2023 17:07:21 +0000 Subject: [PATCH 23/39] fix markdown quoting and remove a not-properly-written paragraph --- docs/userguide/plugins.rst | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 7e0340cb06..fdc7bca3f9 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -82,7 +82,7 @@ task graph. See :ref:`label-join-globus-compute` Serialization ------------- -By default Parsl will serialize objects with either `pickle` or `dill`, in +By default Parsl will serialize objects with either ``pickle`` or ``dill``, in some cases applying an LRU cache. Parsl has an unstable API to register new serialization methods @@ -90,6 +90,4 @@ Parsl has an unstable API to register new serialization methods Additional serialization methods can be registered by XXXXX TODO XXXXX Limitations: -these mechanisms are not registered on the remote side, so custom -deserialization cannot be implemented -- instead, custom serializers can -only generate byte streams to be deserialized by pickle or dill +TODO From c276fe1f2916470af74d0035d9701271fdea822d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 18 Jul 2023 16:32:57 +0000 Subject: [PATCH 24/39] Fix (functional) merge conflict with PR #2815 which removed PickleCallableSerializer --- parsl/serialize/facade.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 2d8c2dae03..5b67ea95be 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -68,7 +68,6 @@ def unregister_serializer(serializer: SerializerBase) -> None: register_serializer(c.DillSerializer()) register_serializer(c.DillCallableSerializer()) register_serializer(c.PickleSerializer()) -register_serializer(c.PickleCallableSerializer()) def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: From 282086ceb1f55c6d0cbc87d18f2df084fe826ad0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 18 Jul 2023 17:36:18 +0000 Subject: [PATCH 25/39] Add codeprotector version checking hook prototype --- parsl/serialize/facade.py | 10 ++++- parsl/serialize/plugin_codeprotector.py | 51 +++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 parsl/serialize/plugin_codeprotector.py diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 5b67ea95be..d1cb7657bf 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,5 +1,6 @@ import logging import parsl.serialize.concretes as c +from parsl.serialize.plugin_codeprotector import CodeProtectorSerializer from parsl.serialize.base import SerializerBase from typing import Any, List, Union @@ -66,7 +67,14 @@ def unregister_serializer(serializer: SerializerBase) -> None: register_serializer(c.DillSerializer()) -register_serializer(c.DillCallableSerializer()) + +# register_serializer(c.DillCallableSerializer()) +register_serializer(CodeProtectorSerializer()) +# structuring it this way is probably wrong - should perhaps be a single +# Pickle-variant (or dill-variant) that is used, with all pluggable hooked +# in - eg proxystore should hook into the same Pickle/Dill subclass as +# CodeProtector? + register_serializer(c.PickleSerializer()) diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py new file mode 100644 index 0000000000..1a13dec17a --- /dev/null +++ b/parsl/serialize/plugin_codeprotector.py @@ -0,0 +1,51 @@ +from parsl.serialize.base import SerializerBase + +from typing import Any + +import dill +import io +import logging +import sys +import types + +logger = logging.getLogger(__name__) + + +def _deprotectCode(major, minor, b): + if sys.version_info.major != major: + raise RuntimeError("Major version mismatch deserializing code") + if sys.version_info.minor != minor: + raise RuntimeError("Major version mismatch deserializing code") + + return dill.loads(b) + + +class CodeProtectorPickler(dill.Pickler): + + def reducer_override(self, o: Any) -> Any: + logger.info(f"BENC: reducing object {o!r} of type {type(o)}") + if isinstance(o, types.CodeType): + logger.info(f"BENC: special casing code object {o!r} of type {type(o)}") + return (_deprotectCode, (sys.version_info.major, sys.version_info.minor, dill.dumps(o))) + + return NotImplemented + + +class CodeProtectorSerializer(SerializerBase): + + _identifier = b'parsl.serialize.plugin_codeprotector CodeProtectorSerializer' + + _for_code = True + _for_data = False + + def serialize(self, data: Any) -> bytes: + + f = io.BytesIO() + pickler = CodeProtectorPickler(file=f) + pickler.dump(data) + return f.getvalue() + + return dill.dumps(data) + + def deserialize(self, body: bytes) -> Any: + return dill.loads(body) From 759ff4d589a4cb0ee97db826aa11bcc7327869a0 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 18 Jul 2023 17:38:19 +0000 Subject: [PATCH 26/39] fix linting --- mypy.ini | 3 +++ parsl/serialize/plugin_codeprotector.py | 6 ++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/mypy.ini b/mypy.ini index ababaf8d8b..38823411c0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -76,6 +76,9 @@ disallow_untyped_defs = True [mypy-parsl.serialize.plugin_proxystore_deep_pickle.*] disallow_subclassing_any = False +[mypy-parsl.serialize.plugin_codeprotector.*] +disallow_subclassing_any = False + [mypy-parsl.executors.base.*] disallow_untyped_defs = True disallow_any_expr = True diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py index 1a13dec17a..e437bf1a1f 100644 --- a/parsl/serialize/plugin_codeprotector.py +++ b/parsl/serialize/plugin_codeprotector.py @@ -11,12 +11,12 @@ logger = logging.getLogger(__name__) -def _deprotectCode(major, minor, b): +def _deprotectCode(major: int, minor: int, b: bytes) -> Any: if sys.version_info.major != major: raise RuntimeError("Major version mismatch deserializing code") if sys.version_info.minor != minor: raise RuntimeError("Major version mismatch deserializing code") - + return dill.loads(b) @@ -45,7 +45,5 @@ def serialize(self, data: Any) -> bytes: pickler.dump(data) return f.getvalue() - return dill.dumps(data) - def deserialize(self, body: bytes) -> Any: return dill.loads(body) From ed759dc6e4b9181a2093a7d37f7f84dbfb4edc27 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 06:49:22 +0000 Subject: [PATCH 27/39] Fix clear serializers behaviour after master merge --- parsl/serialize/facade.py | 34 ++++++++++++------- .../test_proxystore_deep_pickle_htex.py | 7 ++-- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index d586cb2960..c51b40b3de 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -22,14 +22,21 @@ deserializers = {} -def clear_serializers() -> None: - # does not clear deserializers because remote sending back will have a - # different serializer list (and wants to send back results with one of - # the default 4) so clearing all the deserializers means we cannot receive - # results... - global methods_for_data, methods_for_code - methods_for_code = [] - methods_for_data = [] +# maybe it's weird to want to clear away all serializers rather than just the +# data or just code ones? eg in proxystore test, clearing serializers means +# there is no code serializer... so just data sreializer should be cleared +# (or if only having one kind of serializer, no code/data distinction, then +# this is more consistent, but clearing serializer is still a bit weird in +# that case (maybe for security?) - with inserting one near the start more +# usual? +# def clear_serializers() -> None: +# # does not clear deserializers because remote sending back will have a +# # different serializer list (and wants to send back results with one of +# # the default 4) so clearing all the deserializers means we cannot receive +# # results... +# global methods_for_data, methods_for_code +# methods_for_code = [] +# methods_for_data = [] def unregister_serializer(serializer: SerializerBase) -> None: @@ -121,6 +128,9 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: else: methods = methods_for_data + if methods == []: + raise RuntimeError("There are no configured serializers") + for method in methods: try: logger.info(f"BENC: trying serializer {method}") @@ -132,15 +142,13 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: else: break - # if no serializer found if result is None: - logger.error("BENC: no serializer returned a result") - raise RuntimeError("BENC: No serializers") + raise RuntimeError("No serializer returned a result") elif isinstance(result, BaseException): - logger.error("BENC: exception from final serializer") + logger.error("Serializer returned an excepton, reraise") raise result else: - logger.info("BENC: serialization complete") + logger.debug("Serialization complete") if len(result) > buffer_threshold: logger.warning(f"Serialized object exceeds buffer threshold of {buffer_threshold} bytes, this could cause overflows") return result diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py index 324eddd249..5a69ce4430 100644 --- a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -6,7 +6,7 @@ from parsl.tests.configs.htex_local import fresh_config as local_config from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer, clear_serializers +from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer from parsl.serialize.plugin_proxystore_deep_pickle import create_deep_proxystore_serializer @@ -42,7 +42,10 @@ def __str__(self): def test_proxystore_single_call(): later_c = [v for v in parsl.serialize.facade.methods_for_code] later_d = [v for v in parsl.serialize.facade.methods_for_data] - clear_serializers() + + # clear the data serializers, leaving the code serializers in place + parsl.serialize.facade.methods_for_data = [] + s = create_deep_proxystore_serializer(policy=MyDemo) register_method_for_data(s) From 79df5830f1c4e5602acda029655b2e2b9b92a209 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 13:47:59 +0000 Subject: [PATCH 28/39] Get rid of some uses of internal _identifier attribute, forcing use via the calculated identifier property - in preparation for user serializers to get autocomputed identifiers --- parsl/serialize/base.py | 7 +++++-- parsl/serialize/facade.py | 6 +++--- parsl/tests/test_serialization/test_plugin.py | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index bc4a881abc..56af50c69b 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -7,15 +7,18 @@ class SerializerBase(metaclass=ABCMeta): """ Adds shared functionality for all serializer implementations """ - # For deserializer _identifier: bytes - # For deserializer @property def identifier(self) -> bytes: """Get that identifier that will be used to indicate in byte streams that this class should be used for deserialization. + TODO: for user derived serialisers, this should be fixed to be the + appropriate module and class name so that it can be loaded dynamically: + a serializer shouldn't be forced to specify an _identifier unless its + trying to short-cut that path. + Returns ------- identifier : bytes diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index c51b40b3de..437180b037 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -42,8 +42,8 @@ def unregister_serializer(serializer: SerializerBase) -> None: logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}") logger.info(f"BENC: unregistering serializer {serializer}") - if serializer._identifier in deserializers: - del deserializers[serializer._identifier] + if serializer.identifier in deserializers: + del deserializers[serializer.identifier] else: logger.warning("BENC: not found in deserializers list") if serializer in methods_for_code: @@ -134,7 +134,7 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: for method in methods: try: logger.info(f"BENC: trying serializer {method}") - result = method._identifier + b'\n' + method.serialize(obj) + result = method.identifier + b'\n' + method.serialize(obj) except Exception as e: logger.warning(f"BENC: serializer {method} skipping, with exception: {e}") result = e diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py index 900aa1b494..4a874b8128 100644 --- a/parsl/tests/test_serialization/test_plugin.py +++ b/parsl/tests/test_serialization/test_plugin.py @@ -46,7 +46,7 @@ def test_const_inprocess(): assert deserialize(serialize(1)) != 1 # and that the behaviour looks like ConstSerializer - assert serialize(1) == ConstSerializer._identifier + b'\n' + B_MAGIC + assert serialize(1) == s.identifier + b'\n' + B_MAGIC assert deserialize(serialize(1)) == V_MAGIC finally: unregister_serializer(s) From e9c341465579a266234ae0b03a2866d08a688b1d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 14:16:57 +0000 Subject: [PATCH 29/39] Get rid of fixed-format _identifier on user defined serializers --- parsl/serialize/base.py | 20 +++++++++---------- parsl/serialize/concretes.py | 6 +++--- parsl/serialize/plugin_codeprotector.py | 2 -- parsl/serialize/plugin_proxystore.py | 1 - .../plugin_proxystore_deep_pickle.py | 3 --- parsl/serialize/plugin_serpent.py | 1 - parsl/tests/test_serialization/test_plugin.py | 4 ---- .../test_serialization/test_plugin_htex.py | 4 ---- 8 files changed, 13 insertions(+), 28 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 56af50c69b..84bd03ec6f 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,5 +1,5 @@ from abc import abstractmethod, ABCMeta - +from functools import cached_property from typing import Any @@ -7,23 +7,23 @@ class SerializerBase(metaclass=ABCMeta): """ Adds shared functionality for all serializer implementations """ - _identifier: bytes - - @property + @cached_property def identifier(self) -> bytes: - """Get that identifier that will be used to indicate in byte streams + """Compute the identifier that will be used to indicate in byte streams that this class should be used for deserialization. - TODO: for user derived serialisers, this should be fixed to be the - appropriate module and class name so that it can be loaded dynamically: - a serializer shouldn't be forced to specify an _identifier unless its - trying to short-cut that path. + Classes that wish to use a self-managed identifier namespace, such as + the default concretes.py implementations, should override this property + with their own identifier. Returns ------- identifier : bytes """ - return self._identifier + t = type(self) + m = bytes(t.__module__, encoding="utf-8") + c = bytes(t.__name__, encoding="utf-8") + return m + b' ' + c @abstractmethod def serialize(self, data: Any) -> bytes: diff --git a/parsl/serialize/concretes.py b/parsl/serialize/concretes.py index 56ca9d7ddc..fb0363c046 100644 --- a/parsl/serialize/concretes.py +++ b/parsl/serialize/concretes.py @@ -17,7 +17,7 @@ class PickleSerializer(SerializerBase): * closures, generators and coroutines """ - _identifier = b'01' + identifier = b'01' def serialize(self, data: Any) -> bytes: return pickle.dumps(data) @@ -38,7 +38,7 @@ class DillSerializer(SerializerBase): * closures """ - _identifier = b'02' + identifier = b'02' def serialize(self, data: Any) -> bytes: return dill.dumps(data) @@ -53,7 +53,7 @@ class DillCallableSerializer(SerializerBase): assumption that callables are immutable and so can be cached. """ - _identifier = b'C2' + identifier = b'C2' @functools.lru_cache def serialize(self, data: Any) -> bytes: diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py index e437bf1a1f..ffa348a70a 100644 --- a/parsl/serialize/plugin_codeprotector.py +++ b/parsl/serialize/plugin_codeprotector.py @@ -33,8 +33,6 @@ def reducer_override(self, o: Any) -> Any: class CodeProtectorSerializer(SerializerBase): - _identifier = b'parsl.serialize.plugin_codeprotector CodeProtectorSerializer' - _for_code = True _for_data = False diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 8659d24136..90aaea975a 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -13,7 +13,6 @@ class ProxyStoreSerializer(SerializerBase): # TODO: better enforcement of this being bytes, because that's common error i'm making # and probably it should be autogenerated... - _identifier = b'parsl.serialize.plugin_proxystore ProxyStoreSerializer' # must be class name _for_code = False _for_data = True diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index 1bfd30f131..f25e12427b 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -37,9 +37,6 @@ def reducer_override(self, o: Any) -> Any: class ProxyStoreDeepSerializer(SerializerBase): - # TODO: better enforcement of this being bytes, because that's common error i'm making - # and probably it should be autogenerated... - _identifier = b'parsl.serialize.plugin_proxystore_deep_pickle ProxyStoreDeepSerializer' # must be class name _for_code = True _for_data = True diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py index 5020267101..77ee2ff2f9 100644 --- a/parsl/serialize/plugin_serpent.py +++ b/parsl/serialize/plugin_serpent.py @@ -5,7 +5,6 @@ class SerpentSerializer(SerializerBase): - _identifier = b'parsl.serialize.plugin_serpent SerpentSerializer' _for_code = False _for_data = True diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py index 4a874b8128..f756f53a92 100644 --- a/parsl/tests/test_serialization/test_plugin.py +++ b/parsl/tests/test_serialization/test_plugin.py @@ -15,10 +15,6 @@ class ConstSerializer(SerializerBase): deserialization. """ - # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? - _identifier = b'parsl.tests.test_serializer.test_plugin ConstSerializer' - # note a space in the name here not final dot, to distinguish modules vs attribute in module - # TODO: better enforcement of the presence of these values? tied into abstract base class? def serialize(self, o): diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py index 5dcede2e05..9109617e90 100644 --- a/parsl/tests/test_serialization/test_plugin_htex.py +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -21,10 +21,6 @@ class XXXXSerializer(SerializerBase): _for_code = True _for_data = True - # TODO: should be enforcing/defaulting this to class name so that by default we can dynamically load the serializer remotely? - _identifier = b'parsl.tests.test_serialization.test_plugin_htex XXXXSerializer' - # note a space in the name here not final dot, to distinguish modules vs attribute in module - # TODO: better enforcement of the presence of these values? tied into abstract base class? def serialize(self, o): From c614387f75c1ba9d7ec5abf6ce03d82a968d72b5 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 14:21:26 +0000 Subject: [PATCH 30/39] Remove _for_data/code from demo serializers, no longer needed since PR #2831 --- parsl/serialize/plugin_codeprotector.py | 3 --- parsl/serialize/plugin_proxystore.py | 4 ---- parsl/serialize/plugin_proxystore_deep_pickle.py | 2 -- parsl/serialize/plugin_serpent.py | 2 -- parsl/tests/test_serialization/test_plugin_htex.py | 5 ----- 5 files changed, 16 deletions(-) diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py index ffa348a70a..2b1b61644b 100644 --- a/parsl/serialize/plugin_codeprotector.py +++ b/parsl/serialize/plugin_codeprotector.py @@ -33,9 +33,6 @@ def reducer_override(self, o: Any) -> Any: class CodeProtectorSerializer(SerializerBase): - _for_code = True - _for_data = False - def serialize(self, data: Any) -> bytes: f = io.BytesIO() diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 90aaea975a..431970594c 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -11,10 +11,6 @@ class ProxyStoreSerializer(SerializerBase): - # TODO: better enforcement of this being bytes, because that's common error i'm making - # and probably it should be autogenerated... - _for_code = False - _for_data = True def __init__(self, store: Optional[Store] = None) -> None: """Because of jumbled use of this class for init-time configurable diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index f25e12427b..ad139a9af2 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -37,8 +37,6 @@ def reducer_override(self, o: Any) -> Any: class ProxyStoreDeepSerializer(SerializerBase): - _for_code = True - _for_data = True def __init__(self, *, policy: Optional[Type] = None, store: Optional[Store] = None) -> None: """Because of jumbled use of this class for init-time configurable diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py index 77ee2ff2f9..0e454e5d74 100644 --- a/parsl/serialize/plugin_serpent.py +++ b/parsl/serialize/plugin_serpent.py @@ -5,8 +5,6 @@ class SerpentSerializer(SerializerBase): - _for_code = False - _for_data = True def serialize(self, data: Any) -> bytes: body = serpent.dumps(data) diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py index 9109617e90..3ddcbab748 100644 --- a/parsl/tests/test_serialization/test_plugin_htex.py +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -18,11 +18,6 @@ class XXXXSerializer(SerializerBase): """This is a test deserializer but puts some padding round to help distinguish... """ - _for_code = True - _for_data = True - - # TODO: better enforcement of the presence of these values? tied into abstract base class? - def serialize(self, o): import dill logger.error(f"BENC: XXXX serializer serializing value {o} of type {type(o)}") From 2435b37ffd8f82ffb3cae43891d51d3ff77105ec Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 14:23:57 +0000 Subject: [PATCH 31/39] Make serializer demo dependencies optional --- requirements.txt | 2 -- setup.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index f7cc0e85f4..5d6df186cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,5 +9,3 @@ requests paramiko psutil>=5.5.1 setproctitle -proxystore # for proxystore demo plugin -serpent # for serpent demo plugin diff --git a/setup.py b/setup.py index b90806bfc6..671fce253d 100755 --- a/setup.py +++ b/setup.py @@ -26,6 +26,7 @@ 'azure' : ['azure<=4', 'msrestazure'], 'workqueue': ['work_queue'], 'flux': ['pyyaml', 'cffi', 'jsonschema'], + 'serializer_demos': ['proxystore', 'serpent'], # Disabling psi-j since github direct links are not allowed by pypi # 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl'] } From 10a6937b843594c376f73421460f37ff0004eb27 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 14:37:32 +0000 Subject: [PATCH 32/39] Don't use CodeProtector by default --- parsl/serialize/facade.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 437180b037..b6356b9259 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -1,5 +1,4 @@ import logging -from parsl.serialize.plugin_codeprotector import CodeProtectorSerializer from typing import Any, Dict, List, Union import parsl.serialize.concretes as concretes @@ -68,8 +67,8 @@ def register_method_for_code(s: SerializerBase) -> None: methods_for_code.insert(0, s) -# register_method_for_code(concretes.DillCallableSerializer()) -register_method_for_code(CodeProtectorSerializer()) +register_method_for_code(concretes.DillCallableSerializer()) +# register_method_for_code(CodeProtectorSerializer()) def register_method_for_data(s: SerializerBase) -> None: From 1928b50eefb751f5b5c03a83d32f55254bfb4093 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 14:48:37 +0000 Subject: [PATCH 33/39] hack CI deps --- Makefile | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Makefile b/Makefile index 02c8aa7f9f..439320101a 100644 --- a/Makefile +++ b/Makefile @@ -26,6 +26,10 @@ virtualenv: ## create an activate a virtual env $(DEPS): test-requirements.txt requirements.txt pip3 install --upgrade pip pip3 install -r test-requirements.txt -r requirements.txt + # TODO: this is a hack to get packages installed for optional plugins for mypy + # -- a better story might be that the relevant setup.py dependencies get + # installed as a dep for mypy... + pip3 install proxystore serpent touch $(DEPS) .PHONY: deps From c77ceae6d35ab012dcd81ad40dd7e7cc6dc14b6c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 20 Jul 2023 15:35:44 +0000 Subject: [PATCH 34/39] tabs vs spaces --- Makefile | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 439320101a..6fcfe48ac4 100644 --- a/Makefile +++ b/Makefile @@ -26,10 +26,10 @@ virtualenv: ## create an activate a virtual env $(DEPS): test-requirements.txt requirements.txt pip3 install --upgrade pip pip3 install -r test-requirements.txt -r requirements.txt - # TODO: this is a hack to get packages installed for optional plugins for mypy - # -- a better story might be that the relevant setup.py dependencies get - # installed as a dep for mypy... - pip3 install proxystore serpent + # TODO: this is a hack to get packages installed for optional plugins for mypy + # -- a better story might be that the relevant setup.py dependencies get + # installed as a dep for mypy... + pip3 install proxystore serpent touch $(DEPS) .PHONY: deps From 0100f44b9abc32ffe1a0a233344c2b14b39638b2 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 21 Jul 2023 10:26:38 +0000 Subject: [PATCH 35/39] Redo serpent config so that it demonstrates how serpent on its own isn't enough for the whole test suite --- parsl/tests/test_serialization/config_serpent.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py index 5fcd0d2e4d..daf62d8c5f 100644 --- a/parsl/tests/test_serialization/config_serpent.py +++ b/parsl/tests/test_serialization/config_serpent.py @@ -22,15 +22,26 @@ from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.file_noop import NoOpFileStaging -from parsl.serialize.facade import register_method_for_data # TODO: move this into parsl.serialize root as its user exposed +from parsl.serialize.facade import methods_for_data, register_method_for_data # TODO: move this into parsl.serialize root as its user exposed from parsl.serialize.plugin_serpent import SerpentSerializer working_dir = os.getcwd() + "/" + "test_htex_alternate" +import logging + +logger = logging.getLogger(__name__) + def fresh_config(): + # get rid of the default serializers so that only serpent will be used in + # data mode. + global methods_for_data + logger.error(f"BENC: before reset, methods_for_data = {methods_for_data}") + methods_for_data.clear() + logger.error(f"BENC: after reset, methods_for_data = {methods_for_data}") register_method_for_data(SerpentSerializer()) + logger.error(f"BENC: after config, methods_for_data = {methods_for_data}") return Config( executors=[ From 1b06cfcf12b010bba7d9ee576ef2d05d32a90e55 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 11 Aug 2023 10:21:25 +0000 Subject: [PATCH 36/39] Fix CI to use better dependency installing method --- Makefile | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 8be282b131..2d2ba28324 100644 --- a/Makefile +++ b/Makefile @@ -26,10 +26,6 @@ virtualenv: ## create an activate a virtual env $(DEPS): test-requirements.txt requirements.txt pip3 install --upgrade pip pip3 install -r test-requirements.txt -r requirements.txt - # TODO: this is a hack to get packages installed for optional plugins for mypy - # -- a better story might be that the relevant setup.py dependencies get - # installed as a dep for mypy... - pip3 install proxystore serpent touch $(DEPS) .PHONY: deps @@ -78,7 +74,7 @@ wqex_local_test: $(CCTOOLS_INSTALL) ## run all tests with workqueue_ex config .PHONY: config_local_test config_local_test: - pip3 install ".[monitoring,proxystore]" + pip3 install ".[monitoring,proxystore,serializer_demos]" pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10 .PHONY: site_test From 4009ae6f11e5fb98669c124eda5f5267628c8843 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 11 Aug 2023 10:21:42 +0000 Subject: [PATCH 37/39] Fix semantic level merge conflicts from recent master build --- parsl/serialize/base.py | 3 +-- parsl/serialize/facade.py | 17 +++-------------- .../test_proxystore_configured.py | 13 +++++-------- 3 files changed, 9 insertions(+), 24 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index 8847c1ec67..c18b0acafe 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,6 +1,5 @@ -from abc import abstractmethod +from abc import abstractmethod, ABCMeta from functools import cached_property -import logging from typing import Any diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index 58fb6a9547..b94c7b9669 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -82,12 +82,6 @@ def register_method_for_data(s: SerializerBase) -> None: register_method_for_data(concretes.DillSerializer()) -# When deserialize dynamically loads a deserializer, it will be stored here, -# rather than in the methods_for_* dictionaries, so that loading does not -# cause it to be used for future serializations. -additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {} - - def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: """Serialize and pack function and parameters @@ -171,12 +165,8 @@ def deserialize(payload: bytes) -> Any: """ header, body = payload.split(b'\n', 1) - if header in methods_for_code: - deserializer = methods_for_code[header] - elif header in methods_for_data: - deserializer = methods_for_data[header] - elif header in additional_methods_for_deserialization: - deserializer = additional_methods_for_deserialization[header] + if header in deserializers: + deserializer = deserializers[header] else: logger.info("Trying to dynamically load deserializer: {!r}".format(header)) # This is a user plugin point, so expect exceptions to happen. @@ -186,13 +176,12 @@ def deserialize(payload: bytes) -> Any: module = importlib.import_module(decoded_module_name) deserializer_class = getattr(module, class_name.decode('utf-8')) deserializer = deserializer_class() - additional_methods_for_deserialization[header] = deserializer + deserializers[header] = deserializer except Exception as e: raise DeserializerPluginError(header) from e result = deserializer.deserialize(body) - # raise TypeError("Invalid serialization header: {!r}".format(header)) return result diff --git a/parsl/tests/test_serialization/test_proxystore_configured.py b/parsl/tests/test_serialization/test_proxystore_configured.py index 04486a62ce..47dfa7c44c 100644 --- a/parsl/tests/test_serialization/test_proxystore_configured.py +++ b/parsl/tests/test_serialization/test_proxystore_configured.py @@ -3,7 +3,7 @@ import uuid import parsl -from parsl.serialize.facade import additional_methods_for_deserialization, methods_for_data, register_method_for_data +from parsl.serialize.facade import methods_for_data, register_method_for_data, deserializers from parsl.tests.configs.htex_local import fresh_config @@ -11,6 +11,7 @@ def local_setup(): + global s from parsl.serialize.proxystore import ProxyStoreSerializer from proxystore.store import Store, register_store from proxystore.connectors.file import FileConnector @@ -25,11 +26,6 @@ def local_setup(): global previous_methods previous_methods = methods_for_data.copy() - # get rid of all data serialization methods, in preparation for using only - # proxystore. put all the old methods as additional methods used only for - # deserialization, because those will be needed to deserialize the results, - # which will be serialized using the default serializer set. - additional_methods_for_deserialization.update(previous_methods) methods_for_data.clear() register_method_for_data(s) @@ -37,13 +33,14 @@ def local_setup(): def local_teardown(): + global s parsl.dfk().cleanup() parsl.clear() methods_for_data.clear() - methods_for_data.update(previous_methods) + methods_for_data.extend(previous_methods) - additional_methods_for_deserialization.clear() + del deserializers[s.identifier] @parsl.python_app From 0b7921b616cd4810b64414c41e4b9adb9d82ce3d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 11 Aug 2023 10:25:57 +0000 Subject: [PATCH 38/39] Move a comment into the right place --- parsl/serialize/facade.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index b94c7b9669..1eb424e160 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -8,11 +8,6 @@ logger = logging.getLogger(__name__) -# These must be registered in reverse order of -# importance: later registered serializers -# will take priority over earlier ones. This is -# to facilitate user registered serializers - methods_for_code: List[SerializerBase] methods_for_code = [] @@ -78,6 +73,11 @@ def register_method_for_data(s: SerializerBase) -> None: methods_for_data.insert(0, s) +# These must be registered in reverse order of +# importance: later registered serializers +# will take priority over earlier ones. This is +# to facilitate user registered serializers + register_method_for_data(concretes.PickleSerializer()) register_method_for_data(concretes.DillSerializer()) From 4631448accda7e0720b4176da3d36f8eb95c774c Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 2 Jul 2024 19:55:07 +0000 Subject: [PATCH 39/39] fix: flake8 mypy isort --- parsl/serialize/base.py | 3 +-- parsl/serialize/plugin_codeprotector.py | 10 ++++---- parsl/serialize/plugin_proxystore.py | 11 ++++----- .../plugin_proxystore_deep_pickle.py | 17 +++++-------- parsl/serialize/plugin_serpent.py | 5 ++-- .../test_serialization/config_proxystore.py | 17 ++++++------- .../test_serialization/config_serpent.py | 24 +++++++++---------- .../test_htex_code_cache.py | 6 ++--- parsl/tests/test_serialization/test_plugin.py | 7 +++++- .../test_serialization/test_plugin_htex.py | 13 ++++++---- .../test_serialization/test_proxystore.py | 1 + .../test_proxystore_configured.py | 3 +-- .../test_proxystore_deep_pickle_htex.py | 18 +++++++++----- .../test_proxystore_htex.py | 14 +++++++---- .../test_serialization/test_serpent_htex.py | 14 +++++++---- 15 files changed, 88 insertions(+), 75 deletions(-) diff --git a/parsl/serialize/base.py b/parsl/serialize/base.py index f65b4b6a8d..577af37d82 100644 --- a/parsl/serialize/base.py +++ b/parsl/serialize/base.py @@ -1,5 +1,4 @@ -import logging -from abc import abstractmethod, ABCMeta +from abc import ABCMeta, abstractmethod from functools import cached_property from typing import Any diff --git a/parsl/serialize/plugin_codeprotector.py b/parsl/serialize/plugin_codeprotector.py index 2b1b61644b..75f6966f22 100644 --- a/parsl/serialize/plugin_codeprotector.py +++ b/parsl/serialize/plugin_codeprotector.py @@ -1,12 +1,12 @@ -from parsl.serialize.base import SerializerBase - -from typing import Any - -import dill import io import logging import sys import types +from typing import Any + +import dill + +from parsl.serialize.base import SerializerBase logger = logging.getLogger(__name__) diff --git a/parsl/serialize/plugin_proxystore.py b/parsl/serialize/plugin_proxystore.py index 431970594c..ea76de8ec1 100644 --- a/parsl/serialize/plugin_proxystore.py +++ b/parsl/serialize/plugin_proxystore.py @@ -1,13 +1,12 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] -from proxystore.store import Store, register_store +import pickle +from typing import Any, Optional + from proxystore.connectors.file import FileConnector -from parsl.serialize.facade import register_method_for_data +from proxystore.store import Store, register_store from parsl.serialize.base import SerializerBase - -from typing import Any, Optional - -import pickle +from parsl.serialize.facade import register_method_for_data class ProxyStoreSerializer(SerializerBase): diff --git a/parsl/serialize/plugin_proxystore_deep_pickle.py b/parsl/serialize/plugin_proxystore_deep_pickle.py index ad139a9af2..45a2c9d43f 100644 --- a/parsl/serialize/plugin_proxystore_deep_pickle.py +++ b/parsl/serialize/plugin_proxystore_deep_pickle.py @@ -1,18 +1,13 @@ # parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined] -from proxystore.store import Store, register_store -from proxystore.connectors.file import FileConnector - -from parsl.serialize.base import SerializerBase - -from typing import Any, Optional - -import dill - import io - import logging +from typing import Any, Optional, Type -from typing import Type +import dill +from proxystore.connectors.file import FileConnector +from proxystore.store import Store, register_store + +from parsl.serialize.base import SerializerBase logger = logging.getLogger(__name__) diff --git a/parsl/serialize/plugin_serpent.py b/parsl/serialize/plugin_serpent.py index 0e454e5d74..338dc91cac 100644 --- a/parsl/serialize/plugin_serpent.py +++ b/parsl/serialize/plugin_serpent.py @@ -1,7 +1,8 @@ -from parsl.serialize.base import SerializerBase +from typing import Any + import serpent -from typing import Any +from parsl.serialize.base import SerializerBase class SerpentSerializer(SerializerBase): diff --git a/parsl/tests/test_serialization/config_proxystore.py b/parsl/tests/test_serialization/config_proxystore.py index b6536409d0..4752b2800d 100644 --- a/parsl/tests/test_serialization/config_proxystore.py +++ b/parsl/tests/test_serialization/config_proxystore.py @@ -5,22 +5,19 @@ will not see the default serializer environment... """ -# imports for monitoring: -from parsl.monitoring import MonitoringHub - import os -from parsl.providers import LocalProvider from parsl.channels import LocalChannel -from parsl.launchers import SingleNodeLauncher - from parsl.config import Config +from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging from parsl.executors import HighThroughputExecutor +from parsl.launchers import SingleNodeLauncher - -from parsl.data_provider.http import HTTPInTaskStaging -from parsl.data_provider.ftp import FTPInTaskStaging -from parsl.data_provider.file_noop import NoOpFileStaging +# imports for monitoring: +from parsl.monitoring import MonitoringHub +from parsl.providers import LocalProvider working_dir = os.getcwd() + "/" + "test_htex_alternate" diff --git a/parsl/tests/test_serialization/config_serpent.py b/parsl/tests/test_serialization/config_serpent.py index daf62d8c5f..36a9e40035 100644 --- a/parsl/tests/test_serialization/config_serpent.py +++ b/parsl/tests/test_serialization/config_serpent.py @@ -5,25 +5,23 @@ will not see the default serializer environment... """ -# imports for monitoring: -from parsl.monitoring import MonitoringHub - import os -from parsl.providers import LocalProvider from parsl.channels import LocalChannel -from parsl.launchers import SingleNodeLauncher - from parsl.config import Config -from parsl.executors import HighThroughputExecutor - - -from parsl.data_provider.http import HTTPInTaskStaging -from parsl.data_provider.ftp import FTPInTaskStaging from parsl.data_provider.file_noop import NoOpFileStaging +from parsl.data_provider.ftp import FTPInTaskStaging +from parsl.data_provider.http import HTTPInTaskStaging +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SingleNodeLauncher -from parsl.serialize.facade import methods_for_data, register_method_for_data # TODO: move this into parsl.serialize root as its user exposed - +# imports for monitoring: +from parsl.monitoring import MonitoringHub +from parsl.providers import LocalProvider +from parsl.serialize.facade import ( # TODO: move this into parsl.serialize root as its user exposed + methods_for_data, + register_method_for_data, +) from parsl.serialize.plugin_serpent import SerpentSerializer working_dir = os.getcwd() + "/" + "test_htex_alternate" diff --git a/parsl/tests/test_serialization/test_htex_code_cache.py b/parsl/tests/test_serialization/test_htex_code_cache.py index 65cb72527a..85c2348368 100644 --- a/parsl/tests/test_serialization/test_htex_code_cache.py +++ b/parsl/tests/test_serialization/test_htex_code_cache.py @@ -17,9 +17,9 @@ def test_caching() -> None: # for future serializer devs: if this is failing because you added another # code serializer, you'll also probably need to re-think what is being tested # about serialization caching here. - assert len(methods_for_code) == 1 - - serializer = methods_for_code[b'C2'] + assert len(methods_for_code) == 1, "This test expects one default code serializer" + serializer = methods_for_code[0] + assert serializer.identifier == b'C2', "This test expects the default code serializer to have identifier C2" # force type to Any here because a serializer method coming from # methods_for_code doesn't statically have any cache management diff --git a/parsl/tests/test_serialization/test_plugin.py b/parsl/tests/test_serialization/test_plugin.py index f756f53a92..e2099299bd 100644 --- a/parsl/tests/test_serialization/test_plugin.py +++ b/parsl/tests/test_serialization/test_plugin.py @@ -3,7 +3,12 @@ import pytest from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) B_MAGIC = b'3626874628368432' # arbitrary const bytestring V_MAGIC = 777 # arbitrary const object diff --git a/parsl/tests/test_serialization/test_plugin_htex.py b/parsl/tests/test_serialization/test_plugin_htex.py index 3ddcbab748..8f8fe46b04 100644 --- a/parsl/tests/test_serialization/test_plugin_htex.py +++ b/parsl/tests/test_serialization/test_plugin_htex.py @@ -1,12 +1,17 @@ # test the serializer plugin API import logging -import pytest -import parsl -from parsl.tests.configs.htex_local import fresh_config as local_config +import pytest +import parsl from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.tests.configs.htex_local import fresh_config as local_config logger = logging.getLogger(__name__) diff --git a/parsl/tests/test_serialization/test_proxystore.py b/parsl/tests/test_serialization/test_proxystore.py index 08813c60db..4aa59800c6 100644 --- a/parsl/tests/test_serialization/test_proxystore.py +++ b/parsl/tests/test_serialization/test_proxystore.py @@ -1,4 +1,5 @@ import pytest + from parsl.serialize.plugin_proxystore import create_proxystore_serializer diff --git a/parsl/tests/test_serialization/test_proxystore_configured.py b/parsl/tests/test_serialization/test_proxystore_configured.py index d0b43390f3..6e51729c77 100644 --- a/parsl/tests/test_serialization/test_proxystore_configured.py +++ b/parsl/tests/test_serialization/test_proxystore_configured.py @@ -5,8 +5,7 @@ import parsl from parsl.serialize.facade import ( - additional_methods_for_deserialization, - deserializers + deserializers, methods_for_data, register_method_for_data, ) diff --git a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py index 5a69ce4430..a454d3f44e 100644 --- a/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_deep_pickle_htex.py @@ -1,14 +1,20 @@ # test the serializer plugin API import logging -import pytest -import parsl -from parsl.tests.configs.htex_local import fresh_config as local_config +import pytest +import parsl from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer - -from parsl.serialize.plugin_proxystore_deep_pickle import create_deep_proxystore_serializer +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) +from parsl.serialize.plugin_proxystore_deep_pickle import ( + create_deep_proxystore_serializer, +) +from parsl.tests.configs.htex_local import fresh_config as local_config logger = logging.getLogger(__name__) diff --git a/parsl/tests/test_serialization/test_proxystore_htex.py b/parsl/tests/test_serialization/test_proxystore_htex.py index 18a60e0774..9f71ea3b85 100644 --- a/parsl/tests/test_serialization/test_proxystore_htex.py +++ b/parsl/tests/test_serialization/test_proxystore_htex.py @@ -1,14 +1,18 @@ # test the serializer plugin API import logging -import pytest -import parsl -from parsl.tests.configs.htex_local import fresh_config as local_config +import pytest +import parsl from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer - +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) from parsl.serialize.plugin_proxystore import create_proxystore_serializer +from parsl.tests.configs.htex_local import fresh_config as local_config logger = logging.getLogger(__name__) diff --git a/parsl/tests/test_serialization/test_serpent_htex.py b/parsl/tests/test_serialization/test_serpent_htex.py index 54377aa94b..395f6dc8de 100644 --- a/parsl/tests/test_serialization/test_serpent_htex.py +++ b/parsl/tests/test_serialization/test_serpent_htex.py @@ -1,14 +1,18 @@ # test the serializer plugin API import logging -import pytest -import parsl -from parsl.tests.configs.htex_local import fresh_config as local_config +import pytest +import parsl from parsl.serialize.base import SerializerBase -from parsl.serialize.facade import serialize, deserialize, register_method_for_data, unregister_serializer - +from parsl.serialize.facade import ( + deserialize, + register_method_for_data, + serialize, + unregister_serializer, +) from parsl.serialize.plugin_serpent import SerpentSerializer +from parsl.tests.configs.htex_local import fresh_config as local_config logger = logging.getLogger(__name__)