-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add diaspora radio and is able to send resource info #3147
add diaspora radio and is able to send resource info #3147
Conversation
@@ -323,7 +323,7 @@ def __init__(self, | |||
launch_cmd = DEFAULT_LAUNCH_CMD | |||
self.launch_cmd = launch_cmd | |||
|
|||
radio_mode = "htex" | |||
radio_mode = "diaspora" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should be changing the behaviour of Parsl monitoring to use this new radio by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about set this radio_mode
in executor's __init__
by using config file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's probably the right way to go - change radio_mode into an object level value (self.radio_mode) instead of a class level value and set it as an __init__
parameter
parsl/tests/test_radio/test_basic.py
Outdated
return 5 | ||
|
||
@pytest.mark.local | ||
def test_energy_collection(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a weird test name, I think from some other work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you are right. I forgot to change this name.
There are quite a lot of flake8 formatting errors that you should be able to see in the details of the test suite checks on this PR. |
@@ -0,0 +1,74 @@ | |||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file is a an outdated copy of htex_local_alternate.py with no changes related to this PR, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, sorry about that
parsl/tests/test_radio/test_basic.py
Outdated
|
||
logger.info("all done") | ||
|
||
if __name__ == "__main__": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use main blocks in pytest tests - we've been slowly removing them over the years when they've been previously committed - see PR #2685 for example.
parsl/tests/test_radio/test_basic.py
Outdated
parsl.dfk().cleanup() | ||
parsl.clear() | ||
|
||
logger.info("all done") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this test testing? I don't see that it is testing anything introduced by this PR, but is more like a variation of the existing tests in parsl/tests/test_monitoring?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to run this simple test file and see whether diaspora received the resource information. Maybe I should add a diaspora consumer here in test script to make sure the records are successfully sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right. some of the existing database-backed monitoring code is tested by running a small workflow and then checking that the database has the right things in it - the right number of tasks, etc.
@@ -0,0 +1,6 @@ | |||
from diaspora_event_sdk import Client as GlobusClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this diaspora_login.py for? Can you add some usage notes about when it might be used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Ben,
Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the botocore==1.29.125
constraint in Parsl's test.
Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that there's an urgent need to rush this PR into master
without tests and with awkward package dependencies. Hopefully your potential users will be ok to install from a branch at this prototype stage until you are able to work on the PR some more after your more important targets.
It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface.
@@ -173,3 +207,16 @@ def send(self, message: object) -> None: | |||
logging.error("Could not send message within timeout limit") | |||
return | |||
return | |||
|
|||
|
|||
def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a good factorisation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pulled this out into a separate PR, #3432
…h/parsl_resilient into diaspora_radio merge
@@ -21,3 +21,6 @@ sqlalchemy2-stubs | |||
Sphinx==4.5.0 | |||
twine | |||
wheel | |||
|
|||
diaspora-event-sdk[kafka-python] | |||
cloudpickle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is cloudpickle used? If it's inside this diaspora-event-sdk[kafka-python]
, does that dependency bring in cloudpickle anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parsl/tests/test_regression/test_1480.py E
======================================================================== ERRORS =========================================================================
______________________________________________________________ ERROR at setup of test_1480 ______________________________________________________________
self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]
def __init__(self,
label: str = "TaskVineExecutor",
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
storage_access: Optional[List[Staging]] = None):
# Set worker launch option for this executor
if worker_launch_method == 'factory' or worker_launch_method == 'manual':
provider = None
# Initialize the parent class with the execution provider and block error handling enabled.
# If provider is None, then no worker is launched via the provider method.
BlockProviderExecutor.__init__(self, provider=provider,
block_error_handler=True)
# Raise an exception if there's a problem importing TaskVine
try:
> import ndcctools.taskvine
parsl/executors/taskvine/executor.py:122:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/__init__.py:42: in <module>
from .manager import (
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/manager.py:24: in <module>
from .task import (
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
##
# @package ndcctools.taskvine.task
#
# This module provides the classes to construct tasks to submit for execution to a
# TaskVine manager.
#
# Copyright (C) 2022- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.
from . import cvine
from .file import File
import copy
import os
import sys
import textwrap
import uuid
> import cloudpickle
E ModuleNotFoundError: No module named 'cloudpickle'
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/task.py:19: ModuleNotFoundError
During handling of the above exception, another exception occurred:
request = <SubRequest 'load_dfk_session' for <Function test_1480>>, pytestconfig = <_pytest.config.Config object at 0x7f5d77a7dde0>
tmpd_cwd_session = PosixPath('/home/cc/parsl_resilient/.pytest/parsltest-current')
@pytest.fixture(autouse=True, scope='session')
def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
"""Load a dfk around entire test suite, except in local mode.
The special path `local` indicates that configuration will not come
from a pytest managed configuration file; in that case, see
load_dfk_local_module for module-level configuration management.
"""
RepresentationMixin._validate_repr = True
config = pytestconfig.getoption('config')[0]
if config != 'local':
spec = importlib.util.spec_from_file_location('', config)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if DataFlowKernelLoader._dfk is not None:
raise RuntimeError("DFK didn't start as None - there was a DFK from somewhere already")
if hasattr(module, 'config'):
parsl_conf = module.config
elif hasattr(module, 'fresh_config'):
> parsl_conf = module.fresh_config()
parsl/tests/conftest.py:187:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
parsl/tests/configs/taskvine_ex.py:11: in fresh_config
return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]
def __init__(self,
label: str = "TaskVineExecutor",
worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
storage_access: Optional[List[Staging]] = None):
# Set worker launch option for this executor
if worker_launch_method == 'factory' or worker_launch_method == 'manual':
provider = None
# Initialize the parent class with the execution provider and block error handling enabled.
# If provider is None, then no worker is launched via the provider method.
BlockProviderExecutor.__init__(self, provider=provider,
block_error_handler=True)
# Raise an exception if there's a problem importing TaskVine
try:
import ndcctools.taskvine
logger.debug(f'TaskVine default port: {ndcctools.taskvine.cvine.VINE_DEFAULT_PORT}')
except ImportError:
> raise OptionalModuleMissing(['taskvine'], "TaskVineExecutor requires the taskvine module.")
E parsl.errors.OptionalModuleMissing: The functionality requested requires optional modules ['taskvine'] which could not be imported, because: TaskVineExecutor requires the taskvine module.
parsl/executors/taskvine/executor.py:125: OptionalModuleMissing
=================================================================== warnings summary ====================================================================
parsl/executors/workqueue/executor.py:46
/home/cc/parsl_resilient/parsl/executors/workqueue/executor.py:46: DeprecationWarning: 'import work_queue' is deprecated. Please instead use: 'import ndcctools.work_queue'
import work_queue as wq
parsl/tests/test_radical/test_mpi_funcs.py:21
/home/cc/parsl_resilient/parsl/tests/test_radical/test_mpi_funcs.py:21: PytestUnknownMarkWarning: Unknown pytest.mark.radical - is this a typo? You can register custom marks to avoid this warning - for details, see https://docs.pytest.org/en/stable/how-to/mark.html
@pytest.mark.radical
-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
================================================================= slowest 10 durations ==================================================================
0.05s setup test_regression/test_1480.py::test_1480
(1 durations < 0.005s hidden. Use -vv to show these durations.)
================================================================ short test summary info ================================================================
SKIPPED [1] parsl/tests/configs/ec2_single_node.py:30: 'public_ip' not configured in user_opts.py
SKIPPED [1] parsl/tests/configs/local_threads_globus.py:14: 'globus' not configured in user_opts.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
================================================ 2 skipped, 30 deselected, 2 warnings, 1 error in 0.94s =================================================
make: *** [Makefile:69: vineex_local_test] Error 1
This is my local test result. If I remove this dependency, it will give this module not found error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. Do you get that same failure when building/testing against parsl master
branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly the same error information. Shall I update this in a general PR, maybe also in the one changing runinfo_*
in makefile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test is running successfully on the test server, which suggests something is different about your dev environment compared to the test server.
The vineex_local_test target should run the CCTOOLS_INSTALL dependency in Makefile
, which then runs parsl/executors/taskvine/install-taskvine.sh
which then installs cloudpickle
.
So if this isn't working for you, perhaps have a look at why that dependency install isn't behaving properly for you. (for example, the makefile target won't re-run properly if you delete your Python environment but do not delete /tmp/cctools, because make
is not a very good language for specifying accurate dependencies)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, sure, I'll remove this line.
On Python 3.9 this goes into a super-deep tree search trying to resolve the dependencies with the new packages that you added, which usually means there is no coherent solution to which packages to install but that It works on Python 3.11 and Python 3.12, so I think it's quite likely that one of the packages you are adding as a dependency (or transitively, one of the packages that those packages depend on) isn't supported on earlier Pythons - for example, I can see that deep search happens here:
which suggests that without your PR, we're ended up with an earlier version of |
If I add this line in |
@@ -122,4 +122,4 @@ coverage: ## show the coverage report | |||
|
|||
.PHONY: clean | |||
clean: ## clean up the environment by deleting the .venv, dist, eggs, mypy caches, coverage info, etc | |||
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo_* $(WORKQUEUE_INSTALL) | |||
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo $(WORKQUEUE_INSTALL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is a general tidyup. if so, it would be a good separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean runinfo_*
is a general tidyup? But when I test it locally, the dir created by make test
is runinfo
instead of runinfo_*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that this change to the rm
commandline is a general fix you could submit as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runinfo_* directories were where test logs used to get stored a long time ago but that kind of directory hasn't existed for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in: #3236
self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize) | ||
logger.info("Diaspora-based monitoring channel initializing") | ||
|
||
def send(self, message: object) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the message
parameter - compared to the other radios, which pass on message
to some destination rather than attempt to inspect it/interpret it.
It would be good to understand what this code is actually trying to distinguish when:
i) testing if the value is a tuple or not: what are the situations (both from the parsl monitoring side and whatever your distant-end processor expects) in both of those cases...
ii) testing if it is a tuple with a second element that is indexable and contains a run_id
value to use as a key - what actual structure are you expecting here and what sort of messages are you expecting in both of these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing.
I compared the broken version of your branch and the version with this new constraint added. It looks like package resolution still does a bunch of search for valid versions, but by adding in this constraint, it changes the search space enough that So that's a bit horrible... but I suppose its OK. It would be good to add a paragraph of explanation in a |
@ClaudiaCumberbatch I made a prototype plugin API, PR #3315 which might help you plug in new monitoring radio code without needing to put diaspora specific stuff into the core Parsl codebase. If you try it out, let me know any feedback. |
Hi Ben, thanks for the following up. I tried to use this new API today and have some questions about that. If I want to use diaspora, does that mean I have to create a
|
@ClaudiaCumberbatch the code you've pasted looks mostly ok, I think. You should put your If you are happy to fiddle with the parsl codebase still, put them in something like |
@benclifford thanks! I tried to add |
The intention of this plugin work is to allow you to plug in the diaspora code from its own packaging/repository, without having to put it into the main Parsl repository. If you reach the stage where you are willing and able to support this code for the next year or three, and it is well tested, then that would be the time to consider including it in the main Parsl codebase. |
I see. Thank you! |
I'm closing this PR as out-dated. Some of my ongoing work for pluggable radios is being merged via PR #3315, and @ClaudiaCumberbatch merged one other piece of this PR as PR #3236 |
Description
Add a diaspora radio to send resource information. Also add a test_radio directory to test this functionality.
Changed Behaviour
The resource information will not be inserted into database, instead, it will be sent to diaspora. If the db insertion is still required, just change the
radio_mode
inexecutors/high_throughput/executors.py
.Type of change