diff --git a/calcloud/dynamodb.py b/calcloud/dynamodb.py deleted file mode 100644 index 2b9bbac8..00000000 --- a/calcloud/dynamodb.py +++ /dev/null @@ -1,112 +0,0 @@ -"""This module handles initializing and using the calcloud job database -which tracks metadata like memory consumption and compute duration. -""" -import sys - -import boto3 - -# -------------------------------------------------------------------------------- - - -def convert(v): - """Type converter from str to int, float, str in that order.""" - v = v.strip() - try: - return int(v) - except ValueError: - try: - return float(v) - except ValueError: - return str(v) - return v - - -# -------------------------------------------------------------------------------- - - -class DynamoDB: - """Handle Dynamo API calls more naturally.""" - - def __init__(self, table_name, primary_key): - """Initialize DynamoDB w/o creating or opening actual database.""" - self.table_name = table_name - self.primary_key = primary_key - self.dynamodb = boto3.resource("dynamodb") - self.table = self.open_db() # Assume it's terraform'ed already and fail if not. - - # try: - # self.table = self.open_db() - # except exceptions.ResourceNotExists: - # print(f"Creating dynamodb '{table_name}' with primary key '{primary_key}'") - # self.table = self.create_db() - - def create_db(self): - """Create a dynamodb corresponding to this object's table_name and primary_key.""" - table = self.dynamodb.create_table( - TableName=self.table_name, - KeySchema=[{"AttributeName": self.primary_key, "KeyType": "HASH"}], - AttributeDefinitions=[ - {"AttributeName": self.primary_key, "AttributeType": "S"}, - ], - BillingMode="PAY_PER_REQUEST", - ) - table.meta.client.get_waiter("table_exists").wait(TableName=self.table_name) - return table - - def open_db(self): - """Open an existing db.""" - return self.dynamodb.Table(self.table_name) - - def to_db(self, item_dict): - """Convert the values of item_dict for storage in dynamodb.""" - return {key: str(val) for (key, val) in item_dict.items()} - - def from_db(self, item_dict): - """Convert values coming from dynamodb back to Python types.""" - return {key: convert(val) for (key, val) in item_dict.items()} - - def put_item(self, item_dict): - """Create a new item corresponding to `item_dict`.""" - self.table.put_item(Item=self.to_db(item_dict)) - - def init_db(self, db_dict): - """Initialize the simpledb using database dictionary `db_dict` of the form: - - db_dict dict(dict...) { item_name: item_dict, ...} - - where e.g. item_name == "i9zf11010" and item_dict == {'dataset':'i9zf11010', "imageSize":2048, "jobDuration": 4398.4, ...} - - Returns None - """ - with self.table.batch_writer() as batch: - for i, (_item_name, item_dict) in enumerate(db_dict.items()): - if i % 1000 == 0: - print(i) - sys.stdout.flush() - batch.put_item(self.to_db(item_dict)) - - def get_item(self, item_name): - """Fetch attribute dictionary of item with primary key value `item_name`. - - item_name: str e.g. name of dataset 'i9zf11010' - - returns dict(item attributes...) - """ - resp = self.table.get_item(Key={self.primary_key: item_name}) - try: - return self.from_db(resp["Item"]) - except KeyError: - raise KeyError(f"Item '{item_name}' not found.") - - def del_item(self, item_name): - """Delete the item with primary key value `item_name`.""" - self.table.delete_item(Key={self.primary_key: item_name}) - - def update_item(self, item_dict): - """Update the item identified in `item_dict` with the values of `item_dict`.""" - self.del_item(item_dict[self.primary_key]) - self.put_item(item_dict) - - def del_db(self): - """Delete this database destroying the persistent copy.""" - self.table.delete() diff --git a/calcloud/lambda_submit.py b/calcloud/lambda_submit.py index 3bc2d49d..6d0f1780 100644 --- a/calcloud/lambda_submit.py +++ b/calcloud/lambda_submit.py @@ -48,13 +48,12 @@ def _main(comm, ipppssoot, bucket_name): except comm.xdata.client.exceptions.NoSuchKey: metadata = dict(memory_retries=0, job_id=None, terminated=False) + # get_plan() raises AllBinsTriedQuit when retries exhaust higher memory job definitions p = plan.get_plan(ipppssoot, bucket_name, f"{bucket_name}/inputs", metadata["memory_retries"]) + # Only reached if get_plan() defines a viable job plan print("Job Plan:", p) - response = submit.submit_job(p) - print("Submitted job for", ipppssoot, "as ID", response["jobId"]) - metadata["job_id"] = response["jobId"] comm.xdata.put(ipppssoot, metadata) diff --git a/calcloud/metrics.py b/calcloud/metrics.py deleted file mode 100644 index d39130e8..00000000 --- a/calcloud/metrics.py +++ /dev/null @@ -1,134 +0,0 @@ -import sys -import csv -import math - -from . import dynamodb -from . import s3 -from . import log - -# -------------------------------------------------------------------------------- - - -def load_blackboard(filename, delimiter="|", item_key="dataset"): - """Loader for dump of on-prem HST blackboard defining known resource usage. - - Returns { row_dict[item_key]: row_dict, ...} - - where `row_dict` is a dictionary with keys that are column names and - corresponding row values. - """ - with open(filename) as csvfile: - reader = csv.reader(csvfile, delimiter=delimiter) - columns = tuple(col.strip() for col in reader.__next__()) - db_dict = {} - for row in reader: - converted = tuple(dynamodb.convert(v) for v in row) - item_dict = dict(zip(columns, converted)) - item_dict.pop("", None) - db_dict[item_dict[item_key]] = { - item_key: item_dict[item_key], - "memory_megabytes": math.ceil(item_dict["imageSize"] / 1024), - "wallclock_seconds": item_dict["jobduration"], - "cpus": 1, - } - return db_dict - - -# -------------------------------------------------------------------------------- - -DB = dynamodb.DynamoDB("calcloud-hst-job-info", "dataset") - - -def load_db(filepath): - DB.init_db(load_blackboard(filepath)) - - -def get_resources(dataset): - """Given ipppssoot `dataset`, return: - (memory_megabytes, cpus, wallclock_seconds) - """ - # raise KeyError("Forced get_resources error for", dataset) - item = DB.get_item(dataset) - memory_megabytes = item["memory_megabytes"] - cpus = item.get("cpus", 1) # original blackboard doesn't have these - wallclock_seconds = item["wallclock_seconds"] - return memory_megabytes, cpus, wallclock_seconds - - -def set_resources(dataset, memory_megabytes, cpus, wallclock_seconds): - """Update the resource metrics database for ipppssoot `dataset` with measured - values for `memory_megabytes`, `cpus` effectively used/available, and - `walkclock_seconds` of runtime. - """ - item = DB.get_item(dataset) - item["memory_megabytes"] = memory_megabytes - item["cpus"] = cpus - item["wallclock_seconds"] = wallclock_seconds - DB.update_item(item) - - -# -------------------------------------------------------------------------------- - -# This is throw-away code if the /usr/bin/time metrics turn out to be unusable. -# It might however illustrate loading metrics from S3 in general if it turns out -# that container-based metrics are viable in some form, e.g. based on psutils, -# or aspects of any lambda based on CloudWatch metrics. - - -def update_resources(s3_batch_path): - """Extract the process_metrics.txt files from every ipppssoot in a batch and - use them to overwrite or add a metrics database entry. - """ - process_metrics_s3 = [ - metric for metric in s3.list_directory(s3_batch_path, max_objects=10 ** 7) if "process_metric" in metric - ] - for s3_path in process_metrics_s3: - log.info("Processing metrics for", s3_path) - metrics_text = s3.get_object(s3_path) - dataset = s3_path.split("/")[-3] - memory, cpus, seconds = parse_time_metrics(metrics_text) - set_resources(dataset, memory, cpus, seconds) - - -def parse_time_metrics(metrics_text): - """Parse the verbose output of GNU /usr/bin/time included in the original - caldp container to extract the memory resident set size, wallclock time, - and CPU's utilized. - - For outlier (jcl403010) with 49.6G RAM reported in the blackboard, - the RSS of approx 8000 reported by "time" was insufficient to run - the container. 2x or roughly 16000 succeeded. - - Add 2x fudge here so reported numbers are directly usable and - metric-specific fudge is not hardcoded in the planner or - provisioner. (This fudge is property of this measurement). - - Returns memory_megabytes, cpus, wallclock_seconds - """ - for line in metrics_text.splitlines(): - line = line.strip() - words = line.split() - if line.startswith("Percent of CPU"): - percent = words[-1][:-1] - cpus = math.ceil(int(percent) / 100) - elif line.startswith("Maximum resident set size"): - kilobytes = int(words[-1]) - memory_megabytes = math.ceil(kilobytes / 1024 * 2) - elif line.startswith("Elapsed (wall clock) time"): - parts = words[-1].split(":") - if len(parts) == 2: - h, m, ss = 0, parts[0], parts[1] - elif len(parts) == 3: - h, m, ss = parts - h, m, ss = map(lambda x: int(float(x)), [h, m, ss]) - wallclock_seconds = h * 3600 + m * 60 + ss - return memory_megabytes, cpus, wallclock_seconds - - -# -------------------------------------------------------------------------------- -if __name__ == "__main__": - if len(sys.argv) != 2: - print("usage: python -m calcloud.metrics ", file=sys.stderr) - sys.exit(1) - else: - load_db(sys.argv[1]) diff --git a/calcloud/plan.py b/calcloud/plan.py index 90e549db..6272ce35 100644 --- a/calcloud/plan.py +++ b/calcloud/plan.py @@ -1,22 +1,23 @@ -"""This module is used to create processing resourcess given a list of ipppssoots and parameters to -define outputs locations. +"""This module is used to define job plans using the high level function +get_plan(). -The idea behind creating resourcess is to generate enough information such that an ipppssoot or -set of ipppssoots can be assigned to well tuned processing resources. +get_plan() returns a named tuple specifying all the information needed to +submit a job. + +Based on a memory_retries counter, get_plan() iterates through a sequence +of job definitions with increasing memory requirements until the job later +succeeds with sufficient memory or exhausts all retries. """ import sys import os from collections import namedtuple from . import hst -from . import metrics from . import log from . import s3 # ---------------------------------------------------------------------- -OUTLIER_THRESHHOLD_MEGABYTES = 8192 - 128 # m5.large - 128M ~overhead - JobResources = namedtuple( "JobResources", [ @@ -26,8 +27,7 @@ "s3_output_uri", "input_path", "crds_config", - "vcpus", - "memory", + "initial_modeled_bin", "max_seconds", ], ) @@ -36,6 +36,11 @@ Plan = namedtuple("Plan", JobResources._fields + JobEnv._fields) + +class AllBinsTriedQuit(Exception): + """Exception to raise when retry is requested but no applicable bin is available.""" + + # ---------------------------------------------------------------------- # This is the top level entrypoint called from calcloud.lambda_submit.main @@ -60,12 +65,12 @@ def get_plan(ipppssoot, output_bucket, input_path, memory_retries=0): Returns Plan (named tuple) """ - job_resources = get_resources(ipppssoot, output_bucket, input_path, memory_retries) - env = _get_environment(job_resources) + job_resources = _get_resources(ipppssoot, output_bucket, input_path) + env = _get_environment(job_resources, memory_retries) return Plan(*(job_resources + env)) -def get_resources(ipppssoot, output_bucket, input_path, retries=0): +def _get_resources(ipppssoot, output_bucket, input_path): """Given an HST IPPPSSOOT ID, return information used to schedule it as a batch job. Conceptually resource requirements can be tailored to individual IPPPSSOOTs. @@ -81,41 +86,39 @@ def get_resources(ipppssoot, output_bucket, input_path, retries=0): job_name = ipppssoot input_path = input_path crds_config = "caldp-config-offsite" - return JobResources( - *(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config) - + _get_job_resources(instr, ipppssoot, retries) - ) + initial_bin = 0 + kill_time = 48 * 60 * 60 + return JobResources(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config, initial_bin, kill_time) + +def _get_environment(job_resources, memory_retries): + """Based on a resources tuple and a memory_retries counter, determine: -def _get_environment(job_resources): + (queue, job_definition_for_memory, kill seconds) + """ + job_defs = os.environ["JOBDEFINITIONS"].split(",") job_resources = JobResources(*job_resources) - job_definition = os.environ["JOBDEFINITION"] normal_queue = os.environ["NORMALQUEUE"] - outlier_queue = os.environ["OUTLIERQUEUE"] - if job_resources.memory <= OUTLIER_THRESHHOLD_MEGABYTES: - return JobEnv(normal_queue, job_definition, "caldp-process") + final_bin = job_resources.initial_modeled_bin + memory_retries + if final_bin < len(job_defs): + job_definition = job_defs[final_bin] + log.info( + "Selected job definition", + job_definition, + "for", + job_resources.ipppssoot, + "based on initial bin", + job_resources.initial_modeled_bin, + "and", + memory_retries, + "retries.", + ) else: - return JobEnv(outlier_queue, job_definition, "caldp-process") - + log.info("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) + raise AllBinsTriedQuit("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) -def _get_job_resources(instr, ipppssoot, retries=0): - """Given the instrument `instr` and dataset id `ipppssoot`... - - Return required resources (cores, memory in M, seconds til kill) - - Note that these are "required" and still need to be matched to "available". - """ - info = [0] * 3 - try: - memory_megabytes, cpus, wallclock_seconds = metrics.get_resources(ipppssoot) - info[0] = cpus - info[1] = memory_megabytes + 512 # add some overhead for AWS Batch (>= 32M) and measurement error - info[2] = max(int(wallclock_seconds * cpus * 6), 120) # kill time, BEWARE: THIS LOOKS WRONG - except KeyError: - info = (32, 64 * 1024, int(60 * 60 * 48)) # 32 cores, 64G/72G, 48 hours max (c5.9xlarge) - log.warning("Defaulting (cpu, memory, time) requirements for unknown dataset:", ipppssoot, "to", info) - return tuple(info) + return JobEnv(normal_queue, job_definition, "caldp-process") # ---------------------------------------------------------------------- diff --git a/calcloud/submit.py b/calcloud/submit.py index 108b4088..9c352326 100644 --- a/calcloud/submit.py +++ b/calcloud/submit.py @@ -17,10 +17,10 @@ def submit_job(plan_tuple): "jobQueue": info.job_queue, "jobDefinition": info.job_definition, "containerOverrides": { - "resourceRequirements": [ - {"value": f"{info.memory}", "type": "MEMORY"}, - {"value": f"{info.vcpus}", "type": "VCPU"}, - ], + # "resourceRequirements": [ + # {"value": f"{info.memory}", "type": "MEMORY"}, + # {"value": f"{info.vcpus}", "type": "VCPU"}, + # ], "command": [info.command, info.ipppssoot, info.input_path, info.s3_output_uri, info.crds_config], }, "timeout": { diff --git a/terraform/batch-outlier.tf b/terraform/batch-outlier.tf deleted file mode 100644 index 7bbf50a3..00000000 --- a/terraform/batch-outlier.tf +++ /dev/null @@ -1,41 +0,0 @@ -resource "aws_batch_job_queue" "batch_outlier_queue" { - name = "calcloud-hst-outlier-queue${local.environment}" - compute_environments = [ - aws_batch_compute_environment.calcloud_outlier.arn - ] - priority = 10 - state = "ENABLED" - -} - -resource "aws_batch_compute_environment" "calcloud_outlier" { - compute_environment_name_prefix = "calcloud-hst-outlier${local.environment}-" - type = "MANAGED" - service_role = data.aws_ssm_parameter.batch_service_role.value - - compute_resources { - allocation_strategy = "BEST_FIT_PROGRESSIVE" - instance_role = data.aws_ssm_parameter.ecs_instance_role.value - type = "EC2" - bid_percentage = 0 - tags = {} - subnets = local.batch_subnet_ids - security_group_ids = local.batch_sgs - - instance_type = [ - "optimal", # 36 cores, 72G ram - ] - max_vcpus = 72 - min_vcpus = 0 - desired_vcpus = 0 - - launch_template { - launch_template_id = aws_launch_template.hstdp.id - version = "$Latest" - } - } - lifecycle { - ignore_changes = [compute_resources.0.desired_vcpus] - create_before_destroy = true - } -} diff --git a/terraform/batch.tf b/terraform/batch.tf index 4ed21694..ce1bbc6f 100644 --- a/terraform/batch.tf +++ b/terraform/batch.tf @@ -137,8 +137,114 @@ data "aws_ecr_image" "caldp_latest" { image_tag = var.image_tag } -resource "aws_batch_job_definition" "calcloud" { - name = "calcloud-hst-caldp-job-definition${local.environment}" +# ------------------------------------------------------------------------------------------ + +# 2G ----------------- also reserve 128M per 1G for Batch ECS + STScI overheads + +resource "aws_batch_job_definition" "calcloud_2g" { + name = "calcloud-jobdef-2g${local.environment}" + type = "container" + container_properties = <