From f9e2bf5b3a57044bb38e3a1327b5aff672e9aac3 Mon Sep 17 00:00:00 2001 From: Nishchay Karle <45297081+NishchayKarle@users.noreply.github.com> Date: Thu, 6 Jun 2024 06:23:36 -0500 Subject: [PATCH 1/2] Allow users to select their preferred level of usage tracking (#3400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces a choice of 3 levels for users to select based on their preferred level of usage reporting. It introduces updates on top of #3229. Tracking Levels Level 1: python version, parsl version, operating system details. Level 2: configuration details + Level 1 Level 3: total apps run, total failed apps, execution time + Level 2 Usage tracking if currently enabled will be defaulted to level 1. Usage Data sent at launch (Levels 1 and 2) • Capture Parsl version, Python version, and environment details at startup. • Configuration Reporting: Log details about providers, launchers, executors, channels, and storage access methods used. Usage Data sent on closure (Level 3 only) • Number of apps ran • Number of failed apps • Total time elapsed --- parsl/config.py | 20 +++++-- parsl/tests/unit/test_usage_tracking.py | 45 +++++++++++++++ parsl/usage_tracking/levels.py | 6 ++ parsl/usage_tracking/usage.py | 77 +++++++++++++++++-------- 4 files changed, 121 insertions(+), 27 deletions(-) create mode 100644 parsl/tests/unit/test_usage_tracking.py create mode 100644 parsl/usage_tracking/levels.py diff --git a/parsl/config.py b/parsl/config.py index 8858ddb042..ecea149114 100644 --- a/parsl/config.py +++ b/parsl/config.py @@ -11,6 +11,8 @@ from parsl.executors.threads import ThreadPoolExecutor from parsl.monitoring import MonitoringHub from parsl.usage_tracking.api import UsageInformation +from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED +from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3 from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -66,9 +68,12 @@ class Config(RepresentationMixin, UsageInformation): How often the scaling strategy should be executed. Default is 5 seconds. max_idletime : float, optional The maximum idle time allowed for an executor before strategy could shut down unused blocks. Default is 120.0 seconds. - usage_tracking : bool, optional - Set this field to True to opt-in to Parsl's usage tracking system. Parsl only collects minimal, non personally-identifiable, - information used for reporting to our funding agencies. Default is False. + usage_tracking : int, optional + Set this field to 1, 2, or 3 to opt-in to Parsl's usage tracking system. + The value represents the level of usage tracking detail to be collected. + Setting this field to 0 will disable usage tracking. Default (this field is not set): usage tracking is not enabled. + Parsl only collects minimal, non personally-identifiable, + information used for reporting to our funding agencies. initialize_logging : bool, optional Make DFK optionally not initialize any logging. Log messages will still be passed into the python logging system under the @@ -102,7 +107,7 @@ def __init__(self, strategy_period: Union[float, int] = 5, max_idletime: float = 120.0, monitoring: Optional[MonitoringHub] = None, - usage_tracking: bool = False, + usage_tracking: int = 0, initialize_logging: bool = True) -> None: executors = tuple(executors or []) @@ -136,6 +141,7 @@ def __init__(self, self.strategy = strategy self.strategy_period = strategy_period self.max_idletime = max_idletime + self.validate_usage_tracking(usage_tracking) self.usage_tracking = usage_tracking self.initialize_logging = initialize_logging self.monitoring = monitoring @@ -156,6 +162,12 @@ def _validate_executors(self) -> None: raise ConfigurationError('Executors must have unique labels ({})'.format( ', '.join(['label={}'.format(repr(d)) for d in duplicates]))) + def validate_usage_tracking(self, level: int) -> None: + if not USAGE_TRACKING_DISABLED <= level <= USAGE_TRACKING_LEVEL_3: + raise ConfigurationError( + f"Usage Tracking values must be 0, 1, 2, or 3 and not {level}" + ) + def get_usage_information(self): return {"executors_len": len(self.executors), "dependency_resolver": self.dependency_resolver is not None} diff --git a/parsl/tests/unit/test_usage_tracking.py b/parsl/tests/unit/test_usage_tracking.py new file mode 100644 index 0000000000..351355811c --- /dev/null +++ b/parsl/tests/unit/test_usage_tracking.py @@ -0,0 +1,45 @@ +"""Test usage_tracking values.""" + +import pytest + +import parsl +from parsl.config import Config +from parsl.errors import ConfigurationError + + +@pytest.mark.local +def test_config_load(): + """Test loading a config with usage tracking.""" + with parsl.load(Config(usage_tracking=3)): + pass + parsl.clear() + + +@pytest.mark.local +@pytest.mark.parametrize("level", (0, 1, 2, 3, False, True)) +def test_valid(level): + """Test valid usage_tracking values.""" + Config(usage_tracking=level) + assert Config(usage_tracking=level).usage_tracking == level + + +@pytest.mark.local +@pytest.mark.parametrize("level", (12, 1000, -1)) +def test_invalid_values(level): + """Test invalid usage_tracking values.""" + with pytest.raises(ConfigurationError): + Config(usage_tracking=level) + + +@pytest.mark.local +@pytest.mark.parametrize("level", ("abcd", None, bytes(1), 1.0, 1j, object())) +def test_invalid_types(level): + """Test invalid usage_tracking types.""" + with pytest.raises(Exception) as ex: + Config(usage_tracking=level) + + # with typeguard 4.x this is TypeCheckError, + # with typeguard 2.x this is TypeError + # we can't instantiate TypeCheckError if we're in typeguard 2.x environment + # because it does not exist... so check name using strings. + assert ex.type.__name__ in ["TypeCheckError", "TypeError"] diff --git a/parsl/usage_tracking/levels.py b/parsl/usage_tracking/levels.py new file mode 100644 index 0000000000..a772220ca3 --- /dev/null +++ b/parsl/usage_tracking/levels.py @@ -0,0 +1,6 @@ +"""Module for defining the usage tracking levels.""" + +DISABLED = 0 # Tracking is disabled +LEVEL_1 = 1 # Share info about Parsl version, Python version, platform +LEVEL_2 = 2 # Share info about config + level 1 +LEVEL_3 = 3 # Share info about app count, app fails, execution time + level 2 diff --git a/parsl/usage_tracking/usage.py b/parsl/usage_tracking/usage.py index 10acbd8e89..3730fcc464 100644 --- a/parsl/usage_tracking/usage.py +++ b/parsl/usage_tracking/usage.py @@ -7,8 +7,11 @@ import uuid from parsl.dataflow.states import States +from parsl.errors import ConfigurationError from parsl.multiprocessing import ForkProcess from parsl.usage_tracking.api import get_parsl_usage +from parsl.usage_tracking.levels import DISABLED as USAGE_TRACKING_DISABLED +from parsl.usage_tracking.levels import LEVEL_3 as USAGE_TRACKING_LEVEL_3 from parsl.utils import setproctitle from parsl.version import VERSION as PARSL_VERSION @@ -110,17 +113,32 @@ def __init__(self, dfk, port=50077, self.python_version = "{}.{}.{}".format(sys.version_info.major, sys.version_info.minor, sys.version_info.micro) - self.tracking_enabled = self.check_tracking_enabled() - logger.debug("Tracking status: {}".format(self.tracking_enabled)) - - def check_tracking_enabled(self): - """Check if tracking is enabled. - - Tracking will be enabled unless the following is true: - - 1. dfk.config.usage_tracking is set to False - + self.tracking_level = self.check_tracking_level() + self.start_time = None + logger.debug("Tracking level: {}".format(self.tracking_level)) + + def check_tracking_level(self) -> int: + """Check if tracking is enabled and return level. + + Checks usage_tracking in Config + - Possible values: [True, False, 0, 1, 2, 3] + + True/False values are treated as Level 1/Level 0 respectively. + + Returns: int + - 0 : Tracking is disabled + - 1 : Tracking is enabled with level 1 + Share info about Parsl version, Python version, platform + - 2 : Tracking is enabled with level 2 + Share info about config + level 1 + - 3 : Tracking is enabled with level 3 + Share info about app count, app fails, execution time + level 2 """ + if not USAGE_TRACKING_DISABLED <= self.config.usage_tracking <= USAGE_TRACKING_LEVEL_3: + raise ConfigurationError( + f"Usage Tracking values must be 0, 1, 2, or 3 and not {self.config.usage_tracking}" + ) + return self.config.usage_tracking def construct_start_message(self) -> bytes: @@ -133,18 +151,28 @@ def construct_start_message(self) -> bytes: 'parsl_v': self.parsl_version, 'python_v': self.python_version, 'platform.system': platform.system(), - 'start': int(time.time()), - 'components': get_parsl_usage(self.dfk._config)} + 'tracking_level': int(self.tracking_level)} + + if self.tracking_level >= 2: + message['components'] = get_parsl_usage(self.dfk._config) + + if self.tracking_level == 3: + self.start_time = int(time.time()) + message['start'] = self.start_time + logger.debug(f"Usage tracking start message: {message}") return self.encode_message(message) def construct_end_message(self) -> bytes: """Collect the final run information at the time of DFK cleanup. + This is only called if tracking level is 3. Returns: - Message dict dumped as json string, ready for UDP """ + end_time = int(time.time()) + app_count = self.dfk.task_count app_fails = self.dfk.task_state_counts[States.failed] + self.dfk.task_state_counts[States.dep_fail] @@ -157,7 +185,8 @@ def construct_end_message(self) -> bytes: 'app_fails': app_fails} message = {'correlator': self.correlator_uuid, - 'end': int(time.time()), + 'end': end_time, + 'execution_time': end_time - self.start_time, 'components': [dfk_component] + get_parsl_usage(self.dfk._config)} logger.debug(f"Usage tracking end message (unencoded): {message}") @@ -168,20 +197,22 @@ def encode_message(self, obj): def send_UDP_message(self, message: bytes) -> None: """Send UDP message.""" - if self.tracking_enabled: - try: - proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message) - self.procs.append(proc) - except Exception as e: - logger.debug("Usage tracking failed: {}".format(e)) + try: + proc = udp_messenger(self.domain_name, self.UDP_PORT, self.sock_timeout, message) + self.procs.append(proc) + except Exception as e: + logger.debug("Usage tracking failed: {}".format(e)) def send_start_message(self) -> None: - message = self.construct_start_message() - self.send_UDP_message(message) + if self.tracking_level: + self.start_time = time.time() + message = self.construct_start_message() + self.send_UDP_message(message) def send_end_message(self) -> None: - message = self.construct_end_message() - self.send_UDP_message(message) + if self.tracking_level == 3: + message = self.construct_end_message() + self.send_UDP_message(message) def close(self, timeout: float = 10.0) -> None: """First give each process one timeout period to finish what it is From 825b285e3ef636065eed6302910baa8bdeecdcac Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 6 Jun 2024 17:23:36 +0200 Subject: [PATCH 2/2] Add FAQ on surviving end of a batch job (#3475) --- docs/faq.rst | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/faq.rst b/docs/faq.rst index 291016bda3..f427db82f9 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -358,3 +358,22 @@ or url = {https://doi.org/10.1145/3307681.3325400} } + +How can my tasks survive ``WorkerLost`` and ``ManagerLost`` at the end of a batch job? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +When a batch job ends, pilot workers will be terminated by the batch system, +and any tasks running there will fail. With `HighThroughputExecutor`, +this failure will be reported as a `parsl.executors.high_throughput.errors.WorkerLost` or +`parsl.executors.high_throughput.interchange.ManagerLost` in the task future. + +To mitigate against this: + +* use retries by setting ``retries=`` in `parsl.config.Config`. +* if you only want to retry on certain errors such as `WorkerLost` and `ManagerLost`, + use ``retry_handler`` in `parsl.config.Config` to implement that policy. +* avoid sending tasks to batch jobs that will expire soon. With `HighThroughputExecutor`, + set drain_period to a little longer than you expect your tasks to take. + With `WorkQueueExecutor`, you can configure individual expected task duration using + a ``parsl_resource_specification`` and specify a worker ``--wall-time`` using the + ``worker_options`` parameter to the `WorkQueueExecutor`.