Skip to content

Commit

Permalink
Merge pull request #60 from jaytmiller/calcloud-223-async-inputs
Browse files Browse the repository at this point in the history
Calcloud 223 async inputs
  • Loading branch information
jaytmiller authored Mar 9, 2021
2 parents 2971db1 + 3b790c6 commit 951ad72
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 10 deletions.
12 changes: 6 additions & 6 deletions calcloud/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ def list(self, prefixes="all", max_objects=s3.MAX_LIST_OBJECTS):
for s3_path in self.list_s3(prefixes, max_objects=max_objects):
yield s3_path[len(self.s3_path + "/") :]

def listl(self, prefixes="all"):
def listl(self, prefixes="all", max_objects=s3.MAX_LIST_OBJECTS):
"""Return the outputs of list() as a list, mainly for testing since list()
returns a generator which reveals little in its repr().
"""
return list(sorted(self.list(prefixes)))
return list(sorted(self.list(prefixes, max_objects=max_objects)))

def get(self, prefix, encoding="utf-8"):
return s3.get_object(self.path(prefix), client=self.client, encoding=encoding)
Expand Down Expand Up @@ -409,9 +409,9 @@ def ids(self, message_types="all"):
"""
return list(set(msg.split("-")[1] for msg in self.list(message_types)))

def list(self, prefix):
def list(self, prefix, max_objects=s3.MAX_LIST_OBJECTS):
"""List all objects related to `prefix, removing any .trigger suffix."""
for obj in super().list(prefix):
for obj in super().list(prefix, max_objects=max_objects):
if obj.endswith(".trigger"): # XXXX Undo trigger hack
obj = obj[: -len(".trigger")]
yield obj
Expand Down Expand Up @@ -530,16 +530,16 @@ def __init__(self, bucket=s3.DEFAULT_BUCKET, client=None):
self.xdata = MetadataIo(self.bucket + "/control", self.client) # serialized object job control metadata i/o

def reset(self, ids="all"):
"""Delete outputs, messages, and the control metadata files."""
"""Delete outputs, messages, and control files."""
self.outputs.delete(ids)
self.messages.delete(ids)
self.xdata.delete(ids)

def clear(self, ids="all"):
"""Delete every S3 file managed by this IoBundle."""
self.reset(ids)
self.inputs.delete(ids)
self.control.delete(ids)
self.inputs.delete(ids)


def get_io_bundle(bucket=s3.DEFAULT_BUCKET, client=None):
Expand Down
34 changes: 33 additions & 1 deletion calcloud/lambda_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
handlers for more information on how jobs are initiated and
retried.
"""
import time

from . import plan
from . import submit
Expand All @@ -28,18 +29,21 @@ def main(comm, ipppssoot, bucket_name):
6. Nominally sends "submit-ipppssoot" message.
7. On error anywhere, sends the "error-ipppssoot" message.
"""
comm.messages.delete("all-{ipppssoot}")
try:
_main(comm, ipppssoot, bucket_name)
comm.messages.put(f"submit-{ipppssoot}")
except Exception as exc:
print("Exception in lambda_submit.main for", ipppssoot, "=", exc)
comm.messages.delete(f"all-{ipppssoot}")
comm.messages.put(f"error-{ipppssoot}")
raise


def _main(comm, ipppssoot, bucket_name):
"""Core job submission function factored out of main() to clarify exception handling."""

wait_for_inputs(comm, ipppssoot)

comm.messages.delete(f"all-{ipppssoot}")
comm.outputs.delete(f"{ipppssoot}")

Expand All @@ -57,3 +61,31 @@ def _main(comm, ipppssoot, bucket_name):
print("Submitted job for", ipppssoot, "as ID", response["jobId"])
metadata["job_id"] = response["jobId"]
comm.xdata.put(ipppssoot, metadata)


class CalcloudInputsFailure(RuntimeError):
"""The inputs needed to plan and run this job were not ready in time."""


def wait_for_inputs(comm, ipppssoot):
"""Ensure that the inputs required to plan and run the job for `ipppssoot` are available.
Each iteration, check for the S3 message files which trigger submissions and abort if none
are found.
Eventually after 15 min (default) the lambda will die if it's still waiting.
"""
input_tarball, memory_modeling = [], []
while not input_tarball or not memory_modeling:
input_tarball = comm.inputs.listl(f"{ipppssoot}.tar.gz")
memory_modeling = comm.control.listl(f"{ipppssoot}/{ipppssoot}_MemModelFeatures.txt")
if not comm.messages.listl([f"placed-{ipppssoot}", f"rescue-{ipppssoot}"]):
raise CalcloudInputsFailure(
f"Both the 'placed' and 'rescue' messages for {ipppssoot} have been deleted. Aborting input wait and submission."
)
if not input_tarball or not memory_modeling:
print(
f"Waiting for inputs for {ipppssoot}. input_tarball={input_tarball} memory_modeling={memory_modeling}"
)
time.sleep(30)
print(f"Inputs for {ipppssoot} found.")
2 changes: 1 addition & 1 deletion calcloud/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def list_objects(s3_prefix, client=None, max_objects=MAX_LIST_OBJECTS):
log.verbose("s3.list_objects", s3_prefix, max_objects)
client, bucket_name, prefix = _s3_setup(client, s3_prefix)
paginator = client.get_paginator("list_objects_v2")
config = {"MaxItems": max_objects, "PageSize": max_objects}
config = {"MaxItems": max_objects, "PageSize": 1000}
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix, PaginationConfig=config):
for result in page.get("Contents", []):
listed = "s3://" + bucket_name + "/" + result["Key"]
Expand Down
2 changes: 1 addition & 1 deletion terraform/lambda_job_rescue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module "calcloud_lambda_rescueJob" {
handler = "rescue_handler.lambda_handler"
runtime = "python3.6"
publish = false
timeout = 300
timeout = 900
cloudwatch_logs_retention_in_days = 30

source_path = [
Expand Down
2 changes: 1 addition & 1 deletion terraform/lambda_job_submit.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module "calcloud_lambda_submit" {
handler = "s3_trigger_handler.lambda_handler"
runtime = "python3.6"
publish = false
timeout = 30
timeout = 900
cloudwatch_logs_retention_in_days = 30

source_path = [
Expand Down

0 comments on commit 951ad72

Please sign in to comment.