Skip to content

Commit

Permalink
Wait-for-inputs functionality for calcloud-223
Browse files Browse the repository at this point in the history
Bugfix s3.list_objects and io list() and listl() max_objects handling; pagination docs unclear/wrong.
  • Loading branch information
EC2 Default User committed Mar 8, 2021
1 parent ff2749c commit 2e70fbf
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 10 deletions.
12 changes: 6 additions & 6 deletions calcloud/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ def list(self, prefixes="all", max_objects=s3.MAX_LIST_OBJECTS):
for s3_path in self.list_s3(prefixes, max_objects=max_objects):
yield s3_path[len(self.s3_path + "/") :]

def listl(self, prefixes="all"):
def listl(self, prefixes="all", max_objects=s3.MAX_LIST_OBJECTS):
"""Return the outputs of list() as a list, mainly for testing since list()
returns a generator which reveals little in its repr().
"""
return list(sorted(self.list(prefixes)))
return list(sorted(self.list(prefixes, max_objects=max_objects)))

def get(self, prefix, encoding="utf-8"):
return s3.get_object(self.path(prefix), client=self.client, encoding=encoding)
Expand Down Expand Up @@ -409,9 +409,9 @@ def ids(self, message_types="all"):
"""
return list(set(msg.split("-")[1] for msg in self.list(message_types)))

def list(self, prefix):
def list(self, prefix, max_objects=s3.MAX_LIST_OBJECTS):
"""List all objects related to `prefix, removing any .trigger suffix."""
for obj in super().list(prefix):
for obj in super().list(prefix, max_objects=max_objects):
if obj.endswith(".trigger"): # XXXX Undo trigger hack
obj = obj[: -len(".trigger")]
yield obj
Expand Down Expand Up @@ -530,16 +530,16 @@ def __init__(self, bucket=s3.DEFAULT_BUCKET, client=None):
self.xdata = MetadataIo(self.bucket + "/control", self.client) # serialized object job control metadata i/o

def reset(self, ids="all"):
"""Delete outputs, messages, and the control metadata files."""
"""Delete outputs, messages, and control files."""
self.outputs.delete(ids)
self.messages.delete(ids)
self.control.delete(ids)
self.xdata.delete(ids)

def clear(self, ids="all"):
"""Delete every S3 file managed by this IoBundle."""
self.reset(ids)
self.inputs.delete(ids)
self.control.delete(ids)


def get_io_bundle(bucket=s3.DEFAULT_BUCKET, client=None):
Expand Down
31 changes: 30 additions & 1 deletion calcloud/lambda_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
handlers for more information on how jobs are initiated and
retried.
"""
import time

from . import plan
from . import submit
Expand All @@ -28,18 +29,21 @@ def main(comm, ipppssoot, bucket_name):
6. Nominally sends "submit-ipppssoot" message.
7. On error anywhere, sends the "error-ipppssoot" message.
"""
comm.messages.delete("all-{ipppssoot}")
try:
_main(comm, ipppssoot, bucket_name)
comm.messages.put(f"submit-{ipppssoot}")
except Exception as exc:
print("Exception in lambda_submit.main for", ipppssoot, "=", exc)
comm.messages.delete(f"all-{ipppssoot}")
comm.messages.put(f"error-{ipppssoot}")
raise


def _main(comm, ipppssoot, bucket_name):
"""Core job submission function factored out of main() to clarify exception handling."""

wait_for_inputs(comm, ipppssoot)

comm.messages.delete(f"all-{ipppssoot}")
comm.outputs.delete(f"{ipppssoot}")

Expand All @@ -58,3 +62,28 @@ def _main(comm, ipppssoot, bucket_name):

metadata["job_id"] = response["jobId"]
comm.xdata.put(ipppssoot, metadata)


class CalcloudInputsFailure(RuntimeError):
"""The inputs needed to plan and run this job were not ready in time."""


def wait_for_inputs(comm, ipppssoot):
"""Ensure that the inputs required to plan and run the job for `ipppssoot` are available.
Each iteration, check for the S3 message files which trigger submissions and abort if none
are found.
Eventually after 15 min (default) the lambda will die if it's still waiting.
"""
input_tarball, memory_modeling = [], []
while not input_tarball or not memory_modeling:
input_tarball = comm.inputs.listl(f"{ipppssoot}.tar.gz")
memory_modeling = comm.control.listl(f"{ipppssoot}/{ipppssoot}_MemModelFeatures.txt")
if not comm.messages.listl([f"placed-{ipppssoot}", f"rescue-{ipppssoot}"]):
raise CalcloudInputsFailure(
f"Both the 'placed' and 'rescue' messages for {ipppssoot} have been deleted. Aborting input wait and submission."
)
print(f"Waiting for inputs for {ipppssoot}. input_tarball={input_tarball} memory_modeling={memory_modeling}")
time.sleep(30)
print(f"Inputs for {ipppssoot} found.")
2 changes: 1 addition & 1 deletion calcloud/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def list_objects(s3_prefix, client=None, max_objects=MAX_LIST_OBJECTS):
log.verbose("s3.list_objects", s3_prefix, max_objects)
client, bucket_name, prefix = _s3_setup(client, s3_prefix)
paginator = client.get_paginator("list_objects_v2")
config = {"MaxItems": max_objects, "PageSize": max_objects}
config = {"MaxItems": max_objects, "PageSize": 1000}
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix, PaginationConfig=config):
for result in page.get("Contents", []):
listed = "s3://" + bucket_name + "/" + result["Key"]
Expand Down
2 changes: 1 addition & 1 deletion terraform/lambda_job_rescue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module "calcloud_lambda_rescueJob" {
handler = "rescue_handler.lambda_handler"
runtime = "python3.6"
publish = false
timeout = 300
timeout = 900

source_path = [
{
Expand Down
2 changes: 1 addition & 1 deletion terraform/lambda_job_submit.tf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module "calcloud_lambda_submit" {
handler = "s3_trigger_handler.lambda_handler"
runtime = "python3.6"
publish = false
timeout = 30
timeout = 900

source_path = [
{
Expand Down

0 comments on commit 2e70fbf

Please sign in to comment.