Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add diaspora radio and is able to send resource info #3147

Closed
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ coverage: ## show the coverage report

.PHONY: clean
clean: ## clean up the environment by deleting the .venv, dist, eggs, mypy caches, coverage info, etc
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo_* $(WORKQUEUE_INSTALL)
rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo $(WORKQUEUE_INSTALL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is a general tidyup. if so, it would be a good separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean runinfo_* is a general tidyup? But when I test it locally, the dir created by make test is runinfo instead of runinfo_*

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that this change to the rm commandline is a general fix you could submit as a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runinfo_* directories were where test logs used to get stored a long time ago but that kind of directory hasn't existed for a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in: #3236

3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,6 @@ ignore_missing_imports = True

[mypy-proxystore.*]
ignore_missing_imports = True

[mypy-diaspora_event_sdk.*]
ignore_missing_imports = True
9 changes: 7 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):

encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.

radio_mode : str
The radio mode to use. Options include "htex" and "diaspora". Default is "htex".
"""

@typeguard.typechecked
Expand Down Expand Up @@ -246,7 +249,8 @@ def __init__(self,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
radio_mode: str = "htex"):

logger.debug("Initializing HighThroughputExecutor")

Expand Down Expand Up @@ -308,6 +312,7 @@ def __init__(self,
self.worker_logdir_root = worker_logdir_root
self.cpu_affinity = cpu_affinity
self.encrypted = encrypted
self.radio_mode = radio_mode
self.cert_dir = None

self.enable_mpi_mode = enable_mpi_mode
Expand All @@ -323,7 +328,7 @@ def __init__(self,
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd

radio_mode = "htex"
# radio_mode = "htex"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
Expand Down
2 changes: 1 addition & 1 deletion parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat

# TODO: tighten the Any message format
def send(self, mtype: MessageType, message: Any) -> None:
self.logger.debug("Sending message type {}".format(mtype))
self.logger.debug("Sending message type {} content {}".format(mtype, message))
try:
self._dfk_channel.send_pyobj((mtype, message))
except zmq.Again:
Expand Down
53 changes: 52 additions & 1 deletion parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime
import json
import os
import socket
import pickle
Expand All @@ -6,7 +8,7 @@

from abc import ABCMeta, abstractmethod

from typing import Optional
from typing import Optional, Any

from parsl.serialize import serialize

Expand All @@ -22,6 +24,42 @@ def send(self, message: object) -> None:
pass


class DateTimeEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
return super(DateTimeEncoder, self).default(obj)


class DiasporaRadio(MonitoringRadio):
def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
from diaspora_event_sdk import KafkaProducer
benclifford marked this conversation as resolved.
Show resolved Hide resolved
self.source_id = source_id
self.producer = KafkaProducer(value_serializer=DiasporaRadio.serialize)
logger.info("Diaspora-based monitoring channel initializing")

def send(self, message: object) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The control flow in here, with two if statements, is a bit tangled: it's a bit unclear to me what assumptions you are making about the message parameter - compared to the other radios, which pass on message to some destination rather than attempt to inspect it/interpret it.

It would be good to understand what this code is actually trying to distinguish when:
i) testing if the value is a tuple or not: what are the situations (both from the parsl monitoring side and whatever your distant-end processor expects) in both of those cases...

ii) testing if it is a tuple with a second element that is indexable and contains a run_id value to use as a key - what actual structure are you expecting here and what sort of messages are you expecting in both of these cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want me to add some explanations here? I'm expecting the message to contain a run_id which will be set as key to be sent to diaspora. This will bring some convenience for records consumer to analyze them. But in order to be in similar structure and functionality with the other two radios, simply removing these two statements and leaving more work to the consumer will also be fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to process messages however you want in a radio - I think you just need to be clear in comments for someone coming along to work on this code later why/what you are doing.

topic = "radio-test"
if isinstance(message, tuple):
# TODO: make configurable
if 'run_id' in message[1]:
key = message[1]['run_id'].encode("utf-8")
else:
logger.info("set key as init")
key = b"init"
# logger.info(f"Sending message of type {key}:{msg_type} to topic {topic}, content {message[1]}")
self.producer.send(topic=topic, key=key, value=message[1])
else:
key = b"payload"
self.producer.send(topic=topic, key=key, value=message)
logger.info("Sent message")
return

@staticmethod
def serialize(value: Any) -> bytes:
return json.dumps(value, cls=DateTimeEncoder).encode("utf-8")


class FilesystemRadio(MonitoringRadio):
"""A MonitoringRadio that sends messages over a shared filesystem.

Expand Down Expand Up @@ -173,3 +211,16 @@ def send(self, message: object) -> None:
logging.error("Could not send message within timeout limit")
return
return


def get_monitoring_radio(monitoring_url: str, source_id: int, radio_mode: str, run_dir: str) -> MonitoringRadio:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good factorisation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out into a separate PR, #3432

if radio_mode == "udp":
return UDPRadio(monitoring_url, source_id)
elif radio_mode == "htex":
return HTEXRadio(monitoring_url, source_id)
elif radio_mode == "filesystem":
return FilesystemRadio(monitoring_url=monitoring_url, source_id=source_id, run_dir=run_dir)
elif radio_mode == "diaspora":
return DiasporaRadio(monitoring_url, source_id)
else:
raise ValueError(f"Unknown radio mode {radio_mode}")
30 changes: 5 additions & 25 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import time
import logging
import datetime
import parsl.monitoring.radios as radios
from functools import wraps

from parsl.multiprocessing import ForkProcess
from multiprocessing import Event
from parsl.process_loggers import wrap_with_logs

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MonitoringRadio, UDPRadio, HTEXRadio, FilesystemRadio
from typing import Any, Callable, Dict, List, Sequence, Tuple

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -121,18 +121,8 @@ def send_first_last_message(try_id: int,
import platform
import os

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio: radios.MonitoringRadio
radio = radios.get_monitoring_radio(monitoring_hub_url, task_id, radio_mode, run_dir)

msg = (MessageType.RESOURCE_INFO,
{'run_id': run_id,
Expand Down Expand Up @@ -177,18 +167,8 @@ def monitor(pid: int,

setproctitle("parsl: task resource monitor")

radio: MonitoringRadio
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
radio: radios.MonitoringRadio
radio = radios.get_monitoring_radio(monitoring_hub_url, task_id, radio_mode, run_dir)

logging.debug("start of monitor")

Expand Down
Empty file.
11 changes: 11 additions & 0 deletions parsl/tests/test_radio/diaspora_login.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'''
Before using diaspora radio, the user should first login to the diaspora event service.
This can not be aggregated into test file, because it needs an authentication token requiring cli
which pytest does not support.
'''
from diaspora_event_sdk import Client as GlobusClient
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this diaspora_login.py for? Can you add some usage notes about when it might be used?

Copy link
Contributor Author

@ClaudiaCumberbatch ClaudiaCumberbatch Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Before using diaspora radio, the user should first log in. This can not be aggregated into test file because login requires cli which pytest does not support.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What service is this logging into? / What kind of account is needed? Github Actions (which runs our automated testing) supports being logged into services (for example, that is how automated release publication works to PyPI) so possibly that is an approach to getting the test running in CI?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben, this is Haochen, I am a GLabs student working on the diaspora project. The log goes to an AWS-managed Kafka cluster maintained by the diaspora team. The login process works similarly to that of the Compute SDK, so any accounts supported by Globus Auth can log in. I'll discuss with Ryan this week to figure out how to log in during CI and how to drop the boto3 dependency. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think I shall mock the behaviors related to diaspora in order to run the testable parts instead of skipping the whole thing?

Copy link

@haochenpan haochenpan Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Ben,

Ryan and I agree that Parsl's CI should not depend on the diaspora Kafka cluster status, so mock sending to Diaspora in tests may be needed in the future. Also, after a recent paper deadline, we will refactor the indirect boto3 dependency in the Diaspora SDK to remove the botocore==1.29.125 constraint in Parsl's test.

Do you think Sicheng can keep the diaspora test optional for this PR and add mock sending tests in a future PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that there's an urgent need to rush this PR into master without tests and with awkward package dependencies. Hopefully your potential users will be ok to install from a branch at this prototype stage until you are able to work on the PR some more after your more important targets.

It's fine to leave this PR open until you have more time to work on it. You can convert it to a draft if you would like to indicate that status - I do that often for my own PRs - there's a link "Convert to draft" in the top right corner of the Github web interface.

c = GlobusClient()
print(c.retrieve_key())
topic = "radio-test" + c.subject_openid[-12:]
print(c.register_topic(topic))
print(c.list_topics())
72 changes: 72 additions & 0 deletions parsl/tests/test_radio/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import logging
import os
import parsl
import pytest
import threading
import time

from diaspora_event_sdk import KafkaConsumer
from diaspora_event_sdk import Client as GlobusClient


logger = logging.getLogger(__name__)


def consumer_check(consumer):
start = time.time()
for record in consumer:
end = time.time()
if end - start > 60:
assert False, "No messages received"
if record:
break


@parsl.python_app
def this_app():
# this delay needs to be several times the resource monitoring
# period configured in the test configuration, so that some
# messages are actually sent - there is no guarantee that any
# (non-first) resource message will be sent at all for a short app.
import time
time.sleep(3)

return 5


@pytest.mark.skip(reason="requires diaspora login")
def test_diaspora_radio():
c = GlobusClient()
topic = "radio-test" + c.subject_openid[-12:]
consumer = KafkaConsumer(topic)
# open a new thread for the consumer
consumer_thread = threading.Thread(target=consumer_check, args=(consumer,))
consumer_thread.start()

# this is imported here rather than at module level because
# it isn't available in a plain parsl install, so this module
# would otherwise fail to import and break even a basic test
# run.
import sqlalchemy
from sqlalchemy import text
from parsl.tests.configs.htex_local_alternate import fresh_config

if os.path.exists("runinfo/monitoring.db"):
logger.info("Monitoring database already exists - deleting")
os.remove("runinfo/monitoring.db")

logger.info("loading parsl")
c = fresh_config()
c.executors[0].radio_mode = "diaspora"
parsl.load(c)

logger.info("invoking and waiting for result")
assert this_app().result() == 5

logger.info("cleaning up parsl")
parsl.dfk().cleanup()
parsl.clear()

consumer_thread.join()

logger.info("all done")
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'flux': ['pyyaml', 'cffi', 'jsonschema'],
'proxystore': ['proxystore'],
'radical-pilot': ['radical.pilot'],
'diaspora_radio': ['diaspora-event-sdk[kafka-python]'],
# Disabling psi-j since github direct links are not allowed by pypi
# 'psij': ['psi-j-parsl@git+https://github.com/ExaWorks/psi-j-parsl']
}
Expand Down
3 changes: 3 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ sqlalchemy2-stubs
Sphinx==4.5.0
twine
wheel

diaspora-event-sdk[kafka-python]
cloudpickle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is cloudpickle used? If it's inside this diaspora-event-sdk[kafka-python], does that dependency bring in cloudpickle anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsl/tests/test_regression/test_1480.py E

======================================================================== ERRORS =========================================================================
______________________________________________________________ ERROR at setup of test_1480 ______________________________________________________________

self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]

    def __init__(self,
                 label: str = "TaskVineExecutor",
                 worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
                 function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
                 manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
                 factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
                 provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
                 storage_access: Optional[List[Staging]] = None):
    
        # Set worker launch option for this executor
        if worker_launch_method == 'factory' or worker_launch_method == 'manual':
            provider = None
    
        # Initialize the parent class with the execution provider and block error handling enabled.
        # If provider is None, then no worker is launched via the provider method.
        BlockProviderExecutor.__init__(self, provider=provider,
                                       block_error_handler=True)
    
        # Raise an exception if there's a problem importing TaskVine
        try:
>           import ndcctools.taskvine

parsl/executors/taskvine/executor.py:122: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/__init__.py:42: in <module>
    from .manager import (
/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/manager.py:24: in <module>
    from .task import (
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    ##
    # @package ndcctools.taskvine.task
    #
    # This module provides the classes to construct tasks to submit for execution to a
    # TaskVine manager.
    #
    
    # Copyright (C) 2022- The University of Notre Dame
    # This software is distributed under the GNU General Public License.
    # See the file COPYING for details.
    from . import cvine
    from .file import File
    
    import copy
    import os
    import sys
    import textwrap
    import uuid
>   import cloudpickle
E   ModuleNotFoundError: No module named 'cloudpickle'

/tmp/cctools/lib/python3.8/site-packages/ndcctools/taskvine/task.py:19: ModuleNotFoundError

During handling of the above exception, another exception occurred:

request = <SubRequest 'load_dfk_session' for <Function test_1480>>, pytestconfig = <_pytest.config.Config object at 0x7f5d77a7dde0>
tmpd_cwd_session = PosixPath('/home/cc/parsl_resilient/.pytest/parsltest-current')

    @pytest.fixture(autouse=True, scope='session')
    def load_dfk_session(request, pytestconfig, tmpd_cwd_session):
        """Load a dfk around entire test suite, except in local mode.
    
        The special path `local` indicates that configuration will not come
        from a pytest managed configuration file; in that case, see
        load_dfk_local_module for module-level configuration management.
        """
    
        RepresentationMixin._validate_repr = True
    
        config = pytestconfig.getoption('config')[0]
    
        if config != 'local':
            spec = importlib.util.spec_from_file_location('', config)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
    
            if DataFlowKernelLoader._dfk is not None:
                raise RuntimeError("DFK didn't start as None - there was a DFK from somewhere already")
    
            if hasattr(module, 'config'):
                parsl_conf = module.config
            elif hasattr(module, 'fresh_config'):
>               parsl_conf = module.fresh_config()

parsl/tests/conftest.py:187: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
parsl/tests/configs/taskvine_ex.py:11: in fresh_config
    return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError('class TaskVineExecutor uses worker_launch_method in the constructor, but does not define it as an attribute') raised in repr()] TaskVineExecutor object at 0x7f5cf7ef9cf0>
label = 'TaskVineExecutor', worker_launch_method = 'factory', function_exec_mode = 'regular'
manager_config = TaskVineManagerConfig(port=9000, address='resilient-compute', project_name=None, project_password_file=None, env_vars=...-xput', autolabel_window=None, autocategory=True, enable_peer_transfers=True, wait_for_workers=None, vine_log_dir=None)
factory_config = TaskVineFactoryConfig(factory_timeout=300, scratch_dir=None, min_workers=1, max_workers=1, workers_per_cycle=1, worker...ents=None, batch_options=None, _project_port=0, _project_address=None, _project_name=None, _project_password_file=None)
provider = None, storage_access = [FTPInTaskStaging(), HTTPInTaskStaging(), NoOpFileStaging()]

    def __init__(self,
                 label: str = "TaskVineExecutor",
                 worker_launch_method: Union[Literal['provider'], Literal['factory'], Literal['manual']] = 'factory',
                 function_exec_mode: Union[Literal['regular'], Literal['serverless']] = 'regular',
                 manager_config: TaskVineManagerConfig = TaskVineManagerConfig(),
                 factory_config: TaskVineFactoryConfig = TaskVineFactoryConfig(),
                 provider: Optional[ExecutionProvider] = LocalProvider(init_blocks=1),
                 storage_access: Optional[List[Staging]] = None):
    
        # Set worker launch option for this executor
        if worker_launch_method == 'factory' or worker_launch_method == 'manual':
            provider = None
    
        # Initialize the parent class with the execution provider and block error handling enabled.
        # If provider is None, then no worker is launched via the provider method.
        BlockProviderExecutor.__init__(self, provider=provider,
                                       block_error_handler=True)
    
        # Raise an exception if there's a problem importing TaskVine
        try:
            import ndcctools.taskvine
            logger.debug(f'TaskVine default port: {ndcctools.taskvine.cvine.VINE_DEFAULT_PORT}')
        except ImportError:
>           raise OptionalModuleMissing(['taskvine'], "TaskVineExecutor requires the taskvine module.")
E           parsl.errors.OptionalModuleMissing: The functionality requested requires optional modules ['taskvine'] which could not be imported, because: TaskVineExecutor requires the taskvine module.

parsl/executors/taskvine/executor.py:125: OptionalModuleMissing
=================================================================== warnings summary ====================================================================
parsl/executors/workqueue/executor.py:46
  /home/cc/parsl_resilient/parsl/executors/workqueue/executor.py:46: DeprecationWarning: 'import work_queue' is deprecated. Please instead use: 'import ndcctools.work_queue'
    import work_queue as wq

parsl/tests/test_radical/test_mpi_funcs.py:21
  /home/cc/parsl_resilient/parsl/tests/test_radical/test_mpi_funcs.py:21: PytestUnknownMarkWarning: Unknown pytest.mark.radical - is this a typo?  You can register custom marks to avoid this warning - for details, see https://docs.pytest.org/en/stable/how-to/mark.html
    @pytest.mark.radical

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
================================================================= slowest 10 durations ==================================================================
0.05s setup    test_regression/test_1480.py::test_1480

(1 durations < 0.005s hidden.  Use -vv to show these durations.)
================================================================ short test summary info ================================================================
SKIPPED [1] parsl/tests/configs/ec2_single_node.py:30: 'public_ip' not configured in user_opts.py
SKIPPED [1] parsl/tests/configs/local_threads_globus.py:14: 'globus' not configured in user_opts.py
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
================================================ 2 skipped, 30 deselected, 2 warnings, 1 error in 0.94s =================================================
make: *** [Makefile:69: vineex_local_test] Error 1

This is my local test result. If I remove this dependency, it will give this module not found error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. Do you get that same failure when building/testing against parsl master branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly the same error information. Shall I update this in a general PR, maybe also in the one changing runinfo_* in makefile?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is running successfully on the test server, which suggests something is different about your dev environment compared to the test server.

The vineex_local_test target should run the CCTOOLS_INSTALL dependency in Makefile, which then runs parsl/executors/taskvine/install-taskvine.sh which then installs cloudpickle.

So if this isn't working for you, perhaps have a look at why that dependency install isn't behaving properly for you. (for example, the makefile target won't re-run properly if you delete your Python environment but do not delete /tmp/cctools, because make is not a very good language for specifying accurate dependencies)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure, I'll remove this line.

Loading