From 7adc3a69a4e3457b304888c2d91e51925f5182e0 Mon Sep 17 00:00:00 2001 From: Tejas Rajopadhye Date: Thu, 12 Dec 2024 09:57:39 -0600 Subject: [PATCH] bugfix for gh-1734 --- .../db/dataset_table_repositories.py | 29 ++++++++++++++++++- .../services/dataset_table_service.py | 3 +- .../db/share_object_repositories.py | 24 ++++++++++++--- 3 files changed, 49 insertions(+), 7 deletions(-) diff --git a/backend/dataall/modules/s3_datasets/db/dataset_table_repositories.py b/backend/dataall/modules/s3_datasets/db/dataset_table_repositories.py index 6fd86610b..208aaa5dc 100644 --- a/backend/dataall/modules/s3_datasets/db/dataset_table_repositories.py +++ b/backend/dataall/modules/s3_datasets/db/dataset_table_repositories.py @@ -1,9 +1,11 @@ import logging from datetime import datetime +from typing import List from sqlalchemy.sql import and_ from dataall.base.db import exceptions +from dataall.core.activity.db.activity_models import Activity from dataall.modules.s3_datasets.db.dataset_models import ( DatasetTableColumn, DatasetTable, @@ -11,6 +13,9 @@ DatasetTableDataFilter, ) from dataall.base.utils import json_utils +from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository +from dataall.modules.shares_base.db.share_object_models import ShareObjectItem +from dataall.modules.shares_base.services.shares_enums import ShareItemStatus logger = logging.getLogger(__name__) @@ -61,11 +66,33 @@ def get_dataset_table_by_uri(session, table_uri): return table @staticmethod - def update_existing_tables_status(existing_tables, glue_tables): + def update_existing_tables_status(existing_tables, glue_tables, session): for existing_table in existing_tables: if existing_table.GlueTableName not in [t['Name'] for t in glue_tables]: existing_table.LastGlueTableStatus = 'Deleted' logger.info(f'Existing Table {existing_table.GlueTableName} status set to Deleted from Glue') + # Once the table item is deleted from glue and no longer part of the dataset + # Find out where this item is used in shares and delete all the share items. + share_item_status_filter = [ShareItemStatus.Share_Succeeded.value] + share_object_items: List[ShareObjectItem] = ( + ShareObjectRepository.list_share_object_items_for_item_with_status( + session=session, item_uri=existing_table.tableUri, status=share_item_status_filter + ) + ) + logger.info( + f'Found {len(share_object_items)} share objects where the table {existing_table.tableUri} is present as a share item in state: {share_item_status_filter}. Deleting those share items' + ) + for share_object_item in share_object_items: + activity = Activity( + action='SHARE_OBJECT_ITEM:DELETE', + label='SHARE_OBJECT_ITEM:DELETE', + owner='dataall-automation', + summary=f'dataall-automation deleted share object: {share_object_item.itemName} with uri: {share_object_item.itemUri} since the glue table associated was deleted from source glue db', + targetUri=share_object_item.itemUri, + targetType='share_object_item', + ) + session.add(activity) + session.delete(share_object_item) elif ( existing_table.GlueTableName in [t['Name'] for t in glue_tables] and existing_table.LastGlueTableStatus == 'Deleted' diff --git a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py index 386ab252e..e5cb4eff7 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_table_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_table_service.py @@ -140,8 +140,7 @@ def sync_existing_tables(session, uri, glue_tables=None): existing_tables = DatasetTableRepository.find_dataset_tables(session, uri) existing_table_names = [e.GlueTableName for e in existing_tables] existing_dataset_tables_map = {t.GlueTableName: t for t in existing_tables} - - DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables) + DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables, session) log.info(f'existing_tables={glue_tables}') for table in glue_tables: if table['Name'] not in existing_table_names: diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index 5d1f0ff8e..4d70faf30 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -10,11 +10,9 @@ from dataall.modules.datasets_base.db.dataset_models import DatasetBase from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository from dataall.modules.notifications.db.notification_models import Notification +from dataall.modules.s3_datasets.db.dataset_models import S3Dataset from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject -from dataall.modules.shares_base.services.shares_enums import ( - ShareItemHealthStatus, - PrincipalType, -) +from dataall.modules.shares_base.services.shares_enums import ShareItemHealthStatus, PrincipalType, ShareableType logger = logging.getLogger(__name__) @@ -108,6 +106,14 @@ def get_share_item_by_uri(session, uri): def get_share_item_details(session, share_type_model, item_uri): return session.query(share_type_model).get(item_uri) + @staticmethod + def get_shares_for_principal_and_database(session, principal, database): + return ( + session.query(ShareObject) + .join(S3Dataset, S3Dataset.datasetUri == ShareObject.datasetUri) + .filter(and_(S3Dataset.GlueDatabaseName == database, ShareObject.principalIAMRoleName == principal)) + ) + @staticmethod def remove_share_object_item(session, share_item): session.delete(share_item) @@ -411,6 +417,8 @@ def list_shareable_items_of_type(session, share, type, share_type_model, share_t ) if status: query = query.filter(ShareObjectItem.status.in_(status)) + if type == ShareableType.Table: + query = query.filter(share_type_model.LastGlueTableStatus == 'InSync') return query @staticmethod @@ -455,6 +463,14 @@ def list_active_share_object_for_dataset(session, dataset_uri): ) return share_objects + @staticmethod + def list_share_object_items_for_item_with_status(session, item_uri: str, status: List[str]): + return ( + session.query(ShareObjectItem) + .filter(ShareObjectItem.status.in_(status), ShareObjectItem.itemUri == item_uri) + .all() + ) + @staticmethod def fetch_submitted_shares_with_notifications(session): """