-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add diaspora radio and is able to send resource info #3147
Changes from 3 commits
2b8eadf
22739d7
b7e0044
cc79dc6
cd77454
e16015e
f85ad06
d7ee6d1
197182f
7755ab4
8961f82
d42638b
7e610b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import datetime | ||
import json | ||
import os | ||
import socket | ||
import pickle | ||
|
@@ -8,6 +10,7 @@ | |
|
||
from typing import Optional | ||
|
||
from parsl.monitoring.message_type import MessageType | ||
from parsl.serialize import serialize | ||
|
||
_db_manager_excepts: Optional[Exception] | ||
|
@@ -21,6 +24,37 @@ class MonitoringRadio(metaclass=ABCMeta): | |
def send(self, message: object) -> None: | ||
pass | ||
|
||
class DateTimeEncoder(json.JSONEncoder): | ||
def default(self, obj): | ||
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): | ||
return obj.isoformat() | ||
return super(DateTimeEncoder, self).default(obj) | ||
|
||
class DiasporaRadio(MonitoringRadio): | ||
def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): | ||
from diaspora_event_sdk import KafkaProducer | ||
benclifford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.source_id = source_id | ||
self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize) | ||
logger.info("Diaspora-based monitoring channel initializing") | ||
|
||
def send(self, message: object) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the It would be good to understand what this code is actually trying to distinguish when: ii) testing if it is a tuple with a second element that is indexable and contains a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing. |
||
msg_type = message[0] | ||
# TODO: make configurable | ||
topic = "radio-test" | ||
if 'run_id' in message[1]: | ||
key = message[1]['run_id'].encode("utf-8") | ||
else: | ||
logger.info("set key as init") | ||
key = b"init" | ||
# logger.info(f"Sending message of type {key}:{msg_type} to topic {topic}, content {message[1]}") | ||
self.producer.send(topic=topic, key=key, value=message[1]) | ||
logger.info(f"Sent message") | ||
return | ||
|
||
@staticmethod | ||
def serialize(value): | ||
return json.dumps(value, cls=DateTimeEncoder).encode("utf-8") | ||
|
||
|
||
class FilesystemRadio(MonitoringRadio): | ||
"""A MonitoringRadio that sends messages over a shared filesystem. | ||
|
@@ -173,3 +207,16 @@ def send(self, message: object) -> None: | |
logging.error("Could not send message within timeout limit") | ||
return | ||
return | ||
|
||
|
||
def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems like a good factorisation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pulled this out into a separate PR, #3432 |
||
if radio_mode == "udp": | ||
return UDPRadio(monitoring_url, source_id) | ||
elif radio_mode == "htex": | ||
return HTEXRadio(monitoring_url, source_id) | ||
elif radio_mode == "filesystem": | ||
return FilesystemRadio(monitoring_url=monitoring_url, source_id=source_id, run_dir=run_dir) | ||
elif radio_mode == "diaspora": | ||
return DiasporaRadio(monitoring_url, source_id) | ||
else: | ||
raise ValueError(f"Unknown radio mode {radio_mode}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this file is a an outdated copy of htex_local_alternate.py with no changes related to this PR, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah, sorry about that |
||
The aim of this configuration is to run a local htex | ||
in a similar manner to htex_local.py, but with lots of | ||
options different and more complicated than in that | ||
configuration, so that more code paths are executed | ||
than when testing only with htex_local. | ||
|
||
It does not matter too much *what* is different in this | ||
configuration; what matters is that the differences | ||
cause significantly different pieces of parsl code to be | ||
run - for example, by turning on monitoring, by allowing | ||
blocks to be started by a strategy, by using a different | ||
set of staging providers, by using timing parameters that | ||
will cause substantially different behaviour on whatever | ||
those timing parameters control. | ||
""" | ||
|
||
# imports for monitoring: | ||
from parsl.monitoring import MonitoringHub | ||
|
||
import os | ||
|
||
from parsl.providers import LocalProvider | ||
from parsl.channels import LocalChannel | ||
from parsl.launchers import SingleNodeLauncher | ||
|
||
from parsl.config import Config | ||
from parsl.executors import HighThroughputExecutor | ||
|
||
|
||
from parsl.data_provider.http import HTTPInTaskStaging | ||
from parsl.data_provider.ftp import FTPInTaskStaging | ||
from parsl.data_provider.file_noop import NoOpFileStaging | ||
|
||
working_dir = os.getcwd() + "/" + "test_htex_energy" | ||
|
||
|
||
def fresh_config(): | ||
return Config( | ||
executors=[ | ||
HighThroughputExecutor( | ||
address="127.0.0.1", | ||
label="htex_Local", | ||
working_dir=working_dir, | ||
storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()], | ||
worker_debug=True, | ||
cores_per_worker=1, | ||
heartbeat_period=2, | ||
heartbeat_threshold=5, | ||
poll_period=100, | ||
provider=LocalProvider( | ||
channel=LocalChannel(), | ||
init_blocks=0, | ||
min_blocks=0, | ||
max_blocks=5, | ||
launcher=SingleNodeLauncher(), | ||
), | ||
block_error_handler=False, | ||
) | ||
], | ||
strategy='simple', | ||
app_cache=True, checkpoint_mode='task_exit', | ||
retries=2, | ||
monitoring=MonitoringHub( | ||
hub_address="localhost", | ||
hub_port=55055, | ||
monitoring_debug=False, | ||
resource_monitoring_interval=1, | ||
), | ||
usage_tracking=True | ||
) | ||
|
||
|
||
config = fresh_config() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from diaspora_event_sdk import Client as GlobusClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this diaspora_login.py for? Can you add some usage notes about when it might be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben, Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see that there's an urgent need to rush this PR into It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface. |
||
c = GlobusClient() | ||
print(c.retrieve_key()) | ||
topic = "radio-test" | ||
print(c.register_topic(topic)) | ||
print(c.list_topics()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
import logging | ||
import os | ||
import parsl | ||
import pytest | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@parsl.python_app | ||
def this_app(): | ||
# this delay needs to be several times the resource monitoring | ||
# period configured in the test configuration, so that some | ||
# messages are actually sent - there is no guarantee that any | ||
# (non-first) resource message will be sent at all for a short app. | ||
import time | ||
time.sleep(3) | ||
|
||
return 5 | ||
|
||
@pytest.mark.local | ||
def test_energy_collection(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a weird test name, I think from some other work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah you are right. I forgot to change this name. |
||
# this is imported here rather than at module level because | ||
# it isn't available in a plain parsl install, so this module | ||
# would otherwise fail to import and break even a basic test | ||
# run. | ||
import sqlalchemy | ||
from sqlalchemy import text | ||
from parsl.tests.configs.htex_local_radio import fresh_config | ||
|
||
if os.path.exists("runinfo/monitoring.db"): | ||
logger.info("Monitoring database already exists - deleting") | ||
os.remove("runinfo/monitoring.db") | ||
|
||
logger.info("loading parsl") | ||
parsl.load(fresh_config()) | ||
|
||
logger.info("invoking and waiting for result") | ||
assert this_app().result() == 5 | ||
|
||
logger.info("cleaning up parsl") | ||
parsl.dfk().cleanup() | ||
parsl.clear() | ||
|
||
logger.info("all done") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is this test testing? I don't see that it is testing anything introduced by this PR, but is more like a variation of the existing tests in parsl/tests/test_monitoring? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried to run this simple test file and see whether diaspora received the resource information. Maybe I should add a diaspora consumer here in test script to make sure the records are successfully sent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right. some of the existing database-backed monitoring code is tested by running a small workflow and then checking that the database has the right things in it - the right number of tasks, etc. |
||
|
||
if __name__ == "__main__": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. don't use main blocks in pytest tests - we've been slowly removing them over the years when they've been previously committed - see PR #2685 for example. |
||
test_energy_collection() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should be changing the behaviour of Parsl monitoring to use this new radio by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about set this
radio_mode
in executor's__init__
by using config file?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's probably the right way to go - change radio_mode into an object level value (self.radio_mode) instead of a class level value and set it as an
__init__
parameter