Skip to content

Commit

Permalink
minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hagen-danswer committed Sep 30, 2024
1 parent 492dfdd commit 552a0f8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 37 deletions.
1 change: 1 addition & 0 deletions backend/danswer/connectors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def identify_connector_class(
DocumentSource.SLACK: {
InputType.LOAD_STATE: SlackLoadConnector,
InputType.POLL: SlackPollConnector,
InputType.PRUNE: SlackPollConnector,
},
DocumentSource.GITHUB: GithubConnector,
DocumentSource.GMAIL: GmailConnector,
Expand Down
6 changes: 3 additions & 3 deletions backend/danswer/connectors/slack/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ThreadType = list[MessageType]


def _get_channels(
def _collect_paginated_channels(
client: WebClient,
exclude_archived: bool,
channel_types: list[str],
Expand Down Expand Up @@ -69,7 +69,7 @@ def get_channels(
channel_types.append("private_channel")
# try getting private channels as well at first
try:
channels = _get_channels(
channels = _collect_paginated_channels(
client=client,
exclude_archived=exclude_archived,
channel_types=channel_types,
Expand All @@ -82,7 +82,7 @@ def get_channels(
else:
logger.warning("No channels to fetch")
return []
channels = _get_channels(
channels = _collect_paginated_channels(
client=client,
exclude_archived=exclude_archived,
channel_types=channel_types,
Expand Down
4 changes: 1 addition & 3 deletions backend/danswer/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
from danswer.server.models import StatusResponse
from danswer.utils.logger import setup_logger
from ee.danswer.db.external_perm import delete_user__ext_group_for_cc_pair__no_commit
from ee.danswer.external_permissions.permission_sync_function_map import (
check_if_valid_sync_source,
)
from ee.danswer.external_permissions.sync_params import check_if_valid_sync_source

logger = setup_logger()

Expand Down
60 changes: 31 additions & 29 deletions backend/ee/danswer/external_permissions/permission_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,38 @@
from sqlalchemy.orm import Session

from danswer.access.access import get_access_for_documents
from danswer.configs.constants import DocumentSource
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import get_document_ids_for_connector_credential_pair
from danswer.db.models import ConnectorCredentialPair
from danswer.document_index.factory import get_current_primary_default_document_index
from danswer.document_index.interfaces import UpdateRequest
from danswer.utils.logger import setup_logger
from ee.danswer.external_permissions.permission_sync_function_map import (
DOC_PERMISSIONS_FUNC_MAP,
)
from ee.danswer.external_permissions.permission_sync_function_map import (
GROUP_PERMISSIONS_FUNC_MAP,
)
from ee.danswer.external_permissions.sync_params import DOC_PERMISSIONS_FUNC_MAP
from ee.danswer.external_permissions.sync_params import GROUP_PERMISSIONS_FUNC_MAP
from ee.danswer.external_permissions.sync_params import PERMISSION_SYNC_PERIODS

logger = setup_logger()


# None means that the connector runs every time
_RESTRICTED_FETCH_PERIOD: dict[DocumentSource, int | None] = {
# Polling is supported
DocumentSource.GOOGLE_DRIVE: None,
# Polling is not supported so we fetch all doc permissions every 5 minutes
DocumentSource.CONFLUENCE: 5 * 60,
DocumentSource.SLACK: 60,
}
def _is_time_to_run_sync(cc_pair: ConnectorCredentialPair) -> bool:
source_sync_period = PERMISSION_SYNC_PERIODS.get(cc_pair.connector.source)

# If RESTRICTED_FETCH_PERIOD[source] is None, we always run the sync.
if not source_sync_period:
return True

# If the last sync is None, it has never been run so we run the sync
if cc_pair.last_time_perm_sync is None:
return True

last_sync = cc_pair.last_time_perm_sync.replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)

# If the last sync is greater than the full fetch period, we run the sync
if (current_time - last_sync).total_seconds() > source_sync_period:
return True

return False


def run_external_group_permission_sync(
Expand All @@ -45,6 +53,9 @@ def run_external_group_permission_sync(
# Not all sync connectors support group permissions so this is fine
return

if not _is_time_to_run_sync(cc_pair):
return

try:
# This function updates:
# - the user_email <-> external_user_group_id mapping
Expand Down Expand Up @@ -80,20 +91,8 @@ def run_external_doc_permission_sync(
f"No permission sync function found for source type: {source_type}"
)

# If RESTRICTED_FETCH_PERIOD[source] is None, we always run the sync.
# If RESTRICTED_FETCH_PERIOD is not None, we only run sync if the
# last sync was more than RESTRICTED_FETCH_PERIOD seconds ago.
full_fetch_period = _RESTRICTED_FETCH_PERIOD[cc_pair.connector.source]
if full_fetch_period is not None:
last_sync = cc_pair.last_time_perm_sync
if (
last_sync
and (
datetime.now(timezone.utc) - last_sync.replace(tzinfo=timezone.utc)
).total_seconds()
< full_fetch_period
):
return
if not _is_time_to_run_sync(cc_pair):
return

try:
# This function updates:
Expand Down Expand Up @@ -132,6 +131,9 @@ def run_external_doc_permission_sync(

# update vespa
document_index.update(update_reqs)

cc_pair.last_time_perm_sync = datetime.now(timezone.utc)

# update postgres
db_session.commit()
except Exception as e:
Expand Down
4 changes: 3 additions & 1 deletion backend/ee/danswer/external_permissions/slack/doc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ def slack_doc_sync(
it in postgres so that when it gets created later, the permissions are
already populated
"""
slack_client = WebClient(token=cc_pair.credential.credential_json["bot_token"])
slack_client = WebClient(
token=cc_pair.credential.credential_json["slack_bot_token"]
)
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)
channel_doc_map = _get_slack_document_ids_and_channels(
db_session=db_session,
Expand Down
4 changes: 3 additions & 1 deletion backend/ee/danswer/external_permissions/slack/group_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def slack_group_sync(
db_session: Session,
cc_pair: ConnectorCredentialPair,
) -> None:
slack_client = WebClient(token=cc_pair.credential.credential_json["bot_token"])
slack_client = WebClient(
token=cc_pair.credential.credential_json["slack_bot_token"]
)
user_id_to_email_map = fetch_user_id_to_email_map(slack_client)

danswer_groups: list[ExternalUserGroup] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,13 @@
}


# If nothing is specified here, we run the doc_sync every time the celery beat runs
PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all doc permissions every 5 minutes
DocumentSource.CONFLUENCE: 5 * 60,
DocumentSource.SLACK: 2 * 60,
}


def check_if_valid_sync_source(source_type: DocumentSource) -> bool:
return source_type in DOC_PERMISSIONS_FUNC_MAP

0 comments on commit 552a0f8

Please sign in to comment.