Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add diaspora radio and is able to send resource info #3147

Closed

Conversation

ClaudiaCumberbatch
Copy link
Contributor

Description

Add a diaspora radio to send resource information. Also add a test_radio directory to test this functionality.

Changed Behaviour

The resource information will not be inserted into database, instead, it will be sent to diaspora. If the db insertion is still required, just change the radio_mode in executors/high_throughput/executors.py .

Type of change

  • New feature

@@ -323,7 +323,7 @@ def __init__(self,
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd

radio_mode = "htex"
radio_mode = "diaspora"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should be changing the behaviour of Parsl monitoring to use this new radio by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about set this radio_mode in executor's __init__ by using config file?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's probably the right way to go - change radio_mode into an object level value (self.radio_mode) instead of a class level value and set it as an __init__ parameter

return 5

@pytest.mark.local
def test_energy_collection():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird test name, I think from some other work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right. I forgot to change this name.

@benclifford
Copy link
Collaborator

There are quite a lot of flake8 formatting errors that you should be able to see in the details of the test suite checks on this PR.

@@ -0,0 +1,74 @@
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is a an outdated copy of htex_local_alternate.py with no changes related to this PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, sorry about that


logger.info("all done")

if __name__ == "__main__":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use main blocks in pytest tests - we've been slowly removing them over the years when they've been previously committed - see PR #2685 for example.

parsl.dfk().cleanup()
parsl.clear()

logger.info("all done")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this test testing? I don't see that it is testing anything introduced by this PR, but is more like a variation of the existing tests in parsl/tests/test_monitoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to run this simple test file and see whether diaspora received the resource information. Maybe I should add a diaspora consumer here in test script to make sure the records are successfully sent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. some of the existing database-backed monitoring code is tested by running a small workflow and then checking that the database has the right things in it - the right number of tasks, etc.

@@ -0,0 +1,6 @@
from diaspora_event_sdk import Client as GlobusClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this diaspora_login.py for? Can you add some usage notes about when it might be used?

Copy link
Contributor Author

@ClaudiaCumberbatch ClaudiaCumberbatch Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing?

Copy link

@haochenpan haochenpan Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben,

Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the botocore==1.29.125 constraint in Parsl's test.

Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that there's an urgent need to rush this PR into master without tests and with awkward package dependencies. Hopefully your potential users will be ok to install from a branch at this prototype stage until you are able to work on the PR some more after your more important targets.

It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface.

@@ -173,3 +207,16 @@ def send(self, message: object) -> None:
logging.error("Could not send message within timeout limit")
return
return


def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good factorisation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out into a separate PR, #3432

@@ -21,3 +21,6 @@ sqlalchemy2-stubs
Sphinx==4.5.0
twine
wheel

diaspora-event-sdk[kafka-python]
cloudpickle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is cloudpickle used? If it's inside this diaspora-event-sdk[kafka-python], does that dependency bring in cloudpickle anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsl/tests/test_regression/test_1480.py E

======================================================================== ERRORS =========================================================================
______________________________________________________________ ERROR at setup of test_1480 ______________________________________________________________

self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]

    def __init__(self,
                 label: str = "TaskVineExecutor",
                 worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
                 function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
                 manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
                 factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
                 provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
                 storage_access: Optional[List[Staging]] = None):
    
        # Set worker launch option for this executor
        if worker_launch_method == 'factory' or worker_launch_method == 'manual':
            provider = None
    
        # Initialize the parent class with the execution provider and block error handling enabled.
        # If provider is None, then no worker is launched via the provider method.
        BlockProviderExecutor.__init__(self, provider=provider,
                                       block_error_handler=True)
    
        # Raise an exception if there's a problem importing TaskVine
        try:
>           import ndcctools.taskvine

parsl/executors/taskvine/executor.py:122: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/__init__.py:42: in <module>
    from .manager import (
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/manager.py:24: in <module>
    from .task import (
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    ##
    # @package ndcctools.taskvine.task
    #
    # This module provides the classes to construct tasks to submit for execution to a
    # TaskVine manager.
    #
    
    # Copyright (C) 2022- The University of Notre Dame
    # This software is distributed under the GNU General Public License.
    # See the file COPYING for details.
    from . import cvine
    from .file import File
    
    import copy
    import os
    import sys
    import textwrap
    import uuid
>   import cloudpickle
E   ModuleNotFoundError: No module named 'cloudpickle'

/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/task.py:19: ModuleNotFoundError

During handling of the above exception, another exception occurred:

request = <SubRequest 'load_dfk_session' for <Function test_1480>>, pytestconfig = <_pytest.config.Config object at 0x7f5d77a7dde0>
tmpd_cwd_session = PosixPath('/home/cc/parsl_resilient/.pytest/parsltest-current')

    @pytest.fixture(autouse=True, scope='session')
    def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
        """Load a dfk around entire test suite, except in local mode.
    
        The special path `local` indicates that configuration will not come
        from a pytest managed configuration file; in that case, see
        load_dfk_local_module for module-level configuration management.
        """
    
        RepresentationMixin._validate_repr = True
    
        config = pytestconfig.getoption('config')[0]
    
        if config != 'local':
            spec = importlib.util.spec_from_file_location('', config)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
    
            if DataFlowKernelLoader._dfk is not None:
                raise RuntimeError("DFK didn't start as None - there was a DFK from somewhere already")
    
            if hasattr(module, 'config'):
                parsl_conf = module.config
            elif hasattr(module, 'fresh_config'):
>               parsl_conf = module.fresh_config()

parsl/tests/conftest.py:187: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
parsl/tests/configs/taskvine_ex.py:11: in fresh_config
    return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]

    def __init__(self,
                 label: str = "TaskVineExecutor",
                 worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
                 function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
                 manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
                 factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
                 provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
                 storage_access: Optional[List[Staging]] = None):
    
        # Set worker launch option for this executor
        if worker_launch_method == 'factory' or worker_launch_method == 'manual':
            provider = None
    
        # Initialize the parent class with the execution provider and block error handling enabled.
        # If provider is None, then no worker is launched via the provider method.
        BlockProviderExecutor.__init__(self, provider=provider,
                                       block_error_handler=True)
    
        # Raise an exception if there's a problem importing TaskVine
        try:
            import ndcctools.taskvine
            logger.debug(f'TaskVine default port: {ndcctools.taskvine.cvine.VINE_DEFAULT_PORT}')
        except ImportError:
>           raise OptionalModuleMissing(['taskvine'], "TaskVineExecutor requires the taskvine module.")
E           parsl.errors.OptionalModuleMissing: The functionality requested requires optional modules ['taskvine'] which could not be imported, because: TaskVineExecutor requires the taskvine module.

parsl/executors/taskvine/executor.py:125: OptionalModuleMissing
=================================================================== warnings summary ====================================================================
parsl/executors/workqueue/executor.py:46
  /home/cc/parsl_resilient/parsl/executors/workqueue/executor.py:46: DeprecationWarning: 'import work_queue' is deprecated. Please instead use: 'import ndcctools.work_queue'
    import work_queue as wq

parsl/tests/test_radical/test_mpi_funcs.py:21
  /home/cc/parsl_resilient/parsl/tests/test_radical/test_mpi_funcs.py:21: PytestUnknownMarkWarning: Unknown pytest.mark.radical - is this a typo?  You can register custom marks to avoid this warning - for details, see https://docs.pytest.org/en/stable/how-to/mark.html
    @pytest.mark.radical

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
================================================================= slowest 10 durations ==================================================================
0.05s setup    test_regression/test_1480.py::test_1480

(1 durations < 0.005s hidden.  Use -vv to show these durations.)
================================================================ short test summary info ================================================================
SKIPPED [1] parsl/tests/configs/ec2_single_node.py:30: 'public_ip' not configured in user_opts.py
SKIPPED [1] parsl/tests/configs/local_threads_globus.py:14: 'globus' not configured in user_opts.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
================================================ 2 skipped, 30 deselected, 2 warnings, 1 error in 0.94s =================================================
make: *** [Makefile:69: vineex_local_test] Error 1

This is my local test result. If I remove this dependency, it will give this module not found error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Do you get that same failure when building/testing against parsl master branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly the same error information. Shall I update this in a general PR, maybe also in the one changing runinfo_* in makefile?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is running successfully on the test server, which suggests something is different about your dev environment compared to the test server.

The vineex_local_test target should run the CCTOOLS_INSTALL dependency in Makefile, which then runs parsl/executors/taskvine/install-taskvine.sh which then installs cloudpickle.

So if this isn't working for you, perhaps have a look at why that dependency install isn't behaving properly for you. (for example, the makefile target won't re-run properly if you delete your Python environment but do not delete /tmp/cctools, because make is not a very good language for specifying accurate dependencies)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure, I'll remove this line.

@benclifford
Copy link
Collaborator

On Python 3.9 this goes into a super-deep tree search trying to resolve the dependencies with the new packages that you added, which usually means there is no coherent solution to which packages to install but that pip will try very hard to find one...

It works on Python 3.11 and Python 3.12, so I think it's quite likely that one of the packages you are adding as a dependency (or transitively, one of the packages that those packages depend on) isn't supported on earlier Pythons - for example, I can see that deep search happens here:

INFO: pip is looking at multiple versions of botocore to determine which version is compatible with other requirements. This could take a while.
Collecting boto3>=1.26.125 (from aws-msk-iam-sasl-signer-python->diaspora-event-sdk[kafka-python]->-r test-requirements.txt (line 25))

which suggests that without your PR, we're ended up with an earlier version of boto3? (perhaps you can check which version of boto3 gets installed on Python 3.9 without your changes by looking in the build logs of a recently merged PR)

@ClaudiaCumberbatch
Copy link
Contributor Author

botocore==1.29.125

If I add this line in test-requirements.txt, the make deps step can finish properly. But is it ok to do so?
I also tried to search the boto3 version in other's build log but got no result.

@@ -122,4 +122,4 @@ coverage: ## show the coverage report

.PHONY: clean
clean: ## clean up the environment by deleting the .venv, dist, eggs, mypy caches, coverage info, etc
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo_* $(WORKQUEUE_INSTALL)
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo $(WORKQUEUE_INSTALL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is a general tidyup. if so, it would be a good separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean runinfo_* is a general tidyup? But when I test it locally, the dir created by make test is runinfo instead of runinfo_*

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that this change to the rm commandline is a general fix you could submit as a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runinfo_* directories were where test logs used to get stored a long time ago but that kind of directory hasn't existed for a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in: #3236

self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize)
logger.info("Diaspora-based monitoring channel initializing")

def send(self, message: object) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the message parameter - compared to the other radios, which pass on message to some destination rather than attempt to inspect it/interpret it.

It would be good to understand what this code is actually trying to distinguish when:
i) testing if the value is a tuple or not: what are the situations (both from the parsl monitoring side and whatever your distant-end processor expects) in both of those cases...

ii) testing if it is a tuple with a second element that is indexable and contains a run_id value to use as a key - what actual structure are you expecting here and what sort of messages are you expecting in both of these cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing.

@benclifford
Copy link
Collaborator

botocore==1.29.125

If I add this line in test-requirements.txt, the make deps step can finish properly. But is it ok to do so? I also tried to search the boto3 version in other's build log but got no result.

I compared the broken version of your branch and the version with this new constraint added. It looks like package resolution still does a bunch of search for valid versions, but by adding in this constraint, it changes the search space enough that pip can find a valid solution rather than crash.

So that's a bit horrible... but I suppose its OK.

It would be good to add a paragraph of explanation in a # comment next to that dependency because it's not particularly obvious why this works/why removing it would break things.

@benclifford
Copy link
Collaborator

@ClaudiaCumberbatch I made a prototype plugin API, PR #3315 which might help you plug in new monitoring radio code without needing to put diaspora specific stuff into the core Parsl codebase. If you try it out, let me know any feedback.

@ClaudiaCumberbatch
Copy link
Contributor Author

Hi Ben, thanks for the following up. I tried to use this new API today and have some questions about that. If I want to use diaspora, does that mean I have to create a DiasporaRadioSender class and send it to executor using DiasporaRadio in config? I wrote the test code like this but got some error which I guess is related to import things. Here is my code and error.

import os
import json
import datetime
import parsl
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.app.app import python_app
from parsl.launchers import SimpleLauncher
from parsl.launchers import SingleNodeLauncher
from typing import Optional, Any

# for local test on ChameleonCloud
from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.data_provider.file_noop import NoOpFileStaging

from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher

from parsl.monitoring import MonitoringHub
from parsl.monitoring.radios import RadioConfig, MonitoringRadioSender


class DateTimeEncoder(json.JSONEncoder):
    def default(self, obj: Any) -> Any:
        if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
            return obj.isoformat()
        return super(DateTimeEncoder, self).default(obj)


class DiasporaRadioSender(MonitoringRadioSender):
    print("DiasporaRadioSender class invoke")
    def __init__(self):
        from diaspora_event_sdk import KafkaProducer
        self.producer = KafkaProducer(value_serializer=DiasporaRadioSender.serialize)

    def send(self, message: object) -> None:
        # TODO: make configurable
        topic = "radio-test"
        # intend to set run_id as key so diaspora consumer can process messages more conveniently
        # message is likely to be a tuple of (msg_type, payload), but if not we'll just send the payload
        if isinstance(message, tuple):
            if 'run_id' in message[1]:
                key = message[1]['run_id'].encode("utf-8")
            else:
                key = b"init"
            # logger.info(f"Sending message of type {key}:{msg_type} to topic {topic}, content {message[1]}")
            self.producer.send(topic=topic, key=key, value=message[1])
        else:
            key = b"payload"
            self.producer.send(topic=topic, key=key, value=message)
        return
    
    def flush(self):
        self.producer.flush()
        return
    
    class DateTimeEncoder(json.JSONEncoder):
        def default(self, obj: Any) -> Any:
            if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
                return obj.isoformat()
            return super(DiasporaRadioSender.DateTimeEncoder, self).default(obj)
    
    @staticmethod
    def serialize(value: Any) -> bytes:
        return json.dumps(value, cls=DiasporaRadioSender.DateTimeEncoder).encode("utf-8")


class DiasporaRadio(RadioConfig):
    def create_sender(self, *, source_id: int, run_dir: str) -> MonitoringRadioSender:
        print("create sender")
        sender = DiasporaRadioSender()
        return sender

def get_config():
    config = Config(
        executors=[
            HighThroughputExecutor(
                max_workers=1,
                address="127.0.0.1",
                label="htex_Local",
                working_dir=os.getcwd() + "/" + "latency",
                storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()],
                worker_debug=True,
                cores_per_worker=1,
                heartbeat_period=2,
                heartbeat_threshold=5,
                poll_period=100,
                provider=LocalProvider(
                    channel=LocalChannel(),
                    init_blocks=1,
                    min_blocks=1,
                    max_blocks=1,
                    launcher=SingleNodeLauncher(),
                ),
                block_error_handler=False,
                monitoring_radio=DiasporaRadio()
            )
        ],
        strategy='simple',
        app_cache=True, checkpoint_mode='task_exit',
        retries=1,
        monitoring=MonitoringHub(
                        hub_address="localhost",
                        monitoring_debug=False,
                        resource_monitoring_interval=10,
        ),
        usage_tracking=True
    )
 
    return config

@python_app
def noop():
    pass

@python_app
def sleep10ms():
    import time
    time.sleep(0.01)


if __name__ == "__main__":
    config = get_config()
    dfk = parsl.load(config)
    tasks = [sleep10ms() for _ in range(0, 3)]
    [t.result() for t in tasks]
(parsl310) cc@resilience:~/resilient_compute/radio_test$ python test.py 
DiasporaRadioSender class invoke
Traceback (most recent call last):
  File "/home/cc/resilient_compute/radio_test/test.py", line 128, in <module>
    [t.result() for t in tasks]
  File "/home/cc/resilient_compute/radio_test/test.py", line 128, in <listcomp>
    [t.result() for t in tasks]
  File "/home/cc/miniconda3/envs/parsl310/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/home/cc/miniconda3/envs/parsl310/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/cc/real_parsl/parsl/parsl/dataflow/dflow.py", line 310, in handle_exec_update
    res = self._unwrap_remote_exception_wrapper(future)
  File "/home/cc/real_parsl/parsl/parsl/dataflow/dflow.py", line 578, in _unwrap_remote_exception_wrapper
    result = future.result()
  File "/home/cc/miniconda3/envs/parsl310/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/cc/miniconda3/envs/parsl310/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/cc/real_parsl/parsl/parsl/executors/high_throughput/executor.py", line 516, in _queue_management_worker
    s.reraise()
  File "/home/cc/real_parsl/parsl/parsl/app/errors.py", line 118, in reraise
    raise v
  File "/home/cc/real_parsl/parsl/parsl/executors/high_throughput/process_worker_pool.py", line 763, in worker
    result = execute_task(req['buffer'], mpi_launcher=mpi_launcher)
  File "/home/cc/real_parsl/parsl/parsl/executors/high_throughput/process_worker_pool.py", line 623, in execute_task
    exec(code, user_ns, user_ns)
  File "<string>", line 1, in <module>
  File "/home/cc/real_parsl/parsl/parsl/monitoring/remote.py", line 39, in wrapped
    send_first_message(try_id,
  File "/home/cc/real_parsl/parsl/parsl/process_loggers.py", line 27, in wrapped
    r = func(*args, **kwargs)
  File "/home/cc/real_parsl/parsl/parsl/monitoring/remote.py", line 110, in send_first_message
    send_first_last_message(try_id, task_id, radio_config, run_id,
  File "/home/cc/real_parsl/parsl/parsl/monitoring/remote.py", line 131, in send_first_last_message
    radio = get_radio(radio_config, task_id, run_dir)
  File "/home/cc/real_parsl/parsl/parsl/monitoring/remote.py", line 100, in get_radio
    radio = radio_config.create_sender(source_id=task_id, run_dir=run_dir)
  File "/home/cc/resilient_compute/radio_test/test.py", line 73, in create_sender
    sender = DiasporaRadioSender()
NameError: name 'DiasporaRadioSender' is not defined

@benclifford
Copy link
Collaborator

@ClaudiaCumberbatch the code you've pasted looks mostly ok, I think. You should put your DiasporaRadio and supporting classes into their own module though: you cannot send them across the network if they exist in the main __main__ module of a parsl workflow (because things defined in the __main__ module are pickled differently...). That is likely the cause of: NameError: name 'DiasporaRadioSender' is not defined.

If you are happy to fiddle with the parsl codebase still, put them in something like parsl.monitoring.diaspora (parsl/monitoring/diaspora.py). Later, for distributing this code to other users, you could package it separately, not under the parsl hierarchy - but that is probably something to consider later, not now.

@ClaudiaCumberbatch
Copy link
Contributor Author

@benclifford thanks! I tried to add parsl.monitoring.diaspora and it works! Shall I push this change into your draft PR? It seems that currently I don't have privilege to do so.

@benclifford
Copy link
Collaborator

@benclifford thanks! I tried to add parsl.monitoring.diaspora and it works! Shall I push this change into your draft PR? It seems that currently I don't have privilege to do so.

The intention of this plugin work is to allow you to plug in the diaspora code from its own packaging/repository, without having to put it into the main Parsl repository. If you reach the stage where you are willing and able to support this code for the next year or three, and it is well tested, then that would be the time to consider including it in the main Parsl codebase.

@ClaudiaCumberbatch
Copy link
Contributor Author

@benclifford thanks! I tried to add parsl.monitoring.diaspora and it works! Shall I push this change into your draft PR? It seems that currently I don't have privilege to do so.

The intention of this plugin work is to allow you to plug in the diaspora code from its own packaging/repository, without having to put it into the main Parsl repository. If you reach the stage where you are willing and able to support this code for the next year or three, and it is well tested, then that would be the time to consider including it in the main Parsl codebase.

I see. Thank you!

@benclifford
Copy link
Collaborator

I'm closing this PR as out-dated. Some of my ongoing work for pluggable radios is being merged via PR #3315, and @ClaudiaCumberbatch merged one other piece of this PR as PR #3236

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants