Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add diaspora radio and is able to send resource info #3147

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def __init__(self,
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd

radio_mode = "htex"
radio_mode = "diaspora"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should be changing the behaviour of Parsl monitoring to use this new radio by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about set this radio_mode in executor's __init__ by using config file?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's probably the right way to go - change radio_mode into an object level value (self.radio_mode) instead of a class level value and set it as an __init__ parameter


def _warn_deprecated(self, old: str, new: str):
warnings.warn(
Expand Down
47 changes: 47 additions & 0 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime
import json
import os
import socket
import pickle
Expand All @@ -8,6 +10,7 @@

from typing import Optional

from parsl.monitoring.message_type import MessageType
from parsl.serialize import serialize

_db_manager_excepts: Optional[Exception]
Expand All @@ -21,6 +24,37 @@ class MonitoringRadio(metaclass=ABCMeta):
def send(self, message: object) -> None:
pass

class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
return super(DateTimeEncoder, self).default(obj)

class DiasporaRadio(MonitoringRadio):
def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
from diaspora_event_sdk import KafkaProducer
benclifford marked this conversation as resolved.
Show resolved Hide resolved
self.source_id = source_id
self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize)
logger.info("Diaspora-based monitoring channel initializing")

def send(self, message: object) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the message parameter - compared to the other radios, which pass on message to some destination rather than attempt to inspect it/interpret it.

It would be good to understand what this code is actually trying to distinguish when:
i) testing if the value is a tuple or not: what are the situations (both from the parsl monitoring side and whatever your distant-end processor expects) in both of those cases...

ii) testing if it is a tuple with a second element that is indexable and contains a run_id value to use as a key - what actual structure are you expecting here and what sort of messages are you expecting in both of these cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing.

msg_type = message[0]
# TODO: make configurable
topic = "radio-test"
if 'run_id' in message[1]:
key = message[1]['run_id'].encode("utf-8")
else:
logger.info("set key as init")
key = b"init"
# logger.info(f"Sending message of type {key}:{msg_type} to topic {topic}, content {message[1]}")
self.producer.send(topic=topic, key=key, value=message[1])
logger.info(f"Sent message")
return

@staticmethod
def serialize(value):
return json.dumps(value, cls=DateTimeEncoder).encode("utf-8")


class FilesystemRadio(MonitoringRadio):
"""A MonitoringRadio that sends messages over a shared filesystem.
Expand Down Expand Up @@ -173,3 +207,16 @@ def send(self, message: object) -> None:
logging.error("Could not send message within timeout limit")
return
return


def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good factorisation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out into a separate PR, #3432

if radio_mode == "udp":
return UDPRadio(monitoring_url, source_id)
elif radio_mode == "htex":
return HTEXRadio(monitoring_url, source_id)
elif radio_mode == "filesystem":
return FilesystemRadio(monitoring_url=monitoring_url, source_id=source_id, run_dir=run_dir)
elif radio_mode == "diaspora":
return DiasporaRadio(monitoring_url, source_id)
else:
raise ValueError(f"Unknown radio mode {radio_mode}")
30 changes: 5 additions & 25 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import time
import logging
import datetime
import parsl.monitoring.radios as radios
from functools import wraps

from parsl.multiprocessing import ForkProcess
from multiprocessing import Event
from parsl.process_loggers import wrap_with_logs

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadio, UDPRadio, HTEXRadio, FilesystemRadio
from typing import Any, Callable, Dict, List, Sequence, Tuple

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -121,18 +121,8 @@ def send_first_last_message(try_id: int,
import platform
import os

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio: radios.MonitoringRadio
radio = radios.get_monitoring_radio(monitoring_hub_url, task_id, radio_mode, run_dir)

msg = (MessageType.RESOURCE_INFO,
{'run_id': run_id,
Expand Down Expand Up @@ -177,18 +167,8 @@ def monitor(pid: int,

setproctitle("parsl: task resource monitor")

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio: radios.MonitoringRadio
radio = radios.get_monitoring_radio(monitoring_hub_url, task_id, radio_mode, run_dir)

logging.debug("start of monitor")

Expand Down
74 changes: 74 additions & 0 deletions parsl/tests/configs/htex_local_radio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is a an outdated copy of htex_local_alternate.py with no changes related to this PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, sorry about that

The aim of this configuration is to run a local htex
in a similar manner to htex_local.py, but with lots of
options different and more complicated than in that
configuration, so that more code paths are executed
than when testing only with htex_local.

It does not matter too much *what* is different in this
configuration; what matters is that the differences
cause significantly different pieces of parsl code to be
run - for example, by turning on monitoring, by allowing
blocks to be started by a strategy, by using a different
set of staging providers, by using timing parameters that
will cause substantially different behaviour on whatever
those timing parameters control.
"""

# imports for monitoring:
from parsl.monitoring import MonitoringHub

import os

from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.launchers import SingleNodeLauncher

from parsl.config import Config
from parsl.executors import HighThroughputExecutor


from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.ftp import FTPInTaskStaging
from parsl.data_provider.file_noop import NoOpFileStaging

working_dir = os.getcwd() + "/" + "test_htex_energy"


def fresh_config():
return Config(
executors=[
HighThroughputExecutor(
address="127.0.0.1",
label="htex_Local",
working_dir=working_dir,
storage_access=[FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()],
worker_debug=True,
cores_per_worker=1,
heartbeat_period=2,
heartbeat_threshold=5,
poll_period=100,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=0,
min_blocks=0,
max_blocks=5,
launcher=SingleNodeLauncher(),
),
block_error_handler=False,
)
],
strategy='simple',
app_cache=True, checkpoint_mode='task_exit',
retries=2,
monitoring=MonitoringHub(
hub_address="localhost",
hub_port=55055,
monitoring_debug=False,
resource_monitoring_interval=1,
),
usage_tracking=True
)


config = fresh_config()
Empty file.
6 changes: 6 additions & 0 deletions parsl/tests/test_radio/diaspora_login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from diaspora_event_sdk import Client as GlobusClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this diaspora_login.py for? Can you add some usage notes about when it might be used?

Copy link
Contributor Author

@ClaudiaCumberbatch ClaudiaCumberbatch Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing?

Copy link

@haochenpan haochenpan Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben,

Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the botocore==1.29.125 constraint in Parsl's test.

Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that there's an urgent need to rush this PR into master without tests and with awkward package dependencies. Hopefully your potential users will be ok to install from a branch at this prototype stage until you are able to work on the PR some more after your more important targets.

It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface.

c = GlobusClient()
print(c.retrieve_key())
topic = "radio-test"
print(c.register_topic(topic))
print(c.list_topics())
47 changes: 47 additions & 0 deletions parsl/tests/test_radio/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
import os
import parsl
import pytest

logger = logging.getLogger(__name__)


@parsl.python_app
def this_app():
# this delay needs to be several times the resource monitoring
# period configured in the test configuration, so that some
# messages are actually sent - there is no guarantee that any
# (non-first) resource message will be sent at all for a short app.
import time
time.sleep(3)

return 5

@pytest.mark.local
def test_energy_collection():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weird test name, I think from some other work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right. I forgot to change this name.

# this is imported here rather than at module level because
# it isn't available in a plain parsl install, so this module
# would otherwise fail to import and break even a basic test
# run.
import sqlalchemy
from sqlalchemy import text
from parsl.tests.configs.htex_local_radio import fresh_config

if os.path.exists("runinfo/monitoring.db"):
logger.info("Monitoring database already exists - deleting")
os.remove("runinfo/monitoring.db")

logger.info("loading parsl")
parsl.load(fresh_config())

logger.info("invoking and waiting for result")
assert this_app().result() == 5

logger.info("cleaning up parsl")
parsl.dfk().cleanup()
parsl.clear()

logger.info("all done")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this test testing? I don't see that it is testing anything introduced by this PR, but is more like a variation of the existing tests in parsl/tests/test_monitoring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to run this simple test file and see whether diaspora received the resource information. Maybe I should add a diaspora consumer here in test script to make sure the records are successfully sent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. some of the existing database-backed monitoring code is tested by running a small workflow and then checking that the database has the right things in it - the right number of tasks, etc.


if __name__ == "__main__":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use main blocks in pytest tests - we've been slowly removing them over the years when they've been previously committed - see PR #2685 for example.

test_energy_collection()
Loading