Skip to content

Commit

Permalink
V1.5.0 Features (#409)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- V1.5.0 Features. Check each PR for a complete description of the
feature.

### Detail
- #292 
- #355 
- #337 
- #427 
- #431 

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: kukushking <[email protected]>
Co-authored-by: Dariusz Osiennik <[email protected]>
Co-authored-by: Noah Paige <[email protected]>
Co-authored-by: Dennis Goldner <[email protected]>
  • Loading branch information
5 people authored Apr 25, 2023
1 parent 6460986 commit 219553f
Show file tree
Hide file tree
Showing 73 changed files with 2,834 additions and 1,939 deletions.
Binary file modified UserGuide.pdf
Binary file not shown.
19 changes: 19 additions & 0 deletions backend/dataall/api/Objects/Dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ....aws.handlers.service_handlers import Worker
from ....aws.handlers.sts import SessionHelper
from ....aws.handlers.sns import Sns
from ....aws.handlers.quicksight import Quicksight
from ....db import paginate, exceptions, permissions, models
from ....db.api import Dataset, Environment, ShareObject, ResourcePolicy
from ....db.api.organization import Organization
Expand All @@ -22,8 +23,20 @@
log = logging.getLogger(__name__)


def check_dataset_account(environment):
if environment.dashboardsEnabled:
quicksight_subscription = Quicksight.check_quicksight_enterprise_subscription(AwsAccountId=environment.AwsAccountId)
if quicksight_subscription:
group = Quicksight.create_quicksight_group(AwsAccountId=environment.AwsAccountId)
return True if group else False
return True


def create_dataset(context: Context, source, input=None):
with context.engine.scoped_session() as session:
environment = Environment.get_environment_by_uri(session, input.get('environmentUri'))
check_dataset_account(environment=environment)

dataset = Dataset.create_dataset(
session=session,
username=context.username,
Expand Down Expand Up @@ -56,6 +69,9 @@ def import_dataset(context: Context, source, input=None):
raise exceptions.RequiredParameter('group')

with context.engine.scoped_session() as session:
environment = Environment.get_environment_by_uri(session, input.get('environmentUri'))
check_dataset_account(environment=environment)

dataset = Dataset.create_dataset(
session=session,
username=context.username,
Expand Down Expand Up @@ -212,6 +228,9 @@ def get_dataset_stewards_group(context, source: models.Dataset, **kwargs):

def update_dataset(context, source, datasetUri: str = None, input: dict = None):
with context.engine.scoped_session() as session:
dataset = Dataset.get_dataset_by_uri(session, datasetUri)
environment = Environment.get_environment_by_uri(session, dataset.environmentUri)
check_dataset_account(environment=environment)
updated_dataset = Dataset.update_dataset(
session=session,
username=context.username,
Expand Down
46 changes: 39 additions & 7 deletions backend/dataall/api/Objects/Environment/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ....aws.handlers.quicksight import Quicksight
from ....aws.handlers.cloudformation import CloudFormation
from ....aws.handlers.iam import IAM
from ....aws.handlers.parameter_store import ParameterStoreManager
from ....db import exceptions, permissions
from ....db.api import Environment, ResourcePolicy, Stack
from ....utils.naming_convention import (
Expand All @@ -30,16 +31,39 @@ def get_trust_account(context: Context, source, **kwargs):
return current_account


def check_environment(context: Context, source, input=None):
def get_pivot_role_as_part_of_environment(context: Context, source, **kwargs):
ssm_param = ParameterStoreManager.get_parameter_value(region=os.getenv('AWS_REGION', 'eu-west-1'), parameter_path=f"/dataall/{os.getenv('envname', 'local')}/pivotRole/enablePivotRoleAutoCreate")
return True if ssm_param == "True" else False


def check_environment(context: Context, source, account_id, region):
""" Checks necessary resources for environment deployment.
- Check CDKToolkit exists in Account assuming cdk_look_up_role
- Check Pivot Role exists in Account if pivot_role_as_part_of_environment is False
Args:
input: environment creation input
"""
pivot_role_as_part_of_environment = get_pivot_role_as_part_of_environment(context, source)
log.info(f"Creating environment. Pivot role as part of environment = {pivot_role_as_part_of_environment}")
ENVNAME = os.environ.get('envname', 'local')
if ENVNAME == 'pytest':
return 'CdkRoleName'
account = input.get('AwsAccountId')
region = input.get('region')
cdk_role_name = CloudFormation.check_existing_cdk_toolkit_stack(AwsAccountId=account, region=region)

if input.get('dashboardsEnabled'):
existing_quicksight = Quicksight.check_quicksight_enterprise_subscription(AwsAccountId=account)
cdk_look_up_role_arn = SessionHelper.get_cdk_look_up_role_arn(
accountid=account_id, region=region
)
cdk_role_name = CloudFormation.check_existing_cdk_toolkit_stack(
AwsAccountId=account_id, region=region
)
if not pivot_role_as_part_of_environment:
log.info("Check if PivotRole exist in the account")
pivot_role_arn = SessionHelper.get_delegation_role_arn(accountid=account_id)
role = IAM.get_role(account_id=account_id, role_arn=pivot_role_arn, role=cdk_look_up_role_arn)
if not role:
raise exceptions.AWSResourceNotFound(
action='CHECK_PIVOT_ROLE',
message='Pivot Role has not been created in the Environment AWS Account',
)

return cdk_role_name

Expand All @@ -52,7 +76,10 @@ def create_environment(context: Context, source, input=None):
)

with context.engine.scoped_session() as session:
cdk_role_name = check_environment(context, source, input=input)
cdk_role_name = check_environment(context, source,
account_id=input.get('AwsAccountId'),
region=input.get('region')
)
input['cdk_role_name'] = cdk_role_name
env = Environment.create_environment(
session=session,
Expand Down Expand Up @@ -86,6 +113,11 @@ def update_environment(
with context.engine.scoped_session() as session:

environment = db.api.Environment.get_environment_by_uri(session, environmentUri)
cdk_role_name = check_environment(context, source,
account_id=environment.AwsAccountId,
region=environment.region
)

previous_resource_prefix = environment.resourcePrefix

environment = db.api.Environment.update_environment(
Expand Down
20 changes: 9 additions & 11 deletions backend/dataall/aws/handlers/cloudformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,18 @@ def __init__(self):
pass

@staticmethod
def client(AwsAccountId, region):
session = SessionHelper.remote_session(AwsAccountId)
def client(AwsAccountId, region, role=None):
session = SessionHelper.remote_session(accountid=AwsAccountId, role=role)
return session.client('cloudformation', region_name=region)

@staticmethod
def check_existing_cdk_toolkit_stack(AwsAccountId, region):
cfn = CloudFormation.client(AwsAccountId=AwsAccountId, region=region)
role = SessionHelper.get_cdk_look_up_role_arn(accountid=AwsAccountId, region=region)
try:
cfn = CloudFormation.client(AwsAccountId=AwsAccountId, region=region, role=role)
response = cfn.describe_stacks(StackName='CDKToolkit')
except cfn.exceptions.ClientError as e:
print(e)
raise Exception('CDKToolkitNotFound')

stacks = response['Stacks']
if not len(stacks):
except ClientError as e:
log.exception(f'CDKToolkitNotFound: {e}')
raise Exception('CDKToolkitNotFound')

try:
Expand All @@ -39,8 +36,9 @@ def check_existing_cdk_toolkit_stack(AwsAccountId, region):
)
cdk_role_name = response['StackResourceDetail']['PhysicalResourceId']
return cdk_role_name
except cfn.exceptions.ClientError as e:
raise Exception('CDKToolkitDeploymentActionRoleNotFound')
except ClientError as e:
log.exception(f'CDKToolkitDeploymentActionRoleNotFound: {e}')
raise Exception(f'CDKToolkitDeploymentActionRoleNotFound: {e}')

@staticmethod
@Worker.handler(path='cloudformation.stack.delete')
Expand Down
26 changes: 26 additions & 0 deletions backend/dataall/aws/handlers/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging

from .sts import SessionHelper


log = logging.getLogger(__name__)


class EC2:
@staticmethod
def client(account_id: str, region: str, role=None):
session = SessionHelper.remote_session(accountid=account_id, role=role)
return session.client('ec2', region_name=region)

@staticmethod
def check_default_vpc_exists(AwsAccountId: str, region: str, role=None):
log.info("Check that default VPC exists..")
client = EC2.client(account_id=AwsAccountId, region=region, role=role)
response = client.describe_vpcs(
Filters=[{'Name': 'isDefault', 'Values': ['true']}]
)
vpcs = response['Vpcs']
log.info(f"Default VPCs response: {vpcs}")
if vpcs:
return True
return False
34 changes: 17 additions & 17 deletions backend/dataall/aws/handlers/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def approve_share(engine, task: models.Task):
return DataSharingService.approve_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'approve_share'
envname=envname, share_uri=task.targetUri, handler='approve_share'
)

@staticmethod
Expand All @@ -37,7 +37,7 @@ def revoke_share(engine, task: models.Task):
return DataSharingService.revoke_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'revoke_share'
envname=envname, share_uri=task.targetUri, handler='revoke_share'
)

@staticmethod
Expand All @@ -56,12 +56,12 @@ def run_share_management_ecs_task(envname, share_uri, handler):

try:
Ecs.run_ecs_task(
cluster_name,
share_task_definition,
container_name,
security_groups,
subnets,
[
cluster_name=cluster_name,
task_definition=share_task_definition,
container_name=container_name,
security_groups=security_groups,
subnets=subnets,
environment=[
{'name': 'shareUri', 'value': share_uri},
{'name': 'envname', 'value': envname},
{'name': 'handler', 'value': handler},
Expand All @@ -88,13 +88,13 @@ def deploy_stack(engine, task: models.Task):
env=envname, path='ecs/cluster/name'
)

while Ecs.is_task_running(cluster_name, f'awsworker-{task.targetUri}'):
while Ecs.is_task_running(cluster_name=cluster_name, started_by=f'awsworker-{task.targetUri}'):
log.info(
f'ECS task for stack stack-{task.targetUri} is running waiting for 30 seconds before retrying...'
)
time.sleep(30)

stack.EcsTaskArn = Ecs.run_cdkproxy_task(task.targetUri)
stack.EcsTaskArn = Ecs.run_cdkproxy_task(stack_uri=task.targetUri)

@staticmethod
def run_cdkproxy_task(stack_uri):
Expand All @@ -112,20 +112,20 @@ def run_cdkproxy_task(stack_uri):
)
try:
task_arn = Ecs.run_ecs_task(
cluster_name,
cdkproxy_task_definition,
container_name,
security_groups,
subnets,
[
cluster_name=cluster_name,
task_definition=cdkproxy_task_definition,
container_name=container_name,
security_groups=security_groups,
subnets=subnets,
environment=[
{'name': 'stackUri', 'value': stack_uri},
{'name': 'envname', 'value': envname},
{
'name': 'AWS_REGION',
'value': os.getenv('AWS_REGION', 'eu-west-1'),
},
],
f'awsworker-{stack_uri}',
started_by=f'awsworker-{stack_uri}',
)
log.info(f'ECS Task {task_arn} running')
return task_arn
Expand Down
1 change: 0 additions & 1 deletion backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ def get_job_runs(engine, task: models.Task):
glue_client = aws.client('glue', region_name=Data_pipeline.region)
try:
response = glue_client.get_job_runs(JobName=Data_pipeline.name)
print(response)
except ClientError as e:
log.warning(f'Could not retrieve pipeline runs , {str(e)}')
return []
Expand Down
12 changes: 5 additions & 7 deletions backend/dataall/aws/handlers/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@

class IAM:
@staticmethod
def client(account_id: str):
session = SessionHelper.remote_session(account_id)
def client(account_id: str, role=None):
session = SessionHelper.remote_session(accountid=account_id, role=role)
return session.client('iam')

@staticmethod
def get_role(
account_id: str,
role_arn: str
):
def get_role(account_id: str, role_arn: str, role=None):
log.info(f"Getting IAM role = {role_arn}")
try:
iamcli = IAM.client(account_id)
iamcli = IAM.client(account_id=account_id, role=role)
response = iamcli.get_role(
RoleName=role_arn.split("/")[-1]
)
Expand Down
17 changes: 8 additions & 9 deletions backend/dataall/aws/handlers/lakeformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,32 @@
from .sts import SessionHelper

log = logging.getLogger('aws:lakeformation')
PIVOT_ROLE_NAME_PREFIX = "datallPivotRole"


class LakeFormation:
def __init__(self):
pass

@staticmethod
def describe_resource(resource_arn, role_arn, accountid, region):
def check_existing_lf_registered_location(resource_arn, accountid, region):
"""
Describes a LF data location
Checks if there is a non-dataall-created registered location for the Dataset
Returns False is already existing location else return the resource info
"""
try:
session = SessionHelper.remote_session(accountid)
lf_client = session.client('lakeformation', region_name=region)

response = lf_client.describe_resource(ResourceArn=resource_arn)

log.info(f'LF data location already registered: {response}, checking if data.all registered it ...')
if response['ResourceInfo']['RoleArn'] == role_arn:
registered_role_name = response['ResourceInfo']['RoleArn'].lstrip(f"arn:aws:iam::{accountid}:role/")
log.info(f'LF data location already registered: {response}, registered with role {registered_role_name}')
if registered_role_name.startswith(PIVOT_ROLE_NAME_PREFIX):
log.info('The existing data location was created as part of the dataset stack. There was no pre-existing data location.')
return False
return response['ResourceInfo']

except ClientError as e:
log.info(
f'LF data location for resource {resource_arn} not found due to {e}'
)
log.info(f'LF data location for resource {resource_arn} not found due to {e}')
return False

@staticmethod
Expand Down
6 changes: 4 additions & 2 deletions backend/dataall/aws/handlers/parameter_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ def __init__(self):
pass

@staticmethod
def client(AwsAccountId=None, region=None):
def client(AwsAccountId=None, region=None, role=None):
if AwsAccountId:
session = SessionHelper.remote_session(AwsAccountId)
log.info(f"SSM Parameter remote session with role:{role if role else 'PivotRole'}")
session = SessionHelper.remote_session(accountid=AwsAccountId, role=role)
else:
log.info("SSM Parameter session in central account")
session = SessionHelper.get_session()
return session.client('ssm', region_name=region)

Expand Down
Loading

0 comments on commit 219553f

Please sign in to comment.