From a4214cd23a9023f63cea19ee732d3ee98ed423e2 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 5 Mar 2021 14:45:06 +0000 Subject: [PATCH 1/9] Initial stab at job definition ladder. --- calcloud/lambda_submit.py | 5 +- calcloud/plan.py | 72 +++++++++++++++---------- terraform/batch.tf | 99 ++++++++++++++++++++++++++++++++-- terraform/lambda_job_rescue.tf | 2 +- terraform/lambda_job_submit.tf | 2 +- terraform/locals.tf | 2 + 6 files changed, 147 insertions(+), 35 deletions(-) diff --git a/calcloud/lambda_submit.py b/calcloud/lambda_submit.py index 3bc2d49d..6d0f1780 100644 --- a/calcloud/lambda_submit.py +++ b/calcloud/lambda_submit.py @@ -48,13 +48,12 @@ def _main(comm, ipppssoot, bucket_name): except comm.xdata.client.exceptions.NoSuchKey: metadata = dict(memory_retries=0, job_id=None, terminated=False) + # get_plan() raises AllBinsTriedQuit when retries exhaust higher memory job definitions p = plan.get_plan(ipppssoot, bucket_name, f"{bucket_name}/inputs", metadata["memory_retries"]) + # Only reached if get_plan() defines a viable job plan print("Job Plan:", p) - response = submit.submit_job(p) - print("Submitted job for", ipppssoot, "as ID", response["jobId"]) - metadata["job_id"] = response["jobId"] comm.xdata.put(ipppssoot, metadata) diff --git a/calcloud/plan.py b/calcloud/plan.py index 90e549db..6af90d68 100644 --- a/calcloud/plan.py +++ b/calcloud/plan.py @@ -1,22 +1,23 @@ -"""This module is used to create processing resourcess given a list of ipppssoots and parameters to -define outputs locations. +"""This module is used to define job plans using the high level function +get_plan(). -The idea behind creating resourcess is to generate enough information such that an ipppssoot or -set of ipppssoots can be assigned to well tuned processing resources. +get_plan() returns a named tuple specifying all the information needed to +submit a job. + +Based on a memory_retries counter, get_plan() iterates through a sequence +of job definitions with increasing memory requirements until the job later +succeeds with sufficient memory or exhausts all retries. """ import sys import os from collections import namedtuple from . import hst -from . import metrics from . import log from . import s3 # ---------------------------------------------------------------------- -OUTLIER_THRESHHOLD_MEGABYTES = 8192 - 128 # m5.large - 128M ~overhead - JobResources = namedtuple( "JobResources", [ @@ -27,7 +28,7 @@ "input_path", "crds_config", "vcpus", - "memory", + "initial_modeled_bin", "max_seconds", ], ) @@ -36,6 +37,11 @@ Plan = namedtuple("Plan", JobResources._fields + JobEnv._fields) + +class AllBinsTriedQuit(Exception): + """Exception to raise when retry is requested but no applicable bin is available.""" + + # ---------------------------------------------------------------------- # This is the top level entrypoint called from calcloud.lambda_submit.main @@ -61,7 +67,7 @@ def get_plan(ipppssoot, output_bucket, input_path, memory_retries=0): Returns Plan (named tuple) """ job_resources = get_resources(ipppssoot, output_bucket, input_path, memory_retries) - env = _get_environment(job_resources) + env = _get_environment(job_resources, memory_retries) return Plan(*(job_resources + env)) @@ -87,35 +93,47 @@ def get_resources(ipppssoot, output_bucket, input_path, retries=0): ) -def _get_environment(job_resources): +def _get_environment(job_resources, memory_retries): + """Based on a resources tuple and a memory_retries counter, determine: + + (queue, job_definition_for_memory, kill seconds) + """ + job_defs = os.environ["JOBDEFINITIONS"].split(",") job_resources = JobResources(*job_resources) - job_definition = os.environ["JOBDEFINITION"] normal_queue = os.environ["NORMALQUEUE"] - outlier_queue = os.environ["OUTLIERQUEUE"] - if job_resources.memory <= OUTLIER_THRESHHOLD_MEGABYTES: - return JobEnv(normal_queue, job_definition, "caldp-process") + final_bin = job_resources.initial_modeled_bin + memory_retries + if final_bin < len(job_defs): + job_definition = job_defs[final_bin] + print( + "Selected job definition", + job_definition, + "for", + job_resources.ipppssoot, + "based on initial bin", + job_resources.initial_modeled_bin, + "and", + memory_retries, + "retries.", + ) else: - return JobEnv(outlier_queue, job_definition, "caldp-process") + print("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) + raise AllBinsTriedQuit("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) + return JobEnv(queue, job_definition, "caldp-process") -def _get_job_resources(instr, ipppssoot, retries=0): + +def _get_job_resources(instr, ipppssoot): """Given the instrument `instr` and dataset id `ipppssoot`... - Return required resources (cores, memory in M, seconds til kill) + Return required resources (cores, initial_modeled_bin, seconds til kill) Note that these are "required" and still need to be matched to "available". + + # XXXXX Memory modeling nominally plugs in here to determin starting bin. """ - info = [0] * 3 - try: - memory_megabytes, cpus, wallclock_seconds = metrics.get_resources(ipppssoot) - info[0] = cpus - info[1] = memory_megabytes + 512 # add some overhead for AWS Batch (>= 32M) and measurement error - info[2] = max(int(wallclock_seconds * cpus * 6), 120) # kill time, BEWARE: THIS LOOKS WRONG - except KeyError: - info = (32, 64 * 1024, int(60 * 60 * 48)) # 32 cores, 64G/72G, 48 hours max (c5.9xlarge) - log.warning("Defaulting (cpu, memory, time) requirements for unknown dataset:", ipppssoot, "to", info) - return tuple(info) + # (1 core, 0th bin, 48 hour kill time) + return tuple(1, 0, 48 * 60 * 60) # ---------------------------------------------------------------------- diff --git a/terraform/batch.tf b/terraform/batch.tf index 27a6a033..6b907815 100644 --- a/terraform/batch.tf +++ b/terraform/batch.tf @@ -108,8 +108,70 @@ data "aws_ecr_image" "caldp_latest" { image_tag = var.image_tag } -resource "aws_batch_job_definition" "calcloud" { - name = "calcloud-hst-caldp-job-definition${local.environment}" +# ------------------------------------------------------------------------------------------ + +# 2G ----------------- also reserve 32M per 1G for Batch ECS overheads + +resource "aws_batch_job_definition" "calcloud_2g" { + name = "calcloud-jobdef-2g${local.environment}" + type = "container" + container_properties = < Date: Fri, 5 Mar 2021 15:00:36 +0000 Subject: [PATCH 2/9] Memory ladder draft tweaks. --- calcloud/plan.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/calcloud/plan.py b/calcloud/plan.py index 6af90d68..3c916a7c 100644 --- a/calcloud/plan.py +++ b/calcloud/plan.py @@ -66,12 +66,12 @@ def get_plan(ipppssoot, output_bucket, input_path, memory_retries=0): Returns Plan (named tuple) """ - job_resources = get_resources(ipppssoot, output_bucket, input_path, memory_retries) + job_resources = get_resources(ipppssoot, output_bucket, input_path) env = _get_environment(job_resources, memory_retries) return Plan(*(job_resources + env)) -def get_resources(ipppssoot, output_bucket, input_path, retries=0): +def get_resources(ipppssoot, output_bucket, input_path): """Given an HST IPPPSSOOT ID, return information used to schedule it as a batch job. Conceptually resource requirements can be tailored to individual IPPPSSOOTs. @@ -89,7 +89,7 @@ def get_resources(ipppssoot, output_bucket, input_path, retries=0): crds_config = "caldp-config-offsite" return JobResources( *(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config) - + _get_job_resources(instr, ipppssoot, retries) + + _get_job_resources(instr, ipppssoot) ) @@ -133,7 +133,7 @@ def _get_job_resources(instr, ipppssoot): # XXXXX Memory modeling nominally plugs in here to determin starting bin. """ # (1 core, 0th bin, 48 hour kill time) - return tuple(1, 0, 48 * 60 * 60) + return (1, 0, 48 * 60 * 60) # ---------------------------------------------------------------------- From f5a8d2ce6b47c17bf84b6ef40cfa3e610d4942b1 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 5 Mar 2021 19:11:56 +0000 Subject: [PATCH 3/9] Fix refactoring errors in plan.py, black --- calcloud/plan.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/calcloud/plan.py b/calcloud/plan.py index 3c916a7c..a8134311 100644 --- a/calcloud/plan.py +++ b/calcloud/plan.py @@ -88,8 +88,7 @@ def get_resources(ipppssoot, output_bucket, input_path): input_path = input_path crds_config = "caldp-config-offsite" return JobResources( - *(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config) - + _get_job_resources(instr, ipppssoot) + *(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config) + _get_job_resources(instr, ipppssoot) ) @@ -105,7 +104,7 @@ def _get_environment(job_resources, memory_retries): final_bin = job_resources.initial_modeled_bin + memory_retries if final_bin < len(job_defs): job_definition = job_defs[final_bin] - print( + log.info( "Selected job definition", job_definition, "for", @@ -117,10 +116,10 @@ def _get_environment(job_resources, memory_retries): "retries.", ) else: - print("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) + log.info("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) raise AllBinsTriedQuit("No higher memory job definition for", job_resources.ipppssoot, "after", memory_retries) - return JobEnv(queue, job_definition, "caldp-process") + return JobEnv(normal_queue, job_definition, "caldp-process") def _get_job_resources(instr, ipppssoot): From 1c1154da2440978dfd113defc010c08e4702a285 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 5 Mar 2021 19:16:34 +0000 Subject: [PATCH 4/9] Commented out memory and cpu overrides in submit.py --- calcloud/submit.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/calcloud/submit.py b/calcloud/submit.py index 108b4088..9c352326 100644 --- a/calcloud/submit.py +++ b/calcloud/submit.py @@ -17,10 +17,10 @@ def submit_job(plan_tuple): "jobQueue": info.job_queue, "jobDefinition": info.job_definition, "containerOverrides": { - "resourceRequirements": [ - {"value": f"{info.memory}", "type": "MEMORY"}, - {"value": f"{info.vcpus}", "type": "VCPU"}, - ], + # "resourceRequirements": [ + # {"value": f"{info.memory}", "type": "MEMORY"}, + # {"value": f"{info.vcpus}", "type": "VCPU"}, + # ], "command": [info.command, info.ipppssoot, info.input_path, info.s3_output_uri, info.crds_config], }, "timeout": { From e0bd2e0fcde7b44e25d76aaccfc3a4f3b10fcf5c Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 5 Mar 2021 21:02:01 +0000 Subject: [PATCH 5/9] Adjustments to binning and resource planning --- calcloud/plan.py | 24 +++++------------------- terraform/batch.tf | 16 ++++++++-------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/calcloud/plan.py b/calcloud/plan.py index a8134311..6272ce35 100644 --- a/calcloud/plan.py +++ b/calcloud/plan.py @@ -27,7 +27,6 @@ "s3_output_uri", "input_path", "crds_config", - "vcpus", "initial_modeled_bin", "max_seconds", ], @@ -66,12 +65,12 @@ def get_plan(ipppssoot, output_bucket, input_path, memory_retries=0): Returns Plan (named tuple) """ - job_resources = get_resources(ipppssoot, output_bucket, input_path) + job_resources = _get_resources(ipppssoot, output_bucket, input_path) env = _get_environment(job_resources, memory_retries) return Plan(*(job_resources + env)) -def get_resources(ipppssoot, output_bucket, input_path): +def _get_resources(ipppssoot, output_bucket, input_path): """Given an HST IPPPSSOOT ID, return information used to schedule it as a batch job. Conceptually resource requirements can be tailored to individual IPPPSSOOTs. @@ -87,9 +86,9 @@ def get_resources(ipppssoot, output_bucket, input_path): job_name = ipppssoot input_path = input_path crds_config = "caldp-config-offsite" - return JobResources( - *(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config) + _get_job_resources(instr, ipppssoot) - ) + initial_bin = 0 + kill_time = 48 * 60 * 60 + return JobResources(ipppssoot, instr, job_name, s3_output_uri, input_path, crds_config, initial_bin, kill_time) def _get_environment(job_resources, memory_retries): @@ -122,19 +121,6 @@ def _get_environment(job_resources, memory_retries): return JobEnv(normal_queue, job_definition, "caldp-process") -def _get_job_resources(instr, ipppssoot): - """Given the instrument `instr` and dataset id `ipppssoot`... - - Return required resources (cores, initial_modeled_bin, seconds til kill) - - Note that these are "required" and still need to be matched to "available". - - # XXXXX Memory modeling nominally plugs in here to determin starting bin. - """ - # (1 core, 0th bin, 48 hour kill time) - return (1, 0, 48 * 60 * 60) - - # ---------------------------------------------------------------------- diff --git a/terraform/batch.tf b/terraform/batch.tf index 6b907815..5f6e1b6b 100644 --- a/terraform/batch.tf +++ b/terraform/batch.tf @@ -110,7 +110,7 @@ data "aws_ecr_image" "caldp_latest" { # ------------------------------------------------------------------------------------------ -# 2G ----------------- also reserve 32M per 1G for Batch ECS overheads +# 2G ----------------- also reserve 128M per 1G for Batch ECS + STScI overheads resource "aws_batch_job_definition" "calcloud_2g" { name = "calcloud-jobdef-2g${local.environment}" @@ -121,7 +121,7 @@ resource "aws_batch_job_definition" "calcloud_2g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${2*(1024-32)}, + "memory": ${2*(1024-128)}, "mountPoints": [], "resourceRequirements": [], "ulimits": [], @@ -150,11 +150,11 @@ resource "aws_batch_job_definition" "calcloud_8g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${8*(1024-32)}, + "memory": ${8*(1024-128)}, "mountPoints": [], "resourceRequirements": [], "ulimits": [], - "vcpus": 1, + "vcpus": 4, "volumes": [] } CONTAINER_PROPERTIES @@ -179,11 +179,11 @@ resource "aws_batch_job_definition" "calcloud_16g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${16*(1024-32)}, + "memory": ${16*(1024-128)}, "mountPoints": [], "resourceRequirements": [], "ulimits": [], - "vcpus": 1, + "vcpus": 8, "volumes": [] } CONTAINER_PROPERTIES @@ -208,11 +208,11 @@ resource "aws_batch_job_definition" "calcloud_64g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${64*(1024-32)}, + "memory": ${64*(1024-128)}, "mountPoints": [], "resourceRequirements": [], "ulimits": [], - "vcpus": 1, + "vcpus": 32, "volumes": [] } CONTAINER_PROPERTIES From 3a7a10d990c9bb9ffdb0d1e532c848edb9213065 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Fri, 5 Mar 2021 21:20:17 +0000 Subject: [PATCH 6/9] Dropped batch outlier queue from Terraform --- terraform/batch-outlier.tf | 41 ----------------------- terraform/lambda_batch_events.tf | 2 +- terraform/lambda_blackboard.tf | 2 +- terraform/lambda_job_delete.tf | 2 +- terraform/lambda_job_rescue.tf | 1 - terraform/lambda_job_submit.tf | 1 - terraform/lambda_refresh_cache_logging.tf | 2 +- 7 files changed, 4 insertions(+), 47 deletions(-) delete mode 100644 terraform/batch-outlier.tf diff --git a/terraform/batch-outlier.tf b/terraform/batch-outlier.tf deleted file mode 100644 index 7bbf50a3..00000000 --- a/terraform/batch-outlier.tf +++ /dev/null @@ -1,41 +0,0 @@ -resource "aws_batch_job_queue" "batch_outlier_queue" { - name = "calcloud-hst-outlier-queue${local.environment}" - compute_environments = [ - aws_batch_compute_environment.calcloud_outlier.arn - ] - priority = 10 - state = "ENABLED" - -} - -resource "aws_batch_compute_environment" "calcloud_outlier" { - compute_environment_name_prefix = "calcloud-hst-outlier${local.environment}-" - type = "MANAGED" - service_role = data.aws_ssm_parameter.batch_service_role.value - - compute_resources { - allocation_strategy = "BEST_FIT_PROGRESSIVE" - instance_role = data.aws_ssm_parameter.ecs_instance_role.value - type = "EC2" - bid_percentage = 0 - tags = {} - subnets = local.batch_subnet_ids - security_group_ids = local.batch_sgs - - instance_type = [ - "optimal", # 36 cores, 72G ram - ] - max_vcpus = 72 - min_vcpus = 0 - desired_vcpus = 0 - - launch_template { - launch_template_id = aws_launch_template.hstdp.id - version = "$Latest" - } - } - lifecycle { - ignore_changes = [compute_resources.0.desired_vcpus] - create_before_destroy = true - } -} diff --git a/terraform/lambda_batch_events.tf b/terraform/lambda_batch_events.tf index 8eb5bbd5..da1c7add 100644 --- a/terraform/lambda_batch_events.tf +++ b/terraform/lambda_batch_events.tf @@ -40,7 +40,7 @@ module "calcloud_lambda_batchEvents" { lambda_role = data.aws_ssm_parameter.lambda_submit_role.value environment_variables = { - JOBQUEUES="${aws_batch_job_queue.batch_queue.name},${aws_batch_job_queue.batch_outlier_queue.name}" + JOBQUEUES=aws_batch_job_queue.batch_queue.name MAX_MEMORY_RETRIES="4" } diff --git a/terraform/lambda_blackboard.tf b/terraform/lambda_blackboard.tf index 3e12e8d6..edbd1027 100644 --- a/terraform/lambda_blackboard.tf +++ b/terraform/lambda_blackboard.tf @@ -42,7 +42,7 @@ module "calcloud_lambda_blackboard" { environment_variables = { # comma delimited list of job queues, because batch can only list jobs per queue - JOBQUEUES="${aws_batch_job_queue.batch_queue.name},${aws_batch_job_queue.batch_outlier_queue.name}" + JOBQUEUES=aws_batch_job_queue.batch_queue.name BUCKET=aws_s3_bucket.calcloud.id FILESHARE=data.aws_ssm_parameter.file_share_arn.value } diff --git a/terraform/lambda_job_delete.tf b/terraform/lambda_job_delete.tf index 058efb9b..11b5b02f 100644 --- a/terraform/lambda_job_delete.tf +++ b/terraform/lambda_job_delete.tf @@ -40,7 +40,7 @@ module "calcloud_lambda_deleteJob" { lambda_role = data.aws_ssm_parameter.lambda_delete_role.value environment_variables = { - JOBQUEUES="${aws_batch_job_queue.batch_queue.name},${aws_batch_job_queue.batch_outlier_queue.name}" + JOBQUEUES=aws_batch_job_queue.batch_queue.name } tags = { diff --git a/terraform/lambda_job_rescue.tf b/terraform/lambda_job_rescue.tf index c919fc59..ef71ab70 100644 --- a/terraform/lambda_job_rescue.tf +++ b/terraform/lambda_job_rescue.tf @@ -42,7 +42,6 @@ module "calcloud_lambda_rescueJob" { environment_variables = { JOBDEFINITIONS = local.job_definitions, NORMALQUEUE = aws_batch_job_queue.batch_queue.name, - OUTLIERQUEUE = aws_batch_job_queue.batch_outlier_queue.name, } tags = { diff --git a/terraform/lambda_job_submit.tf b/terraform/lambda_job_submit.tf index 62d86f41..2b6605bc 100644 --- a/terraform/lambda_job_submit.tf +++ b/terraform/lambda_job_submit.tf @@ -42,7 +42,6 @@ module "calcloud_lambda_submit" { environment_variables = { JOBDEFINITIONS = local.job_definitions, NORMALQUEUE = aws_batch_job_queue.batch_queue.name, - OUTLIERQUEUE = aws_batch_job_queue.batch_outlier_queue.name, } tags = { diff --git a/terraform/lambda_refresh_cache_logging.tf b/terraform/lambda_refresh_cache_logging.tf index 6be82a30..a0168949 100644 --- a/terraform/lambda_refresh_cache_logging.tf +++ b/terraform/lambda_refresh_cache_logging.tf @@ -40,7 +40,7 @@ module "calcloud_lambda_refreshCache" { lambda_role = data.aws_ssm_parameter.lambda_cloudwatch_role.value # environment_variables = { -# JOBQUEUES="${aws_batch_job_queue.batch_queue.name},${aws_batch_job_queue.batch_outlier_queue.name}" +# JOBQUEUES=aws_batch_job_queue.batch_queue.name # } tags = { From 704c479dff460bda25f0df5092d190f623da1be9 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 9 Mar 2021 15:34:52 +0000 Subject: [PATCH 7/9] Removed metrics.py and dynamodb.py --- calcloud/dynamodb.py | 112 ------------------------------------ calcloud/metrics.py | 134 ------------------------------------------- 2 files changed, 246 deletions(-) delete mode 100644 calcloud/dynamodb.py delete mode 100644 calcloud/metrics.py diff --git a/calcloud/dynamodb.py b/calcloud/dynamodb.py deleted file mode 100644 index 2b9bbac8..00000000 --- a/calcloud/dynamodb.py +++ /dev/null @@ -1,112 +0,0 @@ -"""This module handles initializing and using the calcloud job database -which tracks metadata like memory consumption and compute duration. -""" -import sys - -import boto3 - -# -------------------------------------------------------------------------------- - - -def convert(v): - """Type converter from str to int, float, str in that order.""" - v = v.strip() - try: - return int(v) - except ValueError: - try: - return float(v) - except ValueError: - return str(v) - return v - - -# -------------------------------------------------------------------------------- - - -class DynamoDB: - """Handle Dynamo API calls more naturally.""" - - def __init__(self, table_name, primary_key): - """Initialize DynamoDB w/o creating or opening actual database.""" - self.table_name = table_name - self.primary_key = primary_key - self.dynamodb = boto3.resource("dynamodb") - self.table = self.open_db() # Assume it's terraform'ed already and fail if not. - - # try: - # self.table = self.open_db() - # except exceptions.ResourceNotExists: - # print(f"Creating dynamodb '{table_name}' with primary key '{primary_key}'") - # self.table = self.create_db() - - def create_db(self): - """Create a dynamodb corresponding to this object's table_name and primary_key.""" - table = self.dynamodb.create_table( - TableName=self.table_name, - KeySchema=[{"AttributeName": self.primary_key, "KeyType": "HASH"}], - AttributeDefinitions=[ - {"AttributeName": self.primary_key, "AttributeType": "S"}, - ], - BillingMode="PAY_PER_REQUEST", - ) - table.meta.client.get_waiter("table_exists").wait(TableName=self.table_name) - return table - - def open_db(self): - """Open an existing db.""" - return self.dynamodb.Table(self.table_name) - - def to_db(self, item_dict): - """Convert the values of item_dict for storage in dynamodb.""" - return {key: str(val) for (key, val) in item_dict.items()} - - def from_db(self, item_dict): - """Convert values coming from dynamodb back to Python types.""" - return {key: convert(val) for (key, val) in item_dict.items()} - - def put_item(self, item_dict): - """Create a new item corresponding to `item_dict`.""" - self.table.put_item(Item=self.to_db(item_dict)) - - def init_db(self, db_dict): - """Initialize the simpledb using database dictionary `db_dict` of the form: - - db_dict dict(dict...) { item_name: item_dict, ...} - - where e.g. item_name == "i9zf11010" and item_dict == {'dataset':'i9zf11010', "imageSize":2048, "jobDuration": 4398.4, ...} - - Returns None - """ - with self.table.batch_writer() as batch: - for i, (_item_name, item_dict) in enumerate(db_dict.items()): - if i % 1000 == 0: - print(i) - sys.stdout.flush() - batch.put_item(self.to_db(item_dict)) - - def get_item(self, item_name): - """Fetch attribute dictionary of item with primary key value `item_name`. - - item_name: str e.g. name of dataset 'i9zf11010' - - returns dict(item attributes...) - """ - resp = self.table.get_item(Key={self.primary_key: item_name}) - try: - return self.from_db(resp["Item"]) - except KeyError: - raise KeyError(f"Item '{item_name}' not found.") - - def del_item(self, item_name): - """Delete the item with primary key value `item_name`.""" - self.table.delete_item(Key={self.primary_key: item_name}) - - def update_item(self, item_dict): - """Update the item identified in `item_dict` with the values of `item_dict`.""" - self.del_item(item_dict[self.primary_key]) - self.put_item(item_dict) - - def del_db(self): - """Delete this database destroying the persistent copy.""" - self.table.delete() diff --git a/calcloud/metrics.py b/calcloud/metrics.py deleted file mode 100644 index d39130e8..00000000 --- a/calcloud/metrics.py +++ /dev/null @@ -1,134 +0,0 @@ -import sys -import csv -import math - -from . import dynamodb -from . import s3 -from . import log - -# -------------------------------------------------------------------------------- - - -def load_blackboard(filename, delimiter="|", item_key="dataset"): - """Loader for dump of on-prem HST blackboard defining known resource usage. - - Returns { row_dict[item_key]: row_dict, ...} - - where `row_dict` is a dictionary with keys that are column names and - corresponding row values. - """ - with open(filename) as csvfile: - reader = csv.reader(csvfile, delimiter=delimiter) - columns = tuple(col.strip() for col in reader.__next__()) - db_dict = {} - for row in reader: - converted = tuple(dynamodb.convert(v) for v in row) - item_dict = dict(zip(columns, converted)) - item_dict.pop("", None) - db_dict[item_dict[item_key]] = { - item_key: item_dict[item_key], - "memory_megabytes": math.ceil(item_dict["imageSize"] / 1024), - "wallclock_seconds": item_dict["jobduration"], - "cpus": 1, - } - return db_dict - - -# -------------------------------------------------------------------------------- - -DB = dynamodb.DynamoDB("calcloud-hst-job-info", "dataset") - - -def load_db(filepath): - DB.init_db(load_blackboard(filepath)) - - -def get_resources(dataset): - """Given ipppssoot `dataset`, return: - (memory_megabytes, cpus, wallclock_seconds) - """ - # raise KeyError("Forced get_resources error for", dataset) - item = DB.get_item(dataset) - memory_megabytes = item["memory_megabytes"] - cpus = item.get("cpus", 1) # original blackboard doesn't have these - wallclock_seconds = item["wallclock_seconds"] - return memory_megabytes, cpus, wallclock_seconds - - -def set_resources(dataset, memory_megabytes, cpus, wallclock_seconds): - """Update the resource metrics database for ipppssoot `dataset` with measured - values for `memory_megabytes`, `cpus` effectively used/available, and - `walkclock_seconds` of runtime. - """ - item = DB.get_item(dataset) - item["memory_megabytes"] = memory_megabytes - item["cpus"] = cpus - item["wallclock_seconds"] = wallclock_seconds - DB.update_item(item) - - -# -------------------------------------------------------------------------------- - -# This is throw-away code if the /usr/bin/time metrics turn out to be unusable. -# It might however illustrate loading metrics from S3 in general if it turns out -# that container-based metrics are viable in some form, e.g. based on psutils, -# or aspects of any lambda based on CloudWatch metrics. - - -def update_resources(s3_batch_path): - """Extract the process_metrics.txt files from every ipppssoot in a batch and - use them to overwrite or add a metrics database entry. - """ - process_metrics_s3 = [ - metric for metric in s3.list_directory(s3_batch_path, max_objects=10 ** 7) if "process_metric" in metric - ] - for s3_path in process_metrics_s3: - log.info("Processing metrics for", s3_path) - metrics_text = s3.get_object(s3_path) - dataset = s3_path.split("/")[-3] - memory, cpus, seconds = parse_time_metrics(metrics_text) - set_resources(dataset, memory, cpus, seconds) - - -def parse_time_metrics(metrics_text): - """Parse the verbose output of GNU /usr/bin/time included in the original - caldp container to extract the memory resident set size, wallclock time, - and CPU's utilized. - - For outlier (jcl403010) with 49.6G RAM reported in the blackboard, - the RSS of approx 8000 reported by "time" was insufficient to run - the container. 2x or roughly 16000 succeeded. - - Add 2x fudge here so reported numbers are directly usable and - metric-specific fudge is not hardcoded in the planner or - provisioner. (This fudge is property of this measurement). - - Returns memory_megabytes, cpus, wallclock_seconds - """ - for line in metrics_text.splitlines(): - line = line.strip() - words = line.split() - if line.startswith("Percent of CPU"): - percent = words[-1][:-1] - cpus = math.ceil(int(percent) / 100) - elif line.startswith("Maximum resident set size"): - kilobytes = int(words[-1]) - memory_megabytes = math.ceil(kilobytes / 1024 * 2) - elif line.startswith("Elapsed (wall clock) time"): - parts = words[-1].split(":") - if len(parts) == 2: - h, m, ss = 0, parts[0], parts[1] - elif len(parts) == 3: - h, m, ss = parts - h, m, ss = map(lambda x: int(float(x)), [h, m, ss]) - wallclock_seconds = h * 3600 + m * 60 + ss - return memory_megabytes, cpus, wallclock_seconds - - -# -------------------------------------------------------------------------------- -if __name__ == "__main__": - if len(sys.argv) != 2: - print("usage: python -m calcloud.metrics ", file=sys.stderr) - sys.exit(1) - else: - load_db(sys.argv[1]) From 21ac6f3a9a134ff655122632f69d976171808aa1 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 9 Mar 2021 17:04:06 +0000 Subject: [PATCH 8/9] Convert memory and vcpu specs in batch.tf to new resourceRequirements form. --- terraform/batch.tf | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/terraform/batch.tf b/terraform/batch.tf index 8b062946..eb9a9f5b 100644 --- a/terraform/batch.tf +++ b/terraform/batch.tf @@ -154,11 +154,12 @@ resource "aws_batch_job_definition" "calcloud_2g" { ], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${2*(1024-128)}, "mountPoints": [], - "resourceRequirements": [], + "resourceRequirements": [ + {"value" : "${2*(1024-128)}", "type" : "MEMORY"}, + {"value" : "1", "type": "VCPU"} + ], "ulimits": [], - "vcpus": 1, "volumes": [] } CONTAINER_PROPERTIES @@ -183,11 +184,12 @@ resource "aws_batch_job_definition" "calcloud_8g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${8*(1024-128)}, "mountPoints": [], - "resourceRequirements": [], + "resourceRequirements": [ + {"value" : "${8*(1024-128)}", "type" : "MEMORY"}, + {"value" : "4", "type": "VCPU"} + ], "ulimits": [], - "vcpus": 4, "volumes": [] } CONTAINER_PROPERTIES @@ -212,11 +214,12 @@ resource "aws_batch_job_definition" "calcloud_16g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${16*(1024-128)}, "mountPoints": [], - "resourceRequirements": [], + "resourceRequirements": [ + {"value": "${16*(1024-128)}", "type": "MEMORY"}, + {"value": "8", "type": "VCPU"} + ], "ulimits": [], - "vcpus": 8, "volumes": [] } CONTAINER_PROPERTIES @@ -241,11 +244,12 @@ resource "aws_batch_job_definition" "calcloud_64g" { "environment": [], "image": "${aws_ecr_repository.caldp_ecr.repository_url}:${data.aws_ecr_image.caldp_latest.image_tag}", "jobRoleArn": "${data.aws_ssm_parameter.batch_job_role.value}", - "memory": ${64*(1024-128)}, "mountPoints": [], - "resourceRequirements": [], + "resourceRequirements": [ + {"value": "${64*(1024-128)}", "type": "MEMORY"}, + {"value": "32", "type": "VCPU"} + ], "ulimits": [], - "vcpus": 32, "volumes": [] } CONTAINER_PROPERTIES From 120f22b5bea7d3a6adeb5842976184b5b9b9dda6 Mon Sep 17 00:00:00 2001 From: EC2 Default User Date: Tue, 9 Mar 2021 17:23:04 +0000 Subject: [PATCH 9/9] Add environment variables to 8,16,64G job defs. Fix indents --- terraform/batch.tf | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/terraform/batch.tf b/terraform/batch.tf index eb9a9f5b..ce1bbc6f 100644 --- a/terraform/batch.tf +++ b/terraform/batch.tf @@ -181,7 +181,11 @@ resource "aws_batch_job_definition" "calcloud_8g" { container_properties = <