-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add diaspora radio and is able to send resource info #3147
Changes from 10 commits
2b8eadf
22739d7
b7e0044
cc79dc6
cd77454
e16015e
f85ad06
d7ee6d1
197182f
7755ab4
8961f82
d42638b
7e610b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import datetime | ||
import json | ||
import os | ||
import socket | ||
import pickle | ||
|
@@ -6,7 +8,7 @@ | |
|
||
from abc import ABCMeta, abstractmethod | ||
|
||
from typing import Optional | ||
from typing import Optional, Any | ||
|
||
from parsl.serialize import serialize | ||
|
||
|
@@ -22,6 +24,42 @@ def send(self, message: object) -> None: | |
pass | ||
|
||
|
||
class DateTimeEncoder(json.JSONEncoder): | ||
def default(self, obj: Any) -> Any: | ||
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): | ||
return obj.isoformat() | ||
return super(DateTimeEncoder, self).default(obj) | ||
|
||
|
||
class DiasporaRadio(MonitoringRadio): | ||
def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): | ||
from diaspora_event_sdk import KafkaProducer | ||
benclifford marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.source_id = source_id | ||
self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize) | ||
logger.info("Diaspora-based monitoring channel initializing") | ||
|
||
def send(self, message: object) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the It would be good to understand what this code is actually trying to distinguish when: ii) testing if it is a tuple with a second element that is indexable and contains a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing. |
||
topic = "radio-test" | ||
if isinstance(message, tuple): | ||
# TODO: make configurable | ||
if 'run_id' in message[1]: | ||
key = message[1]['run_id'].encode("utf-8") | ||
else: | ||
logger.info("set key as init") | ||
key = b"init" | ||
# logger.info(f"Sending message of type {key}:{msg_type} to topic {topic}, content {message[1]}") | ||
self.producer.send(topic=topic, key=key, value=message[1]) | ||
else: | ||
key = b"payload" | ||
self.producer.send(topic=topic, key=key, value=message) | ||
logger.info("Sent message") | ||
return | ||
|
||
@staticmethod | ||
def serialize(value: Any) -> bytes: | ||
return json.dumps(value, cls=DateTimeEncoder).encode("utf-8") | ||
|
||
|
||
class FilesystemRadio(MonitoringRadio): | ||
"""A MonitoringRadio that sends messages over a shared filesystem. | ||
|
||
|
@@ -173,3 +211,16 @@ def send(self, message: object) -> None: | |
logging.error("Could not send message within timeout limit") | ||
return | ||
return | ||
|
||
|
||
def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems like a good factorisation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pulled this out into a separate PR, #3432 |
||
if radio_mode == "udp": | ||
return UDPRadio(monitoring_url, source_id) | ||
elif radio_mode == "htex": | ||
return HTEXRadio(monitoring_url, source_id) | ||
elif radio_mode == "filesystem": | ||
return FilesystemRadio(monitoring_url=monitoring_url, source_id=source_id, run_dir=run_dir) | ||
elif radio_mode == "diaspora": | ||
return DiasporaRadio(monitoring_url, source_id) | ||
else: | ||
raise ValueError(f"Unknown radio mode {radio_mode}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
''' | ||
Before using diaspora radio, the user should first login to the diaspora event service. | ||
This can not be aggregated into test file, because it needs an authentication token requiring cli | ||
which pytest does not support. | ||
''' | ||
from diaspora_event_sdk import Client as GlobusClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this diaspora_login.py for? Can you add some usage notes about when it might be used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben, Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see that there's an urgent need to rush this PR into It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface. |
||
c = GlobusClient() | ||
print(c.retrieve_key()) | ||
topic = "radio-test" + c.subject_openid[-12:] | ||
print(c.register_topic(topic)) | ||
print(c.list_topics()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import logging | ||
import os | ||
import parsl | ||
import pytest | ||
import threading | ||
import time | ||
|
||
from diaspora_event_sdk import KafkaConsumer | ||
from diaspora_event_sdk import Client as GlobusClient | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def consumer_check(consumer): | ||
start = time.time() | ||
for record in consumer: | ||
end = time.time() | ||
if end - start > 60: | ||
assert False, "No messages received" | ||
if record: | ||
break | ||
|
||
|
||
@parsl.python_app | ||
def this_app(): | ||
# this delay needs to be several times the resource monitoring | ||
# period configured in the test configuration, so that some | ||
# messages are actually sent - there is no guarantee that any | ||
# (non-first) resource message will be sent at all for a short app. | ||
import time | ||
time.sleep(3) | ||
|
||
return 5 | ||
|
||
|
||
@pytest.mark.skip(reason="requires diaspora login") | ||
def test_diaspora_radio(): | ||
c = GlobusClient() | ||
topic = "radio-test" + c.subject_openid[-12:] | ||
consumer = KafkaConsumer(topic) | ||
# open a new thread for the consumer | ||
consumer_thread = threading.Thread(target=consumer_check, args=(consumer,)) | ||
consumer_thread.start() | ||
|
||
# this is imported here rather than at module level because | ||
# it isn't available in a plain parsl install, so this module | ||
# would otherwise fail to import and break even a basic test | ||
# run. | ||
import sqlalchemy | ||
from sqlalchemy import text | ||
from parsl.tests.configs.htex_local_alternate import fresh_config | ||
|
||
if os.path.exists("runinfo/monitoring.db"): | ||
logger.info("Monitoring database already exists - deleting") | ||
os.remove("runinfo/monitoring.db") | ||
|
||
logger.info("loading parsl") | ||
c = fresh_config() | ||
c.executors[0].radio_mode = "diaspora" | ||
parsl.load(c) | ||
|
||
logger.info("invoking and waiting for result") | ||
assert this_app().result() == 5 | ||
|
||
logger.info("cleaning up parsl") | ||
parsl.dfk().cleanup() | ||
parsl.clear() | ||
|
||
consumer_thread.join() | ||
|
||
logger.info("all done") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,6 @@ sqlalchemy2-stubs | |
Sphinx==4.5.0 | ||
twine | ||
wheel | ||
|
||
diaspora-event-sdk[kafka-python] | ||
cloudpickle | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is cloudpickle used? If it's inside this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is my local test result. If I remove this dependency, it will give this module not found error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok. Do you get that same failure when building/testing against parsl There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, exactly the same error information. Shall I update this in a general PR, maybe also in the one changing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this test is running successfully on the test server, which suggests something is different about your dev environment compared to the test server. The vineex_local_test target should run the CCTOOLS_INSTALL dependency in So if this isn't working for you, perhaps have a look at why that dependency install isn't behaving properly for you. (for example, the makefile target won't re-run properly if you delete your Python environment but do not delete /tmp/cctools, because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, sure, I'll remove this line. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is a general tidyup. if so, it would be a good separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean
runinfo_*
is a general tidyup? But when I test it locally, the dir created bymake test
isruninfo
instead ofruninfo_*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that this change to the
rm
commandline is a general fix you could submit as a separate PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runinfo_* directories were where test logs used to get stored a long time ago but that kind of directory hasn't existed for a long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in: #3236