Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[not for merge] Parsl serializer plugins development #2718

Draft
wants to merge 87 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
6932e96
Passes many tests but fails at test_memoize.py
benclifford May 19, 2023
008bff9
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford May 19, 2023
5e84f9f
fix flake8, mypy
benclifford May 19, 2023
af42d3a
install proxystore
benclifford May 19, 2023
f29054e
Can't use pluggable serializers for memoization... i) they aren't req…
benclifford May 19, 2023
0acdbcf
fix flake8
benclifford May 19, 2023
df037c5
Merge branch 'master' into benc-prototype-proxystore
benclifford May 23, 2023
b15565c
Isolate one test from parsl/tests/test_python_apps/test_memoize_bad_i…
benclifford May 23, 2023
c075a36
I think proxystore cannot store objects of type None
benclifford May 23, 2023
8602853
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford May 23, 2023
9a09716
Merge branch 'master' into benc-prototype-proxystore
benclifford Jun 19, 2023
501b065
Remove unused internal serializer _list_methods method
benclifford Jun 30, 2023
66920e7
Add some basic unit tests of serializer components
benclifford Jun 30, 2023
035bf98
Add __init__
benclifford Jun 30, 2023
a039bb3
Parse serialization header only once
benclifford Jun 29, 2023
3e7f2fa
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Jul 2, 2023
ec9438b
Merge remote-tracking branch 'origin/benc-serialization-headers' into…
benclifford Jul 2, 2023
ac83db9
Merge remote-tracking branch 'origin/benc-serializer-remove-_list_met…
benclifford Jul 2, 2023
e40682a
Merge remote-tracking branch 'origin/benc-serializer-tests' into benc…
benclifford Jul 2, 2023
8254d95
Fix up proxystore serializer for recent changes, add a test case
benclifford Jul 2, 2023
26da368
Disable mypy for proxystore connectors
benclifford Jul 2, 2023
73702ba
WIP remove enable_caching
benclifford Jul 2, 2023
4900695
remove unused headers - port from PR #2784
benclifford Jul 2, 2023
b37efc7
fix linting
benclifford Jul 2, 2023
bdf0a86
Merge branch 'master' into benc-prototype-proxystore
benclifford Jul 4, 2023
4ea2e18
serializer plugin API
benclifford Jul 4, 2023
35f6a19
Merge remote-tracking branch 'refs/remotes/origin/benc-prototype-prox…
benclifford Jul 10, 2023
59bfd02
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Jul 10, 2023
71c2879
Remove old proxystore serializer impl
benclifford Jul 10, 2023
c9cbda9
Fixup mypy
benclifford Jul 10, 2023
fdef4ed
Use repr not str, for less confusion in log message
benclifford Jul 10, 2023
299924f
fix mypy with newer proxystore
benclifford Jul 10, 2023
1fd4098
remove mis-added test
benclifford Jul 10, 2023
f0d3af1
Add UUID to store name
benclifford Jul 10, 2023
1ad9c87
fix markdown quoting and remove a not-properly-written paragraph
benclifford Jul 10, 2023
b0184d8
Merge branch 'master' into benc-prototype-proxystore
benclifford Jul 18, 2023
df159fc
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Jul 18, 2023
c276fe1
Fix (functional) merge conflict with PR #2815 which removed PickleCal…
benclifford Jul 18, 2023
282086c
Add codeprotector version checking hook prototype
benclifford Jul 18, 2023
759ff4d
fix linting
benclifford Jul 18, 2023
2e8bdea
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Jul 19, 2023
ed759dc
Fix clear serializers behaviour after master merge
benclifford Jul 20, 2023
79df583
Get rid of some uses of internal _identifier attribute, forcing use v…
benclifford Jul 20, 2023
e9c3414
Get rid of fixed-format _identifier on user defined serializers
benclifford Jul 20, 2023
c614387
Remove _for_data/code from demo serializers, no longer needed since P…
benclifford Jul 20, 2023
2435b37
Make serializer demo dependencies optional
benclifford Jul 20, 2023
10a6937
Don't use CodeProtector by default
benclifford Jul 20, 2023
1928b50
hack CI deps
benclifford Jul 20, 2023
c77ceae
tabs vs spaces
benclifford Jul 20, 2023
98f9531
Merge branch 'master' into benc-prototype-proxystore
benclifford Jul 21, 2023
0100f44
Redo serpent config so that it demonstrates how serpent on its own is…
benclifford Jul 21, 2023
6e9a730
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Aug 11, 2023
1b06cfc
Fix CI to use better dependency installing method
benclifford Aug 11, 2023
4009ae6
Fix semantic level merge conflicts from recent master build
benclifford Aug 11, 2023
0b7921b
Move a comment into the right place
benclifford Aug 11, 2023
a1e0e83
Merge branch 'master' into benc-prototype-proxystore
benclifford Sep 19, 2023
db57df6
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Sep 26, 2023
5970def
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Nov 1, 2023
c8ee71d
Merge commit 'origin/master~300' into benc-prototype-proxystore
benclifford Jul 2, 2024
7d79e98
Merge commit 'origin/master~200' into benc-prototype-proxystore
benclifford Jul 2, 2024
efa4c0f
Merge commit 'origin/master~190' into benc-prototype-proxystore
benclifford Jul 2, 2024
ef61622
Merge commit 'origin/master~180' into benc-prototype-proxystore
benclifford Jul 2, 2024
58730a0
Merge commit 'origin/master~170' into benc-prototype-proxystore
benclifford Jul 2, 2024
01a4b1f
Merge commit 'origin/master~160' into benc-prototype-proxystore
benclifford Jul 2, 2024
15ea8dc
Merge commit 'origin/master~150' into benc-prototype-proxystore
benclifford Jul 2, 2024
187c97f
Merge commit 'origin/master~140' into benc-prototype-proxystore
benclifford Jul 2, 2024
af14a02
Merge commit 'origin/master~130' into benc-prototype-proxystore
benclifford Jul 2, 2024
0817b4d
Merge commit 'origin/master~120' into benc-prototype-proxystore
benclifford Jul 2, 2024
4c02802
Merge commit 'origin/master~119' into benc-prototype-proxystore
benclifford Jul 2, 2024
d412c73
Merge commit 'origin/master~118' into benc-prototype-proxystore
benclifford Jul 2, 2024
17ec3a6
Merge commit 'origin/master~117' into benc-prototype-proxystore
benclifford Jul 2, 2024
9a4d97c
Merge commit 'origin/master~116' into benc-prototype-proxystore
benclifford Jul 2, 2024
1644457
Merge commit 'origin/master~115' into benc-prototype-proxystore
benclifford Jul 2, 2024
8b68005
Merge commit 'origin/master~114' into benc-prototype-proxystore
benclifford Jul 2, 2024
35ef6ed
Merge commit 'origin/master~100' into benc-prototype-proxystore
benclifford Jul 2, 2024
35977a0
Merge commit 'origin/master~50' into benc-prototype-proxystore
benclifford Jul 2, 2024
3aa6ec8
Merge commit 'origin/master~40' into benc-prototype-proxystore
benclifford Jul 2, 2024
d7058ab
Merge commit 'origin/master~39' into benc-prototype-proxystore
benclifford Jul 2, 2024
97f1c3b
Merge commit 'origin/master~38' into benc-prototype-proxystore
benclifford Jul 2, 2024
6d3184e
Merge commit 'origin/master~37' into benc-prototype-proxystore
benclifford Jul 2, 2024
f21b38b
Merge commit 'origin/master~30' into benc-prototype-proxystore
benclifford Jul 2, 2024
e54616c
Merge commit 'origin/master~29' into benc-prototype-proxystore
benclifford Jul 2, 2024
4ee30b9
Merge commit 'origin/master~28' into benc-prototype-proxystore
benclifford Jul 2, 2024
2fbb883
Merge commit 'origin/master~27' into benc-prototype-proxystore
benclifford Jul 2, 2024
76152b8
Merge commit 'origin/master~26' into benc-prototype-proxystore
benclifford Jul 2, 2024
33dd69e
Merge remote-tracking branch 'origin/master' into benc-prototype-prox…
benclifford Jul 2, 2024
4631448
fix: flake8 mypy isort
benclifford Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ radical_local_test:

.PHONY: config_local_test
config_local_test: $(CCTOOLS_INSTALL)
pip3 install ".[monitoring,visualization,proxystore]"
pip3 install ".[monitoring,visualization,proxystore,serializer_demos]"
PYTHONPATH=/tmp/cctools/lib/python3.8/site-packages pytest parsl/tests/ -k "not cleannet" --config local --random-order --durations 10

.PHONY: site_test
Expand Down
14 changes: 14 additions & 0 deletions docs/userguide/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ from invoking a Parsl app. This includes as the return value of a
An specific example of this is integrating Globus Compute tasks into a Parsl
task graph. See :ref:`label-join-globus-compute`


Serialization
-------------

By default Parsl will serialize objects with either ``pickle`` or ``dill``, in
some cases applying an LRU cache.

Parsl has an unstable API to register new serialization methods

Additional serialization methods can be registered by XXXXX TODO XXXXX

Limitations:
TODO

Dependency resolution
---------------------

Expand Down
15 changes: 15 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ check_untyped_defs = True
disallow_subclassing_any = True
disallow_untyped_defs = True

[mypy-parsl.serialize.plugin_proxystore_deep_pickle.*]
disallow_subclassing_any = False

[mypy-parsl.serialize.plugin_codeprotector.*]
disallow_subclassing_any = False

[mypy-parsl.serialize.proxystore.*]
# parsl/serialize/proxystore.py:9: error: Class cannot subclass "Pickler" (has type "Any")
disallow_subclassing_any = False
Expand Down Expand Up @@ -183,6 +189,9 @@ ignore_missing_imports = True
#[mypy-multiprocessing.synchronization.*]
#ignore_missing_imports = True

[mypy-proxystore.connectors.file]
ignore_missing_imports = True

[mypy-pandas.*]
ignore_missing_imports = True

Expand All @@ -204,5 +213,11 @@ ignore_missing_imports = True
[mypy-setproctitle.*]
ignore_missing_imports = True


# for serializer plugin demos:

[mypy-serpent.*]
ignore_missing_imports = True

[mypy-proxystore.*]
ignore_missing_imports = True
3 changes: 3 additions & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ def _queue_management_worker(self):

if 'result' in msg:
result = deserialize(msg['result'])
# ^ if this raises an exception, queue management worker fails and
# parsl hangs. this is more relevant now allowing user-pluggable
# serialization and so user code exceptions can be raised here.
task_fut.set_result(result)

elif 'exception' in msg:
Expand Down
7 changes: 2 additions & 5 deletions parsl/serialize/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import logging
from abc import abstractmethod
from abc import ABCMeta, abstractmethod
from functools import cached_property
from typing import Any

logger = logging.getLogger(__name__)


class SerializerBase:
class SerializerBase(metaclass=ABCMeta):
""" Adds shared functionality for all serializer implementations
"""

Expand Down
97 changes: 75 additions & 22 deletions parsl/serialize/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,80 @@

logger = logging.getLogger(__name__)

methods_for_code: List[SerializerBase]
methods_for_code = []

methods_for_data: List[SerializerBase]
methods_for_data = []

deserializers: Dict[bytes, SerializerBase]
deserializers = {}


# maybe it's weird to want to clear away all serializers rather than just the
# data or just code ones? eg in proxystore test, clearing serializers means
# there is no code serializer... so just data sreializer should be cleared
# (or if only having one kind of serializer, no code/data distinction, then
# this is more consistent, but clearing serializer is still a bit weird in
# that case (maybe for security?) - with inserting one near the start more
# usual?
# def clear_serializers() -> None:
# # does not clear deserializers because remote sending back will have a
# # different serializer list (and wants to send back results with one of
# # the default 4) so clearing all the deserializers means we cannot receive
# # results...
# global methods_for_data, methods_for_code
# methods_for_code = []
# methods_for_data = []


def unregister_serializer(serializer: SerializerBase) -> None:
logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}")
logger.info(f"BENC: unregistering serializer {serializer}")
if serializer.identifier in deserializers:
del deserializers[serializer.identifier]
else:
logger.warning("BENC: not found in deserializers list")
if serializer in methods_for_code:
logger.info("BENC: removing serializer from methods_for_code")
methods_for_code.remove(serializer)
else:
logger.warning("BENC: not found in methods for code")
if serializer in methods_for_data:
logger.info("BENC: removing serializer from methods_for_data")
methods_for_data.remove(serializer)
else:
logger.warning("BENC: not found in methods for data")

methods_for_code: Dict[bytes, SerializerBase] = {}

# structuring it this way is probably wrong - should perhaps be a single
# Pickle-variant (or dill-variant) that is used, with all pluggable hooked
# in - eg proxystore should hook into the same Pickle/Dill subclass as
# CodeProtector?

def register_method_for_code(s: SerializerBase) -> None:
methods_for_code[s.identifier] = s
deserializers[s.identifier] = s
methods_for_code.insert(0, s)


register_method_for_code(concretes.DillCallableSerializer())


methods_for_data: Dict[bytes, SerializerBase] = {}
# register_method_for_code(CodeProtectorSerializer())


def register_method_for_data(s: SerializerBase) -> None:
methods_for_data[s.identifier] = s
deserializers[s.identifier] = s
methods_for_data.insert(0, s)


# These must be registered in reverse order of
# importance: later registered serializers
# will take priority over earlier ones. This is
# to facilitate user registered serializers

register_method_for_data(concretes.PickleSerializer())
register_method_for_data(concretes.DillSerializer())


# When deserialize dynamically loads a deserializer, it will be stored here,
# rather than in the methods_for_* dictionaries, so that loading does not
# cause it to be used for future serializations.
additional_methods_for_deserialization: Dict[bytes, SerializerBase] = {}


def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes:
"""Serialize and pack function and parameters

Expand Down Expand Up @@ -106,24 +152,35 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes:
Individual serialization methods might raise a TypeError (eg. if objects are non serializable)
This method will raise the exception from the last method that was tried, if all methods fail.
"""
result: Union[bytes, Exception]
logger.info(f"BENC: Trying to serialize {obj}")
result: Union[bytes, Exception, None]
result = None
if callable(obj):
methods = methods_for_code
else:
methods = methods_for_data

for method in methods.values():
if methods == []:
raise RuntimeError("There are no configured serializers")

for method in methods:
try:
logger.info(f"BENC: trying serializer {method}")
result = method.identifier + b'\n' + method.serialize(obj)
except Exception as e:
logger.warning(f"BENC: serializer {method} skipping, with exception: {e}")
result = e
continue
else:
break

if isinstance(result, BaseException):
if result is None:
raise RuntimeError("No serializer returned a result")
elif isinstance(result, BaseException):
logger.error("Serializer returned an excepton, reraise")
raise result
else:
logger.debug("Serialization complete")
if len(result) > buffer_threshold:
logger.warning(f"Serialized object exceeds buffer threshold of {buffer_threshold} bytes, this could cause overflows")
return result
Expand All @@ -139,12 +196,8 @@ def deserialize(payload: bytes) -> Any:
"""
header, body = payload.split(b'\n', 1)

if header in methods_for_code:
deserializer = methods_for_code[header]
elif header in methods_for_data:
deserializer = methods_for_data[header]
elif header in additional_methods_for_deserialization:
deserializer = additional_methods_for_deserialization[header]
if header in deserializers:
deserializer = deserializers[header]
else:
logger.info("Trying to dynamically load deserializer: {!r}".format(header))
# This is a user plugin point, so expect exceptions to happen.
Expand All @@ -154,7 +207,7 @@ def deserialize(payload: bytes) -> Any:
module = importlib.import_module(decoded_module_name)
deserializer_class = getattr(module, class_name.decode('utf-8'))
deserializer = deserializer_class()
additional_methods_for_deserialization[header] = deserializer
deserializers[header] = deserializer
except Exception as e:
raise DeserializerPluginError(header) from e

Expand Down
44 changes: 44 additions & 0 deletions parsl/serialize/plugin_codeprotector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import io
import logging
import sys
import types
from typing import Any

import dill

from parsl.serialize.base import SerializerBase

logger = logging.getLogger(__name__)


def _deprotectCode(major: int, minor: int, b: bytes) -> Any:
if sys.version_info.major != major:
raise RuntimeError("Major version mismatch deserializing code")
if sys.version_info.minor != minor:
raise RuntimeError("Major version mismatch deserializing code")

return dill.loads(b)


class CodeProtectorPickler(dill.Pickler):

def reducer_override(self, o: Any) -> Any:
logger.info(f"BENC: reducing object {o!r} of type {type(o)}")
if isinstance(o, types.CodeType):
logger.info(f"BENC: special casing code object {o!r} of type {type(o)}")
return (_deprotectCode, (sys.version_info.major, sys.version_info.minor, dill.dumps(o)))

return NotImplemented


class CodeProtectorSerializer(SerializerBase):

def serialize(self, data: Any) -> bytes:

f = io.BytesIO()
pickler = CodeProtectorPickler(file=f)
pickler.dump(data)
return f.getvalue()

def deserialize(self, body: bytes) -> Any:
return dill.loads(body)
44 changes: 44 additions & 0 deletions parsl/serialize/plugin_proxystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined]
import pickle
from typing import Any, Optional

from proxystore.connectors.file import FileConnector
from proxystore.store import Store, register_store

from parsl.serialize.base import SerializerBase
from parsl.serialize.facade import register_method_for_data


class ProxyStoreSerializer(SerializerBase):

def __init__(self, store: Optional[Store] = None) -> None:
"""Because of jumbled use of this class for init-time configurable
serialization, and non-configurable remote deserializer loading, the
store field can be None... TODO: this would go away if serializer and
deserializer were split into different objects/classes/functions."""
self._store = store

def serialize(self, data: Any) -> bytes:
assert self._store is not None
assert data is not None
p = self._store.proxy(data)
return pickle.dumps(p)

def deserialize(self, body: bytes) -> Any:
return pickle.loads(body)


def register_proxystore_serializer() -> None:
"""Initializes proxystore and registers it as a serializer with parsl"""
serializer = create_proxystore_serializer()
register_method_for_data(serializer)


def create_proxystore_serializer() -> ProxyStoreSerializer:
"""Creates a serializer but does not register with global system - so this
can be used in testing."""

import uuid
store = Store(name='parsl_store_' + str(uuid.uuid4()), connector=FileConnector(store_dir="/tmp"))
register_store(store)
return ProxyStoreSerializer(store)
Loading
Loading