Skip to content

Commit

Permalink
Removed lazy import prototype
Browse files Browse the repository at this point in the history
This gave measurable performance improvement under very high load/shared fs
situations, but it is a lot of additional complexity in the code to achieve
that.

Reducing import cost is probably a good goal, but this way is probably not
the way to do it.
  • Loading branch information
benclifford committed Aug 14, 2024
1 parent 526ab75 commit e5603a7
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 245 deletions.
111 changes: 26 additions & 85 deletions parsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,98 +18,28 @@
import multiprocessing as _multiprocessing
import os
import platform
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from parsl.app.app import bash_app, join_app, python_app
from parsl.config import Config
from parsl.data_provider.files import File
from parsl.dataflow.dflow import DataFlowKernel
from parsl.executors import (
HighThroughputExecutor,
ThreadPoolExecutor,
WorkQueueExecutor,
)
from parsl.log_utils import set_file_logger, set_stream_logger
from parsl.monitoring import MonitoringHub

lazys = {
'python_app': 'parsl.app.app',
'bash_app': 'parsl.app.app',
'join_app': 'parsl.app.app',
'Config': 'parsl.config',
'ThreadPoolExecutor': 'parsl.executors',
'HighThroughputExecutor': 'parsl.executors',
'WorkQueueExecutor': 'parsl.executors',
'set_stream_logger': 'parsl.log_utils',
'set_file_logger': 'parsl.log_utils',
'MonitoringHub': 'parsl.monitoring',
'File': 'parsl.data_provider.files',
'DataFlowKernel': 'parsl.dataflow.dflow',
'DataFlowKernelLoader': 'parsl.dataflow.dflow',
}

import parsl


def lazy_loader(name):
# print(f"lazy_loader getattr for {name}")
if name in lazys:
import importlib
m = lazys[name]
# print(f"lazy load {name} from module {m}")
v = importlib.import_module(m)
# print(f"imported module: {v}")
a = v.__getattribute__(name)
parsl.__setattr__(name, a)
return a
raise AttributeError(f"No (lazy loadable) attribute in {__name__} for {name}")


# parsl/__init__.py:61: error: Cannot assign to a method
parsl.__getattr__ = lazy_loader # type: ignore[method-assign]

if platform.system() == 'Darwin':
_multiprocessing.set_start_method('fork', force=True)


AUTO_LOGNAME = -1

# there's a reason these were aliases and not redefinitions,
# and i should fix this to keep them as such.


def clear(*args, **kwargs):
from parsl import DataFlowKernelLoader
return DataFlowKernelLoader.clear(*args, **kwargs)


def load(*args, **kwargs):
from parsl import DataFlowKernelLoader
return DataFlowKernelLoader.load(*args, **kwargs)


def dfk(*args, **kwargs):
from parsl import DataFlowKernelLoader
return DataFlowKernelLoader.dfk(*args, **kwargs)


def wait_for_current_tasks(*args, **kwargs):
from parsl import DataFlowKernelLoader
return DataFlowKernelLoader.wait_for_current_tasks(*args, **kwargs)


logging.getLogger('parsl').addHandler(logging.NullHandler())
from parsl.app.app import bash_app, join_app, python_app
from parsl.config import Config
from parsl.data_provider.files import File
from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader
from parsl.executors import (
HighThroughputExecutor,
ThreadPoolExecutor,
WorkQueueExecutor,
)
from parsl.log_utils import set_file_logger, set_stream_logger
from parsl.monitoring import MonitoringHub
from parsl.version import VERSION

if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
_multiprocessing.set_start_method('fork', force=True)

__author__ = 'The Parsl Team'

from parsl.version import VERSION

__version__ = VERSION

AUTO_LOGNAME = -1

__all__ = [

# decorators
Expand All @@ -136,3 +66,14 @@ def wait_for_current_tasks(*args, **kwargs):
# monitoring
'MonitoringHub',
]

clear = DataFlowKernelLoader.clear
load = DataFlowKernelLoader.load
dfk = DataFlowKernelLoader.dfk
wait_for_current_tasks = DataFlowKernelLoader.wait_for_current_tasks


logging.getLogger('parsl').addHandler(logging.NullHandler())

if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
16 changes: 7 additions & 9 deletions parsl/app/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import annotations

"""Definitions for the @App decorator and the App classes.
The App class encapsulates a generic leaf task that can be executed asynchronously.
Expand All @@ -11,8 +9,8 @@

import typeguard

import parsl.dataflow.dflow as dflow
import parsl.dataflow.futures
from parsl.dataflow.dflow import DataFlowKernel
from parsl.dataflow.futures import AppFuture

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +25,7 @@ class AppBase(metaclass=ABCMeta):

@typeguard.typechecked
def __init__(self, func: Callable,
data_flow_kernel: Optional[dflow.DataFlowKernel] = None,
data_flow_kernel: Optional[DataFlowKernel] = None,
executors: Union[List[str], str] = 'all',
cache: bool = False,
ignore_for_cache: Optional[Sequence[str]] = None) -> None:
Expand Down Expand Up @@ -73,13 +71,13 @@ def __init__(self, func: Callable,
self.kwargs['inputs'] = params['inputs'].default

@abstractmethod
def __call__(self, *args: Any, **kwargs: Any) -> parsl.dataflow.futures.AppFuture:
def __call__(self, *args: Any, **kwargs: Any) -> AppFuture:
pass


@typeguard.typechecked
def python_app(function: Optional[Callable] = None,
data_flow_kernel: Optional[dflow.DataFlowKernel] = None,
data_flow_kernel: Optional[DataFlowKernel] = None,
cache: bool = False,
executors: Union[List[str], str] = 'all',
ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
Expand Down Expand Up @@ -120,7 +118,7 @@ def wrapper(f: Callable) -> PythonApp:

@typeguard.typechecked
def join_app(function: Optional[Callable] = None,
data_flow_kernel: Optional[dflow.DataFlowKernel] = None,
data_flow_kernel: Optional[DataFlowKernel] = None,
cache: bool = False,
ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
"""Decorator function for making join apps
Expand Down Expand Up @@ -158,7 +156,7 @@ def wrapper(f: Callable) -> PythonApp:

@typeguard.typechecked
def bash_app(function: Optional[Callable] = None,
data_flow_kernel: Optional[dflow.DataFlowKernel] = None,
data_flow_kernel: Optional[DataFlowKernel] = None,
cache: bool = False,
executors: Union[List[str], str] = 'all',
ignore_for_cache: Optional[Sequence[str]] = None) -> Callable:
Expand Down
30 changes: 2 additions & 28 deletions parsl/channels/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,4 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from parsl.channels.base import Channel
from parsl.channels.local.local import LocalChannel

lazys = {
'Channel': 'parsl.channels.base',
'LocalChannel': 'parsl.channels.local.local',
}

import parsl.channels as px


def lazy_loader(name):
if name in lazys:
import importlib
m = lazys[name]
# print(f"lazy load {name} from module {m}")
v = importlib.import_module(m)
# print(f"imported module: {v}")
a = v.__getattribute__(name)
px.__setattr__(name, a)
return a
raise AttributeError(f"No (lazy loadable) attribute in {__name__} for {name}")


px.__getattr__ = lazy_loader # type: ignore[method-assign]
from parsl.channels.base import Channel
from parsl.channels.local.local import LocalChannel

__all__ = ['Channel', 'LocalChannel']
6 changes: 2 additions & 4 deletions parsl/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from __future__ import annotations

import logging
from typing import Callable, Iterable, Optional, Sequence, Union

import typeguard
from typing_extensions import Literal

import parsl.dataflow.taskrecord as taskrecord
from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.executors.base import ParslExecutor
from parsl.executors.threads import ThreadPoolExecutor
Expand Down Expand Up @@ -112,7 +110,7 @@ def __init__(self,
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
retry_handler: Optional[Callable[[Exception, taskrecord.TaskRecord], float]] = None,
retry_handler: Optional[Callable[[Exception, TaskRecord], float]] = None,
run_dir: str = 'runinfo',
std_autopath: Optional[Callable] = None,
strategy: Optional[str] = 'simple',
Expand Down
Loading

0 comments on commit e5603a7

Please sign in to comment.