Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] - Changes in logic to delete share db #1706

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,15 @@ def list_all_datasets(session) -> [S3Dataset]:
def list_all_active_datasets(session) -> [S3Dataset]:
return session.query(S3Dataset).filter(S3Dataset.deleted.is_(None)).all()

@staticmethod
def list_all_active_datasets_with_glue_db(session, dataset_uri) -> [S3Dataset]:
s3_dataset: S3Dataset = DatasetRepository.get_dataset_by_uri(session=session, dataset_uri=dataset_uri)
return (
session.query(S3Dataset)
.filter(and_(S3Dataset.deleted.is_(None), S3Dataset.GlueDatabaseName == s3_dataset.GlueDatabaseName))
.all()
)

@staticmethod
def get_dataset_by_bucket_name(session, bucket) -> [S3Dataset]:
return session.query(S3Dataset).filter(S3Dataset.S3BucketName == bucket).first()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,22 +149,25 @@ def check_other_approved_share_item_table_exists(session, environment_uri, item_
return query.first()

@staticmethod
def check_existing_shared_items_of_type(session, uri, item_type):
share: ShareObject = ShareObjectRepository.get_share_by_uri(session, uri)
def check_existing_shares_on_items_for_principal(session, item_type, principal, database):
shares: List[ShareObject] = ShareObjectRepository.get_shares_for_principal_and_database(
session=session, principal=principal, database=database
)
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
shared_items = (
session.query(ShareObjectItem)
.filter(
and_(
ShareObjectItem.shareUri == share.shareUri,
ShareObjectItem.itemType == item_type,
ShareObjectItem.status.in_(share_item_shared_states),
for share in shares:
shared_items = (
session.query(ShareObjectItem)
.filter(
and_(
ShareObjectItem.shareUri == share.shareUri,
ShareObjectItem.itemType == item_type,
ShareObjectItem.status.in_(share_item_shared_states),
)
)
.all()
)
.all()
)
if shared_items:
return True
if shared_items:
return True
return False

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
from warnings import warn
from datetime import datetime
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.s3_datasets.api.dataset.types import Dataset
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
ShareItemStatus,
ShareObjectActions,
ShareItemActions,
ShareableType,
)
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
from dataall.modules.shares_base.db.share_object_models import ShareObjectItemDataFilter
from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound
from dataall.modules.s3_datasets_shares.services.share_managers import LFShareManager
Expand Down Expand Up @@ -324,25 +328,36 @@ def process_revoked_shares(self) -> bool:

try:
if self.tables:
existing_shared_tables_in_share = S3ShareObjectRepository.check_existing_shared_items_of_type(
session=self.session, uri=self.share_data.share.shareUri, item_type=ShareableType.Table.value
s3_dataset = DatasetRepository.get_dataset_by_uri(session=self.session, dataset_uri=self.share_data.dataset.datasetUri)

# Find any share items which exist between the principal and the dataset db.
# Please note - a single db can be used across various dataset. This not only finds share items related to the current share under process but also any other share where the gluedb and the principal is used.
existing_shared_tables_in_shares = S3ShareObjectRepository.check_existing_shares_on_items_for_principal(
session=self.session, item_type=ShareableType.Table.value, principal=self.share_data.share.principalRoleName, database=s3_dataset.GlueDatabaseName
)
log.info(f'Remaining tables shared in this share object = {existing_shared_tables_in_share}')
log.info(f'Remaining tables shared on the database: {s3_dataset.GlueDatabaseName} and principal: {self.share_data.share.principalRoleName} = {existing_shared_tables_in_shares}')

if not existing_shared_tables_in_share:
if not existing_shared_tables_in_shares:
log.info('Revoking permissions to target shared database...')
manager.revoke_principals_database_permissions_to_shared_database()

share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
existing_shares_with_shared_tables_in_environment = (
ShareObjectRepository.list_dataset_shares_with_existing_shared_items(

# Find all the shares where the database name is used
# Please note - a single db can be used across various dataset ( and also the same db can be used in different environment ). This will fetch all the dataset shares where the glueDB name is used.
existing_shares_with_shared_tables_for_database = (
ShareObjectRepository.list_dataset_shares_on_database(
session=self.session,
dataset_uri=self.share_data.dataset.datasetUri,
share_item_shared_states=share_item_shared_states,
environment_uri=self.share_data.target_environment.environmentUri,
item_type=ShareableType.Table.value,
database=s3_dataset.GlueDatabaseName
)
)
if not len(existing_shares_with_shared_tables_in_environment):

log.info(f'Existing shares with database: {s3_dataset.GlueDatabaseName} = {existing_shares_with_shared_tables_for_database}. Skipping deleting shared database')

if not len(existing_shares_with_shared_tables_for_database):
log.info('Deleting target shared database...')
manager.delete_shared_database_in_target()
except Exception as e:
Expand Down Expand Up @@ -553,13 +568,16 @@ def cleanup_shares(self) -> bool:
log.info('Revoking permissions to target shared database...')
execute_and_suppress_exception(func=manager.revoke_principals_database_permissions_to_shared_database)
share_item_shared_states = ShareStatusRepository.get_share_item_shared_states()
s3_dataset = DatasetRepository.get_dataset_by_uri(session=self.session,
dataset_uri=self.share_data.dataset.datasetUri)

existing_shares_with_shared_tables_in_environment = (
ShareObjectRepository.list_dataset_shares_with_existing_shared_items(
ShareObjectRepository.list_dataset_shares_on_database(
session=self.session,
dataset_uri=self.share_data.dataset.datasetUri,
share_item_shared_states=share_item_shared_states,
environment_uri=self.share_data.target_environment.environmentUri,
item_type=ShareableType.Table.value,
database=s3_dataset.GlueDatabaseName
)
)
if not len(existing_shares_with_shared_tables_in_environment):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.s3_datasets.db.dataset_models import S3Dataset
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
Expand Down Expand Up @@ -76,6 +77,29 @@ def list_dataset_shares_with_existing_shared_items(
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()

@staticmethod
def list_dataset_shares_on_database(
session, dataset_uri, share_item_shared_states, environment_uri=None, item_type=None, database=None
) -> [ShareObject]:
query = (
session.query(ShareObject)
.join(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.join(S3Dataset, S3Dataset.datasetUri == dataset_uri)
.filter(
and_(
S3Dataset.GlueDatabaseName == database,
ShareObject.deleted.is_(None),
ShareObjectItem.status.in_(share_item_shared_states),
)
)
)

if environment_uri:
query = query.filter(ShareObject.environmentUri == environment_uri)
if item_type:
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()

@staticmethod
def find_sharable_item(session, share_uri, item_uri) -> ShareObjectItem:
return (
Expand Down Expand Up @@ -108,6 +132,14 @@ def get_share_item_by_uri(session, uri):
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

@staticmethod
def get_shares_for_principal_and_database(session, principal, database):
return (
session.query(ShareObject)
.join(S3Dataset, S3Dataset.datasetUri == ShareObject.datasetUri)
.filter(and_(S3Dataset.GlueDatabaseName == database, ShareObject.principalRoleName == principal))
)

@staticmethod
def remove_share_object_item(session, share_item):
session.delete(share_item)
Expand Down
34 changes: 32 additions & 2 deletions backend/dataall/modules/shares_base/services/sharing_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository
from dataall.base.db import Engine
from dataall.core.environment.db.environment_models import ConsumptionRole, Environment, EnvironmentGroup
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.s3_datasets.db.dataset_models import S3Dataset
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.shares_base.db.share_object_state_machines import (
ShareObjectSM,
ShareItemSM,
Expand All @@ -14,7 +18,7 @@
ShareObjectActions,
ShareItemActions,
ShareItemStatus,
PrincipalType,
PrincipalType, ShareableType,
)
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
Expand Down Expand Up @@ -174,6 +178,29 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool:
)
)

# If any share item is a table, get the additional dataset which have to be locked
is_table_item = any([share_item.itemType == ShareableType.Table.value for share_item in share_items])
log.info(f'Found dataset table item in the share with uri={share_data.share.shareUri} : {is_table_item}')
if is_table_item:
# Find out all the datasets where the same db is used and lock all those datasets
# With this any possible override from other share will be avoided. See the https://github.com/data-dot-all/dataall/issues/1633 for more details on this.
s3_datasets_with_common_db: [S3Dataset] = DatasetRepository.list_all_active_datasets_with_glue_db(
session=session, dataset_uri=share_data.dataset.datasetUri
)
dataset_base_with_common_db: [DatasetBase] = [
DatasetBaseRepository.get_dataset_by_uri(session=session, dataset_uri=s3_dataset.datasetUri)
for s3_dataset in s3_datasets_with_common_db
]

log.info(f'Found {len(dataset_base_with_common_db)} datasets where same glue database is used')
additional_resources_to_lock = [
(dataset.datasetUri, dataset.__tablename__)
for dataset in dataset_base_with_common_db
if dataset.datasetUri != share_data.dataset.datasetUri
]
resources.extend(additional_resources_to_lock)
log.info(f'Resources to be locked while revoking: {resources}')

revoke_successful = True
try:
with ResourceLockRepository.acquire_lock_with_retry(
Expand Down Expand Up @@ -400,7 +427,10 @@ def _get_share_data_and_items(session, share_uri, status=None, healthStatus=None
source_env_group=data[4],
env_group=data[5],
)
status_list = [status] if status is not None else []
healthStatus_list = [healthStatus] if healthStatus is not None else []

share_items = ShareObjectRepository.get_all_share_items_in_share(
session=session, share_uri=share_uri, status=[status], healthStatus=[healthStatus]
session=session, share_uri=share_uri, status=status_list, healthStatus=healthStatus_list
)
return share_data, share_items