From 552a0f8960bc983b7a294f68ba02f3f81c8bbc50 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Mon, 30 Sep 2024 07:23:53 -0700 Subject: [PATCH] minor cleanup --- backend/danswer/connectors/factory.py | 1 + backend/danswer/connectors/slack/connector.py | 6 +- .../danswer/db/connector_credential_pair.py | 4 +- .../external_permissions/permission_sync.py | 60 ++++++++++--------- .../external_permissions/slack/doc_sync.py | 4 +- .../external_permissions/slack/group_sync.py | 4 +- ...on_sync_function_map.py => sync_params.py} | 8 +++ 7 files changed, 50 insertions(+), 37 deletions(-) rename backend/ee/danswer/external_permissions/{permission_sync_function_map.py => sync_params.py} (84%) diff --git a/backend/danswer/connectors/factory.py b/backend/danswer/connectors/factory.py index 7d6fc9c1167..75e0d9bb238 100644 --- a/backend/danswer/connectors/factory.py +++ b/backend/danswer/connectors/factory.py @@ -63,6 +63,7 @@ def identify_connector_class( DocumentSource.SLACK: { InputType.LOAD_STATE: SlackLoadConnector, InputType.POLL: SlackPollConnector, + InputType.PRUNE: SlackPollConnector, }, DocumentSource.GITHUB: GithubConnector, DocumentSource.GMAIL: GmailConnector, diff --git a/backend/danswer/connectors/slack/connector.py b/backend/danswer/connectors/slack/connector.py index 3b7ea1c210d..d7a23714a37 100644 --- a/backend/danswer/connectors/slack/connector.py +++ b/backend/danswer/connectors/slack/connector.py @@ -37,7 +37,7 @@ ThreadType = list[MessageType] -def _get_channels( +def _collect_paginated_channels( client: WebClient, exclude_archived: bool, channel_types: list[str], @@ -69,7 +69,7 @@ def get_channels( channel_types.append("private_channel") # try getting private channels as well at first try: - channels = _get_channels( + channels = _collect_paginated_channels( client=client, exclude_archived=exclude_archived, channel_types=channel_types, @@ -82,7 +82,7 @@ def get_channels( else: logger.warning("No channels to fetch") return [] - channels = _get_channels( + channels = _collect_paginated_channels( client=client, exclude_archived=exclude_archived, channel_types=channel_types, diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index ec9a3a08ef8..f9d79df96ae 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -26,9 +26,7 @@ from danswer.server.models import StatusResponse from danswer.utils.logger import setup_logger from ee.danswer.db.external_perm import delete_user__ext_group_for_cc_pair__no_commit -from ee.danswer.external_permissions.permission_sync_function_map import ( - check_if_valid_sync_source, -) +from ee.danswer.external_permissions.sync_params import check_if_valid_sync_source logger = setup_logger() diff --git a/backend/ee/danswer/external_permissions/permission_sync.py b/backend/ee/danswer/external_permissions/permission_sync.py index b793983ab0f..3a4357f7c10 100644 --- a/backend/ee/danswer/external_permissions/permission_sync.py +++ b/backend/ee/danswer/external_permissions/permission_sync.py @@ -4,30 +4,38 @@ from sqlalchemy.orm import Session from danswer.access.access import get_access_for_documents -from danswer.configs.constants import DocumentSource from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id from danswer.db.document import get_document_ids_for_connector_credential_pair +from danswer.db.models import ConnectorCredentialPair from danswer.document_index.factory import get_current_primary_default_document_index from danswer.document_index.interfaces import UpdateRequest from danswer.utils.logger import setup_logger -from ee.danswer.external_permissions.permission_sync_function_map import ( - DOC_PERMISSIONS_FUNC_MAP, -) -from ee.danswer.external_permissions.permission_sync_function_map import ( - GROUP_PERMISSIONS_FUNC_MAP, -) +from ee.danswer.external_permissions.sync_params import DOC_PERMISSIONS_FUNC_MAP +from ee.danswer.external_permissions.sync_params import GROUP_PERMISSIONS_FUNC_MAP +from ee.danswer.external_permissions.sync_params import PERMISSION_SYNC_PERIODS logger = setup_logger() -# None means that the connector runs every time -_RESTRICTED_FETCH_PERIOD: dict[DocumentSource, int | None] = { - # Polling is supported - DocumentSource.GOOGLE_DRIVE: None, - # Polling is not supported so we fetch all doc permissions every 5 minutes - DocumentSource.CONFLUENCE: 5 * 60, - DocumentSource.SLACK: 60, -} +def _is_time_to_run_sync(cc_pair: ConnectorCredentialPair) -> bool: + source_sync_period = PERMISSION_SYNC_PERIODS.get(cc_pair.connector.source) + + # If RESTRICTED_FETCH_PERIOD[source] is None, we always run the sync. + if not source_sync_period: + return True + + # If the last sync is None, it has never been run so we run the sync + if cc_pair.last_time_perm_sync is None: + return True + + last_sync = cc_pair.last_time_perm_sync.replace(tzinfo=timezone.utc) + current_time = datetime.now(timezone.utc) + + # If the last sync is greater than the full fetch period, we run the sync + if (current_time - last_sync).total_seconds() > source_sync_period: + return True + + return False def run_external_group_permission_sync( @@ -45,6 +53,9 @@ def run_external_group_permission_sync( # Not all sync connectors support group permissions so this is fine return + if not _is_time_to_run_sync(cc_pair): + return + try: # This function updates: # - the user_email <-> external_user_group_id mapping @@ -80,20 +91,8 @@ def run_external_doc_permission_sync( f"No permission sync function found for source type: {source_type}" ) - # If RESTRICTED_FETCH_PERIOD[source] is None, we always run the sync. - # If RESTRICTED_FETCH_PERIOD is not None, we only run sync if the - # last sync was more than RESTRICTED_FETCH_PERIOD seconds ago. - full_fetch_period = _RESTRICTED_FETCH_PERIOD[cc_pair.connector.source] - if full_fetch_period is not None: - last_sync = cc_pair.last_time_perm_sync - if ( - last_sync - and ( - datetime.now(timezone.utc) - last_sync.replace(tzinfo=timezone.utc) - ).total_seconds() - < full_fetch_period - ): - return + if not _is_time_to_run_sync(cc_pair): + return try: # This function updates: @@ -132,6 +131,9 @@ def run_external_doc_permission_sync( # update vespa document_index.update(update_reqs) + + cc_pair.last_time_perm_sync = datetime.now(timezone.utc) + # update postgres db_session.commit() except Exception as e: diff --git a/backend/ee/danswer/external_permissions/slack/doc_sync.py b/backend/ee/danswer/external_permissions/slack/doc_sync.py index 2aa29ad8ef4..fe731746a44 100644 --- a/backend/ee/danswer/external_permissions/slack/doc_sync.py +++ b/backend/ee/danswer/external_permissions/slack/doc_sync.py @@ -159,7 +159,9 @@ def slack_doc_sync( it in postgres so that when it gets created later, the permissions are already populated """ - slack_client = WebClient(token=cc_pair.credential.credential_json["bot_token"]) + slack_client = WebClient( + token=cc_pair.credential.credential_json["slack_bot_token"] + ) user_id_to_email_map = fetch_user_id_to_email_map(slack_client) channel_doc_map = _get_slack_document_ids_and_channels( db_session=db_session, diff --git a/backend/ee/danswer/external_permissions/slack/group_sync.py b/backend/ee/danswer/external_permissions/slack/group_sync.py index ea6de17c90e..80838895219 100644 --- a/backend/ee/danswer/external_permissions/slack/group_sync.py +++ b/backend/ee/danswer/external_permissions/slack/group_sync.py @@ -61,7 +61,9 @@ def slack_group_sync( db_session: Session, cc_pair: ConnectorCredentialPair, ) -> None: - slack_client = WebClient(token=cc_pair.credential.credential_json["bot_token"]) + slack_client = WebClient( + token=cc_pair.credential.credential_json["slack_bot_token"] + ) user_id_to_email_map = fetch_user_id_to_email_map(slack_client) danswer_groups: list[ExternalUserGroup] = [] diff --git a/backend/ee/danswer/external_permissions/permission_sync_function_map.py b/backend/ee/danswer/external_permissions/sync_params.py similarity index 84% rename from backend/ee/danswer/external_permissions/permission_sync_function_map.py rename to backend/ee/danswer/external_permissions/sync_params.py index 1ec60e9972e..f012fe9d5b9 100644 --- a/backend/ee/danswer/external_permissions/permission_sync_function_map.py +++ b/backend/ee/danswer/external_permissions/sync_params.py @@ -40,5 +40,13 @@ } +# If nothing is specified here, we run the doc_sync every time the celery beat runs +PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = { + # Polling is not supported so we fetch all doc permissions every 5 minutes + DocumentSource.CONFLUENCE: 5 * 60, + DocumentSource.SLACK: 2 * 60, +} + + def check_if_valid_sync_source(source_type: DocumentSource) -> bool: return source_type in DOC_PERMISSIONS_FUNC_MAP