From e968c0cd62f4455dc782a2e63581b99e6e68266b Mon Sep 17 00:00:00 2001 From: Alexander Veit <53857412+alexander-veit@users.noreply.github.com> Date: Tue, 6 Dec 2022 13:20:02 -0500 Subject: [PATCH] Support Graviton instances and switch to EC2 fleets (#375) * Initial version * Minor changes * Some refactoring, add tests * Modifiy docs * Cleanup + version bump * Add AMIs * Address reviewer comments * Remove unnecessary permissions --- CHANGELOG.rst | 9 + awsf3-docker/Dockerfile | 28 +- awsf3-docker/run.sh | 1 + awsf3/aws_run_workflow_generic.sh | 5 +- awsf3/cloudwatch_agent_config.json | 3 +- awsf3/utils.py | 1 + docs/ami.rst | 4 +- docs/execution_json.rst | 11 +- docs/news.rst | 6 + poetry.lock | 58 +-- pyproject.toml | 3 +- scripts/publish-docker | 5 +- test_json/unicorn/medium_nonspot.postrun.json | 1 + test_json/unicorn/small_spot.postrun.json | 1 + .../unicorn/small_spot_gp3_iops.postrun.json | 1 + ...mall_spot_gp3_iops_throughput.postrun.json | 1 + .../unicorn/small_spot_io2_iops.postrun.json | 1 + .../postrunjson/GBPtlqb2rFGH.postrun.json | 1 + tests/tibanna/unicorn/test_ec2_utils.py | 383 ++++++++--------- tests/tibanna/unicorn/test_job.py | 2 +- tibanna/__main__.py | 10 +- tibanna/ami.py | 32 +- tibanna/awsem.py | 3 +- tibanna/core.py | 18 +- tibanna/create_ami_userdata | 21 +- tibanna/ec2_utils.py | 385 +++++++++++------- tibanna/iam_utils.py | 4 +- tibanna/job.py | 3 +- tibanna/pricing_utils.py | 10 +- tibanna/vars.py | 67 +-- 30 files changed, 590 insertions(+), 488 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 63f4281fd..ce0b7f1c7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,15 @@ Change Log ========== +3.0.0 +===== + +* Added support for Graviton instances. +* Removed ``other_instance_types`` as option for ``behavior_on_capacity_limit``. It will fall back to ``wait_and_retry``. +* Multiple instance types can be specified in the configuration. If ``spot_instance`` is enabled, Tibanna will run the workflow on the instance with the highest available capacity. If ``spot_instance`` is disabled, it will run the workflow on the cheapest instance in the list. +* Instead of using the ``run_instance`` command we switch to EC2 fleets (in instant mode) to start up instances. + + 2.2.6 ===== diff --git a/awsf3-docker/Dockerfile b/awsf3-docker/Dockerfile index 6e952000a..ec1bfab62 100755 --- a/awsf3-docker/Dockerfile +++ b/awsf3-docker/Dockerfile @@ -35,25 +35,31 @@ RUN apt update -y && apt upgrade -y && apt install -y \ libseccomp-dev \ pkg-config \ openjdk-8-jre-headless \ - nodejs + nodejs \ + gnupg \ + lsb-release RUN ln -s /usr/bin/python3.8 /usr/bin/python #RUN ln -s /usr/bin/pip3 /usr/bin/pip WORKDIR /usr/local/bin -# docker inside docker -RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - \ - && apt-key fingerprint 0EBFCD88 \ - && add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" -RUN apt-get update -y \ - && apt-cache policy docker-ce \ - && apt-get install -y docker-ce +# install docker inside docker +RUN mkdir -p /etc/apt/keyrings +RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg + +RUN echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \ + $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null + +RUN apt-get update +RUN apt-get --assume-yes install docker-ce # singularity -RUN wget https://golang.org/dl/go1.16.6.linux-amd64.tar.gz && \ - tar -xzf go1.16.6.linux-amd64.tar.gz && \ - rm go1.16.6.linux-amd64.tar.gz +RUN ARCH="$(dpkg --print-architecture)" && \ + wget "https://golang.org/dl/go1.16.6.linux-${ARCH}.tar.gz" && \ + tar -xzf "go1.16.6.linux-${ARCH}.tar.gz" && \ + rm "go1.16.6.linux-${ARCH}.tar.gz" RUN export SINGULARITY_VERSION=3.8.1 && \ export PATH=/usr/local/bin/go/bin/:$PATH && \ wget https://github.com/sylabs/singularity/releases/download/v${SINGULARITY_VERSION}/singularity-ce-${SINGULARITY_VERSION}.tar.gz && \ diff --git a/awsf3-docker/run.sh b/awsf3-docker/run.sh index e1e6ed253..b8390cbd3 100755 --- a/awsf3-docker/run.sh +++ b/awsf3-docker/run.sh @@ -48,6 +48,7 @@ export TOPLATESTFILE=$LOCAL_OUTDIR/$JOBID.top_latest # this one includes only t export INSTANCE_ID=$(ec2metadata --instance-id|cut -d' ' -f2) export INSTANCE_REGION=$(ec2metadata --availability-zone | sed 's/[a-z]$//') export INSTANCE_AVAILABILITY_ZONE=$(ec2metadata --availability-zone) +export INSTANCE_TYPE=$(ec2metadata --instance-type) export AWS_ACCOUNT_ID=$(aws sts get-caller-identity| grep Account | sed 's/[^0-9]//g') export AWS_REGION=$INSTANCE_REGION # this is for importing awsf3 package which imports tibanna package export LOCAL_OUTDIR_CWL=$MOUNT_DIR_PREFIX$LOCAL_OUTDIR diff --git a/awsf3/aws_run_workflow_generic.sh b/awsf3/aws_run_workflow_generic.sh index 6ffa4b170..4bb2f695a 100755 --- a/awsf3/aws_run_workflow_generic.sh +++ b/awsf3/aws_run_workflow_generic.sh @@ -182,8 +182,11 @@ exl echo "## Installing and activating Cloudwatch agent to collect metrics" cwd0=$(pwd) cd ~ +ARCHITECTURE="$(dpkg --print-architecture)" +CW_AGENT_LINK="https://s3.amazonaws.com/amazoncloudwatch-agent/ubuntu/${ARCHITECTURE}/latest/amazon-cloudwatch-agent.deb" apt install -y wget -wget https://s3.amazonaws.com/amazoncloudwatch-agent/ubuntu/amd64/latest/amazon-cloudwatch-agent.deb +exl echo "Loading Cloudwatch Agent from ${CW_AGENT_LINK}" +wget "${CW_AGENT_LINK}" sudo dpkg -i -E ./amazon-cloudwatch-agent.deb # If we want to collect new metrics, the following file has to be modified exl echo "## Using CW Agent config: https://raw.githubusercontent.com/4dn-dcic/tibanna/master/awsf3/cloudwatch_agent_config.json" diff --git a/awsf3/cloudwatch_agent_config.json b/awsf3/cloudwatch_agent_config.json index ab88aefc2..f3ce20e8b 100644 --- a/awsf3/cloudwatch_agent_config.json +++ b/awsf3/cloudwatch_agent_config.json @@ -42,7 +42,8 @@ "measurement": [ "io_time", "read_bytes", - "iops_in_progress" + "iops_in_progress", + "read_time" ], "metrics_collection_interval": 60, "resources": [ diff --git a/awsf3/utils.py b/awsf3/utils.py index 34fc24b29..fb575058f 100644 --- a/awsf3/utils.py +++ b/awsf3/utils.py @@ -348,6 +348,7 @@ def update_postrun_json_init(json_old, json_new): prj.Job.instance_id = os.getenv('INSTANCE_ID') prj.Job.filesystem = os.getenv('EBS_DEVICE') prj.Job.instance_availablity_zone = os.getenv('INSTANCE_AVAILABILITY_ZONE') + prj.Job.instance_type = os.getenv('INSTANCE_TYPE') # write to new json file write_postrun_json(json_new, prj) diff --git a/docs/ami.rst b/docs/ami.rst index fc01bfc27..bfd458ce5 100755 --- a/docs/ami.rst +++ b/docs/ami.rst @@ -2,7 +2,7 @@ Amazon Machine Image ==================== -Tibanna now uses a single Amazon Machine Image (AMI) ``ami-06e2266f85063aabc``, which is made public for ``us-east-1``. One can find them among Community AMIs. (Tibanna automatically finds and uses them, so no need to worry about it.) +Tibanna now uses the Amazon Machine Images (AMI) ``ami-06e2266f85063aabc`` (``x86``) and ``ami-0f3e90ad8e76c7a32`` (``Arm``), which are made public for ``us-east-1``. One can find them among Community AMIs. (Tibanna automatically finds and uses them, so no need to worry about it.) -For regions that are not ``us-east-1``, a copy of the same AMI is publicly available (different AMI ID) and is auto-detected by Tibanna. +For regions that are not ``us-east-1``, copies of these AMIs are publicly available (different AMI IDs) and are auto-detected by Tibanna. diff --git a/docs/execution_json.rst b/docs/execution_json.rst index 3ba058bfd..494df69cf 100755 --- a/docs/execution_json.rst +++ b/docs/execution_json.rst @@ -469,7 +469,12 @@ The ``config`` field describes execution configuration. :instance_type: - - This or ``mem`` and ``cpu`` are required if Benchmark is not available for a given workflow. - - If both ``instance_type`` and ``mem`` & ``cpu`` are specified, then ``instance_type`` is the first choice. + - ``instance_type`` can be a string (e.g., ``t3.micro``) or a list (e.g., ``[t3.micro, t3.small]``). If ``spot_instance`` + is enabled, Tibanna will run the workflow on the instance with the highest available capacity. If ``spot_instance`` + is disabled, it will run the workflow on the cheapest instance in the list. + - If both ``instance_type`` and ``mem`` & ``cpu`` are specified, Tibanna internally creates a list of instances that + are directly specified in ``instance_type`` and instances that satisfy the ``mem`` & ``cpu`` requirement. One instance is chosen + according to the rules above to run the workflow. :mem: - @@ -588,9 +593,7 @@ The ``config`` field describes execution configuration. - available options : - ``fail`` (default) - - ``wait_and_retry`` (wait and retry with the same instance type again), - - ``other_instance_types`` top 10 cost-effective instance types will be tried in the order - (``mem`` and ``cpu`` must be set in order for this to work), + - ``wait_and_retry`` (wait and retry with the same instance type again.), - ``retry_without_spot`` (try with the same instance type but not a spot instance) : this option is applicable only when ``spot_instance`` is set to ```True`` diff --git a/docs/news.rst b/docs/news.rst index 2fb902c96..b1fbbec57 100755 --- a/docs/news.rst +++ b/docs/news.rst @@ -19,6 +19,12 @@ Version updates .. _releases: https://github.com/4dn-dcic/tibanna/releases + **Nov 18, 2022** The latest version is now 3.0.0_. + - Tibanna now supports AWS Graviton-based instances. + - The instance type configuration now allows single instances (e.g., ``t3.micro``) and lists (e.g., ``[t3.micro, t3.small]``). If ``spot_instance`` is enabled, Tibanna will run the workflow on the instance with the highest available capacity. If ``spot_instance`` is disabled, it will run the workflow on the cheapest instance in the list. + - The option ``other_instance_types`` for ``behavior_on_capacity_limit`` has been removed. It will fall back to ``wait_and_retry``. + + **Mar 10, 2022** The latest version is now 2.0.0_. - The default Python version for Tibanna is now 3.8 (or 3.7). Python 3.6 is no longer supported. diff --git a/poetry.lock b/poetry.lock index 21082c349..b384316be 100644 --- a/poetry.lock +++ b/poetry.lock @@ -49,14 +49,14 @@ python-versions = ">=3.5,<4.0" [[package]] name = "boto3" -version = "1.26.14" +version = "1.26.18" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">= 3.7" [package.dependencies] -botocore = ">=1.29.14,<1.30.0" +botocore = ">=1.29.18,<1.30.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -65,7 +65,7 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.29.14" +version = "1.29.18" description = "Low-level, data-driven core of boto 3." category = "main" optional = false @@ -168,7 +168,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [[package]] name = "importlib-metadata" -version = "5.0.0" +version = "5.1.0" description = "Read metadata from Python packages" category = "dev" optional = false @@ -237,19 +237,6 @@ category = "dev" optional = false python-versions = "*" -[[package]] -name = "mock" -version = "4.0.0" -description = "Rolling backport of unittest.mock for all Pythons" -category = "dev" -optional = false -python-versions = ">=3.6" - -[package.extras] -build = ["blurb", "twine", "wheel"] -docs = ["sphinx"] -test = ["pytest", "pytest-cov"] - [[package]] name = "packaging" version = "21.3" @@ -451,7 +438,7 @@ crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"] [[package]] name = "setuptools" -version = "65.6.0" +version = "65.6.3" description = "Easily download, build, install, upgrade, and uninstall Python packages" category = "dev" optional = false @@ -650,11 +637,11 @@ python-versions = ">=3.7" [[package]] name = "urllib3" -version = "1.26.12" +version = "1.26.13" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [package.extras] brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"] @@ -663,7 +650,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] name = "zipp" -version = "3.10.0" +version = "3.11.0" description = "Backport of pathlib-compatible object wrapper for zip files" category = "dev" optional = false @@ -676,7 +663,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "1.1" python-versions = ">=3.7,<3.9" -content-hash = "75cd72d4c44cb6d66331c434d32972ebcd6e4e76cace53b400e2f49b2f068eb2" +content-hash = "68adfcf243207956cac7435917fd48221c980ea08dff5ad71db74369bde7a6bd" [metadata.files] alabaster = [ @@ -698,12 +685,12 @@ benchmark-4dn = [ {file = "benchmark_4dn-0.5.19-py3-none-any.whl", hash = "sha256:6ebb2f2349005dc23c0cd5cbba13851fd6e258dc4705c050bf98d9663f0b3899"}, ] boto3 = [ - {file = "boto3-1.26.14-py3-none-any.whl", hash = "sha256:9841e0bec9697979394632e33588e57c7bbf60e3c384740df0acadfe5014d709"}, - {file = "boto3-1.26.14.tar.gz", hash = "sha256:d61c97ed51a16f66b6ce5321e611626b0120e80cc41025c6f58e281859d7fbf8"}, + {file = "boto3-1.26.18-py3-none-any.whl", hash = "sha256:933c88b189112a5fdd82d49ef00f95b9dd649d195e557a81aecb773a3e01c517"}, + {file = "boto3-1.26.18.tar.gz", hash = "sha256:3c7315da16eb0b41823965e5ce55f99cb07e94680e0ed7830c581f505fb5bd15"}, ] botocore = [ - {file = "botocore-1.29.14-py3-none-any.whl", hash = "sha256:208ca5c3d8299d45a19b912dc791e3297d2961873ff37131d4fc3eb86365645c"}, - {file = "botocore-1.29.14.tar.gz", hash = "sha256:20fb0978beac90fd8c45b57936f2c1c182e19f1622b09a481a050723632a485c"}, + {file = "botocore-1.29.18-py3-none-any.whl", hash = "sha256:2aba44433b6eac6d3a12cf93f2985e2d7a843307c1a527042fc48dd09b273992"}, + {file = "botocore-1.29.18.tar.gz", hash = "sha256:26e86fce95049f6cc18b5611901549943c4c22522fa8a3b6b265404f673977b2"}, ] certifi = [ {file = "certifi-2022.9.24-py3-none-any.whl", hash = "sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382"}, @@ -790,8 +777,8 @@ imagesize = [ {file = "imagesize-1.4.1.tar.gz", hash = "sha256:69150444affb9cb0d5cc5a92b3676f0b2fb7cd9ae39e947a5e11a36b4497cd4a"}, ] importlib-metadata = [ - {file = "importlib_metadata-5.0.0-py3-none-any.whl", hash = "sha256:ddb0e35065e8938f867ed4928d0ae5bf2a53b7773871bfe6bcc7e4fcdc7dea43"}, - {file = "importlib_metadata-5.0.0.tar.gz", hash = "sha256:da31db32b304314d044d3c12c79bd59e307889b287ad12ff387b3500835fc2ab"}, + {file = "importlib_metadata-5.1.0-py3-none-any.whl", hash = "sha256:d84d17e21670ec07990e1044a99efe8d615d860fd176fc29ef5c306068fda313"}, + {file = "importlib_metadata-5.1.0.tar.gz", hash = "sha256:d5059f9f1e8e41f80e9c56c2ee58811450c31984dfa625329ffd7c0dad88a73b"}, ] iniconfig = [ {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"}, @@ -856,9 +843,6 @@ mccabe = [ {file = "mccabe-0.6.1-py2.py3-none-any.whl", hash = "sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42"}, {file = "mccabe-0.6.1.tar.gz", hash = "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"}, ] -mock = [ - {file = "mock-4.0.0.tar.gz", hash = "sha256:8fff3fd7c5796ea78ae2847f32e87ad4e111e03fef6e90d03b5efb4882211d78"}, -] packaging = [ {file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"}, {file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"}, @@ -924,8 +908,8 @@ s3transfer = [ {file = "s3transfer-0.6.0.tar.gz", hash = "sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"}, ] setuptools = [ - {file = "setuptools-65.6.0-py3-none-any.whl", hash = "sha256:6211d2f5eddad8757bd0484923ca7c0a6302ebc4ab32ea5e94357176e0ca0840"}, - {file = "setuptools-65.6.0.tar.gz", hash = "sha256:d1eebf881c6114e51df1664bc2c9133d022f78d12d5f4f665b9191f084e2862d"}, + {file = "setuptools-65.6.3-py3-none-any.whl", hash = "sha256:57f6f22bde4e042978bcd50176fdb381d7c21a9efa4041202288d3737a0c6a54"}, + {file = "setuptools-65.6.3.tar.gz", hash = "sha256:a7620757bf984b58deaf32fc8a4577a9bbc0850cf92c20e1ce41c38c19e5fb75"}, ] six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, @@ -992,10 +976,10 @@ typing-extensions = [ {file = "typing_extensions-4.4.0.tar.gz", hash = "sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa"}, ] urllib3 = [ - {file = "urllib3-1.26.12-py2.py3-none-any.whl", hash = "sha256:b930dd878d5a8afb066a637fbb35144fe7901e3b209d1cd4f524bd0e9deee997"}, - {file = "urllib3-1.26.12.tar.gz", hash = "sha256:3fa96cf423e6987997fc326ae8df396db2a8b7c667747d47ddd8ecba91f4a74e"}, + {file = "urllib3-1.26.13-py2.py3-none-any.whl", hash = "sha256:47cc05d99aaa09c9e72ed5809b60e7ba354e64b59c9c173ac3018642d8bb41fc"}, + {file = "urllib3-1.26.13.tar.gz", hash = "sha256:c083dd0dce68dbfbe1129d5271cb90f9447dea7d52097c6e0126120c521ddea8"}, ] zipp = [ - {file = "zipp-3.10.0-py3-none-any.whl", hash = "sha256:4fcb6f278987a6605757302a6e40e896257570d11c51628968ccb2a47e80c6c1"}, - {file = "zipp-3.10.0.tar.gz", hash = "sha256:7a7262fd930bd3e36c50b9a64897aec3fafff3dfdeec9623ae22b40e93f99bb8"}, + {file = "zipp-3.11.0-py3-none-any.whl", hash = "sha256:83a28fcb75844b5c0cdaf5aa4003c2d728c77e05f5aeabe8e95e56727005fbaa"}, + {file = "zipp-3.11.0.tar.gz", hash = "sha256:a7a22e05929290a67401440b39690ae6563279bced5f314609d9d03798f56766"}, ] diff --git a/pyproject.toml b/pyproject.toml index d9138ab92..4ebfc3eee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tibanna" -version = "2.2.6" +version = "3.0.0" description = "Tibanna runs portable pipelines (in CWL/WDL) on the AWS Cloud." authors = ["4DN-DCIC Team "] license = "MIT" @@ -43,7 +43,6 @@ flake8 = "^3.9.0" pytest = "^6.0" pytest-cov = "^3.0.0" pytest-parallel = "^0.1.1" -mock = "4.0" pytest-mock = "3.7" coverage = {extras = ["toml"], version = "^6.3.2"} diff --git a/scripts/publish-docker b/scripts/publish-docker index 479478856..0f4df8e66 100755 --- a/scripts/publish-docker +++ b/scripts/publish-docker @@ -4,5 +4,6 @@ export BUILD_LOG=/tmp/build-log export VERSION=$(python -c 'from tibanna._version import __version__; print(__version__)') export AWSF_IMAGE=$(python -c 'from tibanna.vars import DEFAULT_AWSF_IMAGE; print(DEFAULT_AWSF_IMAGE)') -docker build -t $AWSF_IMAGE --build-arg version=$VERSION awsf3-docker/ > $BUILD_LOG -docker push $AWSF_IMAGE +# Your local docker driver needs to support the multiple platforms feature +docker buildx build --push --platform linux/amd64,linux/arm64 -t $AWSF_IMAGE --build-arg version=$VERSION awsf3-docker/ > $BUILD_LOG + diff --git a/test_json/unicorn/medium_nonspot.postrun.json b/test_json/unicorn/medium_nonspot.postrun.json index 9590b44b8..3fa1ac252 100644 --- a/test_json/unicorn/medium_nonspot.postrun.json +++ b/test_json/unicorn/medium_nonspot.postrun.json @@ -53,6 +53,7 @@ "status": "0", "filesystem": "/dev/nvme1n1", "instance_id": "i-01769a822e5dbb407", + "instance_type": "t3.medium", "instance_availablity_zone": "us-east-1b", "total_input_size": "12K", "total_output_size": "36K", diff --git a/test_json/unicorn/small_spot.postrun.json b/test_json/unicorn/small_spot.postrun.json index 9198e7ebf..f43b3abdf 100644 --- a/test_json/unicorn/small_spot.postrun.json +++ b/test_json/unicorn/small_spot.postrun.json @@ -53,6 +53,7 @@ "status": "0", "filesystem": "/dev/nvme1n1", "instance_id": "i-01769a822e5dbb407", + "instance_type": "t3.small", "instance_availablity_zone": "us-east-1b", "total_input_size": "12K", "total_output_size": "36K", diff --git a/test_json/unicorn/small_spot_gp3_iops.postrun.json b/test_json/unicorn/small_spot_gp3_iops.postrun.json index de7b2fa1b..c52ec8abf 100644 --- a/test_json/unicorn/small_spot_gp3_iops.postrun.json +++ b/test_json/unicorn/small_spot_gp3_iops.postrun.json @@ -53,6 +53,7 @@ "status": "0", "filesystem": "/dev/nvme1n1", "instance_id": "i-01769a822e5dbb407", + "instance_type": "t3.small", "instance_availablity_zone": "us-east-1b", "total_input_size": "12K", "total_output_size": "36K", diff --git a/test_json/unicorn/small_spot_gp3_iops_throughput.postrun.json b/test_json/unicorn/small_spot_gp3_iops_throughput.postrun.json index 02ae76bfa..949839856 100644 --- a/test_json/unicorn/small_spot_gp3_iops_throughput.postrun.json +++ b/test_json/unicorn/small_spot_gp3_iops_throughput.postrun.json @@ -53,6 +53,7 @@ "status": "0", "filesystem": "/dev/nvme1n1", "instance_id": "i-01769a822e5dbb407", + "instance_type": "t3.small", "instance_availablity_zone": "us-east-1b", "total_input_size": "12K", "total_output_size": "36K", diff --git a/test_json/unicorn/small_spot_io2_iops.postrun.json b/test_json/unicorn/small_spot_io2_iops.postrun.json index 379a646fe..045f0c080 100644 --- a/test_json/unicorn/small_spot_io2_iops.postrun.json +++ b/test_json/unicorn/small_spot_io2_iops.postrun.json @@ -53,6 +53,7 @@ "status": "0", "filesystem": "/dev/nvme1n1", "instance_id": "i-01769a822e5dbb407", + "instance_type": "t3.small", "instance_availablity_zone": "us-east-1f", "total_input_size": "12K", "total_output_size": "36K", diff --git a/tests/awsf3/postrunjson/GBPtlqb2rFGH.postrun.json b/tests/awsf3/postrunjson/GBPtlqb2rFGH.postrun.json index 26265c769..1fd7f9e01 100644 --- a/tests/awsf3/postrunjson/GBPtlqb2rFGH.postrun.json +++ b/tests/awsf3/postrunjson/GBPtlqb2rFGH.postrun.json @@ -112,6 +112,7 @@ "filesystem": "", "instance_availablity_zone": "", "instance_id": "", + "instance_type": "", "start_time": "20210312-14:04:23-UTC" }, "config": { diff --git a/tests/tibanna/unicorn/test_ec2_utils.py b/tests/tibanna/unicorn/test_ec2_utils.py index 6272e9fa9..600a106cf 100755 --- a/tests/tibanna/unicorn/test_ec2_utils.py +++ b/tests/tibanna/unicorn/test_ec2_utils.py @@ -1,8 +1,9 @@ import boto3 import pytest +import base64 import os import json -import mock +from unittest import mock from tibanna.job import Jobs from tibanna.ec2_utils import ( UnicornInput, @@ -243,7 +244,7 @@ def test_execution_mem_cpu(): assert 'args' in unicorn_dict assert 'config' in unicorn_dict assert 'instance_type' in unicorn_dict['config'] - assert 't3.small' in execution.instance_type_list # instance_type is now randomized, so just check presence + assert 't3.small' in execution.instance_type_list def test_execution_benchmark(): @@ -263,9 +264,8 @@ def test_execution_benchmark(): print(unicorn_dict) assert 'args' in unicorn_dict assert 'config' in unicorn_dict - assert 'instance_type' in unicorn_dict['config'] - assert unicorn_dict['config']['instance_type'] == 't3.micro' assert unicorn_dict['config']['ebs_size'] == 15 + assert execution.instance_type_list[0] == 't3.micro' # cleanup afterwards s3.delete_objects(Bucket='tibanna-output', Delete={'Objects': [{'Key': randomstr}]}) @@ -539,9 +539,14 @@ def test_create_userdata(): 'jobid': 'myjobid'} execution = Execution(input_dict) userdata = execution.create_userdata() - print(userdata) + + # userdata is a base64 encoded - decode for testing + base64_bytes = userdata.encode('ascii') + base64_msg = base64.b64decode(base64_bytes) + userdata_str = base64_msg.decode('ascii') + assert userdata - assert 'JOBID=myjobid' in userdata + assert 'JOBID=myjobid' in userdata_str # cleanup afterwards s3.delete_objects(Bucket='tibanna-output', Delete={'Objects': [{'Key': randomstr}]}) @@ -563,9 +568,14 @@ def test_create_userdata_w_profile(): execution = Execution(input_dict) profile = {'access_key': 'haha', 'secret_key': 'lala'} userdata = execution.create_userdata(profile=profile) - print(userdata) + + # userdata is a base64 encoded - decode for testing + base64_bytes = userdata.encode('ascii') + base64_msg = base64.b64decode(base64_bytes) + userdata_str = base64_msg.decode('ascii') + assert userdata - assert '-a haha -s lala' in userdata + assert '-a haha -s lala' in userdata_str # cleanup afterwards s3.delete_objects(Bucket='tibanna-output', Delete={'Objects': [{'Key': randomstr}]}) @@ -614,83 +624,9 @@ def test_upload_run_json_encrypt_s3_upload(): Delete={'Objects': [{'Key': jobid + '.run.json'}]}) -def test_launch_args(): - """test creating launch arguments - also test spot_instance""" - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't3.small', - 'spot_instance': True}, - 'jobid': jobid} - execution = Execution(input_dict) - # userdata is required before launch_args is created - execution.userdata = execution.create_userdata() - launch_args = execution.launch_args - print(launch_args) - assert launch_args - assert 't3.small' in str(launch_args) - assert 'InstanceMarketOptions' in str(launch_args) - -@pytest.mark.parametrize('subnets', [ - ['subnet-000001', 'subnet-000002'], 'subnet-000001', None -]) -def test_launch_args_subnet(subnets): - """test creating launch arguments with subnet array argument """ - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't3.small', - 'spot_instance': False, 'subnet': subnets}, - 'jobid': jobid} - execution = Execution(input_dict) - # userdata is required before launch_args is created - execution.userdata = execution.create_userdata() - launch_args = execution.launch_args - assert launch_args - if isinstance(subnets, str): - assert launch_args['SubnetId'] == subnets - elif isinstance(subnets, list): - assert launch_args['SubnetId'] in subnets - else: - assert 'SubnetId' not in launch_args - - -@pytest.mark.parametrize('subnets', [ - 'subnet-000001,subnet-000002', 'subnet-000001' -]) -def test_launch_args_subnet_environ(subnets): - """ Tests pulling in subnets from env variable """ - old_environ = dict(os.environ) - os.environ.update({ - 'SUBNETS': subnets - }) - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't3.small', - 'spot_instance': False}, - 'jobid': jobid} - execution = Execution(input_dict) - # userdata is required before launch_args is created - execution.userdata = execution.create_userdata() - launch_args = execution.launch_args - assert launch_args - if isinstance(subnets, str) and ',' not in subnets: - assert launch_args['SubnetId'] == subnets - else: - assert launch_args['SubnetId'] in subnets - os.environ.clear() - os.environ.update(old_environ) # ensure old state restored - - -def test_launch_and_get_instance_id(): +@mock.patch.object(Execution, 'create_fleet') +def test_launch_and_get_instance_id(test_create_fleet): """test dryrun of ec2 launch""" jobid = create_jobid() log_bucket = 'tibanna-output' @@ -698,196 +634,205 @@ def test_launch_and_get_instance_id(): 'cwl_main_filename': 'md5.cwl', 'cwl_directory_url': 'someurl'}, 'config': {'log_bucket': log_bucket, 'mem': 1, 'cpu': 1, - 'spot_instance': True}, + 'spot_instance': True, 'behavior_on_capacity_limit': 'fail'}, 'jobid': jobid} + + test_create_fleet.return_value = { + "Errors":[{ + "ErrorCode": "Unhandled_error", + "ErrorMessage": "Unhandled_error" + }] + } execution = Execution(input_dict, dryrun=True) - # userdata is required before launch_args is created - execution.userdata = execution.create_userdata() with pytest.raises(Exception) as ex: execution.launch_and_get_instance_id() - assert 'Request would have succeeded, but DryRun flag is set' in str(ex.value) + assert 'Failed to launch instance for job' in str(ex.value) + test_create_fleet.return_value = { + "Errors":[{ + "ErrorCode": "InsufficientInstanceCapacity", + "ErrorMessage": "InsufficientInstanceCapacity" + }] + } + execution = Execution(input_dict, dryrun=True) + with pytest.raises(EC2InstanceLimitException) as ex: + execution.launch_and_get_instance_id() + assert 'Instance limit exception' in str(ex.value) -def test_ec2_exception_coordinator2(): - """ec2 limit exceptions with 'fail'""" - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 'c5.4xlarge', - 'spot_instance': True}, - 'jobid': jobid} + input_dict['config']['behavior_on_capacity_limit'] = 'wait_and_retry' execution = Execution(input_dict, dryrun=True) - execution.userdata = execution.create_userdata() - with pytest.raises(EC2InstanceLimitException) as exec_info: - execution.ec2_exception_coordinator(fun)() - assert exec_info + with pytest.raises(EC2InstanceLimitWaitException) as ex: + execution.launch_and_get_instance_id() + assert 'Instance limit exception - wait and retry later' in str(ex.value) + input_dict['config']['behavior_on_capacity_limit'] = 'retry_without_spot' + input_dict['config']['spot_instance'] = False + execution = Execution(input_dict, dryrun=True) + with pytest.raises(Exception) as ex: + execution.launch_and_get_instance_id() + assert "'retry_without_spot' works only with 'spot_instance'" in str(ex.value) -def test_ec2_exception_coordinator3(): - """ec2 exceptions with 'wait_and_retry'""" - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 'c5.4xlarge', - 'spot_instance': True, - 'behavior_on_capacity_limit': 'wait_and_retry'}, - 'jobid': jobid} + input_dict['config']['behavior_on_capacity_limit'] = 'retry_without_spot' + input_dict['config']['spot_instance'] = True execution = Execution(input_dict, dryrun=True) - execution.userdata = execution.create_userdata() - with pytest.raises(EC2InstanceLimitWaitException) as exec_info: - execution.ec2_exception_coordinator(fun)() - assert exec_info + with pytest.raises(EC2InstanceLimitException) as ex: + execution.launch_and_get_instance_id() + assert 'Instance limit exception' in str(ex.value) + assert execution.cfg.behavior_on_capacity_limit == 'fail' + assert execution.cfg.spot_instance == False -def test_ec2_exception_coordinator4(): - """ec2 exceptions with 'other_instance_types'""" +def test_create_fleet_spec(): + """Test fleet specs for different input configs""" jobid = create_jobid() - log_bucket = 'tibanna-output' input_dict = {'args': {'output_S3_bucket': 'somebucket', 'cwl_main_filename': 'md5.cwl', 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'mem': 1, 'cpu': 1, - 'spot_instance': True, 'mem_as_is': True, - 'behavior_on_capacity_limit': 'other_instance_types'}, + 'config': {'log_bucket': 'tibanna-output', + 'instance_type': 't2.micro', + 'behavior_on_capacity_limit': 'fail'}, 'jobid': jobid} + execution = Execution(input_dict, dryrun=True) + fleet_spec = execution.create_fleet_spec() + potential_ec2s = fleet_spec["LaunchTemplateConfigs"][0]["Overrides"] + assert len(potential_ec2s) == 1 + assert potential_ec2s[0]["InstanceType"] == "t2.micro" - def retry_for_new_instance(previous_instance): - """ Try to get a new instance type different from the previous run """ - tries = 10 - for _ in range(tries): - res = execution.ec2_exception_coordinator(fun)() - assert res == 'continue' - try: - assert execution.cfg.instance_type != previous_instance - return execution.cfg.instance_type - except AssertionError: # just try again - continue - raise AssertionError('Did not randomly retrieve a new instance type after 10 tries') + input_dict['config']['instance_type'] = ['t2.micro'] + execution = Execution(input_dict, dryrun=True) + fleet_spec = execution.create_fleet_spec() + potential_ec2s = fleet_spec["LaunchTemplateConfigs"][0]["Overrides"] + assert len(potential_ec2s) == 1 + assert potential_ec2s[0]["InstanceType"] == "t2.micro" + input_dict['config']['instance_type'] = ['t2.micro','a1.medium'] + input_dict['config']['subnet'] = ['subnet_1', 'subnet_2'] execution = Execution(input_dict, dryrun=True) - assert 't3.micro' in execution.instance_type_list - execution.userdata = execution.create_userdata() - new_instance_type = retry_for_new_instance(execution.cfg.instance_type) - new_instance_type2 = retry_for_new_instance(new_instance_type) - retry_for_new_instance(new_instance_type2) + fleet_spec = execution.create_fleet_spec() + potential_ec2s = fleet_spec["LaunchTemplateConfigs"][0]["Overrides"] + assert len(potential_ec2s) == 4 + assert potential_ec2s[0]["InstanceType"] == potential_ec2s[1]["InstanceType"] + assert potential_ec2s[0]["ImageId"] != potential_ec2s[2]["ImageId"] -@pytest.mark.skip # this test is no longer relevant as 'other_instance_types' will randomly spin forever now -def test_ec2_exception_coordinator5(): - """ec2 exceptions with 'other_instance_types' but had only one option""" +def test_create_fleet(): + """Test the create_fleet command""" jobid = create_jobid() - log_bucket = 'tibanna-output' input_dict = {'args': {'output_S3_bucket': 'somebucket', 'cwl_main_filename': 'md5.cwl', 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't2.micro', - 'spot_instance': True, - 'behavior_on_capacity_limit': 'other_instance_types'}, + 'config': {'log_bucket': 'tibanna-output', + 'instance_type': 't2.micro', + 'behavior_on_capacity_limit': 'fail'}, 'jobid': jobid} execution = Execution(input_dict, dryrun=True) - assert execution.cfg.instance_type == 't2.micro' execution.userdata = execution.create_userdata() - with pytest.raises(EC2InstanceLimitException) as exec_info: - execution.ec2_exception_coordinator(fun)() - assert 'No more instance type available' in str(exec_info.value) + with pytest.raises(Exception) as ex: + execution.create_fleet() + assert 'Request would have succeeded, but DryRun flag is set.' in str(ex.value) -def test_ec2_exception_coordinator6(): - """ec2 exceptions with 'retry_without_spot'""" +@pytest.mark.parametrize('subnets', [ + 'subnet-000001,subnet-000002', 'subnet-000001', None +]) +def test_fleet_spec_subnet_env(subnets): + """Test fleet specs for different subnets""" + old_environ = dict(os.environ) + if subnets: + os.environ.update({ + 'SUBNETS': subnets + }) jobid = create_jobid() - log_bucket = 'tibanna-output' input_dict = {'args': {'output_S3_bucket': 'somebucket', 'cwl_main_filename': 'md5.cwl', 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't2.micro', - 'spot_instance': True, - 'behavior_on_capacity_limit': 'retry_without_spot'}, + 'config': {'log_bucket': 'tibanna-output', + 'instance_type': 't2.micro', + 'behavior_on_capacity_limit': 'fail'}, 'jobid': jobid} execution = Execution(input_dict, dryrun=True) - execution.userdata = execution.create_userdata() - res = execution.ec2_exception_coordinator(fun)() - assert res == 'continue' - assert execution.cfg.spot_instance is False # changed to non-spot - assert execution.cfg.behavior_on_capacity_limit == 'fail' # changed to non-spot - with pytest.raises(EC2InstanceLimitException) as exec_info: - res = execution.ec2_exception_coordinator(fun)() # this time, it fails - assert exec_info + fleet_spec = execution.create_fleet_spec() + potential_ec2s = fleet_spec["LaunchTemplateConfigs"][0]["Overrides"] + if isinstance(subnets, str) and ',' not in subnets: + assert len(potential_ec2s) == 1 + assert potential_ec2s[0]["SubnetId"] == "subnet-000001" + elif isinstance(subnets, str): + assert len(potential_ec2s) == 2 + assert potential_ec2s[0]["SubnetId"] == "subnet-000001" + assert potential_ec2s[1]["SubnetId"] == "subnet-000002" + else: + assert len(potential_ec2s) == 1 + assert 'SubnetId' not in potential_ec2s[0] + + os.environ.clear() + os.environ.update(old_environ) # ensure old state restored -def test_ec2_exception_coordinator7(): - """ec2 exceptions with 'retry_without_spot' without spot instance""" + +def test_instance_types(): + """Test possible instance types for different input configs""" jobid = create_jobid() - log_bucket = 'tibanna-output' input_dict = {'args': {'output_S3_bucket': 'somebucket', 'cwl_main_filename': 'md5.cwl', 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't2.micro', - 'behavior_on_capacity_limit': 'retry_without_spot'}, + 'config': {'log_bucket': 'tibanna-output', + 'instance_type': 't2.micro', + 'behavior_on_capacity_limit': 'fail'}, 'jobid': jobid} - execution = Execution(input_dict, dryrun=True) - assert execution.cfg.spot_instance is False - execution.userdata = execution.create_userdata() - with pytest.raises(Exception) as exec_info: - execution.ec2_exception_coordinator(fun)() - assert "'retry_without_spot' works only with 'spot_instance'" in str(exec_info.value) + execution = Execution(input_dict, dryrun=True) + assert len(execution.instance_type_list) == 1 + assert execution.instance_type_list[0] == 't2.micro' -def test_ec2_exception_coordinator8(): - """ec2 exceptions with 'other_instance_types' with both instance_type and mem/cpu - specified""" - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, 'instance_type': 't2.micro', - 'mem': 1, 'cpu': 1, 'mem_as_is': True, - 'behavior_on_capacity_limit': 'other_instance_types'}, - 'jobid': jobid} + input_dict['config']['instance_type'] = ['t2.micro','a1.medium','t3.small'] execution = Execution(input_dict, dryrun=True) - # this list may change as AWS releases new instances - possible_instance_types = ['t2.micro', 't3.micro', 't3.small', 't2.small', 't3.medium', 'm1.small', 't2.medium', - 'm3.medium', 't3.large', 'c5.large'] + assert len(execution.instance_type_list) == 3 + assert execution.instance_type_infos['t2.micro']['EBS_optimized'] == False + assert execution.instance_type_infos['t3.small']['EBS_optimized'] == True + assert execution.instance_type_infos['t2.micro']['ami_id'] == execution.instance_type_infos['t3.small']['ami_id'] + assert execution.instance_type_infos['t2.micro']['ami_id'] != execution.instance_type_infos['a1.medium']['ami_id'] + + input_dict['config']['EBS_optimized'] = True + execution = Execution(input_dict, dryrun=True) + assert len(execution.instance_type_list) == 2 + for instance in execution.instance_type_list: + assert instance in ['a1.medium','t3.small'] + + del input_dict['config']['instance_type'] + input_dict['config']['mem'] = 2 + input_dict['config']['cpu'] = 1 + input_dict['config']['mem_as_is'] = True + execution = Execution(input_dict, dryrun=True) + possible_instance_types = ['t3.small', 't3.medium', 't3.large', 'c5.large', 'm5a.large'] + assert len(execution.instance_type_list) == 5 for instance in possible_instance_types: assert instance in execution.instance_type_list - assert execution.cfg.instance_type in possible_instance_types - execution.userdata = execution.create_userdata() - res = execution.ec2_exception_coordinator(fun)() - assert res == 'continue' - assert execution.cfg.instance_type in possible_instance_types - execution.userdata = execution.create_userdata() - res = execution.ec2_exception_coordinator(fun)() - assert res == 'continue' - assert execution.cfg.instance_type in possible_instance_types - -def test_ec2_exception_coordinator9(): - """ec2 exceptions with 'other_instance_types' with both instance_type and mem/cpu - specified""" - jobid = create_jobid() - log_bucket = 'tibanna-output' - input_dict = {'args': {'output_S3_bucket': 'somebucket', - 'cwl_main_filename': 'md5.cwl', - 'cwl_directory_url': 'someurl'}, - 'config': {'log_bucket': log_bucket, - 'mem': 2, 'cpu': 1, 'mem_as_is': True, - 'behavior_on_capacity_limit': 'other_instance_types'}, - 'jobid': jobid} + input_dict['config']['EBS_optimized'] = False execution = Execution(input_dict, dryrun=True) - # this list may change as AWS releases new instances possible_instance_types = ['t3.small', 't2.small', 't3.medium', 't2.medium', 'm3.medium', 't3.large', 'c5.large', 'm5a.large', 'm1.medium', 't2.large'] for instance in possible_instance_types: assert instance in execution.instance_type_list - assert execution.cfg.instance_type in possible_instance_types - execution.userdata = execution.create_userdata() - res = execution.ec2_exception_coordinator(fun)() - assert res == 'continue' - assert execution.cfg.instance_type in possible_instance_types + + input_dict['config'] = { + 'log_bucket': 'tibanna-output', + 'instance_type': ['t2.micro','a1.medium','t3.small'], + 'behavior_on_capacity_limit': 'fail', + 'ami_id': "my_ami" + } + execution = Execution(input_dict, dryrun=True) + for instance in ['t2.micro','a1.medium','t3.small']: + assert execution.instance_type_infos[instance]['ami_id'] == "my_ami" + + input_dict['config'] = { + 'log_bucket': 'tibanna-output', + 'instance_type': ['a1.medium'], + 'behavior_on_capacity_limit': 'fail', + 'mem': 2, 'cpu': 1 + } + execution = Execution(input_dict, dryrun=True) + assert len(execution.instance_type_list) == 11 def test_upload_workflow_to_s3(run_task_awsem_event_cwl_upload): diff --git a/tests/tibanna/unicorn/test_job.py b/tests/tibanna/unicorn/test_job.py index 4775f9aa7..f7d13c28e 100644 --- a/tests/tibanna/unicorn/test_job.py +++ b/tests/tibanna/unicorn/test_job.py @@ -2,7 +2,7 @@ from tibanna.utils import create_jobid from tibanna import dd_utils from tibanna.vars import DYNAMODB_TABLE, EXECUTION_ARN -import mock +from unittest import mock import boto3 import pytest diff --git a/tibanna/__main__.py b/tibanna/__main__.py index 9a75c115b..1ab875a1f 100755 --- a/tibanna/__main__.py +++ b/tibanna/__main__.py @@ -381,7 +381,11 @@ def args(self): 'help': "The ID of the Ubuntu 20.04 image to build from (e.g. 'ami-0885b1f6bd170450c' for us-east-1). " + "To use this option, turn on the option -B."}, {'flag': ["-r", "--replicate"], - 'help': "Enable to replicate across all regions defined by AMI_PER_REGION"} + 'help': "Enable to replicate across all regions defined by AMI_PER_REGION"}, + {'flag': ["-a", "--architecture"], + 'help': "Architecture: x86 or Arm. Default: x86. " + + "To use this option, turn on option -B. Ignored when option -U is used.", + 'default': "x86"} ] } @@ -525,10 +529,10 @@ def cleanup(usergroup, suffix='', purge_history=False, do_not_remove_iam_group=F def create_ami(make_public=False, build_from_scratch=False, source_image_to_copy_from=None, source_image_region=None, - ubuntu_base_image=None, replicate=False): + ubuntu_base_image=None, replicate=False, architecture="x86"): print(API().create_ami(make_public=make_public, build_from_scratch=build_from_scratch, source_image_to_copy_from=source_image_to_copy_from, source_image_region=source_image_region, - ubuntu_base_image=ubuntu_base_image, replicate=replicate)) + ubuntu_base_image=ubuntu_base_image, replicate=replicate, architecture=architecture)) def main(Subcommands=Subcommands): diff --git a/tibanna/ami.py b/tibanna/ami.py index 6bf5a7b4e..5f9b70414 100755 --- a/tibanna/ami.py +++ b/tibanna/ami.py @@ -11,15 +11,27 @@ class AMI(object): - BASE_AMI = 'ami-0885b1f6bd170450c' # ubuntu 20.04 for us-east-1 + BASE_AMI = None + BASE_AMI_X86 = 'ami-0885b1f6bd170450c' # ubuntu 20.04 for us-east-1 (x86) + BASE_AMI_ARM = 'ami-00266f51b6b22db58' # ubuntu 20.04 for us-east-1 (Arm) BASE_REGION = 'us-east-1' USERDATA_DIR = os.path.dirname(os.path.abspath(__file__)) USERDATA_FILE = os.path.join(USERDATA_DIR, 'create_ami_userdata') AMI_NAME = 'tibanna-ami-' + datetime.strftime(datetime.today(), '%Y%m%d') # e.g tibanna-ami-20201113 + ARCHITECTURE = 'x86' - def __init__(self, base_ami=None, base_region=None, userdata_file=None, ami_name=None): + def __init__(self, base_ami=None, base_region=None, userdata_file=None, ami_name=None, architecture=None): if base_ami: self.BASE_AMI = base_ami + elif architecture == 'x86': + self.BASE_AMI = self.BASE_AMI_X86 + self.AMI_NAME = 'tibanna-ami-x86-' + datetime.strftime(datetime.today(), '%Y%m%d') + elif architecture == 'Arm': + self.BASE_AMI = self.BASE_AMI_ARM + self.AMI_NAME = 'tibanna-ami-arm-' + datetime.strftime(datetime.today(), '%Y%m%d') + self.ARCHITECTURE = 'Arm' + + if base_region: self.BASE_REGION = base_region if userdata_file is not None: @@ -28,10 +40,14 @@ def __init__(self, base_ami=None, base_region=None, userdata_file=None, ami_name self.AMI_NAME = ami_name @staticmethod - def launch_instance_for_tibanna_ami(keyname, userdata_file, base_ami): + def launch_instance_for_tibanna_ami(keyname, userdata_file, base_ami, architecture): + + instanceType = 't3.micro' + if architecture == 'Arm': + instanceType = 'a1.medium' launch_args = {'ImageId': base_ami, - 'InstanceType': 't3.micro', + 'InstanceType': instanceType, 'MaxCount': 1, 'MinCount': 1, 'TagSpecifications': [{'ResourceType': 'instance', @@ -55,7 +71,8 @@ def launch_instance_for_tibanna_ami(keyname, userdata_file, base_ami): def create_ami_for_tibanna(self, keyname=None, make_public=False, replicate=False): return self.create_ami(keyname=keyname, userdata_file=self.USERDATA_FILE, base_ami=self.BASE_AMI, ami_name=self.AMI_NAME, - make_public=make_public, base_region=self.BASE_REGION, replicate=replicate) + make_public=make_public, base_region=self.BASE_REGION, + replicate=replicate, architecture=self.ARCHITECTURE) @staticmethod def replicate_ami(*, ami_name, ami_id, source_region='us-east-1', @@ -75,7 +92,7 @@ def replicate_ami(*, ami_name, ami_id, source_region='us-east-1', Returns an AMI_PER_REGION mapping """ if not target_regions: - target_regions = [r for r in AMI_PER_REGION.keys() if r != source_region] + target_regions = [r for r in AMI_PER_REGION['x86'].keys() if r != source_region] # Create sessions in each target region and copy the AMI into it # If this AMI is to be publicly available, sleep for 5 mins to allow @@ -113,6 +130,7 @@ def create_ami(cls, keyname=None, userdata_file=USERDATA_FILE, ami_name=AMI_NAME, make_public=False, replicate=False, + architecture='x86', base_region='us-east-1'): """ Helper function that creates the Tibanna AMI from a base image. """ if not userdata_file: @@ -136,7 +154,7 @@ def create_ami(cls, keyname=None, userdata_file=USERDATA_FILE, # Launch an instance with base AMI try: - instance_id = AMI.launch_instance_for_tibanna_ami(keyname, userdata_file, base_ami) + instance_id = AMI.launch_instance_for_tibanna_ami(keyname, userdata_file, base_ami, architecture) logger.debug("instance_id=" + instance_id) except: raise Exception("Failed to launch an instance") diff --git a/tibanna/awsem.py b/tibanna/awsem.py index a3e16dea1..f23a0e679 100755 --- a/tibanna/awsem.py +++ b/tibanna/awsem.py @@ -291,7 +291,7 @@ def __init__(self, App=None, Input=None, Output=None, JOBID='', start_time=None, end_time=None, status=None, Log=None, total_input_size=None, total_output_size=None, total_tmp_size=None, # older postrunjsons don't have these fields - filesystem='', instance_id='', instance_availablity_zone='', + filesystem='', instance_id='', instance_availablity_zone='', instance_type='', Metrics=None, strict=True): if strict: if App is None or Input is None or Output is None or not JOBID or start_time is None: @@ -303,6 +303,7 @@ def __init__(self, App=None, Input=None, Output=None, JOBID='', self.filesystem = filesystem self.instance_id = instance_id self.instance_availablity_zone = instance_availablity_zone + self.instance_type = instance_type self.total_input_size = total_input_size self.total_output_size = total_output_size self.total_tmp_size = total_tmp_size diff --git a/tibanna/core.py b/tibanna/core.py index 7f94b79d2..62c63fc3d 100755 --- a/tibanna/core.py +++ b/tibanna/core.py @@ -21,7 +21,6 @@ TIBANNA_DEFAULT_STEP_FUNCTION_NAME, STEP_FUNCTION_ARN, EXECUTION_ARN, - AMI_ID, TIBANNA_REPO_NAME, TIBANNA_REPO_BRANCH, TIBANNA_PROFILE_ACCESS_KEY, @@ -113,8 +112,6 @@ def tibanna_packages(self): check_task_lambda = CHECK_TASK_LAMBDA_NAME update_cost_lambda = UPDATE_COST_LAMBDA_NAME - ami_id = AMI_ID - @property def UNICORN_LAMBDAS(self): return [self.run_task_lambda, self.check_task_lambda] @@ -631,8 +628,7 @@ def rerun_many(self, sfn=None, stopdate='13Feb2018', stophour=13, def env_list(self, name): # don't set this as a global, since not all tasks require it envlist = { - self.run_task_lambda: {'AMI_ID': self.ami_id, - 'TIBANNA_REPO_NAME': TIBANNA_REPO_NAME, + self.run_task_lambda: {'TIBANNA_REPO_NAME': TIBANNA_REPO_NAME, 'TIBANNA_REPO_BRANCH': TIBANNA_REPO_BRANCH}, self.check_task_lambda: {'TIBANNA_DEFAULT_STEP_FUNCTION_NAME': self.default_stepfunction_name} } @@ -976,7 +972,7 @@ def plot_metrics(self, job_id, sfn=None, directory='.', open_browser=True, force else: job_complete = False log_bucket = postrunjson.config.log_bucket - instance_type = postrunjson.config.instance_type or 'unknown' + instance_type = job.instance_type or 'unknown' else: runjsonstr = self.log(job_id=job_id, sfn=sfn, runjson=True, quiet=True) job_complete = False @@ -985,6 +981,10 @@ def plot_metrics(self, job_id, sfn=None, directory='.', open_browser=True, force job = runjson.Job log_bucket = runjson.config.log_bucket instance_type = runjson.config.instance_type or 'unknown' + # Multiple types were specified, but the run json does not know which one was actually picked + # In this case we just show all of them in the metrics report + if isinstance(instance_type, list): + instance_type = ','.join(instance_type) else: raise Exception("Neither postrun json nor run json can be retrieved." + "Check job_id or step function?") @@ -1251,12 +1251,16 @@ def handle_error(errmsg): logger.info("Finished cleaning") def create_ami(self, build_from_scratch=True, source_image_to_copy_from=None, source_image_region=None, - ubuntu_base_image=None, make_public=False, replicate=False): + ubuntu_base_image=None, make_public=False, replicate=False, architecture='x86'): args = dict() if build_from_scratch: # build from ubuntu 20.04 image and user data if ubuntu_base_image: args.update({'base_ami': ubuntu_base_image}) + if architecture in ['x86','Arm']: + args.update({'architecture': architecture}) + else: + raise Exception("Architecture must be 'x86' or 'Arm'.") else: # copy an existing image args.update({'userdata_file': ''}) diff --git a/tibanna/create_ami_userdata b/tibanna/create_ami_userdata index 443551e86..e40993518 100644 --- a/tibanna/create_ami_userdata +++ b/tibanna/create_ami_userdata @@ -6,17 +6,20 @@ apt install -y awscli apt install -y apt-transport-https \ ca-certificates \ curl \ - software-properties-common # docker + gnupg \ + lsb-release # install docker -curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add - -apt-key fingerprint 0EBFCD88 -add-apt-repository \ - "deb [arch=amd64] https://download.docker.com/linux/ubuntu \ - $(lsb_release -cs) \ - stable" -apt update # update again with an updated repository -apt install -y docker-ce # installing docker-ce +mkdir -p /etc/apt/keyrings +curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /etc/apt/keyrings/docker.gpg + +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \ + $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null + +apt-get update +apt-get --assume-yes install docker-ce + usermod -aG docker ubuntu # making it available for non-root user ubuntu reboot diff --git a/tibanna/ec2_utils.py b/tibanna/ec2_utils.py index ce5d245bd..61d2940d5 100755 --- a/tibanna/ec2_utils.py +++ b/tibanna/ec2_utils.py @@ -2,6 +2,7 @@ import json import time import os +import base64 import logging import boto3 import botocore @@ -19,7 +20,7 @@ S3_ACCESS_ARN, TIBANNA_REPO_NAME, TIBANNA_REPO_BRANCH, - AMI_ID, + AMI_PER_REGION, DYNAMODB_TABLE, DEFAULT_ROOT_EBS_SIZE, TIBANNA_AWSF_DIR, @@ -35,8 +36,7 @@ EC2InstanceLimitWaitException, DependencyStillRunningException, DependencyFailedException, - UnsupportedCWLVersionException, - PricingRetrievalException + UnsupportedCWLVersionException ) from .base import SerializableObject from .nnested_array import flatten, run_on_nested_arrays1 @@ -325,12 +325,13 @@ def fill_default(self): if not hasattr(self, 'ebs_size_as_is'): # if false, add 5GB overhead self.ebs_size_as_is = False if not hasattr(self, 'ami_id'): - self.ami_id = AMI_ID + self.ami_id = "" # will be assigned instance architecture specific later + # special handling for subnet, SG if not set already pull from env # values from config take priority - omit them to get these values from lambda if not self.subnet and os.environ.get('SUBNETS', ''): possible_subnets = os.environ['SUBNETS'].split(',') - self.subnet = possible_subnets # propagate all subnets here, pick one in launch_args + self.subnet = possible_subnets # propagate all subnets here if not self.security_group and os.environ.get('SECURITY_GROUPS', ''): self.security_group = os.environ['SECURITY_GROUPS'].split(',')[0] @@ -359,11 +360,7 @@ def __init__(self, input_dict, dryrun=False): self.jobid = self.unicorn_input.jobid self.args = self.unicorn_input.args self.cfg = self.unicorn_input.cfg - # store user-specified values for instance type, EBS_optimized and ebs_size - # separately, since the values in cfg will change. - self.user_specified_instance_type = self.cfg.instance_type - self.user_specified_EBS_optimized = self.cfg.EBS_optimized - self.user_specified_ebs_size = self.cfg.ebs_size + # get benchmark if available self.input_size_in_bytes = self.get_input_size_in_bytes() if self.cfg.use_benchmark: @@ -372,8 +369,7 @@ def __init__(self, input_dict, dryrun=False): else: logger.debug('self.cfg.use_benchmark = ' + str(self.cfg.use_benchmark)) logger.debug('self.cfg.as_dict() = ' + str(self.cfg.as_dict())) - self.init_instance_type_list() - self.update_config_instance_type() + self.create_instance_type_list() self.update_config_ebs_size() @property @@ -395,71 +391,69 @@ def postlaunch(self): if self.cfg.cloudwatch_dashboard: self.create_cloudwatch_dashboard('awsem-' + self.jobid) - def init_instance_type_list(self): - instance_type = self.user_specified_instance_type + def create_instance_type_list(self): + instance_type = self.cfg.instance_type instance_type_dlist = [] # user directly specified instance type if instance_type: if isinstance(instance_type, str): - if self.user_specified_EBS_optimized: - instance_type_dlist.append({'instance_type': instance_type, - 'EBS_optimized': self.user_specified_EBS_optimized}) - else: - instance_type_dlist.append({'instance_type': instance_type, - 'EBS_optimized': False}) + # Note that the EBS_optimized flag is set on the launch template level + instance_type_dlist.append({'instance_type': instance_type, + 'EBS_optimized': self.cfg.EBS_optimized}) elif isinstance(instance_type, list): for potential_type in instance_type: - if self.user_specified_EBS_optimized: - instance_type_dlist.append({'instance_type': potential_type, - 'EBS_optimized': self.user_specified_EBS_optimized}) - else: - instance_type_dlist.append({'instance_type': potential_type, - 'EBS_optimized': False}) - - # user specified mem and cpu + instance_type_dlist.append({'instance_type': potential_type, + 'EBS_optimized': self.cfg.EBS_optimized}) + + # user specified mem and cpu - use the benchmark package to retrieve instance types if self.cfg.mem and self.cfg.cpu: - if self.cfg.mem_as_is: - mem = self.cfg.mem - else: - mem = self.cfg.mem + 1 + mem = self.cfg.mem if self.cfg.mem_as_is else self.cfg.mem + 1 list0 = get_instance_types(self.cfg.cpu, mem, instance_list(exclude_t=False)) - nonredundant_list = [i for i in list0 if i['instance_type'] != instance_type] + current_list = [i['instance_type'] for i in instance_type_dlist] + nonredundant_list = [i for i in list0 if i['instance_type'] not in current_list] instance_type_dlist.extend(nonredundant_list) - # user specifically wanted EBS_optimized instances - if self.user_specified_EBS_optimized: - instance_type_dlist = [i for i in instance_type_dlist if i['EBS_optimized']] + # add benchmark only if there is no user specification if len(instance_type_dlist) == 0 and self.benchmark['instance_type']: instance_type_dlist.append(self.benchmark) - self.instance_type_list = [i['instance_type'] for i in instance_type_dlist] - self.instance_type_info = {i['instance_type']: i for i in instance_type_dlist} - self.choose_next_instance_type() # randomly choose - @property - def current_instance_type(self): - if len(self.instance_type_list) > self.current_instance_type_index: - return self.instance_type_list[self.current_instance_type_index] - else: - return '' - - @property - def current_EBS_optimized(self): - if self.current_instance_type: - return self.instance_type_info[self.current_instance_type]['EBS_optimized'] - else: - return '' + # Augment the list with the corresponding AMI ID and EBS_optimized flag + ec2 = boto3.client('ec2') + current_instance_types = [i['instance_type'] for i in instance_type_dlist] + ami_ebs_info = {} + results = ec2.describe_instance_types( + InstanceTypes=current_instance_types + ) + for result in results['InstanceTypes']: + instance_type = result['InstanceType'] + is_ebs_optimized = result['EbsInfo']['EbsOptimizedSupport'] != 'unsupported' + ami_ebs_info[instance_type] = { + 'EBS_optimized': is_ebs_optimized + } + arch = result['ProcessorInfo']['SupportedArchitectures'][0] + default_ami = AMI_PER_REGION['Arm'].get(AWS_REGION, '') if arch == 'arm64' else AMI_PER_REGION['x86'].get(AWS_REGION, '') + if self.cfg.ami_id: + # if a user supplied an ami_id, it will be used for every instance. + # will be problematic if the instance list contains x86 and Arm instances + ami_ebs_info[instance_type]['ami_id'] = self.cfg.ami_id + elif default_ami: + ami_ebs_info[instance_type]['ami_id'] = default_ami + else: + raise Exception(f"No AMI found for {result['InstanceType']} ({arch}) in {AWS_REGION}") + for instance in instance_type_dlist: + it = instance['instance_type'] + instance['ami_id'] = ami_ebs_info[it]['ami_id'] + instance['EBS_optimized'] = ami_ebs_info[it]['EBS_optimized'] - def choose_next_instance_type(self): - """ Previously we would start to launch jobs at index 0 and increment, but now we will randomly pick - from the list - """ - self.current_instance_type_index = choice(range(0, len(self.instance_type_list))) + # Filtering: user specifically wanted EBS_optimized instances + if self.cfg.EBS_optimized: + instance_type_dlist = [i for i in instance_type_dlist if i['EBS_optimized']] - def update_config_instance_type(self): - # deal with missing fields - self.cfg.instance_type = self.current_instance_type - if not self.user_specified_EBS_optimized: - self.cfg.EBS_optimized = self.current_EBS_optimized + if len(instance_type_dlist) == 0: + raise Exception("There are no EC2 instances that match the provided configuration.") + + self.instance_type_list = [i['instance_type'] for i in instance_type_dlist] + self.instance_type_infos = {i['instance_type']: i for i in instance_type_dlist} @property def total_input_size_in_gb(self): @@ -486,7 +480,7 @@ def update_config_ebs_size(self): self.cfg.ebs_size = round(self.cfg.ebs_size) + 1 else: self.cfg.ebs_size = round(self.cfg.ebs_size) - if not self.user_specified_ebs_size: # use benchmark only if not set by user + if not self.cfg.ebs_size: # use benchmark only if not set by user self.cfg.ebs_size = self.benchmark['ebs_size'] if not self.cfg.ebs_size_as_is: self.cfg.ebs_size += 5 # account for docker image size @@ -536,65 +530,44 @@ def get_start_time(self): return time.strftime("%Y%m%d-%H:%M:%S-%Z") def launch_and_get_instance_id(self): - try: # capturing stdout from the launch command - os.environ['AWS_DEFAULT_REGION'] = AWS_REGION - ec2 = boto3.client('ec2') - except Exception as e: - raise Exception("Failed to create a client for EC2") - while(True): - res = self.ec2_exception_coordinator(self.run_instances)(ec2) - if res == 'continue': - continue - break - try: - instance_id = res['Instances'][0]['InstanceId'] - except Exception as e: - raise Exception("failed to retrieve instance ID for job %s: %s" % (self.jobid, str(e))) - return instance_id + os.environ['AWS_DEFAULT_REGION'] = AWS_REGION - def ec2_exception_coordinator(self, func): - def inner(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - error_response = str(e) - # No instance capacity, too many instances or incompatible subnet/instance configuration - try again - if ('InsufficientInstanceCapacity' in error_response or 'InstanceLimitExceeded' in error_response or - 'is not supported in your requested Availability Zone' in error_response): + while True: + fleet_result = self.create_fleet() + if 'Errors' in fleet_result and len(fleet_result['Errors']) > 0: + error_code = fleet_result['Errors'][0]['ErrorCode'] + error_msg = fleet_result['Errors'][0]['ErrorMessage'] + if ('InsufficientInstanceCapacity' in error_code or 'InstanceLimitExceeded' in error_code or + 'is not supported in your requested Availability Zone' in error_msg + or 'UnfulfillableCapacity' in error_code): behavior = self.cfg.behavior_on_capacity_limit if behavior == 'fail': - errmsg = "Instance limit exception - use 'behavior_on_capacity_limit' option" + \ - "to change the behavior to wait_and_retry, other_instance_types," + \ - "or retry_without_spot. %s" % error_response - raise EC2InstanceLimitException(errmsg) - elif behavior == 'wait_and_retry': - errmsg = "Instance limit exception - wait and retry later: %s" % error_response - raise EC2InstanceLimitWaitException(errmsg) - elif behavior == 'other_instance_types': - try: - self.choose_next_instance_type() - except Exception as e2: # should never hit now as we will continue indefinitely - raise EC2InstanceLimitException(str(e2)) - self.update_config_instance_type() - return 'continue' + msg = "Instance limit exception - use 'behavior_on_capacity_limit' option " + \ + "to change the behavior to wait_and_retry, " + \ + "or retry_without_spot. %s" % error_msg + raise EC2InstanceLimitException(msg) + elif behavior == 'wait_and_retry' or behavior == 'other_instance_types': # 'other_instance_types' is there for backwards compatibility + msg = "Instance limit exception - wait and retry later: %s" % error_msg + raise EC2InstanceLimitWaitException(msg) elif behavior == 'retry_without_spot': if not self.cfg.spot_instance: - errmsg = "'behavior_on_capacity_limit': 'retry_without_spot' works only with " + \ - "'spot_instance' : true. %s" % error_response - raise Exception(errmsg) + msg = "'behavior_on_capacity_limit': 'retry_without_spot' works only with " + \ + "'spot_instance' : true. %s" % error_msg + raise Exception(msg) else: self.cfg.spot_instance = False # change behavior as well, # to avoid 'retry_without_spot works only with spot' error in the next round self.cfg.behavior_on_capacity_limit = 'fail' logger.info("trying without spot...") - return 'continue' + continue else: - raise Exception("failed to launch instance for job %s: %s" % (self.jobid, str(e))) - return inner - - def run_instances(self, ec2): - return ec2.run_instances(**self.launch_args) + raise Exception(f"Failed to launch instance for job {self.jobid}. {error_code}: {error_msg}") + elif 'Instances' in fleet_result and len(fleet_result['Instances']) > 0: + instance_id = fleet_result['Instances'][0]['InstanceIds'][0] + return instance_id + else: + raise Exception(f"Unexpected result from create_fleet command: {json.dumps(fleet_result)}") def create_run_json_dict(self): args = self.args @@ -738,65 +711,175 @@ def create_userdata(self, profile=None): str += " -g" str += "\n" logger.debug("userdata: \n" + str) - return(str) - @property - def launch_args(self): - # creating a launch command - largs = {'ImageId': self.cfg.ami_id, - 'InstanceType': self.cfg.instance_type, - 'IamInstanceProfile': {'Arn': S3_ACCESS_ARN}, - 'UserData': self.userdata, - 'MaxCount': 1, - 'MinCount': 1, - 'InstanceInitiatedShutdownBehavior': 'terminate', - 'DisableApiTermination': False, - 'TagSpecifications': [{'ResourceType': 'instance', - "Tags": [{"Key": "Name", "Value": "awsem-" + self.jobid}, - {"Key": "Type", "Value": "awsem"}]}] - } + # The launch template expects base64 encoded text + str_bytes = str.encode('ascii') + base64_bytes = base64.b64encode(str_bytes) + base64_message = base64_bytes.decode('ascii') + return base64_message + + def create_launch_template(self): + """Create a launch template everytime we run a workflow. There is no other + way to specify the necessary EC2 configurations""" + ec2 = boto3.client('ec2') + launch_template_name = 'TibannaLaunchTemplate' + + try: + # Existing launch templates can't be overwritten. Therefore, delete + # exsting ones first + ec2.delete_launch_template( + DryRun=self.dryrun, + LaunchTemplateName=launch_template_name, + ) + except Exception as e: + logger.info("No existing launch template found.") + + # ImageId, InstanceType and SubnetId will be set during the create-fleet operation + launch_template_data ={ + 'IamInstanceProfile': {'Arn': S3_ACCESS_ARN}, + 'UserData': self.userdata, + 'InstanceInitiatedShutdownBehavior': 'terminate', + 'DisableApiTermination': False, + 'BlockDeviceMappings': [ + { + 'DeviceName': '/dev/sdb', + 'Ebs': + { + 'DeleteOnTermination': True, + 'VolumeSize': self.cfg.ebs_size, + 'VolumeType': self.cfg.ebs_type + } + }, + { + 'DeviceName': '/dev/sda1', + 'Ebs': + { + 'DeleteOnTermination': True, + 'VolumeSize': self.cfg.root_ebs_size, + 'VolumeType': 'gp3' + } + } + ], + 'TagSpecifications': [ + { + 'ResourceType': 'instance', + 'Tags': [ + {"Key": "Name", "Value": "awsem-" + self.jobid}, + {"Key": "Type", "Value": "awsem"} + ] + } + ] + } + if self.cfg.key_name: - largs.update({'KeyName': self.cfg.key_name}) - # EBS options + launch_template_data.update({'KeyName': self.cfg.key_name}) if self.cfg.EBS_optimized is True: - largs.update({"EbsOptimized": True}) - largs.update({"BlockDeviceMappings": [{'DeviceName': '/dev/sdb', - 'Ebs': {'DeleteOnTermination': True, - 'VolumeSize': self.cfg.ebs_size, - 'VolumeType': self.cfg.ebs_type}}, - {'DeviceName': '/dev/sda1', - 'Ebs': {'DeleteOnTermination': True, - 'VolumeSize': self.cfg.root_ebs_size, - 'VolumeType': 'gp3'}}]}) + launch_template_data.update({"EbsOptimized": True}) if self.cfg.ebs_iops: # io1 type, specify iops - largs["BlockDeviceMappings"][0]["Ebs"]['Iops'] = self.cfg.ebs_iops + launch_template_data["BlockDeviceMappings"][0]["Ebs"]['Iops'] = self.cfg.ebs_iops if self.cfg.ebs_throughput and self.cfg.ebs_type == 'gp3': if self.cfg.ebs_throughput < 125 or self.cfg.ebs_throughput > 1000: message = "Invalid EBS throughput. Specify a value between 125 and 1000." raise EC2LaunchException(message) - largs["BlockDeviceMappings"][0]["Ebs"]['Throughput'] = self.cfg.ebs_throughput + launch_template_data["BlockDeviceMappings"][0]["Ebs"]['Throughput'] = self.cfg.ebs_throughput if self.cfg.ebs_size >= 16000: message = "EBS size limit (16TB) exceeded: (attempted size: %s)" % self.cfg.ebs_size raise EC2LaunchException(message) + + + if self.cfg.availability_zone: + launch_template_data.update({'Placement': {'AvailabilityZone': self.cfg.availability_zone}}) + if self.cfg.security_group: + launch_template_data.update({'SecurityGroupIds': [self.cfg.security_group]}) + if self.cfg.spot_instance: spot_options = {'SpotInstanceType': 'one-time', 'InstanceInterruptionBehavior': 'terminate'} if self.cfg.spot_duration: spot_options['BlockDurationMinutes'] = self.cfg.spot_duration - largs.update({'InstanceMarketOptions': {'MarketType': 'spot', - 'SpotOptions': spot_options}}) - if self.cfg.availability_zone: - largs.update({'Placement': {'AvailabilityZone': self.cfg.availability_zone}}) - if self.cfg.security_group: - largs.update({'SecurityGroupIds': [self.cfg.security_group]}) + launch_template_data.update( + {'InstanceMarketOptions': + {'MarketType': 'spot','SpotOptions': spot_options} + } + ) + + try: + ec2.create_launch_template( + DryRun=self.dryrun, + LaunchTemplateName=launch_template_name, + LaunchTemplateData=launch_template_data, + ) + except Exception as e: + raise Exception(f"Could not create launch template: {str(e)}") + + + def create_fleet(self): + '''Create an 'instant' type fleet with 1 instance''' + self.create_launch_template() + + ec2 = boto3.client('ec2') + try: + fleet_spec = self.create_fleet_spec() + fleet_result = ec2.create_fleet(**fleet_spec) + return fleet_result + except Exception as e: + raise Exception(f"Unable to create fleet: {str(e)}") + + + def create_fleet_spec(self): # Factored out for easier testing + + potential_ec2s = [] # Used as overrides in the launch template + + subnets = False if self.cfg.subnet: if isinstance(self.cfg.subnet, str): - largs.update({'SubnetId': self.cfg.subnet}) + subnets = [self.cfg.subnet] elif isinstance(self.cfg.subnet, list): - largs.update({'SubnetId': choice(self.cfg.subnet)}) # if given multiple, pick one randomly - if self.dryrun: - largs.update({'DryRun': True}) - return largs + subnets = self.cfg.subnet + + # Create all possible combinations of instance type / subnet + for instance_type in self.instance_type_list: + instance_info = self.instance_type_infos[instance_type] + if subnets: + for subnet in subnets: + potential_ec2s.append({ + "InstanceType": instance_type, + "SubnetId": subnet, + "ImageId": instance_info['ami_id'] + }) + else: + potential_ec2s.append({ + "InstanceType": instance_type, + "ImageId": instance_info['ami_id'] + }) + + spec = { + "DryRun": self.dryrun, + "SpotOptions": { + "AllocationStrategy": "capacity-optimized", + "InstanceInterruptionBehavior": "terminate" # hibernate is an option here + }, + "OnDemandOptions": { + "AllocationStrategy": "lowest-price" # prioritized is the other option, but then we need to assign prioritoes in the launch template + }, + "LaunchTemplateConfigs": [{ + "LaunchTemplateSpecification": { + "LaunchTemplateName": "TibannaLaunchTemplate", + "Version": "$Latest" # There is only one version + }, + "Overrides": potential_ec2s, + + }], + "TargetCapacitySpecification": { + "TotalTargetCapacity": 1, + "DefaultTargetCapacityType": "spot" if self.cfg.spot_instance else "on-demand", + }, + "Type": "instant" + } + + return spec + + def get_instance_info(self): # get public IP and availablity zone for the instance (This may not happen immediately) @@ -804,7 +887,7 @@ def get_instance_info(self): ec2 = boto3.client('ec2') except Exception as e: raise Exception("Can't create an ec2 client %s" % str(e)) - while(True): # keep trying until you get the result. + while True: # keep trying until you get the result. time.sleep(1) # wait for one second before trying again. try: # sometimes you don't get a description immediately diff --git a/tibanna/iam_utils.py b/tibanna/iam_utils.py index 2928b4845..eb8af2cf8 100755 --- a/tibanna/iam_utils.py +++ b/tibanna/iam_utils.py @@ -115,7 +115,7 @@ def policy_definition(self, policy_type): 'cloudwatch_metric': self.policy_cloudwatch_metric, 'cw_dashboard': self.policy_cw_dashboard, 'dynamodb': self.policy_dynamodb, - 'ec2_desc': self.policy_ec2_desc_policy, + 'ec2_desc': self.policy_ec2_desc, 'pricing': self.policy_pricing, 'executions': self.policy_executions, 'vpc': self.policy_vpc_access, @@ -427,7 +427,7 @@ def policy_dynamodb(self): return policy @property - def policy_ec2_desc_policy(self): + def policy_ec2_desc(self): policy = { "Version": "2012-10-17", "Statement": [ diff --git a/tibanna/job.py b/tibanna/job.py index 966a77898..efb5987c9 100644 --- a/tibanna/job.py +++ b/tibanna/job.py @@ -286,8 +286,7 @@ def add_to_dd(job_id, execution_name, sfn, logbucket, verbose=True): # first check the table exists dydb.describe_table(TableName=DYNAMODB_TABLE) except Exception as e: - if verbose: - logger.error("Not adding to dynamo table: %s" % e) + logger.error("Not adding to dynamo table: %s" % e) return for _ in range(5): # try to add to dynamo 5 times try: diff --git a/tibanna/pricing_utils.py b/tibanna/pricing_utils.py index e62c32128..498e728eb 100755 --- a/tibanna/pricing_utils.py +++ b/tibanna/pricing_utils.py @@ -77,6 +77,10 @@ def get_cost_estimate(postrunjson, ebs_root_type = "gp3", aws_price_overwrite = job_end = datetime.strptime(job.end_time, '%Y%m%d-%H:%M:%S-UTC') job_duration = (job_end - job_start).seconds / 3600.0 # in hours + if(not job.instance_type): + logger.warning("Instance type is not available for cost estimation. Please try to deploy the latest version of Tibanna.") + return 0.0, "NA" + try: pricing_client = boto3.client('pricing', region_name=AWS_REGION) @@ -87,10 +91,10 @@ def get_cost_estimate(postrunjson, ebs_root_type = "gp3", aws_price_overwrite = if(not job.instance_availablity_zone): raise PricingRetrievalException("Instance availability zone is not available. You might have to deploy a newer version of Tibanna.") - + ec2_client=boto3.client('ec2',region_name=AWS_REGION) prices=ec2_client.describe_spot_price_history( - InstanceTypes=[cfg.instance_type], + InstanceTypes=[job.instance_type], ProductDescriptions=['Linux/UNIX'], AvailabilityZone=job.instance_availablity_zone, MaxResults=1) # Most recent price is on top @@ -111,7 +115,7 @@ def get_cost_estimate(postrunjson, ebs_root_type = "gp3", aws_price_overwrite = { 'Type': 'TERM_MATCH', 'Field': 'instanceType', - 'Value': cfg.instance_type + 'Value': job.instance_type }, { 'Type': 'TERM_MATCH', diff --git a/tibanna/vars.py b/tibanna/vars.py index 5f12ada9c..d1893cf56 100755 --- a/tibanna/vars.py +++ b/tibanna/vars.py @@ -38,30 +38,51 @@ # Tibanna AMI info # Override this mapping to use a custom AMI scheme AMI_PER_REGION = { - 'us-east-1': 'ami-06e2266f85063aabc', # latest as of Oct 25 2021 - 'us-east-2': 'ami-03a4e3e84b6a1813d', - 'us-west-1': 'ami-0c5e8147be760a354', - 'us-west-2': 'ami-068589fed9c8d5950', - 'ap-south-1': 'ami-05ef59bc4f359c93b', - 'ap-northeast-2': 'ami-0d8618a76aece8a8e', - 'ap-southeast-1': 'ami-0c22dc3b05714bda1', - 'ap-southeast-2': 'ami-03dc109bbf412aac5', - 'ap-northeast-1': 'ami-0f4c520515c41ff46', - 'ca-central-1': 'ami-01af127710fadfe74', - 'eu-central-1': 'ami-0887bcb1c901c1769', - 'eu-west-1': 'ami-08db59692e4371ea6', - 'eu-west-2': 'ami-036d3ce7a21e07012', - 'eu-west-3': 'ami-0cad0ec4160a6b940', - 'eu-north-1': 'ami-00a6f0f9fee951aa0', - 'sa-east-1': 'ami-0b2164f9680f97099', - 'me-south-1': 'ami-03479b7a590f97945', - 'af-south-1': 'ami-080baa4ec59c456aa', - 'ap-east-1': 'ami-0a9056eb817bc3928', - 'eu-south-1': 'ami-0a72279e56849415e' + 'x86': { + 'us-east-1': 'ami-06e2266f85063aabc', # latest as of Oct 25 2021 + 'us-east-2': 'ami-03a4e3e84b6a1813d', + 'us-west-1': 'ami-0c5e8147be760a354', + 'us-west-2': 'ami-068589fed9c8d5950', + 'ap-south-1': 'ami-05ef59bc4f359c93b', + 'ap-northeast-2': 'ami-0d8618a76aece8a8e', + 'ap-southeast-1': 'ami-0c22dc3b05714bda1', + 'ap-southeast-2': 'ami-03dc109bbf412aac5', + 'ap-northeast-1': 'ami-0f4c520515c41ff46', + 'ca-central-1': 'ami-01af127710fadfe74', + 'eu-central-1': 'ami-0887bcb1c901c1769', + 'eu-west-1': 'ami-08db59692e4371ea6', + 'eu-west-2': 'ami-036d3ce7a21e07012', + 'eu-west-3': 'ami-0cad0ec4160a6b940', + 'eu-north-1': 'ami-00a6f0f9fee951aa0', + 'sa-east-1': 'ami-0b2164f9680f97099', + 'me-south-1': 'ami-03479b7a590f97945', + 'af-south-1': 'ami-080baa4ec59c456aa', + 'ap-east-1': 'ami-0a9056eb817bc3928', + 'eu-south-1': 'ami-0a72279e56849415e' + }, + 'Arm': { + 'us-east-1': 'ami-0f3e90ad8e76c7a32', # latest as of Nov 23 2022 + 'us-east-2': 'ami-03359d89f311a015e', + 'us-west-1': 'ami-00ffd20b39dbfb6ea', + 'us-west-2': 'ami-08ab3015c1bc36d24', + 'ap-south-1': 'ami-01af9ec07fed38a38', + 'ap-northeast-2': 'ami-0ee2af459355dd917', + 'ap-southeast-1': 'ami-0d74dc5af4bf74ea8', + 'ap-southeast-2': 'ami-08ab7201c83209fe8', + 'ap-northeast-1': 'ami-07227003bfa0565e3', + 'ca-central-1': 'ami-0cbf87c80362a058e', + 'eu-central-1': 'ami-09cfa59f75e88ad54', + 'eu-west-1': 'ami-0804bdeafd8af01f8', + 'eu-west-2': 'ami-0db05a333dc02c1c8', + 'eu-west-3': 'ami-0ceab436f882fe36a', + 'eu-north-1': 'ami-04ba962c974ddd374', + 'sa-east-1': 'ami-0fc9a9dec0f3df318', + 'me-south-1': 'ami-0211bc858eb163594', + 'af-south-1': 'ami-0d6a4af087f83899d', + 'ap-east-1': 'ami-0d375f2ce688d16b9', + 'eu-south-1': 'ami-0b1db84f31597a70f' + } } -if AWS_REGION not in AMI_PER_REGION: - logger.warning("Public Tibanna AMI for region %s is not available." % AWS_REGION) -AMI_ID = AMI_PER_REGION.get(AWS_REGION, '') AWS_REGION_NAMES = { 'us-east-1': 'US East (N. Virginia)',