diff --git a/.gitignore b/.gitignore index 568616ae..62eb9343 100755 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# virtualenv bin directory +*/bin/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[odc] diff --git a/awsf3-docker/Dockerfile b/awsf3-docker/Dockerfile index 5ff9a431..9394b97e 100755 --- a/awsf3-docker/Dockerfile +++ b/awsf3-docker/Dockerfile @@ -38,7 +38,7 @@ RUN apt update -y && apt upgrade -y && apt install -y \ nodejs RUN ln -s /usr/bin/python3.8 /usr/bin/python -RUN ln -s /usr/bin/pip3 /usr/bin/pip +#RUN ln -s /usr/bin/pip3 /usr/bin/pip WORKDIR /usr/local/bin @@ -51,23 +51,21 @@ RUN apt-get update -y \ && apt-get install -y docker-ce # singularity -RUN wget https://golang.org/dl/go1.15.3.linux-amd64.tar.gz && \ - tar -xzf go1.15.3.linux-amd64.tar.gz && \ - rm go1.15.3.linux-amd64.tar.gz -RUN export SINGULARITY_VERSION=3.3.0 && \ +RUN wget https://golang.org/dl/go1.16.6.linux-amd64.tar.gz && \ + tar -xzf go1.16.6.linux-amd64.tar.gz && \ + rm go1.16.6.linux-amd64.tar.gz +RUN export SINGULARITY_VERSION=3.8.1 && \ export PATH=/usr/local/bin/go/bin/:$PATH && \ - wget https://github.com/sylabs/singularity/releases/download/v${SINGULARITY_VERSION}/singularity-${SINGULARITY_VERSION}.tar.gz && \ - tar -xzf singularity-${SINGULARITY_VERSION}.tar.gz && \ - rm singularity-${SINGULARITY_VERSION}.tar.gz && \ - cd singularity && \ + wget https://github.com/sylabs/singularity/releases/download/v${SINGULARITY_VERSION}/singularity-ce-${SINGULARITY_VERSION}.tar.gz && \ + tar -xzf singularity-ce-${SINGULARITY_VERSION}.tar.gz && \ + rm singularity-ce-${SINGULARITY_VERSION}.tar.gz && \ + cd singularity-ce-${SINGULARITY_VERSION} && \ ./mconfig && \ make -C ./builddir && \ make -C ./builddir install && \ cd .. && \ rm -rf go && \ - mv singularity/singularity singularity2 && \ - rm -rf singularity && \ - mv singularity2 singularity + rm -rf singularity-ce-${SINGULARITY_VERSION} # goofys # RUN curl -O -L http://bit.ly/goofys-latest && chmod +x goofys-latest # latest is not latest diff --git a/awsf3/__main__.py b/awsf3/__main__.py index 1932fc49..1216f325 100755 --- a/awsf3/__main__.py +++ b/awsf3/__main__.py @@ -44,7 +44,8 @@ def args(self): 'help': "execution metadata file (output json of cwltool / cromwell)"}, {'flag': ["-m", "--md5file"], 'help': "text file storing md5 values for output files"}, {'flag': ["-o", "--output-json"], 'help': "output postrun json file"}, - {'flag': ["-L", "--language"], 'help': "language", 'default': "cwl_v1"}], + {'flag': ["-L", "--language"], 'help': "language", 'default': "cwl_v1"}, + {'flag': ["-u", "--endpoint-url"], 'help': "s3 vpc endpoint url"}], 'upload_postrun_json': [{'flag': ["-i", "--input-json"], 'help': "input postrun json file to upload to s3"}], 'update_postrun_json_final': @@ -66,8 +67,8 @@ def update_postrun_json_init(input_json, output_json): utils.update_postrun_json_init(input_json, output_json) -def update_postrun_json_upload_output(input_json, execution_metadata_file, md5file, output_json, language): - utils.update_postrun_json_upload_output(input_json, execution_metadata_file, md5file, output_json, language) +def update_postrun_json_upload_output(input_json, execution_metadata_file, md5file, output_json, language, endpoint_url): + utils.update_postrun_json_upload_output(input_json, execution_metadata_file, md5file, output_json, language, endpoint_url=endpoint_url) def upload_postrun_json(input_json): diff --git a/awsf3/target.py b/awsf3/target.py index 47fc8743..8e046daa 100755 --- a/awsf3/target.py +++ b/awsf3/target.py @@ -111,12 +111,15 @@ def unzip_source(self): yield {'name': content_file_name, 'content': z.open(content_file_name).read()} yield None - def upload_to_s3(self, encrypt_s3_upload=False): + def upload_to_s3(self, encrypt_s3_upload=False, endpoint_url=None): """upload target to s3, source can be either a file or a directory.""" if not self.is_valid: raise Exception('Upload Error: source / dest must be specified first') if not self.s3: - self.s3 = boto3.client('s3') + if endpoint_url: + self.s3 = boto3.client('s3', endpoint_url=endpoint_url) + else: + self.s3 = boto3.client('s3') err_msg = "failed to upload output file %s to %s. %s" upload_extra_args = {} if encrypt_s3_upload: diff --git a/awsf3/utils.py b/awsf3/utils.py index 2daa4cd2..70779644 100644 --- a/awsf3/utils.py +++ b/awsf3/utils.py @@ -123,7 +123,7 @@ def determine_key_type(bucket, key, profile): # The file itself may be a prefix of another file (e.v. abc.vcf.gz vs abc.vcf.gz.tbi) # but it doesn't matter. else: - return 'File' + return 'File' else: # data_file is a folder return 'Folder' @@ -217,7 +217,7 @@ def download_workflow(): return local_wfdir = os.environ.get('LOCAL_WFDIR') subprocess.call(['mkdir', '-p', local_wfdir]) - + if language in ['wdl', 'wdl_v1', 'wdl_draft2']: main_wf = os.environ.get('MAIN_WDL', '') wf_files = os.environ.get('WDL_FILES', '') @@ -239,10 +239,10 @@ def download_workflow(): wf_files = [wf_files] wf_files.append(main_wf) wf_url = wf_url.rstrip('/') - + print("main workflow file: %s" % main_wf) print("workflow files: " + str(wf_files)) - + s3 = boto3.client('s3') for wf_file in wf_files: target = "%s/%s" % (local_wfdir, wf_file) @@ -262,7 +262,7 @@ def download_workflow(): targetdir = re.sub('[^/]+$', '', target) subprocess.call(["mkdir", "-p", targetdir]) s3.download_file(Bucket=bucket_name, Key=key, Filename=target) - + def read_md5file(md5file): with open(md5file, 'r') as md5_f: @@ -340,7 +340,7 @@ def update_postrun_json_init(json_old, json_new): def update_postrun_json_upload_output(json_old, execution_metadata_file, md5file, json_new, - language='cwl_v1', strict=True, upload=True): + language='cwl_v1', strict=True, upload=True, endpoint_url=None): """Update postrun json with output files. if strict is set false, it does not check execution metadata is required for cwl/wdl.""" # read old json file and prepare postrunjson skeleton @@ -362,18 +362,18 @@ def update_postrun_json_upload_output(json_old, execution_metadata_file, md5file # upload output to S3 (this also updates postrun json) if upload: - upload_output(prj) + upload_output(prj, endpoint_url=endpoint_url) # write to new json file write_postrun_json(json_new, prj) -def upload_output(prj): +def upload_output(prj, endpoint_url=None): # parsing output_target and uploading output files to output target - upload_to_output_target(prj.Job.Output, prj.config.encrypt_s3_upload) + upload_to_output_target(prj.Job.Output, prj.config.encrypt_s3_upload, endpoint_url=endpoint_url) -def upload_to_output_target(prj_out, encrypt_s3_upload=False): +def upload_to_output_target(prj_out, encrypt_s3_upload=False, endpoint_url=None): # parsing output_target and uploading output files to output target output_bucket = prj_out.output_bucket_directory output_argnames = prj_out.output_files.keys() @@ -388,7 +388,7 @@ def upload_to_output_target(prj_out, encrypt_s3_upload=False): target.parse_custom_target(k, output_target[k]) if target.is_valid: print("Target is valid. Uploading..") - target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload) + target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload, endpoint_url=endpoint_url) else: raise Exception("Invalid target %s -> %s: failed to upload" % k, output_target[k]) else: @@ -397,9 +397,9 @@ def upload_to_output_target(prj_out, encrypt_s3_upload=False): target.parse_cwl_target(k, output_target.get(k, ''), prj_out.output_files) if target.is_valid: print("Target is valid. Uploading..") - target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload) + target.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload, endpoint_url=endpoint_url) prj_out.output_files[k].add_target(target.dest) - + # upload secondary files secondary_output_files = prj_out.output_files[k].secondaryFiles if secondary_output_files: @@ -407,7 +407,7 @@ def upload_to_output_target(prj_out, encrypt_s3_upload=False): stlist.parse_target_values(prj_out.secondary_output_target.get(k, [])) stlist.reorder_by_source([sf.path for sf in secondary_output_files]) for st in stlist.secondary_targets: - st.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload) + st.upload_to_s3(encrypt_s3_upload=encrypt_s3_upload, endpoint_url=endpoint_url) for i, sf in enumerate(secondary_output_files): sf.add_target(stlist.secondary_targets[i].dest) else: @@ -424,9 +424,9 @@ def update_postrun_json_final(json_old, json_new, logfile=None): """Update postrun json with status, time stamps, parsed commands, input/tmp/output sizes""" prj = read_postrun_json(json_old) - + postrun_json_final(prj, logfile=logfile) - + # write to new json file write_postrun_json(json_new, prj) diff --git a/tibanna/__main__.py b/tibanna/__main__.py index ba06f967..5db26f40 100755 --- a/tibanna/__main__.py +++ b/tibanna/__main__.py @@ -274,6 +274,12 @@ def args(self): 'action': "store_true", 'help': "Do not delete public access block from buckets" + "(this way postrunjson and metrics reports will not be public)"}, + {'flag': ["-t", "--subnets"], + 'nargs': '+', + 'help': "subnet IDs"}, + {'flag': ["-r", "--security-groups"], + 'nargs': '+', + 'help': "security groups"}, {'flag': ["-q", "--quiet"], 'action': "store_true", 'help': "minimize standard output from deployment"}], @@ -286,6 +292,12 @@ def args(self): {'flag': ["-g", "--usergroup"], 'default': '', 'help': "Tibanna usergroup for the AWS Lambda function"}, + {'flag': ["-t", "--subnets"], + 'nargs': '+', + 'help': "subnet IDs"}, + {'flag': ["-r", "--security-groups"], + 'nargs': '+', + 'help': "security groups"}, {'flag': ["-q", "--quiet"], 'action': "store_true", 'help': "minimize standard output from deployment"}], @@ -368,11 +380,12 @@ def args(self): } -def deploy_core(name, suffix=None, usergroup='', quiet=False): +def deploy_core(name, suffix=None, usergroup='', quiet=False, subnets=None, security_groups=None): """ New method of deploying packaged lambdas (BETA) """ - API().deploy_core(name=name, suffix=suffix, usergroup=usergroup, quiet=quiet) + API().deploy_core(name=name, suffix=suffix, usergroup=usergroup, subnets=subnets, + security_groups=security_groups, quiet=quiet) def run_workflow(input_json, sfn=TIBANNA_DEFAULT_STEP_FUNCTION_NAME, jobid='', do_not_open_browser=False, sleep=3): @@ -396,11 +409,12 @@ def setup_tibanna_env(buckets='', usergroup_tag='default', no_randomize=False, def deploy_unicorn(suffix=None, no_setup=False, buckets='', no_setenv=False, usergroup='', do_not_delete_public_access_block=False, - deploy_costupdater=False, quiet=False): + deploy_costupdater=False, subnets=None, security_groups=None, quiet=False): """deploy tibanna unicorn to AWS cloud""" API().deploy_unicorn(suffix=suffix, no_setup=no_setup, buckets=buckets, no_setenv=no_setenv, usergroup=usergroup, do_not_delete_public_access_block=do_not_delete_public_access_block, - deploy_costupdater=deploy_costupdater, quiet=quiet) + deploy_costupdater=deploy_costupdater, subnets=subnets, security_groups=security_groups, + quiet=quiet) def add_user(user, usergroup): diff --git a/tibanna/_version.py b/tibanna/_version.py index c8a656f9..4a14e029 100755 --- a/tibanna/_version.py +++ b/tibanna/_version.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "1.3.1" +__version__ = "1.4.1" diff --git a/tibanna/core.py b/tibanna/core.py index 90f6ce03..e5e36ea1 100755 --- a/tibanna/core.py +++ b/tibanna/core.py @@ -43,7 +43,8 @@ upload, put_object_s3, retrieve_all_keys, - delete_keys + delete_keys, + create_tibanna_suffix ) from .ec2_utils import ( UnicornInput, @@ -153,7 +154,8 @@ def randomize_run_name(self, run_name, sfn): return run_name def run_workflow(self, input_json, sfn=None, - env=None, jobid=None, sleep=3, verbose=True, open_browser=True): + env=None, jobid=None, sleep=3, verbose=True, + open_browser=True, dryrun=False): ''' input_json is either a dict or a file accession is unique name that we be part of run id @@ -209,36 +211,38 @@ def run_workflow(self, input_json, sfn=None, if verbose: logger.info("about to start run %s" % run_name) # trigger the step function to run - try: - response = client.start_execution( - stateMachineArn=STEP_FUNCTION_ARN(sfn), - name=run_name, - input=aws_input, - ) - time.sleep(sleep) - except Exception as e: - raise(e) + response = None + if not dryrun: + try: + response = client.start_execution( + stateMachineArn=STEP_FUNCTION_ARN(sfn), + name=run_name, + input=aws_input, + ) + time.sleep(sleep) + except Exception as e: + raise(e) - # trigger the cost updater step function to run - try: - costupdater_input = { - "sfn_arn": data[_tibanna]['exec_arn'], - "log_bucket": data['config']['log_bucket'], - "job_id": data['jobid'], - "aws_region": AWS_REGION - } - costupdater_input = json.dumps(costupdater_input) - costupdater_response = client.start_execution( - stateMachineArn=STEP_FUNCTION_ARN(sfn + "_costupdater"), - name=run_name, - input=costupdater_input, - ) - time.sleep(sleep) - except Exception as e: - if 'StateMachineDoesNotExist' in str(e): - pass # Tibanna was probably deployed without the cost updater - else: - raise e + # trigger the cost updater step function to run + try: + costupdater_input = { + "sfn_arn": data[_tibanna]['exec_arn'], + "log_bucket": data['config']['log_bucket'], + "job_id": data['jobid'], + "aws_region": AWS_REGION + } + costupdater_input = json.dumps(costupdater_input) + costupdater_response = client.start_execution( + stateMachineArn=STEP_FUNCTION_ARN(sfn + "_costupdater"), + name=run_name, + input=costupdater_input, + ) + time.sleep(sleep) + except Exception as e: + if 'StateMachineDoesNotExist' in str(e): + pass # Tibanna was probably deployed without the cost updater + else: + raise e # adding execution info to dynamoDB for fast search by awsem job id Job.add_to_dd(jobid, run_name, sfn, data['config']['log_bucket'], verbose=verbose) @@ -254,17 +258,17 @@ def run_workflow(self, input_json, sfn=None, cw_db_url = 'https://console.aws.amazon.com/cloudwatch/' + \ 'home?region=%s#dashboards:name=awsem-%s' % (AWS_REGION, jobid) logger.info("Cloudwatch Dashboard = %s" % cw_db_url) - if open_browser and shutil.which('open') is not None: + if open_browser and shutil.which('open') is not None and not dryrun: subprocess.call(["open", data[_tibanna]['url']]) return data def run_batch_workflows(self, input_json_list, sfn=None, - env=None, sleep=3, verbose=True, open_browser=True): + env=None, sleep=3, verbose=True, open_browser=True, dryrun=False): """given a list of input json, run multiple workflows""" run_infos = [] for input_json in input_json_list: run_info = self.run_workflow(input_json, env=env, sfn=sfn, sleep=sleep, verbose=verbose, - open_browser=False) + open_browser=False, dryrun=dryrun) run_infos.append(run_info) return run_infos @@ -625,7 +629,7 @@ def env_list(self, name): self.run_task_lambda: {'AMI_ID': AMI_ID, 'TIBANNA_REPO_NAME': TIBANNA_REPO_NAME, 'TIBANNA_REPO_BRANCH': TIBANNA_REPO_BRANCH}, - self.check_task_lambda: {} + self.check_task_lambda: {'TIBANNA_DEFAULT_STEP_FUNCTION_NAME': self.default_stepfunction_name} } if TIBANNA_PROFILE_ACCESS_KEY and TIBANNA_PROFILE_SECRET_KEY: envlist[self.run_task_lambda].update({ @@ -634,9 +638,11 @@ def env_list(self, name): ) return envlist.get(name, '') - def deploy_lambda(self, name, suffix, usergroup='', quiet=False): + def deploy_lambda(self, name, suffix, usergroup='', quiet=False, subnets=None, security_groups=None): """ - deploy a single lambda using the aws_lambda.deploy_function (BETA) + deploy a single lambda using the aws_lambda.deploy_function (BETA). + subnets and security groups are lists of subnet IDs and security group IDs, in case + you want the lambda to be deployed into specific subnets / security groups. """ import aws_lambda if name not in dir(self.lambdas_module): @@ -648,39 +654,31 @@ def deploy_lambda(self, name, suffix, usergroup='', quiet=False): envs = self.env_list(name) if envs: extra_config['Environment'] = {'Variables': envs} + if subnets or security_groups: + extra_config['VpcConfig'] = {} + if not 'Environment' in extra_config: + extra_config['Environment'] = {'Variables': {}} + if subnets: + extra_config['VpcConfig'].update({'SubnetIds': subnets}) + extra_config['Environment']['Variables'].update({'SUBNETS': ','.join(subnets)}) + if security_groups: + extra_config['VpcConfig'].update({'SecurityGroupIds': security_groups}) + extra_config['Environment']['Variables'].update({'SECURITY_GROUPS': ','.join(security_groups)}) tibanna_iam = self.IAM(usergroup) if name == self.run_task_lambda: - if usergroup: - extra_config['Environment']['Variables']['AWS_S3_ROLE_NAME'] \ - = tibanna_iam.role_name('ec2') - else: - extra_config['Environment']['Variables']['AWS_S3_ROLE_NAME'] = 'S3_access' # 4dn-dcic default(temp) + extra_config['Environment']['Variables']['AWS_S3_ROLE_NAME'] \ + = tibanna_iam.role_name('ec2') # add role logger.info('name=%s' % name) - if name in [self.run_task_lambda, self.check_task_lambda, self.update_cost_lambda]: - role_arn_prefix = 'arn:aws:iam::' + AWS_ACCOUNT_NUMBER + ':role/' - if usergroup: - role_arn = role_arn_prefix + tibanna_iam.role_name(name) - else: - role_arn = role_arn_prefix + 'lambda_full_s3' # 4dn-dcic default(temp) - logger.info("role_arn=" + role_arn) - extra_config['Role'] = role_arn - - if usergroup and suffix: - function_name_suffix = usergroup + '_' + suffix - elif suffix: - function_name_suffix = suffix - elif usergroup: - function_name_suffix = usergroup - else: - function_name_suffix = '' + role_arn_prefix = 'arn:aws:iam::' + AWS_ACCOUNT_NUMBER + ':role/' + role_arn = role_arn_prefix + tibanna_iam.role_name(name) + logger.info("role_arn=" + role_arn) + extra_config['Role'] = role_arn # first delete the existing function to avoid the weird AWS KMS lambda error function_name_prefix = getattr(self.lambdas_module, name).config.get('function_name') - if function_name_suffix: - full_function_name = function_name_prefix + '_' + function_name_suffix - else: - full_function_name = function_name_prefix + function_name_suffix = create_tibanna_suffix(suffix, usergroup) + full_function_name = function_name_prefix + function_name_suffix if name not in self.do_not_delete: try: if quiet: @@ -708,7 +706,8 @@ def deploy_lambda(self, name, suffix, usergroup='', quiet=False): requirements_fpath=requirements_fpath, extra_config=extra_config) - def deploy_core(self, name, suffix=None, usergroup='', quiet=False): + def deploy_core(self, name, suffix=None, usergroup='', subnets=None, security_groups=None, + quiet=False): """deploy/update lambdas only""" logger.info("preparing for deploy...") if name == 'all': @@ -718,7 +717,8 @@ def deploy_core(self, name, suffix=None, usergroup='', quiet=False): else: names = [name, ] for name in names: - self.deploy_lambda(name, suffix, usergroup, quiet=quiet) + self.deploy_lambda(name, suffix, usergroup, subnets=subnets, security_groups=security_groups, + quiet=quiet) def setup_tibanna_env(self, buckets='', usergroup_tag='default', no_randomize=False, do_not_delete_public_access_block=False, verbose=False): @@ -752,17 +752,19 @@ def setup_tibanna_env(self, buckets='', usergroup_tag='default', no_randomize=Fa logger.info("Tibanna usergroup %s has been created on AWS." % tibanna_iam.user_group_name) return tibanna_iam.user_group_name - def deploy_tibanna(self, suffix=None, usergroup='', setup=False, + def deploy_tibanna(self, suffix=None, usergroup='', setup=False, no_randomize=False, + default_usergroup_tag='default', buckets='', setenv=False, do_not_delete_public_access_block=False, - deploy_costupdater=False, quiet=False): + deploy_costupdater=False, subnets=None, security_groups=None, quiet=False): """deploy tibanna unicorn or pony to AWS cloud (pony is for 4DN-DCIC only)""" if setup: if usergroup: usergroup = self.setup_tibanna_env(buckets, usergroup, True, do_not_delete_public_access_block=do_not_delete_public_access_block) else: # override usergroup - usergroup = self.setup_tibanna_env(buckets, - do_not_delete_public_access_block=do_not_delete_public_access_block) + usergroup = self.setup_tibanna_env(buckets, usergroup_tag=default_usergroup_tag, + do_not_delete_public_access_block=do_not_delete_public_access_block, + no_randomize=no_randomize) # this function will remove existing step function on a conflict step_function_name = self.create_stepfunction(suffix, usergroup=usergroup) logger.info("creating a new step function... %s" % step_function_name) @@ -777,22 +779,26 @@ def deploy_tibanna(self, suffix=None, usergroup='', setup=False, outfile.write("\nexport TIBANNA_DEFAULT_STEP_FUNCTION_NAME=%s\n" % step_function_name) logger.info("deploying lambdas...") - self.deploy_core('all', suffix=suffix, usergroup=usergroup, quiet=quiet) + self.deploy_core('all', suffix=suffix, usergroup=usergroup, subnets=subnets, + security_groups=security_groups, quiet=quiet) if(deploy_costupdater): - self.deploy_lambda(self.update_cost_lambda, suffix=suffix, usergroup=usergroup, quiet=quiet) + self.deploy_lambda(self.update_cost_lambda, suffix=suffix, usergroup=usergroup, + subnets=subnets, security_groups=security_groups, quiet=quiet) dd_utils.create_dynamo_table(DYNAMODB_TABLE, DYNAMODB_KEYNAME) return step_function_name def deploy_unicorn(self, suffix=None, no_setup=False, buckets='', no_setenv=False, usergroup='', do_not_delete_public_access_block=False, - deploy_costupdater=False, quiet=False): + deploy_costupdater=False, subnets=None, security_groups=None, + quiet=False): """deploy tibanna unicorn to AWS cloud""" self.deploy_tibanna(suffix=suffix, usergroup=usergroup, setup=not no_setup, buckets=buckets, setenv=not no_setenv, do_not_delete_public_access_block=do_not_delete_public_access_block, - deploy_costupdater=deploy_costupdater, quiet=quiet) + deploy_costupdater=deploy_costupdater, subnets=subnets, + security_groups=security_groups, quiet=quiet) def add_user(self, user, usergroup): """add a user to a tibanna group""" diff --git a/tibanna/ec2_utils.py b/tibanna/ec2_utils.py index adbc0cad..48f009a2 100755 --- a/tibanna/ec2_utils.py +++ b/tibanna/ec2_utils.py @@ -487,7 +487,6 @@ def get_benchmarking(self, input_size_in_bytes): 'parameters': benchmark_parameters}) except Exception as e: try: - res raise Exception("Benchmarking not working. : {}".format(str(res))) except: raise Exception("Benchmarking not working. : None. %s" % str(e)) diff --git a/tibanna/iam_utils.py b/tibanna/iam_utils.py index dfc9a89e..c0591616 100755 --- a/tibanna/iam_utils.py +++ b/tibanna/iam_utils.py @@ -36,8 +36,6 @@ def __init__(self, user_group_tag, bucket_names='', no_randomize=True): tibanna_policy_prefix : tibanna_default_3465 / tibanna_pony_default_3465 prefix : tibanna_ / tibanna_pony_ """ - # lambda names - self.lambda_names = [self.run_task_lambda_name, self.check_task_lambda_name, self.update_cost_lambda_name] if self.lambda_type: self.prefix = 'tibanna_' + self.lambda_type + '_' else: @@ -61,6 +59,10 @@ def __init__(self, user_group_tag, bucket_names='', no_randomize=True): self.client = boto3.client('iam') self.iam = boto3.resource('iam') + @property + def lambda_names(self): + return [self.run_task_lambda_name, self.check_task_lambda_name, self.update_cost_lambda_name] + @property def iam_group_name(self): return self.tibanna_policy_prefix @@ -69,7 +71,7 @@ def iam_group_name(self): def policy_types(self): return ['bucket', 'termination', 'list', 'cloudwatch', 'passrole', 'lambdainvoke', 'cloudwatch_metric', 'cw_dashboard', 'dynamodb', 'ec2_desc', - 'executions', 'pricing'] + 'executions', 'pricing', 'vpc'] def policy_arn(self, policy_type): return 'arn:aws:iam::' + self.account_id + ':policy/' + self.policy_name(policy_type) @@ -86,7 +88,8 @@ def policy_suffix(self, policy_type): 'dynamodb': 'dynamodb', 'ec2_desc': 'ec2_desc', 'pricing': 'pricing', - 'executions': 'executions'} + 'executions': 'executions', + 'vpc': 'vpc_access'} if policy_type not in suffices: raise Exception("policy %s must be one of %s." % (policy_type, str(self.policy_types))) return suffices[policy_type] @@ -106,7 +109,8 @@ def policy_definition(self, policy_type): 'dynamodb': self.policy_dynamodb, 'ec2_desc': self.policy_ec2_desc_policy, 'pricing': self.policy_pricing, - 'executions': self.policy_executions} + 'executions': self.policy_executions, + 'vpc': self.policy_vpc_access} if policy_type not in definitions: raise Exception("policy %s must be one of %s." % (policy_type, str(self.policy_types))) return definitions[policy_type] @@ -134,12 +138,15 @@ def role_service(self, role_type): raise Exception("role_type %s must be one of %s." % (role_type, str(self.role_types))) return services[role_type] - def policy_arn_list_for_role(self, role_type): + @property + def policy_arn_list_for_role(self): + # returns a dictionary with role_type as keys + # adding vpc access to only check_task since run_task has full ec2 access run_task_custom_policy_types = ['list', 'cloudwatch', 'passrole', 'bucket', 'dynamodb', 'executions', 'cw_dashboard'] check_task_custom_policy_types = ['cloudwatch_metric', 'cloudwatch', 'bucket', 'ec2_desc', - 'termination', 'dynamodb', 'pricing'] - update_cost_custom_policy_types = ['bucket', 'executions', 'dynamodb', 'pricing'] + 'termination', 'dynamodb', 'pricing', 'vpc'] + update_cost_custom_policy_types = ['bucket', 'executions', 'dynamodb', 'pricing', 'vpc'] arnlist = {'ec2': [self.policy_arn(_) for _ in ['bucket', 'cloudwatch_metric']] + ['arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly'], # 'stepfunction': [self.policy_arn(_) for _ in ['lambdainvoke']], @@ -149,9 +156,7 @@ def policy_arn_list_for_role(self, role_type): self.check_task_lambda_name: [self.policy_arn(_) for _ in check_task_custom_policy_types], self.update_cost_lambda_name: [self.policy_arn(_) for _ in update_cost_custom_policy_types]} - if role_type not in arnlist: - raise Exception("role_type %s must be one of %s." % (role_type, str(self.role_types))) - return arnlist[role_type] + return arnlist @property def instance_profile_name(self): @@ -206,6 +211,25 @@ def policy_terminate_instances(self): } return policy + @property + def policy_vpc_access(self): + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "ec2:DescribeNetworkInterfaces", + "ec2:CreateNetworkInterface", + "ec2:AttachNetworkInterface", + "ec2:DeleteNetworkInterface" + ], + "Resource": "*" + } + ] + } + return policy + @property def policy_list_instanceprofiles(self): policy_list_instanceprofiles = { @@ -451,7 +475,7 @@ def create_role_for_role_type(self, role_type, verbose=False): role_policy_doc = self.role_policy_document(self.role_service(role_type)) self.create_role_robust(self.role_name(role_type), json.dumps(role_policy_doc), verbose) role = self.iam.Role(self.role_name(role_type)) - for p_arn in self.policy_arn_list_for_role(role_type): + for p_arn in self.policy_arn_list_for_role[role_type]: response = role.attach_policy(PolicyArn=p_arn) if verbose: logger.debug("response from IAM attach_policy :" + str(response)) diff --git a/tibanna/stepfunction.py b/tibanna/stepfunction.py index caf862d7..2c66e615 100755 --- a/tibanna/stepfunction.py +++ b/tibanna/stepfunction.py @@ -1,4 +1,5 @@ from .vars import AWS_REGION, AWS_ACCOUNT_NUMBER +from .utils import create_tibanna_suffix class StepFunctionUnicorn(object): @@ -56,17 +57,7 @@ def __init__(self, @property def lambda_suffix(self): - if self.usergroup: - if self.dev_suffix: - lambda_suffix = '_' + self.usergroup + '_' + self.dev_suffix - else: - lambda_suffix = '_' + self.usergroup - else: - if self.dev_suffix: - lambda_suffix = '_' + self.dev_suffix - else: - lambda_suffix = '' - return lambda_suffix + return create_tibanna_suffix(self.dev_suffix, self.usergroup) @property def lambda_arn_prefix(self): @@ -83,12 +74,8 @@ def iam(self): @property def sfn_role_arn(self): - if not self.usergroup: # 4dn - sfn_role_arn = "arn:aws:iam::" + self.aws_acc + \ - ":role/service-role/StatesExecutionRole-" + self.region_name - else: - sfn_role_arn = "arn:aws:iam::" + self.aws_acc + ":role/" + \ - self.iam.role_name('stepfunction') + sfn_role_arn = "arn:aws:iam::" + self.aws_acc + ":role/" + \ + self.iam.role_name('stepfunction') return sfn_role_arn @property diff --git a/tibanna/stepfunction_cost_updater.py b/tibanna/stepfunction_cost_updater.py index 4037a666..6817698c 100755 --- a/tibanna/stepfunction_cost_updater.py +++ b/tibanna/stepfunction_cost_updater.py @@ -4,6 +4,7 @@ SFN_TYPE, UPDATE_COST_LAMBDA_NAME ) +from .utils import create_tibanna_suffix class StepFunctionCostUpdater(object): @@ -21,17 +22,7 @@ def __init__(self, @property def lambda_suffix(self): - if self.usergroup: - if self.dev_suffix: - lambda_suffix = '_' + self.usergroup + '_' + self.dev_suffix - else: - lambda_suffix = '_' + self.usergroup - else: - if self.dev_suffix: - lambda_suffix = '_' + self.dev_suffix - else: - lambda_suffix = '' - return lambda_suffix + return create_tibanna_suffix(self.dev_suffix, self.usergroup) @property def lambda_arn_prefix(self): @@ -48,12 +39,8 @@ def iam(self): @property def sfn_role_arn(self): - if not self.usergroup: # 4dn - sfn_role_arn = "arn:aws:iam::" + self.aws_acc + \ - ":role/service-role/StatesExecutionRole-" + self.region_name - else: - sfn_role_arn = "arn:aws:iam::" + self.aws_acc + ":role/" + \ - self.iam.role_name('stepfunction') + sfn_role_arn = "arn:aws:iam::" + self.aws_acc + ":role/" + \ + self.iam.role_name('stepfunction') return sfn_role_arn @property diff --git a/tibanna/utils.py b/tibanna/utils.py index 151d2417..df7c55f1 100755 --- a/tibanna/utils.py +++ b/tibanna/utils.py @@ -76,6 +76,22 @@ def create_jobid(): return randomword(12) # date+random_string +def create_tibanna_suffix(suffix, usergroup): + if usergroup and suffix: + function_name_suffix = usergroup + '_' + suffix + elif suffix: + function_name_suffix = suffix + elif usergroup: + function_name_suffix = usergroup + else: + function_name_suffix = '' + + if function_name_suffix: + return '_' + function_name_suffix + else: + return '' + + def read_s3(bucket, object_name): response = boto3.client('s3').get_object(Bucket=bucket, Key=object_name) logger.debug("response_from_read_s3:" + str(response))