Skip to content

Commit

Permalink
serializer plugin API
Browse files Browse the repository at this point in the history
TODO: (broken) make a test that runs using a custom serializer to an htex worker... to check that remote processes can load a custom serializer.

prior to this PR, serializer registration happened in order of class definiton, which in the case of a single concretes file, is the order of the serializers inside that file.

this does not work well when trying to define other serializers in other files, because the order of import of files can change subtly and because of distant effects.

so doing serialiser registration in a subclass hook is not a very nice thing to do here...

this PR should move to a model which defines the default de*serializers explicitly, and allows the user to modify that list explicitly too, to add in new de*serializers.

limitations:

this is part of work needed to allow serializer plugins, but not all of it, as it does not define how a remote worker will discover plugged-in serializers - for that, perhaps, the header should become something importable? instead of a registry-based ID bytes string.
contrasting with that: proxystore doesn't need its own deserializer! it can use the existing pickle/picklecode deserialisers, because what it generates is a pickled object... and that's a legitimate use case... which suggests that serializers don't need to be indexed by "serializer" ID at all on the serializing side, but only on the deserializing side...

this API also will not easily allow serializer methods to be defined for values returned from workers to the submit side: the expected place of registration, near the start of the user workflow, does not have a corresponding place on the worker side.

this PR separates out serialization behaviour (object -> bytestream) from deserialization
behaviour: (identifier, bytestream) -> object, because serializer might not (and in this
prototype, *cannot*) implement custom deserialization... due to non-registration on the
remote side, and instead can only generate dill/pickle targeted bytestreams


the motivating use case for this basic serializer plugin API is supporting proxystore, prototyped in
#2718
https://labs.globus.org/projects/proxystore.html

this PR could also contain an optional proxystore plugin that would need to be activated by a user

loading remote serialisers via importlib is a lot of stuff... perhaps i should... just pickle the deserializer and send it (and let pickle deal with that?)
- i guess there's two paths here: one is loading remote deserialisers, and the other is focusing on the proxystore use case, which, differently, wants access to the pickle deserializer remotely - and so that drives clearer separation of serializers vs deserialisers... that 2nd case is probably waht i should concentrate on...

right now this does module loading in order to work with the class based serializer design inherited from before... but maybe that's not how it should end up...

perhaps just serializer callables and deserializer callables? or based around modules?

it seems nice to be able to configure the serializer (aka make a class or a partially applied callable) for configuring proxystore... but unclear how that would work for remote end configuration.

TODO: as an example plug in, also show 'serpent' as a serialiser/deserialiser pair
  • Loading branch information
benclifford committed Jul 5, 2023
1 parent a306bb9 commit 031ce8e
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 37 deletions.
16 changes: 16 additions & 0 deletions docs/userguide/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,19 @@ from invoking a Parsl app. This includes as the return value of a

An specific example of this is integrating Globus Compute tasks into a Parsl
task graph. See :ref:`label-join-globus-compute`


Serialization
-------------

By default Parsl will serialize objects with either `pickle` or `dill`, in
some cases applying an LRU cache.

Parsl has an unstable API to register new serialization methods

Additional serialization methods can be registered by XXXXX TODO XXXXX

Limitations:
these mechanisms are not registered on the remote side, so custom
deserialization cannot be implemented -- instead, custom serializers can
only generate byte streams to be deserialized by pickle or dill
9 changes: 9 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,12 @@ ignore_missing_imports = True

[mypy-setproctitle.*]
ignore_missing_imports = True


# for serializer plugin demos:

[mypy-serpent.*]
ignore_missing_imports = True

[mypy-proxystore.connectors.file]
ignore_missing_imports = True
26 changes: 6 additions & 20 deletions parsl/serialize/base.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,20 @@
from abc import abstractmethod
import logging
from abc import abstractmethod, ABCMeta

from typing import Any

logger = logging.getLogger(__name__)

# GLOBALS
METHODS_MAP_CODE = {}
METHODS_MAP_DATA = {}


class SerializerBase:
class SerializerBase(metaclass=ABCMeta):
""" Adds shared functionality for all serializer implementations
"""

def __init_subclass__(cls, **kwargs: Any) -> None:
""" This forces all child classes to register themselves as
methods for serializing code or data
"""
super().__init_subclass__(**kwargs)

if cls._for_code:
METHODS_MAP_CODE[cls._identifier] = cls
if cls._for_data:
METHODS_MAP_DATA[cls._identifier] = cls

# For deserializer
_identifier: bytes

# For serializer
_for_code: bool
_for_data: bool

# For deserializer
@property
def identifier(self) -> bytes:
"""Get that identifier that will be used to indicate in byte streams
Expand Down
87 changes: 70 additions & 17 deletions parsl/serialize/facade.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,64 @@
from parsl.serialize.concretes import * # noqa: F403,F401
from parsl.serialize.base import METHODS_MAP_DATA, METHODS_MAP_CODE
import logging
import parsl.serialize.concretes as c

from parsl.serialize.base import SerializerBase
from typing import Any, List, Union

logger = logging.getLogger(__name__)

# these are used for two directions:

""" Instantiate the appropriate classes
"""
methods_for_code = {}
methods_for_data = {}
# 1. to iterate over to find a valid serializer
# for that, the ordering is important

for key in METHODS_MAP_CODE:
methods_for_code[key] = METHODS_MAP_CODE[key]()
# 2. to perform an ID -> deserializer lookup

for key in METHODS_MAP_DATA:
methods_for_data[key] = METHODS_MAP_DATA[key]()
# These must be registered in reverse order of
# importance: later registered serializers
# will take priority over earlier ones. This is
# to facilitate user registered serializers

methods_for_code: List[SerializerBase]
methods_for_code = []

methods_for_data: List[SerializerBase]
methods_for_data = []

deserializers = {}


def register_serializer(serializer: SerializerBase) -> None:
deserializers[serializer._identifier] = serializer

if serializer._for_code:
methods_for_code.insert(0, serializer)
if serializer._for_data:
methods_for_data.insert(0, serializer)


def unregister_serializer(serializer: SerializerBase) -> None:
logger.info(f"BENC: deserializers {deserializers}, serializer {serializer}")
logger.info(f"BENC: unregistering serializer {serializer}")
if serializer._identifier in deserializers:
del deserializers[serializer._identifier]
else:
logger.warning("BENC: not found in deserializers list")
if serializer in methods_for_code:
logger.info("BENC: removing serializer from methods_for_code")
methods_for_code.remove(serializer)
else:
logger.warning("BENC: not found in methods for code")
if serializer in methods_for_data:
logger.info("BENC: removing serializer from methods_for_data")
methods_for_data.remove(serializer)
else:
logger.warning("BENC: not found in methods for data")


register_serializer(c.DillSerializer())
register_serializer(c.DillCallableSerializer())
register_serializer(c.PickleSerializer())
register_serializer(c.PickleCallableSerializer())


def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes:
Expand Down Expand Up @@ -64,10 +106,12 @@ def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes:
else:
methods = methods_for_data

for method in methods.values():
for method in methods:
try:
logger.info(f"BENC: trying serializer {method}")
result = method._identifier + b'\n' + method.serialize(obj)
except Exception as e:
logger.warning(f"BENC: serializer {method} skipping, with exception: {e}")
result = e
continue
else:
Expand All @@ -91,13 +135,22 @@ def deserialize(payload: bytes) -> Any:
"""
header, body = payload.split(b'\n', 1)

if header in methods_for_code:
result = methods_for_code[header].deserialize(body)
elif header in methods_for_data:
result = methods_for_data[header].deserialize(body)
if header in deserializers:
result = deserializers[header].deserialize(body)
else:
raise TypeError("Invalid header: {!r} in data payload. Buffer is either corrupt or not created by ParslSerializer".format(header))

logger.warning("BENC: unknown serialization header: {!r} - trying to load dynamically".format(header))
import importlib
module_name, class_name = header.split(b' ', 1)
try:
decoded_module_name = module_name.decode('utf-8')
except UnicodeDecodeError as e:
raise RuntimeError(f"Got unicode error with string {module_name!r} exception is {e}")
module = importlib.import_module(decoded_module_name)
deserializer_class = getattr(module, class_name.decode('utf-8'))
deserializer = deserializer_class()
result = deserializer.deserialize(body)

# raise TypeError("Invalid serialization header: {!r}".format(header))
return result


Expand Down
49 changes: 49 additions & 0 deletions parsl/serialize/plugin_proxystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# parsl/serialize/concretes.py:10: error: Module "proxystore.store" does not explicitly export attribute "Store" [attr-defined]
from proxystore.store import Store, register_store # type: ignore
from proxystore.connectors.file import FileConnector
from parsl.serialize.facade import register_serializer

from parsl.serialize.base import SerializerBase

from typing import Any, Optional

import pickle


class ProxyStoreSerializer(SerializerBase):
# TODO: better enforcement of this being bytes, because that's common error i'm making
# and probably it should be autogenerated...
_identifier = b'parsl.serialize.plugin_proxystore ProxyStoreSerializer' # must be class name
_for_code = False
_for_data = True

def __init__(self, store: Optional[Store] = None) -> None:
"""Because of jumbled use of this class for init-time configurable
serialization, and non-configurable remote deserializer loading, the
store field can be None... TODO: this would go away if serializer and
deserializer were split into different objects/classes/functions."""
self._store = store

def serialize(self, data: Any) -> bytes:
assert self._store is not None
assert data is not None
p = self._store.proxy(data)
return pickle.dumps(p)

def deserialize(self, body: bytes) -> Any:
return pickle.loads(body)


def register_proxystore_serializer() -> None:
"""Initializes proxystore and registers it as a serializer with parsl"""
serializer = create_proxystore_serializer()
register_serializer(serializer)


def create_proxystore_serializer() -> ProxyStoreSerializer:
"""Creates a serializer but does not register with global system - so this
can be used in testing."""

store = Store(name='parsl_store', connector=FileConnector(store_dir="/tmp"))
register_store(store)
return ProxyStoreSerializer(store)
23 changes: 23 additions & 0 deletions parsl/serialize/plugin_serpent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from parsl.serialize.base import SerializerBase
import serpent

from typing import Any


class SerpentSerializer(SerializerBase):
_identifier = b'parsl.serialize.plugin_serpent SerpentSerializer'
_for_code = False
_for_data = True

def serialize(self, data: Any) -> bytes:
body = serpent.dumps(data)
roundtripped_data = serpent.loads(body)

# this round trip is because serpent will sometimes serialize objects
# as best as it can, which is not good enough...
if data != roundtripped_data:
raise ValueError(f"SerpentSerializer cannot roundtrip {data} -> {body} -> {roundtripped_data}")
return body

def deserialize(self, body: bytes) -> Any:
return serpent.loads(body)
67 changes: 67 additions & 0 deletions parsl/tests/test_serialization/config_proxystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""htex local, but using the proxy store serializer...
DANGER! this will modify the global serializer environment, so any
future parsl stuff done in the same process as this configuration
will not see the default serializer environment...
"""

# imports for monitoring:
from parsl.monitoring import MonitoringHub

import os

from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher

from parsl.config import Config
from parsl.executors import HighThroughputExecutor


from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.data_provider.file_noop import NoOpFileStaging

working_dir = os.getcwd() + "/" + "test_htex_alternate"


def fresh_config():
import parsl.serialize.plugin_proxystore as pspps
pspps.register_proxystore_serializer()

return Config(
executors=[
HighThroughputExecutor(
address="127.0.0.1",
label="htex_Local",
working_dir=working_dir,
storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()],
worker_debug=True,
cores_per_worker=1,
heartbeat_period=2,
heartbeat_threshold=5,
poll_period=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=0,
min_blocks=0,
max_blocks=5,
launcher=SingleNodeLauncher(),
),
block_error_handler=False
)
],
strategy='simple',
app_cache=True, checkpoint_mode='task_exit',
retries=2,
monitoring=MonitoringHub(
hub_address="localhost",
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=1,
),
usage_tracking=True
)


config = fresh_config()
Loading

0 comments on commit 031ce8e

Please sign in to comment.