Skip to content

Commit

Permalink
Merge pull request #10 from nkaretnikov/papermill-error-8
Browse files Browse the repository at this point in the history
Mention if papermill is not installed in failed status
  • Loading branch information
Nikita Karetnikov authored May 29, 2024
2 parents 1d9f1d1 + cfd0c4e commit 5dea3f7
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 9 deletions.
58 changes: 52 additions & 6 deletions argo_jupyter_scheduler/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
gen_default_output_path,
gen_log_path,
gen_papermill_command_input,
gen_papermill_status_path,
gen_workflow_name,
sanitize_label,
setup_logger,
Expand Down Expand Up @@ -187,6 +188,7 @@ def create_workflow(
):
input_path = staging_paths["input"]
log_path = gen_log_path(input_path)
papermill_status_path = gen_papermill_status_path(input_path)

# Configure logging to file first
add_file_logger(logger, log_path)
Expand Down Expand Up @@ -220,7 +222,12 @@ def create_workflow(
ttl_strategy=ttl_strategy,
) as w:
main = main_container(
job, use_conda_store_env, input_path, log_path, parameters
job=job,
use_conda_store_env=use_conda_store_env,
input_path=input_path,
log_path=log_path,
papermill_status_path=papermill_status_path,
parameters=parameters,
)

with Steps(name="steps"):
Expand Down Expand Up @@ -262,7 +269,12 @@ def create_workflow(

update_job_status_failure(
name="failure",
arguments={"db_url": db_url, "job_id": job.job_id},
arguments={
"db_url": db_url,
"log_path": log_path,
"papermill_status_path": papermill_status_path,
"job_id": job.job_id,
},
when=failure,
)

Expand Down Expand Up @@ -336,6 +348,7 @@ def _create_cwf_object(
):
input_path = staging_paths["input"]
log_path = gen_log_path(input_path)
papermill_status_path = gen_papermill_status_path(input_path)

# Configure logging to file first
add_file_logger(logger, log_path)
Expand Down Expand Up @@ -384,7 +397,12 @@ def _create_cwf_object(
ttl_strategy=ttl_strategy,
) as cwf:
main = main_container(
job, use_conda_store_env, input_path, log_path, parameters
job=job,
use_conda_store_env=use_conda_store_env,
input_path=input_path,
log_path=log_path,
papermill_status_path=papermill_status_path,
parameters=parameters,
)

with Steps(name="steps"):
Expand Down Expand Up @@ -437,6 +455,8 @@ def _create_cwf_object(
name="failure",
arguments={
"db_url": db_url,
"log_path": log_path,
"papermill_status_path": papermill_status_path,
"job_definition_id": job_definition_id,
},
when=failure,
Expand Down Expand Up @@ -554,7 +574,9 @@ def update_cron_workflow(
logger.info("cron workflow updated")


def main_container(job, use_conda_store_env, input_path, log_path, parameters):
def main_container(
job, use_conda_store_env, input_path, log_path, papermill_status_path, parameters
):
envs = []
if parameters is not None:
for key, value in parameters.items():
Expand All @@ -569,6 +591,7 @@ def main_container(job, use_conda_store_env, input_path, log_path, parameters):
output_path=output_path,
html_path=html_path,
log_path=log_path,
papermill_status_path=papermill_status_path,
use_conda_store_env=use_conda_store_env,
)

Expand All @@ -581,11 +604,34 @@ def main_container(job, use_conda_store_env, input_path, log_path, parameters):


@script()
def update_job_status_failure(db_url, job_id=None, job_definition_id=None):
def update_job_status_failure(
db_url, log_path, papermill_status_path, job_id=None, job_definition_id=None
):
from jupyter_scheduler.models import Status
from jupyter_scheduler.orm import Job, create_session
from sqlalchemy import desc

from argo_jupyter_scheduler.utils import add_file_logger, setup_logger

# Sets up logging
logger = setup_logger("update_job_status_failure")
add_file_logger(logger, log_path)

# Gets papermill status
try:
with open(papermill_status_path) as f:
papermill_status = int(f.read())
except Exception:
logger.exception("Failed to get papermill status")
papermill_status = None

status_not_found = 127
if papermill_status == status_not_found:
status_message = "Workflow failed (papermill not found)."
else:
status_message = "Workflow failed."

# Sets job status
db_session = create_session(db_url)
with db_session() as session:
if job_definition_id:
Expand All @@ -599,7 +645,7 @@ def update_job_status_failure(db_url, job_id=None, job_definition_id=None):
job_id = job.job_id

session.query(Job).filter(Job.job_id == job_id).update(
{"status": Status.FAILED, "status_message": "Workflow failed."}
{"status": Status.FAILED, "status_message": status_message}
)
session.commit()

Expand Down
22 changes: 19 additions & 3 deletions argo_jupyter_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def gen_log_path(input_path: str):
return str(p.parent / "logs.txt")


def gen_papermill_status_path(input_path: str):
p = Path(input_path)
return str(p.parent / "papermill_status.txt")


def send_request(api_v1_endpoint):
token = os.environ[CONDA_STORE_TOKEN]
conda_store_svc_name = os.environ[CONDA_STORE_SERVICE]
Expand Down Expand Up @@ -192,6 +197,7 @@ def gen_papermill_command_input(
output_path: str,
html_path: str,
log_path: str,
papermill_status_path: str,
use_conda_store_env: bool = True,
):
# TODO: allow overrides
Expand All @@ -203,11 +209,21 @@ def gen_papermill_command_input(
logger.info(f"output_path: {output_path}")
logger.info(f"log_path: {log_path}")
logger.info(f"html_path: {html_path}")
logger.info(f"papermill_status_path: {papermill_status_path}")

papermill = f"papermill -k {kernel_name} {input_path} {output_path}"
jupyter = f"jupyter nbconvert --to html {output_path} --output {html_path}"
# Within a single-quoted string, wraps the string in single quotes
def sq(s):
return rf"'\''{s}'\''"

# These commands are executed within a single-quoted string below
papermill = (
f"( papermill -k {sq(kernel_name)} {sq(input_path)} {sq(output_path)} ; "
f"ec=$? ; echo $ec > {sq(papermill_status_path)} ; exit $ec )"
)
jupyter = f"jupyter nbconvert --to html {sq(output_path)} --output {sq(html_path)}"

return f'conda run -p {conda_env_path} /bin/sh -c "{{ {papermill} && {jupyter} ; }} >> {log_path} 2>&1"'
# It's important that inner quotes are single quotes to prevent shell expansion
return f"conda run -p '{conda_env_path}' /bin/sh -c '{{ {papermill} && {jupyter} ; }} >> {sq(log_path)} 2>&1'"


def sanitize_label(s: str):
Expand Down

0 comments on commit 5dea3f7

Please sign in to comment.