Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into diaspora_radio
Browse files Browse the repository at this point in the history
  • Loading branch information
ClaudiaCumberbatch authored Mar 11, 2024
2 parents d42638b + e03a97b commit 7e610b1
Show file tree
Hide file tree
Showing 22 changed files with 78 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E126, E402, E129, W504
max-line-length = 151
max-line-length = 147
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
# E741 disallows ambiguous single letter names which look like numbers
Expand Down
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |NSF-1550528| image:: https://img.shields.io/badge/NSF-1550528-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550528
:alt: NSF award info
.. |NSF-1550475| image:: https://img.shields.io/badge/NSF-1550475-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550475
:alt: NSF award info


Quickstart
==========
Expand Down
2 changes: 0 additions & 2 deletions docs/devguide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ Developer documentation
:maxdepth: 3

contributing
changelog
design
roadmap
packaging
../README
File renamed without changes.
File renamed without changes.
9 changes: 9 additions & 0 deletions docs/historical/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Historical Documents
====================

.. toctree::
:maxdepth: 2

changelog
design
performance
File renamed without changes.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ Table of Contents
faq
reference
devguide/index
historical/index


Indices and tables
Expand Down
26 changes: 13 additions & 13 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,12 @@ This system uses Grid Engine which Parsl interfaces with using the `parsl.provid
.. literalinclude:: ../../parsl/configs/cc_in2p3.py


CCL (Notre Dame, with Work Queue)
---------------------------------
CCL (Notre Dame, TaskVine)
--------------------------

.. image:: http://ccl.cse.nd.edu/software/workqueue/WorkQueueLogoSmall.png
.. image:: https://ccl.cse.nd.edu/software/taskvine/taskvine-logo.png

To utilize Work Queue with Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment
To utilize TaskVine with Parsl, please install the full CCTools software package within an appropriate Anaconda or Miniconda environment
(instructions for installing Miniconda can be found `in the Conda install guide <https://docs.conda.io/projects/conda/en/latest/user-guide/install/>`_):

.. code-block:: bash
Expand All @@ -498,17 +498,17 @@ To utilize Work Queue with Parsl, please install the full CCTools software packa
$ conda activate <environment>
$ conda install -y -c conda-forge ndcctools parsl
This creates a Conda environment on your machine with all the necessary tools and setup needed to utilize Work Queue with the Parsl library.
This creates a Conda environment on your machine with all the necessary tools and setup needed to utilize TaskVine with the Parsl library.

The following snippet shows an example configuration for using the Parsl/TaskVine executor to run applications on the local machine.
This examples uses the `parsl.executors.taskvine.TaskVineExecutor` to schedule tasks, and a local worker will be started automatically.
For more information on using TaskVine, including configurations for remote execution, visit the
`TaskVine/Parsl documentation online <https://cctools.readthedocs.io/en/latest/taskvine/#parsl>`_.

The following snippet shows an example configuration for using the Work Queue distributed framework to run applications on remote machines at large.
This examples uses the `parsl.executors.WorkQueueExecutor` to schedule tasks locally,
and assumes that Work Queue workers have been externally connected to the master using the
`work_queue_factory <https://cctools.readthedocs.io/en/latest/man_pages/work_queue_factory/>`_ or
`condor_submit_workers <https://cctools.readthedocs.io/en/latest/man_pages/condor_submit_workers/>`_ command line utilities from CCTools.
For more information on using Work Queue or to get help with running applications using CCTools,
visit the `CCTools documentation online <https://cctools.readthedocs.io/en/latest/help/>`_.
.. literalinclude:: ../../parsl/configs/vineex_local.py

.. literalinclude:: ../../parsl/configs/wqex_local.py
TaskVine's predecessor, WorkQueue, may continue to be used with Parsl.
For more information on using WorkQueue visit the `CCTools documentation online <https://cctools.readthedocs.io/en/latest/help/>`_.

Expanse (SDSC)
--------------
Expand Down
1 change: 0 additions & 1 deletion docs/userguide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,3 @@ User guide
usage_tracking
plugins
parsl_perf
performance
2 changes: 1 addition & 1 deletion parsl/addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def address_by_hostname() -> str:
def address_by_interface(ifname: str) -> str:
"""Returns the IP address of the given interface name, e.g. 'eth0'
This is taken from a Stack Overflow answer: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-of-eth0-in-python#24196955
This is from a Stack Overflow answer: https://stackoverflow.com/questions/24196932/how-can-i-get-the-ip-address-of-eth0-in-python#24196955
Parameters
----------
Expand Down
9 changes: 6 additions & 3 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, config: Config) -> None:
self.checkpoint_lock = threading.Lock()

self.usage_tracker = UsageTracker(self)
self.usage_tracker.send_message()
self.usage_tracker.send_start_message()

self.task_state_counts_lock = threading.Lock()
self.task_state_counts = {state: 0 for state in States}
Expand Down Expand Up @@ -722,7 +722,10 @@ def launch_task(self, task_record: TaskRecord) -> Future:
self._send_task_log_info(task_record)

if hasattr(exec_fu, "parsl_executor_task_id"):
logger.info(f"Parsl task {task_id} try {try_id} launched on executor {executor.label} with executor id {exec_fu.parsl_executor_task_id}")
logger.info(
f"Parsl task {task_id} try {try_id} launched on executor {executor.label} "
f"with executor id {exec_fu.parsl_executor_task_id}")

else:
logger.info(f"Parsl task {task_id} try {try_id} launched on executor {executor.label}")

Expand Down Expand Up @@ -1202,7 +1205,7 @@ def cleanup(self) -> None:
self._checkpoint_timer.close()

# Send final stats
self.usage_tracker.send_message()
self.usage_tracker.send_end_message()
self.usage_tracker.close()

logger.info("Closing job status poller")
Expand Down
4 changes: 3 additions & 1 deletion parsl/dataflow/taskrecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class TaskRecord(TypedDict, total=False):
# these three could be more strongly typed perhaps but I'm not thinking about that now
func: Callable
fn_hash: str
args: Sequence[Any] # in some places we uses a Tuple[Any, ...] and in some places a List[Any]. This is an attempt to correctly type both of those.
args: Sequence[Any]
# in some places we uses a Tuple[Any, ...] and in some places a List[Any].
# This is an attempt to correctly type both of those.
kwargs: Dict[str, Any]

time_invoked: Optional[datetime.datetime]
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ def submit(self, func, resource_specification, *args, **kwargs):
"""Submits work to the outgoing_q.
The outgoing_q is an external process listens on this
queue for new work. This method behaves like a
submit call as described here `Python docs: <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
queue for new work. This method behaves like a submit call as described here `Python docs: <https://docs.python.org/3/
library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor>`_
Args:
- func (callable) : Callable function
Expand Down
7 changes: 6 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,12 @@ def start(self) -> None:
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.warning("Exiting")

def process_task_outgoing_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket], kill_event: threading.Event) -> None:
def process_task_outgoing_incoming(
self,
interesting_managers: Set[bytes],
hub_channel: Optional[zmq.Socket],
kill_event: threading.Event
) -> None:
"""Process one message from manager on the task_outgoing channel.
Note that this message flow is in contradiction to the name of the
channel - it is not an outgoing message and it is not a task.
Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ def __create_data_and_logging_dirs(self):
# factory logs go with manager logs regardless
self.factory_config.scratch_dir = self.manager_config.vine_log_dir
logger.debug(f"Function data directory: {self._function_data_dir}, log directory: {log_dir}")
logger.debug(f"TaskVine manager log directory: {self.manager_config.vine_log_dir}, factory log directory: {self.factory_config.scratch_dir}")
logger.debug(
f"TaskVine manager log directory: {self.manager_config.vine_log_dir}, "
f"factory log directory: {self.factory_config.scratch_dir}")

def start(self):
"""Create submit process and collector thread to create, send, and
Expand Down
3 changes: 2 additions & 1 deletion parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ def _general_strategy(self, status_list, *, strategy_type):
exec_status.scale_in(active_blocks - min_blocks)

else:
logger.debug(f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in")
logger.debug(
f"Idle time {idle_duration}s is less than max_idletime {self.max_idletime}s for executor {label}; not scaling in")

# Case 2
# More tasks than the available slots.
Expand Down
14 changes: 11 additions & 3 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,13 @@ def insert(self, *, table: str, messages: List[MonitoringMessage]) -> None:
def rollback(self) -> None:
self.session.rollback()

def _generate_mappings(self, table: Table, columns: Optional[List[str]] = None, messages: List[MonitoringMessage] = []) -> List[Dict[str, Any]]:
def _generate_mappings(
self,
table: Table,
columns: Optional[List[str]] = None,
messages: List[MonitoringMessage] = [],
) -> List[Dict[str, Any]]:

mappings = []
for msg in messages:
m = {}
Expand Down Expand Up @@ -583,8 +589,10 @@ def _migrate_logs_to_internal(self, logs_queue: queue.Queue, queue_tag: str, kil
self._dispatch_to_internal(x)
elif queue_tag == 'resource':
assert isinstance(x, tuple), "_migrate_logs_to_internal was expecting a tuple, got {}".format(x)
assert x[0] == MessageType.RESOURCE_INFO, \
"_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, got tag {}, message {}".format(x[0], x)
assert x[0] == MessageType.RESOURCE_INFO, (
"_migrate_logs_to_internal can only migrate RESOURCE_INFO message from resource queue, "
"got tag {}, message {}".format(x[0], x)
)
self._dispatch_to_internal(x)
elif queue_tag == 'node':
assert len(x) == 2, "expected message tuple to have exactly two elements"
Expand Down
7 changes: 6 additions & 1 deletion parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
else:
logger.error("Submit command failed")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode)
raise SubmitException(
job_name, "Could not read job ID from submit command standard output",
stdout=stdout,
stderr=stderr,
retcode=retcode
)

def cancel(self, job_ids):
''' Cancels the jobs specified by a list of job ids
Expand Down
4 changes: 3 additions & 1 deletion parsl/tests/configs/user_opts.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
# 'username': MIDWAY_USERNAME,
# 'script_dir': '/scratch/midway2/{}/parsl_scripts'.format(MIDWAY_USERNAME),
# 'scheduler_options': "",
# 'worker_init': 'cd /scratch/midway2/{}/parsl_scripts; module load Anaconda3/5.1.0; source activate parsl_testing;'.format(MIDWAY_USERNAME),
# 'worker_init': 'cd /scratch/midway2/{}/parsl_scripts; '
# 'module load Anaconda3/5.1.0; source activate parsl_testing;'
# .format(MIDWAY_USERNAME),
# },
# 'osg': {
# 'username': OSG_USERNAME,
Expand Down
3 changes: 2 additions & 1 deletion parsl/tests/test_scaling/test_scale_down_htex_auto_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ def test_scale_out(tmpd_cwd, try_assert):

assert dfk.executors['htex_local'].outstanding == 0

# now we can launch one "long" task - and what should happen is that the connected_managers count "eventually" (?) converges to 1 and stays there.
# now we can launch one "long" task -
# and what should happen is that the connected_managers count "eventually" (?) converges to 1 and stays there.

finish_path = tmpd_cwd / "stage2_workers_may_continue"

Expand Down
14 changes: 5 additions & 9 deletions parsl/usage_tracking/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def __init__(self, dfk, port=50077,
sys.version_info.micro)
self.tracking_enabled = self.check_tracking_enabled()
logger.debug("Tracking status: {}".format(self.tracking_enabled))
self.initialized = False # Once first message is sent this will be True

def check_tracking_enabled(self):
"""Check if tracking is enabled.
Expand Down Expand Up @@ -176,15 +175,12 @@ def send_UDP_message(self, message: str) -> None:
except Exception as e:
logger.debug("Usage tracking failed: {}".format(e))

def send_message(self) -> None:
"""Send message over UDP.
"""
if not self.initialized:
message = self.construct_start_message()
self.initialized = True
else:
message = self.construct_end_message()
def send_start_message(self) -> None:
message = self.construct_start_message()
self.send_UDP_message(message)

def send_end_message(self) -> None:
message = self.construct_end_message()
self.send_UDP_message(message)

def close(self, timeout: float = 10.0) -> None:
Expand Down

0 comments on commit 7e610b1

Please sign in to comment.