From 6d26d0b929354ca952e091b870f54b30314595e4 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 21 Nov 2024 12:37:16 -0800 Subject: [PATCH 1/5] replace deprecated confluence group api endpoint --- backend/danswer/connectors/confluence/onyx_confluence.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index 4820429ba0f..a4054369b0f 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -265,5 +265,6 @@ def paginated_group_members_retrieval( This is not an SQL like query. It's a confluence specific endpoint that can be used to fetch the members of a group. """ - group_name = quote(group_name) - yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit) + # we use this instead of /group/{group_name}/member as it was depricated + url = f"rest/api/group/member?name={quote(group_name)}" + yield from self._paginate_url(url, limit) From 95ab63b6bce524567127f99cbc17f42194737dad Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 21 Nov 2024 16:35:08 -0800 Subject: [PATCH 2/5] reworked it --- .../connectors/confluence/connector.py | 19 +---- .../connectors/confluence/onyx_confluence.py | 75 ++++++++++++++++-- .../danswer/connectors/confluence/utils.py | 17 ----- .../confluence/group_sync.py | 76 ++++++++++--------- 4 files changed, 109 insertions(+), 78 deletions(-) diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 8d614c163c7..b118a5932d1 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -3,15 +3,13 @@ from typing import Any from urllib.parse import quote -from atlassian import Confluence # type: ignore - from danswer.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource +from danswer.connectors.confluence.onyx_confluence import build_confluence_client from danswer.connectors.confluence.onyx_confluence import OnyxConfluence from danswer.connectors.confluence.utils import attachment_to_content -from danswer.connectors.confluence.utils import build_confluence_client from danswer.connectors.confluence.utils import build_confluence_document_id from danswer.connectors.confluence.utils import datetime_from_string from danswer.connectors.confluence.utils import extract_text_from_confluence_html @@ -118,21 +116,6 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None is_cloud=self.is_cloud, wiki_base=self.wiki_base, ) - - client_without_retries = Confluence( - api_version="cloud" if self.is_cloud else "latest", - url=self.wiki_base.rstrip("/"), - username=credentials["confluence_username"] if self.is_cloud else None, - password=credentials["confluence_access_token"] if self.is_cloud else None, - token=credentials["confluence_access_token"] if not self.is_cloud else None, - ) - spaces = client_without_retries.get_all_spaces(limit=1) - if not spaces: - raise RuntimeError( - f"No spaces found at {self.wiki_base}! " - "Check your credentials and wiki_base and make sure " - "is_cloud is set correctly." - ) return None def _get_comment_string_for_page_id(self, page_id: str) -> str: diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index a4054369b0f..a5b5c031f29 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -232,7 +232,7 @@ def _traverse_and_update(data: dict | list) -> None: def paginated_cql_user_retrieval( self, - cql: str, + is_cloud: bool, expand: str | None = None, limit: int | None = None, ) -> Iterator[dict[str, Any]]: @@ -241,10 +241,23 @@ def paginated_cql_user_retrieval( It's a seperate endpoint from the content/search endpoint used only for users. Otherwise it's very similar to the content/search endpoint. """ + cql = quote("type=user") + url = "rest/api/search/user" if is_cloud else "rest/api/search" expand_string = f"&expand={expand}" if expand else "" - yield from self._paginate_url( - f"rest/api/search/user?cql={cql}{expand_string}", limit - ) + url += f"?cql={cql}{expand_string}" + yield from self._paginate_url(url, limit) + + def paginated_groups_by_user_retrieval( + self, + user_query: str, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + This is not an SQL like query. + It's a confluence specific endpoint that can be used to fetch groups. + """ + url = f"rest/api/user/memberof?{user_query}" + yield from self._paginate_url(url, limit) def paginated_groups_retrieval( self, @@ -264,7 +277,55 @@ def paginated_group_members_retrieval( """ This is not an SQL like query. It's a confluence specific endpoint that can be used to fetch the members of a group. + THIS DOESN'T WORK FOR SERVER because it breaks when there is a slash in the group name. + E.g. neither "test/group" nor "test%2Fgroup" works for confluence. """ - # we use this instead of /group/{group_name}/member as it was depricated - url = f"rest/api/group/member?name={quote(group_name)}" - yield from self._paginate_url(url, limit) + group_name = quote(group_name) + yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit) + + +def _validate_connector_configuration( + credentials: dict[str, Any], + is_cloud: bool, + wiki_base: str, +) -> None: + # test connection with direct client, no retries + confluence_client_without_retries = Confluence( + api_version="cloud" if is_cloud else "latest", + url=wiki_base.rstrip("/"), + username=credentials["confluence_username"] if is_cloud else None, + password=credentials["confluence_access_token"] if is_cloud else None, + token=credentials["confluence_access_token"] if not is_cloud else None, + ) + spaces = confluence_client_without_retries.get_all_spaces(limit=1) + + if not spaces: + raise RuntimeError( + f"No spaces found at {wiki_base}! " + "Check your credentials and wiki_base and make sure " + "is_cloud is set correctly." + ) + + +def build_confluence_client( + credentials: dict[str, Any], + is_cloud: bool, + wiki_base: str, +) -> OnyxConfluence: + _validate_connector_configuration( + credentials=credentials, + is_cloud=is_cloud, + wiki_base=wiki_base, + ) + return OnyxConfluence( + api_version="cloud" if is_cloud else "latest", + # Remove trailing slash from wiki_base if present + url=wiki_base.rstrip("/"), + # passing in username causes issues for Confluence data center + username=credentials["confluence_username"] if is_cloud else None, + password=credentials["confluence_access_token"] if is_cloud else None, + token=credentials["confluence_access_token"] if not is_cloud else None, + backoff_and_retry=True, + max_backoff_retries=10, + max_backoff_seconds=60, + ) diff --git a/backend/danswer/connectors/confluence/utils.py b/backend/danswer/connectors/confluence/utils.py index cb5253f4c14..e6ac0308a3a 100644 --- a/backend/danswer/connectors/confluence/utils.py +++ b/backend/danswer/connectors/confluence/utils.py @@ -269,20 +269,3 @@ def datetime_from_string(datetime_string: str) -> datetime: datetime_object = datetime_object.astimezone(timezone.utc) return datetime_object - - -def build_confluence_client( - credentials_json: dict[str, Any], is_cloud: bool, wiki_base: str -) -> OnyxConfluence: - return OnyxConfluence( - api_version="cloud" if is_cloud else "latest", - # Remove trailing slash from wiki_base if present - url=wiki_base.rstrip("/"), - # passing in username causes issues for Confluence data center - username=credentials_json["confluence_username"] if is_cloud else None, - password=credentials_json["confluence_access_token"] if is_cloud else None, - token=credentials_json["confluence_access_token"] if not is_cloud else None, - backoff_and_retry=True, - max_backoff_retries=10, - max_backoff_seconds=60, - ) diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index 17140b33f71..ba7ca20864f 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -1,7 +1,7 @@ -from atlassian import Confluence # type: ignore +from typing import Any +from danswer.connectors.confluence.onyx_confluence import build_confluence_client from danswer.connectors.confluence.onyx_confluence import OnyxConfluence -from danswer.connectors.confluence.utils import build_confluence_client from danswer.connectors.confluence.utils import get_user_email_from_username__server from danswer.db.models import ConnectorCredentialPair from danswer.utils.logger import setup_logger @@ -11,22 +11,46 @@ logger = setup_logger() -def _get_group_members_email_paginated( +def _get_group_ids_for_user( confluence_client: OnyxConfluence, - group_name: str, + is_cloud: bool, + user: dict[str, Any], ) -> set[str]: - group_member_emails: set[str] = set() - for member in confluence_client.paginated_group_members_retrieval(group_name): - email = member.get("email") + user_field = "accountId" if is_cloud else "key" + user_value = user["accountId"] if is_cloud else user["userKey"] + # Server uses userKey (but calls it key during the API call), Cloud uses accountId + user_query = f"{user_field}={user_value}" + + group_ids_for_user: set[str] = set() + for group in confluence_client.paginated_groups_by_user_retrieval(user_query): + group_ids_for_user.add(group["name"]) + + return group_ids_for_user + + +def _build_group_member_email_map( + confluence_client: OnyxConfluence, + is_cloud: bool, +) -> dict[str, set[str]]: + group_member_emails: dict[str, set[str]] = {} + for user_result in confluence_client.paginated_cql_user_retrieval(is_cloud): + user = user_result["user"] + email = user.get("email") if not email: - user_name = member.get("username") + # This field is only present in Confluence Server + user_name = user.get("username") + # If it is present, try to get the email using a Server-specific method if user_name: email = get_user_email_from_username__server( confluence_client=confluence_client, user_name=user_name, ) - if email: - group_member_emails.add(email) + if not email: + # If we still don't have an email, skip this user + continue + + for group_id in _get_group_ids_for_user(confluence_client, is_cloud, user): + group_member_emails.setdefault(group_id, set()).add(email) return group_member_emails @@ -38,41 +62,21 @@ def confluence_group_sync( is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False) wiki_base = cc_pair.connector.connector_specific_config["wiki_base"] - # test connection with direct client, no retries - confluence_client = Confluence( - api_version="cloud" if is_cloud else "latest", - url=wiki_base.rstrip("/"), - username=credentials["confluence_username"] if is_cloud else None, - password=credentials["confluence_access_token"] if is_cloud else None, - token=credentials["confluence_access_token"] if not is_cloud else None, - ) - spaces = confluence_client.get_all_spaces(limit=1) - if not spaces: - raise RuntimeError(f"No spaces found at {wiki_base}!") - confluence_client = build_confluence_client( credentials_json=credentials, is_cloud=is_cloud, wiki_base=wiki_base, ) - # Get all group names - group_names: list[str] = [] - for group in confluence_client.paginated_groups_retrieval(): - if group_name := group.get("name"): - group_names.append(group_name) - - # For each group name, get all members and create a danswer group + group_member_email_map = _build_group_member_email_map( + confluence_client=confluence_client, + is_cloud=is_cloud, + ) danswer_groups: list[ExternalUserGroup] = [] - for group_name in group_names: - group_member_emails = _get_group_members_email_paginated( - confluence_client, group_name - ) - if not group_member_emails: - continue + for group_id, group_member_emails in group_member_email_map.items(): danswer_groups.append( ExternalUserGroup( - id=group_name, + id=group_id, user_emails=list(group_member_emails), ) ) From 4fc196fc391360d640727fc6da5b39360bba3059 Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 21 Nov 2024 16:37:48 -0800 Subject: [PATCH 3/5] properly escaped the user query --- backend/danswer/connectors/confluence/onyx_confluence.py | 2 +- .../ee/danswer/external_permissions/confluence/group_sync.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index a5b5c031f29..241b86128a1 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -241,7 +241,7 @@ def paginated_cql_user_retrieval( It's a seperate endpoint from the content/search endpoint used only for users. Otherwise it's very similar to the content/search endpoint. """ - cql = quote("type=user") + cql = "type=user" url = "rest/api/search/user" if is_cloud else "rest/api/search" expand_string = f"&expand={expand}" if expand else "" url += f"?cql={cql}{expand_string}" diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index ba7ca20864f..c8a7ec08152 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -1,4 +1,5 @@ from typing import Any +from urllib.parse import quote from danswer.connectors.confluence.onyx_confluence import build_confluence_client from danswer.connectors.confluence.onyx_confluence import OnyxConfluence @@ -19,7 +20,7 @@ def _get_group_ids_for_user( user_field = "accountId" if is_cloud else "key" user_value = user["accountId"] if is_cloud else user["userKey"] # Server uses userKey (but calls it key during the API call), Cloud uses accountId - user_query = f"{user_field}={user_value}" + user_query = f"{user_field}={quote(user_value)}" group_ids_for_user: set[str] = set() for group in confluence_client.paginated_groups_by_user_retrieval(user_query): From 31f4a68bee2c2b49124a0d25ef0516ea7f9cb32e Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 21 Nov 2024 16:58:31 -0800 Subject: [PATCH 4/5] less passing around is_cloud --- .../connectors/confluence/onyx_confluence.py | 10 +++-- .../confluence/group_sync.py | 38 ++++--------------- 2 files changed, 14 insertions(+), 34 deletions(-) diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index 241b86128a1..739e4aef21d 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -232,7 +232,6 @@ def _traverse_and_update(data: dict | list) -> None: def paginated_cql_user_retrieval( self, - is_cloud: bool, expand: str | None = None, limit: int | None = None, ) -> Iterator[dict[str, Any]]: @@ -242,20 +241,25 @@ def paginated_cql_user_retrieval( Otherwise it's very similar to the content/search endpoint. """ cql = "type=user" - url = "rest/api/search/user" if is_cloud else "rest/api/search" + url = "rest/api/search/user" if self.cloud else "rest/api/search" expand_string = f"&expand={expand}" if expand else "" url += f"?cql={cql}{expand_string}" yield from self._paginate_url(url, limit) def paginated_groups_by_user_retrieval( self, - user_query: str, + user: dict[str, Any], limit: int | None = None, ) -> Iterator[dict[str, Any]]: """ This is not an SQL like query. It's a confluence specific endpoint that can be used to fetch groups. """ + user_field = "accountId" if self.cloud else "key" + user_value = user["accountId"] if self.cloud else user["userKey"] + # Server uses userKey (but calls it key during the API call), Cloud uses accountId + user_query = f"{user_field}={quote(user_value)}" + url = f"rest/api/user/memberof?{user_query}" yield from self._paginate_url(url, limit) diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index c8a7ec08152..d9bdee76aa2 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -1,6 +1,3 @@ -from typing import Any -from urllib.parse import quote - from danswer.connectors.confluence.onyx_confluence import build_confluence_client from danswer.connectors.confluence.onyx_confluence import OnyxConfluence from danswer.connectors.confluence.utils import get_user_email_from_username__server @@ -12,29 +9,11 @@ logger = setup_logger() -def _get_group_ids_for_user( - confluence_client: OnyxConfluence, - is_cloud: bool, - user: dict[str, Any], -) -> set[str]: - user_field = "accountId" if is_cloud else "key" - user_value = user["accountId"] if is_cloud else user["userKey"] - # Server uses userKey (but calls it key during the API call), Cloud uses accountId - user_query = f"{user_field}={quote(user_value)}" - - group_ids_for_user: set[str] = set() - for group in confluence_client.paginated_groups_by_user_retrieval(user_query): - group_ids_for_user.add(group["name"]) - - return group_ids_for_user - - def _build_group_member_email_map( confluence_client: OnyxConfluence, - is_cloud: bool, ) -> dict[str, set[str]]: group_member_emails: dict[str, set[str]] = {} - for user_result in confluence_client.paginated_cql_user_retrieval(is_cloud): + for user_result in confluence_client.paginated_cql_user_retrieval(): user = user_result["user"] email = user.get("email") if not email: @@ -50,7 +29,9 @@ def _build_group_member_email_map( # If we still don't have an email, skip this user continue - for group_id in _get_group_ids_for_user(confluence_client, is_cloud, user): + for group in confluence_client.paginated_groups_by_user_retrieval(user): + # group name uniqueness is enforced by Confluence, so we can use it as a group ID + group_id = group["name"] group_member_emails.setdefault(group_id, set()).add(email) return group_member_emails @@ -59,19 +40,14 @@ def _build_group_member_email_map( def confluence_group_sync( cc_pair: ConnectorCredentialPair, ) -> list[ExternalUserGroup]: - credentials = cc_pair.credential.credential_json - is_cloud = cc_pair.connector.connector_specific_config.get("is_cloud", False) - wiki_base = cc_pair.connector.connector_specific_config["wiki_base"] - confluence_client = build_confluence_client( - credentials_json=credentials, - is_cloud=is_cloud, - wiki_base=wiki_base, + credentials_json=cc_pair.credential.credential_json, + is_cloud=cc_pair.connector.connector_specific_config.get("is_cloud", False), + wiki_base=cc_pair.connector.connector_specific_config["wiki_base"], ) group_member_email_map = _build_group_member_email_map( confluence_client=confluence_client, - is_cloud=is_cloud, ) danswer_groups: list[ExternalUserGroup] = [] for group_id, group_member_emails in group_member_email_map.items(): From e2aaa60e77bd30f0242923d1e3e10a852456c75d Mon Sep 17 00:00:00 2001 From: hagen-danswer Date: Thu, 21 Nov 2024 17:26:18 -0800 Subject: [PATCH 5/5] done --- .../celery/tasks/external_group_syncing/tasks.py | 6 +++--- backend/danswer/connectors/confluence/connector.py | 2 +- .../danswer/external_permissions/confluence/group_sync.py | 2 +- backend/ee/danswer/external_permissions/sync_params.py | 7 ++++++- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py index 4f7451faf76..61ceae4e463 100644 --- a/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/danswer/background/celery/tasks/external_group_syncing/tasks.py @@ -29,7 +29,7 @@ from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs from ee.danswer.db.external_perm import ExternalUserGroup from ee.danswer.db.external_perm import replace_user__ext_group_for_cc_pair -from ee.danswer.external_permissions.sync_params import EXTERNAL_GROUP_SYNC_PERIOD +from ee.danswer.external_permissions.sync_params import EXTERNAL_GROUP_SYNC_PERIODS from ee.danswer.external_permissions.sync_params import GROUP_PERMISSIONS_FUNC_MAP logger = setup_logger() @@ -66,9 +66,9 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: if last_ext_group_sync is None: return True - source_sync_period = EXTERNAL_GROUP_SYNC_PERIOD + source_sync_period = EXTERNAL_GROUP_SYNC_PERIODS.get(cc_pair.connector.source) - # If EXTERNAL_GROUP_SYNC_PERIOD is None, we always run the sync. + # If EXTERNAL_GROUP_SYNC_PERIODS is None, we always run the sync. if not source_sync_period: return True diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index b118a5932d1..c9be6676fa7 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -112,7 +112,7 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None # see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py # for a list of other hidden constructor args self._confluence_client = build_confluence_client( - credentials_json=credentials, + credentials=credentials, is_cloud=self.is_cloud, wiki_base=self.wiki_base, ) diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index d9bdee76aa2..f2f53e589b1 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -41,7 +41,7 @@ def confluence_group_sync( cc_pair: ConnectorCredentialPair, ) -> list[ExternalUserGroup]: confluence_client = build_confluence_client( - credentials_json=cc_pair.credential.credential_json, + credentials=cc_pair.credential.credential_json, is_cloud=cc_pair.connector.connector_specific_config.get("is_cloud", False), wiki_base=cc_pair.connector.connector_specific_config["wiki_base"], ) diff --git a/backend/ee/danswer/external_permissions/sync_params.py b/backend/ee/danswer/external_permissions/sync_params.py index fb81ab35035..c00090d748d 100644 --- a/backend/ee/danswer/external_permissions/sync_params.py +++ b/backend/ee/danswer/external_permissions/sync_params.py @@ -55,7 +55,12 @@ DocumentSource.SLACK: 5 * 60, } -EXTERNAL_GROUP_SYNC_PERIOD: int = 30 # 30 seconds +# If nothing is specified here, we run the doc_sync every time the celery beat runs +EXTERNAL_GROUP_SYNC_PERIODS: dict[DocumentSource, int] = { + # Polling is not supported so we fetch all group permissions every 60 seconds + DocumentSource.GOOGLE_DRIVE: 60, + DocumentSource.CONFLUENCE: 60, +} def check_if_valid_sync_source(source_type: DocumentSource) -> bool: