diff --git a/backend/dataall/modules/s3_datasets/db/dataset_repositories.py b/backend/dataall/modules/s3_datasets/db/dataset_repositories.py index bc9fe1efd..22272fd0b 100644 --- a/backend/dataall/modules/s3_datasets/db/dataset_repositories.py +++ b/backend/dataall/modules/s3_datasets/db/dataset_repositories.py @@ -202,6 +202,15 @@ def list_all_datasets(session) -> [S3Dataset]: def list_all_active_datasets(session) -> [S3Dataset]: return session.query(S3Dataset).filter(S3Dataset.deleted.is_(None)).all() + @staticmethod + def list_all_active_datasets_with_glue_db(session, dataset_uri) -> [S3Dataset]: + s3_dataset: S3Dataset = DatasetRepository.get_dataset_by_uri(session=session, dataset_uri=dataset_uri) + return ( + session.query(S3Dataset) + .filter(and_(S3Dataset.deleted.is_(None), S3Dataset.GlueDatabaseName == s3_dataset.GlueDatabaseName)) + .all() + ) + @staticmethod def get_dataset_by_bucket_name(session, bucket) -> [S3Dataset]: return session.query(S3Dataset).filter(S3Dataset.S3BucketName == bucket).first() diff --git a/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py b/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py index 0e6a30c87..0c28474e0 100644 --- a/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py +++ b/backend/dataall/modules/s3_datasets_shares/db/s3_share_object_repositories.py @@ -149,22 +149,25 @@ def check_other_approved_share_item_table_exists(session, environment_uri, item_ return query.first() @staticmethod - def check_existing_shared_items_of_type(session, uri, item_type): - share: ShareObject = ShareObjectRepository.get_share_by_uri(session, uri) + def check_existing_shares_on_items_for_principal(session, item_type, principal, database): + shares: List[ShareObject] = ShareObjectRepository.get_shares_for_principal_and_database( + session=session, principal=principal, database=database + ) share_item_shared_states = ShareStatusRepository.get_share_item_shared_states() - shared_items = ( - session.query(ShareObjectItem) - .filter( - and_( - ShareObjectItem.shareUri == share.shareUri, - ShareObjectItem.itemType == item_type, - ShareObjectItem.status.in_(share_item_shared_states), + for share in shares: + shared_items = ( + session.query(ShareObjectItem) + .filter( + and_( + ShareObjectItem.shareUri == share.shareUri, + ShareObjectItem.itemType == item_type, + ShareObjectItem.status.in_(share_item_shared_states), + ) ) + .all() ) - .all() - ) - if shared_items: - return True + if shared_items: + return True return False @staticmethod diff --git a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py index 97282b574..242175166 100644 --- a/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py +++ b/backend/dataall/modules/s3_datasets_shares/services/share_processors/glue_table_share_processor.py @@ -3,6 +3,10 @@ from warnings import warn from datetime import datetime from dataall.core.environment.services.environment_service import EnvironmentService +from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository +from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository +from dataall.modules.s3_datasets.api.dataset.types import Dataset +from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, ShareItemStatus, @@ -10,7 +14,7 @@ ShareItemActions, ShareableType, ) -from dataall.modules.s3_datasets.db.dataset_models import DatasetTable +from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset from dataall.modules.shares_base.db.share_object_models import ShareObjectItemDataFilter from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound from dataall.modules.s3_datasets_shares.services.share_managers import LFShareManager @@ -324,25 +328,36 @@ def process_revoked_shares(self) -> bool: try: if self.tables: - existing_shared_tables_in_share = S3ShareObjectRepository.check_existing_shared_items_of_type( - session=self.session, uri=self.share_data.share.shareUri, item_type=ShareableType.Table.value + s3_dataset = DatasetRepository.get_dataset_by_uri(session=self.session, dataset_uri=self.share_data.dataset.datasetUri) + + # Find any share items which exist between the principal and the dataset db. + # Please note - a single db can be used across various dataset. This not only finds share items related to the current share under process but also any other share where the gluedb and the principal is used. + existing_shared_tables_in_shares = S3ShareObjectRepository.check_existing_shares_on_items_for_principal( + session=self.session, item_type=ShareableType.Table.value, principal=self.share_data.share.principalRoleName, database=s3_dataset.GlueDatabaseName ) - log.info(f'Remaining tables shared in this share object = {existing_shared_tables_in_share}') + log.info(f'Remaining tables shared on the database: {s3_dataset.GlueDatabaseName} and principal: {self.share_data.share.principalRoleName} = {existing_shared_tables_in_shares}') - if not existing_shared_tables_in_share: + if not existing_shared_tables_in_shares: log.info('Revoking permissions to target shared database...') manager.revoke_principals_database_permissions_to_shared_database() + share_item_shared_states = ShareStatusRepository.get_share_item_shared_states() - existing_shares_with_shared_tables_in_environment = ( - ShareObjectRepository.list_dataset_shares_with_existing_shared_items( + + # Find all the shares where the database name is used + # Please note - a single db can be used across various dataset ( and also the same db can be used in different environment ). This will fetch all the dataset shares where the glueDB name is used. + existing_shares_with_shared_tables_for_database = ( + ShareObjectRepository.list_dataset_shares_on_database( session=self.session, dataset_uri=self.share_data.dataset.datasetUri, share_item_shared_states=share_item_shared_states, - environment_uri=self.share_data.target_environment.environmentUri, item_type=ShareableType.Table.value, + database=s3_dataset.GlueDatabaseName ) ) - if not len(existing_shares_with_shared_tables_in_environment): + + log.info(f'Existing shares with database: {s3_dataset.GlueDatabaseName} = {existing_shares_with_shared_tables_for_database}. Skipping deleting shared database') + + if not len(existing_shares_with_shared_tables_for_database): log.info('Deleting target shared database...') manager.delete_shared_database_in_target() except Exception as e: @@ -553,13 +568,16 @@ def cleanup_shares(self) -> bool: log.info('Revoking permissions to target shared database...') execute_and_suppress_exception(func=manager.revoke_principals_database_permissions_to_shared_database) share_item_shared_states = ShareStatusRepository.get_share_item_shared_states() + s3_dataset = DatasetRepository.get_dataset_by_uri(session=self.session, + dataset_uri=self.share_data.dataset.datasetUri) + existing_shares_with_shared_tables_in_environment = ( - ShareObjectRepository.list_dataset_shares_with_existing_shared_items( + ShareObjectRepository.list_dataset_shares_on_database( session=self.session, dataset_uri=self.share_data.dataset.datasetUri, share_item_shared_states=share_item_shared_states, - environment_uri=self.share_data.target_environment.environmentUri, item_type=ShareableType.Table.value, + database=s3_dataset.GlueDatabaseName ) ) if not len(existing_shares_with_shared_tables_in_environment): diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index 5d1f0ff8e..2c70c46b6 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -10,6 +10,7 @@ from dataall.modules.datasets_base.db.dataset_models import DatasetBase from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository from dataall.modules.notifications.db.notification_models import Notification +from dataall.modules.s3_datasets.db.dataset_models import S3Dataset from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject from dataall.modules.shares_base.services.shares_enums import ( ShareItemHealthStatus, @@ -76,6 +77,29 @@ def list_dataset_shares_with_existing_shared_items( query = query.filter(ShareObjectItem.itemType == item_type) return query.all() + @staticmethod + def list_dataset_shares_on_database( + session, dataset_uri, share_item_shared_states, environment_uri=None, item_type=None, database=None + ) -> [ShareObject]: + query = ( + session.query(ShareObject) + .join(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri) + .join(S3Dataset, S3Dataset.datasetUri == dataset_uri) + .filter( + and_( + S3Dataset.GlueDatabaseName == database, + ShareObject.deleted.is_(None), + ShareObjectItem.status.in_(share_item_shared_states), + ) + ) + ) + + if environment_uri: + query = query.filter(ShareObject.environmentUri == environment_uri) + if item_type: + query = query.filter(ShareObjectItem.itemType == item_type) + return query.all() + @staticmethod def find_sharable_item(session, share_uri, item_uri) -> ShareObjectItem: return ( @@ -108,6 +132,14 @@ def get_share_item_by_uri(session, uri): def get_share_item_details(session, share_type_model, item_uri): return session.query(share_type_model).get(item_uri) + @staticmethod + def get_shares_for_principal_and_database(session, principal, database): + return ( + session.query(ShareObject) + .join(S3Dataset, S3Dataset.datasetUri == ShareObject.datasetUri) + .filter(and_(S3Dataset.GlueDatabaseName == database, ShareObject.principalRoleName == principal)) + ) + @staticmethod def remove_share_object_item(session, share_item): session.delete(share_item) diff --git a/backend/dataall/modules/shares_base/services/sharing_service.py b/backend/dataall/modules/shares_base/services/sharing_service.py index dd8749bb5..dba36c525 100644 --- a/backend/dataall/modules/shares_base/services/sharing_service.py +++ b/backend/dataall/modules/shares_base/services/sharing_service.py @@ -5,6 +5,10 @@ from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository from dataall.base.db import Engine from dataall.core.environment.db.environment_models import ConsumptionRole, Environment, EnvironmentGroup +from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository +from dataall.modules.s3_datasets.db.dataset_models import S3Dataset +from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository +from dataall.modules.datasets_base.db.dataset_models import DatasetBase from dataall.modules.shares_base.db.share_object_state_machines import ( ShareObjectSM, ShareItemSM, @@ -14,7 +18,7 @@ ShareObjectActions, ShareItemActions, ShareItemStatus, - PrincipalType, + PrincipalType, ShareableType, ) from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository @@ -174,6 +178,29 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: ) ) + # If any share item is a table, get the additional dataset which have to be locked + is_table_item = any([share_item.itemType == ShareableType.Table.value for share_item in share_items]) + log.info(f'Found dataset table item in the share with uri={share_data.share.shareUri} : {is_table_item}') + if is_table_item: + # Find out all the datasets where the same db is used and lock all those datasets + # With this any possible override from other share will be avoided. See the https://github.com/data-dot-all/dataall/issues/1633 for more details on this. + s3_datasets_with_common_db: [S3Dataset] = DatasetRepository.list_all_active_datasets_with_glue_db( + session=session, dataset_uri=share_data.dataset.datasetUri + ) + dataset_base_with_common_db: [DatasetBase] = [ + DatasetBaseRepository.get_dataset_by_uri(session=session, dataset_uri=s3_dataset.datasetUri) + for s3_dataset in s3_datasets_with_common_db + ] + + log.info(f'Found {len(dataset_base_with_common_db)} datasets where same glue database is used') + additional_resources_to_lock = [ + (dataset.datasetUri, dataset.__tablename__) + for dataset in dataset_base_with_common_db + if dataset.datasetUri != share_data.dataset.datasetUri + ] + resources.extend(additional_resources_to_lock) + log.info(f'Resources to be locked while revoking: {resources}') + revoke_successful = True try: with ResourceLockRepository.acquire_lock_with_retry( @@ -400,7 +427,10 @@ def _get_share_data_and_items(session, share_uri, status=None, healthStatus=None source_env_group=data[4], env_group=data[5], ) + status_list = [status] if status is not None else [] + healthStatus_list = [healthStatus] if healthStatus is not None else [] + share_items = ShareObjectRepository.get_all_share_items_in_share( - session=session, share_uri=share_uri, status=[status], healthStatus=[healthStatus] + session=session, share_uri=share_uri, status=status_list, healthStatus=healthStatus_list ) return share_data, share_items