Skip to content

Commit

Permalink
Consolidate job handling in a new module, parsl.jobs (#2837)
Browse files Browse the repository at this point in the history
Prior to this, a lot of job handling was tangled into the parsl.dataflow module.

There's a growing desire in Globus Compute to use the job handling parts of Parsl as a user-exposed feature, distinct from the data flow parts of Parsl.

This PR supports that new use case by creating a new module, parsl.jobs, for job-related code to live in, to help with ongoing work relaxing the previous requirement that a Dataflow Kernel would always exist to manage parsl components.

This PR only moves code around - it should not cause any change in functionality.

This is a breaking change for any externally implemented providers - they would need the same (hopefully superficial) import fixups that this PR does to providers in parsl/providers/

(this is probably a breaking change for Globus Compute making out-of-scope use of Parsl internals, but ... that's out of scope: this PR is part of making that sort of stuff exposed to users properly)
  • Loading branch information
benclifford authored Jul 20, 2023
1 parent 385fa3b commit d5db72f
Show file tree
Hide file tree
Showing 26 changed files with 156 additions and 144 deletions.
4 changes: 2 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ Internal
parsl.app.bash.BashApp
parsl.app.python.PythonApp
parsl.dataflow.dflow.DataFlowKernel
parsl.dataflow.job_status_poller.JobStatusPoller
parsl.dataflow.memoization.id_for_memo
parsl.dataflow.memoization.Memoizer
parsl.dataflow.states.FINAL_STATES
parsl.dataflow.states.States
parsl.dataflow.strategy.Strategy
parsl.dataflow.taskrecord.TaskRecord
parsl.jobs.job_status_poller.JobStatusPoller
parsl.jobs.strategy.Strategy
parsl.utils.Timer
5 changes: 3 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@
from parsl.data_provider.files import File
from parsl.dataflow.errors import BadCheckpoint, DependencyError, JoinError
from parsl.dataflow.futures import AppFuture
from parsl.dataflow.job_status_poller import JobStatusPoller
from parsl.dataflow.memoization import Memoizer
from parsl.dataflow.rundirs import make_rundir
from parsl.dataflow.states import States, FINAL_STATES, FINAL_FAILURE_STATES
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.jobs.job_status_poller import JobStatusPoller
from parsl.jobs.states import JobStatus, JobState
from parsl.usage_tracking.usage import UsageTracker
from parsl.executors.base import ParslExecutor
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.process_loggers import wrap_with_logs
from parsl.providers.base import ExecutionProvider, JobStatus, JobState
from parsl.providers.base import ExecutionProvider
from parsl.utils import get_version, get_std_fname_mode, get_all_checkpoints, Timer

from parsl.monitoring.message_type import MessageType
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Callable, Dict, Optional, List
from typing_extensions import Literal, Self

from parsl.providers.base import JobStatus
from parsl.jobs.states import JobStatus

import parsl # noqa F401

Expand Down Expand Up @@ -167,7 +167,7 @@ def error_management_enabled(self) -> bool:
pass

@abstractmethod
def handle_errors(self, error_handler: "parsl.dataflow.job_error_handler.JobErrorHandler",
def handle_errors(self, error_handler: "parsl.jobs.job_error_handler.JobErrorHandler",
status: Dict[str, JobStatus]) -> None:
"""This method is called by the error management infrastructure after a status poll. The
executor implementing this method is then responsible for detecting abnormal conditions
Expand Down
7 changes: 4 additions & 3 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import parsl # noqa F401
from parsl.executors.base import ParslExecutor
from parsl.executors.errors import BadStateException, ScalingFailed
from parsl.providers.base import JobStatus, ExecutionProvider, JobState
from parsl.jobs.states import JobStatus, JobState
from parsl.providers.base import ExecutionProvider
from parsl.utils import AtomicIDCounter


Expand Down Expand Up @@ -135,7 +136,7 @@ def executor_exception(self):
def error_management_enabled(self):
return self.block_error_handler

def handle_errors(self, error_handler: "parsl.dataflow.job_error_handler.JobErrorHandler",
def handle_errors(self, error_handler: "parsl.jobs.job_error_handler.JobErrorHandler",
status: Dict[str, JobStatus]) -> None:
if not self.block_error_handler:
return
Expand Down Expand Up @@ -235,7 +236,7 @@ def set_bad_state_and_fail_all(self, exception: Exception):
def status(self):
return {}

def handle_errors(self, error_handler: "parsl.dataflow.job_error_handler.JobErrorHandler",
def handle_errors(self, error_handler: "parsl.jobs.job_error_handler.JobErrorHandler",
status: Dict[str, JobStatus]) -> None:
pass

Expand Down
Empty file added parsl/jobs/__init__.py
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from typing import List, Dict

import parsl.dataflow.job_status_poller as jsp
import parsl.jobs.job_status_poller as jsp

from parsl.executors.base import ParslExecutor
from parsl.providers.base import JobStatus, JobState
from parsl.jobs.states import JobStatus, JobState


class JobErrorHandler:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
from typing import Dict, Sequence
from typing import List # noqa F401 (used in type annotation)

from parsl.dataflow.job_error_handler import JobErrorHandler
from parsl.dataflow.strategy import Strategy
from parsl.executors.base import ParslExecutor
from parsl.jobs.job_error_handler import JobErrorHandler
from parsl.jobs.states import JobStatus, JobState
from parsl.jobs.strategy import Strategy
from parsl.monitoring.message_type import MessageType

from parsl.providers.base import JobStatus, JobState

from parsl.utils import Timer

Expand Down
116 changes: 116 additions & 0 deletions parsl/jobs/states.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
from enum import IntEnum
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class JobState(IntEnum):
"""Defines a set of states that a job can be in"""
UNKNOWN = 0
PENDING = 1
RUNNING = 2
CANCELLED = 3
COMPLETED = 4
FAILED = 5
TIMEOUT = 6
HELD = 7

def __str__(self) -> str:
return self.__class__.__name__ + "." + self.name


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT]


class JobStatus:
"""Encapsulates a job state together with other details:
Args:
state: The machine-readable state of the job this status refers to
message: Optional human readable message
exit_code: Optional exit code
stdout_path: Optional path to a file containing the job's stdout
stderr_path: Optional path to a file containing the job's stderr
"""
SUMMARY_TRUNCATION_THRESHOLD = 2048

def __init__(self, state: JobState, message: Optional[str] = None, exit_code: Optional[int] = None,
stdout_path: Optional[str] = None, stderr_path: Optional[str] = None):
self.state = state
self.message = message
self.exit_code = exit_code
self.stdout_path = stdout_path
self.stderr_path = stderr_path

@property
def terminal(self) -> bool:
return self.state in TERMINAL_STATES

@property
def status_name(self) -> str:
return self.state.name

def __repr__(self) -> str:
if self.message is not None:
extra = f"state={self.state} message={self.message}".format(self.state, self.message)
else:
extra = f"state={self.state}".format(self.state)
return f"<{type(self).__module__}.{type(self).__qualname__} object at {hex(id(self))}, {extra}>"

def __str__(self) -> str:
if self.message is not None:
return "{} ({})".format(self.state, self.message)
else:
return "{}".format(self.state)

@property
def stdout(self) -> Optional[str]:
return self._read_file(self.stdout_path)

@property
def stderr(self) -> Optional[str]:
return self._read_file(self.stderr_path)

def _read_file(self, path: Optional[str]) -> Optional[str]:
if path is None:
return None
try:
with open(path, 'r') as f:
return f.read()
except Exception:
logger.exception("Converting exception to None")
return None

@property
def stdout_summary(self) -> Optional[str]:
return self._read_summary(self.stdout_path)

@property
def stderr_summary(self) -> Optional[str]:
return self._read_summary(self.stderr_path)

def _read_summary(self, path: Optional[str]) -> Optional[str]:
if not path:
# can happen for synthetic job failures
return None
try:
with open(path, 'r') as f:
f.seek(0, os.SEEK_END)
size = f.tell()
f.seek(0, os.SEEK_SET)
if size > JobStatus.SUMMARY_TRUNCATION_THRESHOLD:
half_threshold = int(JobStatus.SUMMARY_TRUNCATION_THRESHOLD / 2)
head = f.read(half_threshold)
f.seek(size - half_threshold, os.SEEK_SET)
tail = f.read(half_threshold)
return head + '\n...\n' + tail
else:
f.seek(0, os.SEEK_SET)
return f.read()
except FileNotFoundError:
# When output is redirected to a file, but the process does not produce any output
# bytes, no file is actually created. This handles that case.
return None
4 changes: 2 additions & 2 deletions parsl/dataflow/strategy.py → parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import warnings
from typing import Dict, List, Optional

import parsl.dataflow.job_status_poller as jsp
import parsl.jobs.job_status_poller as jsp

from parsl.executors import HighThroughputExecutor
from parsl.executors.base import ParslExecutor
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.providers.base import JobState
from parsl.jobs.states import JobState
from parsl.process_loggers import wrap_with_logs


Expand Down
3 changes: 2 additions & 1 deletion parsl/providers/ad_hoc/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import time

from parsl.channels import LocalChannel
from parsl.jobs.states import JobStatus, JobState
from parsl.launchers import SimpleLauncher
from parsl.providers.base import ExecutionProvider, JobStatus, JobState
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import ScriptPathError
from parsl.utils import RepresentationMixin

Expand Down
3 changes: 2 additions & 1 deletion parsl/providers/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from string import Template

from parsl.errors import ConfigurationError
from parsl.jobs.states import JobState, JobStatus
from parsl.providers.aws.template import template_string
from parsl.providers.base import ExecutionProvider, JobState, JobStatus
from parsl.providers.base import ExecutionProvider
from parsl.errors import OptionalModuleMissing
from parsl.utils import RepresentationMixin
from parsl.launchers import SingleNodeLauncher
Expand Down
3 changes: 2 additions & 1 deletion parsl/providers/azure/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from string import Template

from parsl.errors import ConfigurationError
from parsl.jobs.states import JobState, JobStatus
from parsl.providers.azure.template import template_string
from parsl.providers.base import ExecutionProvider, JobState, JobStatus
from parsl.providers.base import ExecutionProvider
from parsl.errors import OptionalModuleMissing
from parsl.utils import RepresentationMixin
from parsl.launchers import SingleNodeLauncher
Expand Down
Loading

0 comments on commit d5db72f

Please sign in to comment.