Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(service): support import for outputs generated by the solver with the -z option #1774

Open
wants to merge 6 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions antarest/launcher/adapters/local_launcher/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,7 @@ def run_study(

job = threading.Thread(
target=LocalLauncher._compute,
args=(
self,
antares_solver_path,
study_uuid,
job_id,
launcher_parameters,
),
args=(self, antares_solver_path, study_uuid, job_id, launcher_parameters, version),
name=f"{self.__class__.__name__}-JobRunner",
)
job.start()
Expand All @@ -91,6 +85,7 @@ def _compute(
study_uuid: str,
uuid: UUID,
launcher_parameters: LauncherParametersDTO,
version: str,
) -> None:
end = False

Expand All @@ -114,6 +109,8 @@ def stop_reading_output() -> bool:
f"--force-parallel={launcher_parameters.nb_cpu}",
str(export_path),
]
if int(version) >= 840:
args.append("-z")
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
Expand Down Expand Up @@ -155,7 +152,8 @@ def stop_reading_output() -> bool:

output_id: Optional[str] = None
try:
output_id = self.callbacks.import_output(str(uuid), export_path / "output", {})
output_path = (export_path / "output").iterdir().__next__()
output_id = self.callbacks.import_output(str(uuid), output_path, {})
except Exception as e:
logger.error(
f"Failed to import output for study {study_uuid} located at {export_path}",
Expand Down
9 changes: 4 additions & 5 deletions antarest/launcher/adapters/slurm_launcher/slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from antarest.launcher.model import JobStatus, LauncherParametersDTO, LogType, XpansionParametersDTO
from antarest.study.storage.rawstudy.ini_reader import IniReader
from antarest.study.storage.rawstudy.ini_writer import IniWriter
from antarest.study.storage.utils import retrieve_output_path

logger = logging.getLogger(__name__)
logging.getLogger("paramiko").setLevel("WARN")
Expand Down Expand Up @@ -285,7 +286,7 @@ def _import_study_output(
xpansion_mode: t.Optional[str] = None,
log_dir: t.Optional[str] = None,
) -> t.Optional[str]:
if xpansion_mode is not None:
if xpansion_mode:
self._import_xpansion_result(job_id, xpansion_mode)

launcher_logs: t.Dict[str, t.List[Path]] = {}
Expand All @@ -311,14 +312,12 @@ def _import_study_output(
# `antarest.launcher.service.LauncherService._import_output`
return self.callbacks.import_output(
job_id,
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output",
self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id,
launcher_logs,
)

def _import_xpansion_result(self, job_id: str, xpansion_mode: str) -> None:
output_path = self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id / "output"
if output_path.exists() and len(os.listdir(output_path)) == 1:
output_path = output_path / os.listdir(output_path)[0]
if output_path := retrieve_output_path(self.local_workspace / STUDIES_OUTPUT_DIR_NAME / job_id):
if output_path.name.endswith(".zip"):
logger.info("Unzipping zipped output for xpansion result storage")
unzipped_output_path = (
Expand Down
29 changes: 13 additions & 16 deletions antarest/launcher/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import shutil
import zipfile
from datetime import datetime, timedelta
from http import HTTPStatus
from pathlib import Path
Expand Down Expand Up @@ -42,7 +43,7 @@
from antarest.launcher.ssh_config import SSHConfigDTO
from antarest.study.repository import AccessPermissions, StudyFilter
from antarest.study.service import StudyService
from antarest.study.storage.utils import assert_permission, extract_output_name, find_single_output_path
from antarest.study.storage.utils import assert_permission, extract_output_name, retrieve_output_path

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -510,9 +511,9 @@ def _import_output(
study_id = job_result.study_id
job_launch_params = LauncherParametersDTO.from_launcher_params(job_result.launcher_params)

# this now can be a zip file instead of a directory !
output_true_path = find_single_output_path(output_path)
output_is_zipped = is_zip(output_true_path)
# this now can be a zip file instead of a directory!
output_true_path = retrieve_output_path(output_path)
output_is_zipped = output_true_path.suffix.lower() == ".zip"
output_suffix = cast(
Optional[str],
getattr(
Expand All @@ -535,6 +536,12 @@ def _import_output(
log_paths,
output_true_path / log_name,
)
if additional_logs and output_is_zipped:
with zipfile.ZipFile(output_true_path, "a") as zf:
for log_paths in additional_logs.values():
for path in log_paths:
dest_name = path.name[: path.name.rfind("-")] + ".log"
zf.write(filename=path, arcname=dest_name)

if study_id:
zip_path: Optional[Path] = None
Expand All @@ -548,18 +555,6 @@ def _import_output(
final_output_path = zip_path or output_true_path
with db():
try:
if additional_logs and output_is_zipped:
for log_name, log_paths in additional_logs.items():
log_type = LogType.from_filename(log_name)
log_suffix = log_name
if log_type:
log_suffix = log_type.to_suffix()
self.study_service.save_logs(
study_id,
job_id,
log_suffix,
concat_files_to_str(log_paths),
)
return self.study_service.import_output(
study_id,
final_output_path,
Expand All @@ -576,6 +571,8 @@ def _import_output(
finally:
if zip_path:
os.unlink(zip_path)
if output_is_zipped:
os.unlink(output_true_path)
raise JobNotFound()

def _download_fallback_output(self, job_id: str, params: RequestParameters) -> FileDownloadTaskDTO:
Expand Down
19 changes: 12 additions & 7 deletions antarest/study/storage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ def fix_study_root(study_path: Path) -> None:
shutil.rmtree(sub_root_path)


def find_single_output_path(all_output_path: Path) -> Path:
children = os.listdir(all_output_path)
if len(children) == 1:
if children[0].endswith(".zip"):
return all_output_path / children[0]
return find_single_output_path(all_output_path / children[0])
return all_output_path
def retrieve_output_path(job_path: Path) -> Path:
output_already_zipped_path = job_path.with_suffix(".zip")
if output_already_zipped_path.exists():
return output_already_zipped_path

output_inside_study = job_path / "output"
if output_inside_study.is_dir():
output_folders = os.listdir(output_inside_study)
if len(output_folders) == 1:
return output_inside_study / output_folders[0]

raise FileNotFoundError(f"The output for job {job_path} does not exist.")


def extract_output_name(path_output: Path, new_suffix_name: t.Optional[str] = None) -> str:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Antares-Launcher~=1.3.2
Antares-Launcher~=1.4.1

alembic~=1.7.5
asgi-ratelimit[redis]==0.7.0
Expand Down
7 changes: 5 additions & 2 deletions tests/launcher/test_local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import textwrap
import uuid
from pathlib import Path
from unittest.mock import Mock, call
from unittest.mock import Mock, call, patch

import pytest

Expand Down Expand Up @@ -39,7 +39,9 @@ def test_local_launcher__launcher_init_exception():


@pytest.mark.unit_test
def test_compute(tmp_path: Path, launcher_config: Config):
@patch("antarest.launcher.service.Path.iterdir")
# This is weird but needed for this unit test to run and avoid the FileNotFoundException.
def test_compute(mock, tmp_path: Path, launcher_config: Config):
local_launcher = LocalLauncher(launcher_config, callbacks=Mock(), event_bus=Mock(), cache=Mock())

# prepare a dummy executable to simulate Antares Solver
Expand Down Expand Up @@ -87,6 +89,7 @@ def test_compute(tmp_path: Path, launcher_config: Config):
study_uuid="study-id",
uuid=study_id,
launcher_parameters=launcher_parameters,
version="0",
)

# noinspection PyUnresolvedReferences
Expand Down
16 changes: 7 additions & 9 deletions tests/launcher/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,17 +730,15 @@ def test_manage_output(self, tmp_path: Path) -> None:
)

output_path = tmp_path / "output"
zipped_output_path = tmp_path / "zipped_output"
os.mkdir(output_path)
os.mkdir(zipped_output_path)
new_output_path = output_path / "new_output"
os.mkdir(new_output_path)
(new_output_path / "log").touch()
(new_output_path / "data").touch()
additional_log = tmp_path / "output.log"
additional_log.write_text("some log")
new_output_zipped_path = zipped_output_path / "test.zip"
with ZipFile(new_output_zipped_path, "w", ZIP_DEFLATED) as output_data:
zipped_path = tmp_path / "test.zip"
with ZipFile(zipped_path, "w", ZIP_DEFLATED) as output_data:
output_data.writestr("some output", "0\n1")
job_id = "job_id"
zipped_job_id = "zipped_job_id"
Expand All @@ -766,9 +764,9 @@ def test_manage_output(self, tmp_path: Path) -> None:
),
]
with pytest.raises(JobNotFound):
launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})

launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})
assert not launcher_service._get_job_output_fallback_path(job_id).exists()
launcher_service.study_service.import_output.assert_called()

Expand All @@ -777,7 +775,7 @@ def test_manage_output(self, tmp_path: Path) -> None:

launcher_service._import_output(
zipped_job_id,
zipped_output_path,
zipped_path,
{
"out.log": [additional_log],
"antares-out": [additional_log],
Expand All @@ -797,10 +795,10 @@ def test_manage_output(self, tmp_path: Path) -> None:
StudyNotFoundError(""),
]

assert launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]}) is None
assert launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]}) is None

(new_output_path / "info.antares-output").write_text(f"[general]\nmode=eco\nname=foo\ntimestamp={time.time()}")
output_name = launcher_service._import_output(job_id, output_path, {"out.log": [additional_log]})
output_name = launcher_service._import_output(job_id, tmp_path, {"out.log": [additional_log]})
assert output_name is not None
assert output_name.endswith("-hello")
assert launcher_service._get_job_output_fallback_path(job_id).exists()
Expand Down
19 changes: 17 additions & 2 deletions tests/launcher/test_slurm_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import shutil
import textwrap
import uuid
import zipfile
from argparse import Namespace
from pathlib import Path
from unittest.mock import ANY, Mock, patch
Expand Down Expand Up @@ -385,7 +386,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None:

slurm_launcher.callbacks.import_output.assert_called_once_with(
"1",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1",
{},
)
assert res == "output"
Expand All @@ -412,6 +413,20 @@ def test_import_study_output(launcher_config, tmp_path) -> None:
assert (output_dir / "results" / "something_else").exists()
assert (output_dir / "results" / "something_else").read_text() == "world"

# asserts that a xpansion output zipped can be imported
xpansion_zip_dir = launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2"
xpansion_zip_dir.mkdir(parents=True)
(xpansion_zip_dir / "input" / "links").mkdir(parents=True)
xpansion_out_put_dir = xpansion_zip_dir / "output"
xpansion_out_put_dir.mkdir(parents=True)
xpansion_output_file = xpansion_out_put_dir / "xpansion.zip"
with zipfile.ZipFile(xpansion_output_file, "w") as zipf:
zipf.write(xpansion_dir / "something_else", "some_file.txt")
slurm_launcher._import_study_output("2", "cpp")
assert (
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "2" / "output" / xpansion_output_file.name[:-4]
).exists()

log_dir = tmp_path / "logs"
log_dir.mkdir()
log_info = log_dir / "antares-out-xxxx.txt"
Expand All @@ -422,7 +437,7 @@ def test_import_study_output(launcher_config, tmp_path) -> None:
slurm_launcher._import_study_output("1", None, str(log_dir))
slurm_launcher.callbacks.import_output.assert_called_once_with(
"1",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1" / "output",
launcher_config.launcher.slurm.local_workspace / "OUTPUT" / "1",
{
"antares-out.log": [log_info],
"antares-err.log": [log_error],
Expand Down
Loading