Skip to content

Commit

Permalink
Fix race condition when recording Dask cluster state
Browse files Browse the repository at this point in the history
Running `covalent start` causes the config file to be
updated from two processes:
* the CLI runner, after `_graceful_start()` returns, and
* the DaskCluster process, which records the cluster state after the
cluster starts up.

Unfortunately, the CLI runner previously wrote out the config it
loaded into memory before starting the dispatcher process. Its
ConfigManager instance was therefore unaware of any config file
updates that might have happened before `_graceful_start()`
returned. If the Dask cluster were to finish starting up and write out
its state before the CLI runner returned from `_graceful_start()`, the
CLI's config file update would obliterate the Dask cluster info.

This patch refreshes the CLI app's ConfigManager instance from the
on-disk config file after `_graceful_start()` and ensures that the
Dask cluster finishes starting before `_graceful_start()` returns.
  • Loading branch information
cjao committed Nov 7, 2023
1 parent a5cbb95 commit 31f0479
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,18 @@ jobs:
echo "Invalid backend specified in test matrix."
exit 1
fi
cat $HOME/.config/covalent/covalent.conf
env:
COVALENT_EXECUTOR_DIR: doc/source/how_to/execution/custom_executors

- name: Print Covalent status
if: env.BUILD_AND_RUN_ALL
id: covalent_status
run: |
covalent status
covalent cluster --info
covalent cluster --logs
- name: Run functional tests and measure coverage
id: functional-tests
if: env.BUILD_AND_RUN_ALL
Expand Down
7 changes: 6 additions & 1 deletion covalent_dispatcher/_cli/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from rich.table import Table
from rich.text import Text

from covalent._shared_files.config import ConfigManager, get_config, set_config
from covalent._shared_files.config import ConfigManager, get_config, reload_config, set_config

from .._db.datastore import DataStore
from .migrate import migrate_pickled_result_object
Expand Down Expand Up @@ -241,6 +241,11 @@ def _graceful_start(
except requests.exceptions.ConnectionError:
time.sleep(1)

# Since the dispatcher process might update the config file with the Dask cluster's state,
# we need to sync those changes with the CLI's ConfigManager instance. Otherwise the next
# call to `set_config()` from this module would obliterate the Dask cluster state.
reload_config()

Path(get_config("dispatcher.cache_dir")).mkdir(parents=True, exist_ok=True)
Path(get_config("dispatcher.results_dir")).mkdir(parents=True, exist_ok=True)
Path(get_config("dispatcher.log_dir")).mkdir(parents=True, exist_ok=True)
Expand Down
30 changes: 17 additions & 13 deletions covalent_dispatcher/_service/app_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import os
from logging import Logger
from multiprocessing import Process, current_process
from multiprocessing.connection import Connection
from threading import Thread

import dask.config
from dask.distributed import LocalCluster
from distributed.core import Server, rpc

from covalent._shared_files import logger
from covalent._shared_files.config import get_config, update_config
from covalent._shared_files.config import get_config
from covalent._shared_files.utils import get_random_available_port

app_log = logger.app_log
Expand Down Expand Up @@ -170,12 +171,15 @@ class DaskCluster(Process):
randomly selected TCP port that is available
"""

def __init__(self, name: str, logger: Logger):
def __init__(self, name: str, logger: Logger, conn: Connection):
super(DaskCluster, self).__init__()
self.name = name
self.logger = logger
self.cluster = None

# For sending cluster state back to main covalent process
self.conn = conn

Check warning on line 181 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L181

Added line #L181 was not covered by tests

# Cluster configuration
self.num_workers = None
self.mem_per_worker = None
Expand Down Expand Up @@ -219,18 +223,18 @@ def run(self):
dashboard_link = self.cluster.dashboard_link

try:
update_config(
{
"dask": {
"scheduler_address": scheduler_address,
"dashboard_link": dashboard_link,
"process_info": current_process(),
"pid": os.getpid(),
"admin_host": self.admin_host,
"admin_port": self.admin_port,
}
dask_config = {

Check warning on line 226 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L226

Added line #L226 was not covered by tests
"dask": {
"scheduler_address": scheduler_address,
"dashboard_link": dashboard_link,
"process_info": str(current_process()),
"pid": os.getpid(),
"admin_host": self.admin_host,
"admin_port": self.admin_port,
}
)
}

self.conn.send(dask_config)

Check warning on line 237 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L237

Added line #L237 was not covered by tests

admin = DaskAdminWorker(self.cluster, self.admin_host, self.admin_port, self.logger)
admin.start()
Expand Down
8 changes: 6 additions & 2 deletions covalent_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import argparse
import os
from multiprocessing import Pipe

import socketio
import uvicorn
from fastapi import Request
from fastapi.templating import Jinja2Templates

from covalent._shared_files import logger
from covalent._shared_files.config import get_config
from covalent._shared_files.config import get_config, update_config
from covalent_dispatcher._service.app_dask import DaskCluster
from covalent_dispatcher._triggers_app import triggers_only_app # nopycln: import
from covalent_ui.api.main import app as fastapi_app
Expand Down Expand Up @@ -110,8 +111,11 @@ def get_home(request: Request, rest_of_path: str):

# Start dask if no-cluster flag is not specified (covalent stop auto terminates all child processes of this)
if args.cluster:
dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log)
parent_conn, child_conn = Pipe()
dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log, conn=child_conn)

Check warning on line 115 in covalent_ui/app.py

View check run for this annotation

Codecov / codecov/patch

covalent_ui/app.py#L114-L115

Added lines #L114 - L115 were not covered by tests
dask_cluster.start()
dask_config = parent_conn.recv()
update_config(dask_config)

Check warning on line 118 in covalent_ui/app.py

View check run for this annotation

Codecov / codecov/patch

covalent_ui/app.py#L117-L118

Added lines #L117 - L118 were not covered by tests

app_name = "app:fastapi_app"
if args.triggers_only:
Expand Down

0 comments on commit 31f0479

Please sign in to comment.