From 770588c97f01884295cc121e733c8e80b3b7637f Mon Sep 17 00:00:00 2001 From: Ankush Khanna Date: Mon, 7 Oct 2024 11:24:08 +0200 Subject: [PATCH] Creating entity methods --- compass_sdk/rbac.py | 165 +++++++++++++++++++++++++++++++++++++++ compass_sdk/root_rbac.py | 131 ------------------------------- 2 files changed, 165 insertions(+), 131 deletions(-) create mode 100644 compass_sdk/rbac.py delete mode 100644 compass_sdk/root_rbac.py diff --git a/compass_sdk/rbac.py b/compass_sdk/rbac.py new file mode 100644 index 0000000..b64514e --- /dev/null +++ b/compass_sdk/rbac.py @@ -0,0 +1,165 @@ +import json +from typing import Dict, List, Type, TypeVar + +import requests +from pydantic import BaseModel +from requests import HTTPError + +from compass_sdk.types import ( + GroupCreateRequest, + GroupCreateResponse, + GroupFetchResponse, + GroupUserDeleteResponse, + PolicyRequest, + RoleCreateRequest, + RoleCreateResponse, + RoleDeleteResponse, + RoleFetchResponse, + RoleMappingDeleteResponse, + RoleMappingRequest, + RoleMappingResponse, + UserCreateRequest, + UserCreateResponse, + UserDeleteResponse, + UserFetchResponse, +) + + +def create_root_user(compass_url: str, tenancy_token: str) -> UserCreateResponse: + headers = {"Authorization": f"Bearer {tenancy_token}", "Content-Type": "application/json"} + response = requests.post(f"{compass_url}/security/admin/rbac/v1/root", headers=headers) + response.raise_for_status() + return UserCreateResponse.model_validate(response.json()) + + +class CompassRootClient: + def __init__(self, compass_url: str, root_user_token: str): + self.base_url = compass_url + "/security/admin/rbac" + self.headers = {"Authorization": f"Bearer {root_user_token}", "Content-Type": "application/json"} + + T = TypeVar("T", bound=BaseModel) + U = TypeVar("U", bound=BaseModel) + Headers = Dict[str, str] + + @staticmethod + def fetch_entities(url: str, headers: Headers, entity_type: Type[T]) -> List[T]: + response = requests.get(url, headers=headers) + CompassRootClient.raise_for_status(response) + return [entity_type.model_validate(entity) for entity in response.json()] + + @staticmethod + def create_entities(url: str, headers: Headers, entity_request: List[T], entity_response: Type[U]) -> List[U]: + response = requests.post( + url, + json=[json.loads(entity.model_dump_json()) for entity in entity_request], + headers=headers, + ) + CompassRootClient.raise_for_status(response) + return [entity_response.model_validate(response) for response in response.json()] + + @staticmethod + def delete_entities(self, url: str, headers: Headers, names: List[str], entity_response: Type[U]) -> List[U]: + entities = ",".join(names) + response = requests.delete(f"{url}/{entities}", headers=headers) + CompassRootClient.raise_for_status(response) + return [entity_response.model_validate(entity) for entity in response.json()] + + def fetch_users(self) -> List[UserFetchResponse]: + return self.fetch_entities(f"{self.base_url}/v1/users", self.headers, UserFetchResponse) + + def fetch_groups(self) -> List[GroupFetchResponse]: + return self.fetch_entities(f"{self.base_url}/v1/groups", self.headers, GroupFetchResponse) + + def fetch_roles(self) -> List[RoleFetchResponse]: + return self.fetch_entities(f"{self.base_url}/v1/roles", self.headers, RoleFetchResponse) + + def fetch_role_mappings(self) -> List[RoleMappingResponse]: + return self.fetch_entities(f"{self.base_url}/v1/role-mappings", self.headers, RoleMappingResponse) + + def create_users(self, users: List[UserCreateRequest]) -> List[UserCreateResponse]: + return self.create_entities( + url=f"{self.base_url}/v1/users", + headers=self.headers, + entity_request=users, + entity_response=UserCreateResponse, + ) + + def create_groups(self, groups: List[GroupCreateRequest]) -> List[GroupCreateResponse]: + return self.create_entities( + url=f"{self.base_url}/v1/groups", + headers=self.headers, + entity_request=groups, + entity_response=GroupCreateResponse, + ) + + def create_roles(self, roles: List[RoleCreateRequest]) -> List[RoleCreateResponse]: + return self.create_entities( + url=f"{self.base_url}/v1/roles", + headers=self.headers, + entity_request=roles, + entity_response=RoleCreateResponse, + ) + + def create_role_mappings(self, role_mappings: List[RoleMappingRequest]) -> List[RoleMappingResponse]: + return self.create_entities( + url=f"{self.base_url}/v1/role-mappings", + headers=self.headers, + entity_request=role_mappings, + entity_response=RoleMappingResponse, + ) + + def delete_users(self, user_names: List[str]) -> List[UserDeleteResponse]: + return self.delete_entities(f"{self.base_url}/v1/users", self.headers, user_names, UserDeleteResponse) + + def delete_groups(self, group_names: List[str]) -> List[GroupUserDeleteResponse]: + return self.delete_entities(f"{self.base_url}/v1/groups", self.headers, group_names, GroupUserDeleteResponse) + + def delete_roles(self, role_ids: List[str]) -> List[RoleDeleteResponse]: + return self.delete_entities(f"{self.base_url}/v1/roles", self.headers, role_ids, RoleDeleteResponse) + + def delete_role_mappings(self, role_name: str, group_name: str) -> List[RoleMappingDeleteResponse]: + response = requests.delete( + f"{self.base_url}/v1/role-mappings/role/{role_name}/group/{group_name}", headers=self.headers + ) + self.raise_for_status(response) + return [RoleMappingDeleteResponse.model_validate(role_mapping) for role_mapping in response.json()] + + def delete_user_group(self, group_name: str, user_name: str) -> GroupUserDeleteResponse: + response = requests.delete(f"{self.base_url}/v1/group/{group_name}/user/{user_name}", headers=self.headers) + self.raise_for_status(response) + return GroupUserDeleteResponse.model_validate(response.json()) + + def update_role(self, role_name: str, policies: List[PolicyRequest]) -> RoleCreateResponse: + response = requests.put( + f"{self.base_url}/v1/roles/{role_name}", + json=[json.loads(policy.model_dump_json()) for policy in policies], + headers=self.headers, + ) + self.raise_for_status(response) + return RoleCreateResponse.model_validate(response.json()) + + @staticmethod + def raise_for_status(response: requests.Response): + """Raises :class:`HTTPError`, if one occurred.""" + + http_error_msg = "" + if isinstance(response.reason, bytes): + # We attempt to decode utf-8 first because some servers + # choose to localize their reason strings. If the string + # isn't utf-8, we fall back to iso-8859-1 for all other + # encodings. (See PR #3538) + try: + reason = response.reason.decode("utf-8") + except UnicodeDecodeError: + reason = response.reason.decode("iso-8859-1") + else: + reason = response.content + + if 400 <= response.status_code < 500: + http_error_msg = f"{response.status_code} Client Error: {reason} for url: {response.url}" + + elif 500 <= response.status_code < 600: + http_error_msg = f"{response.status_code} Server Error: {reason} for url: {response.url}" + + if http_error_msg: + raise HTTPError(http_error_msg, response=response) diff --git a/compass_sdk/root_rbac.py b/compass_sdk/root_rbac.py deleted file mode 100644 index 442d0f3..0000000 --- a/compass_sdk/root_rbac.py +++ /dev/null @@ -1,131 +0,0 @@ -import json -from typing import List - -import requests - -from compass_sdk.types import ( - GroupCreateRequest, - GroupCreateResponse, - GroupFetchResponse, - GroupUserDeleteResponse, - PolicyRequest, - RoleCreateRequest, - RoleCreateResponse, - RoleDeleteResponse, - RoleFetchResponse, - RoleMappingDeleteResponse, - RoleMappingRequest, - RoleMappingResponse, - UserCreateRequest, - UserCreateResponse, - UserDeleteResponse, - UserFetchResponse, -) - - -def create_root_user(compass_url: str, tenancy_token: str) -> UserCreateResponse: - headers = {"Authorization": f"Bearer {tenancy_token}", "Content-Type": "application/json"} - response = requests.post(f"{compass_url}/security/admin/rbac/v1/root", headers=headers) - response.raise_for_status() - return UserCreateResponse.model_validate(response.json()) - - -class CompassRootClient: - def __init__(self, compass_url: str, root_user_token: str): - self.base_url = compass_url + "/security/admin/rbac" - self.headers = {"Authorization": f"Bearer {root_user_token}", "Content-Type": "application/json"} - - def fetch_users(self) -> List[UserFetchResponse]: - response = requests.get(f"{self.base_url}/v1/users", headers=self.headers) - response.raise_for_status() - return [UserFetchResponse.model_validate(user) for user in response.json()] - - def create_users(self, users: List[UserCreateRequest]) -> List[UserCreateResponse]: - response = requests.post( - f"{self.base_url}/v1/users", - json=[json.loads(user.model_dump_json()) for user in users], - headers=self.headers, - ) - response.raise_for_status() - return [UserCreateResponse.model_validate(user) for user in response.json()] - - def delete_users(self, user_names: List[str]) -> List[UserDeleteResponse]: - users = ",".join(user_names) - response = requests.delete(f"{self.base_url}/v1/users/{users}", headers=self.headers) - response.raise_for_status() - return [UserDeleteResponse.model_validate(user) for user in response.json()] - - def fetch_groups(self) -> List[GroupFetchResponse]: - response = requests.get(f"{self.base_url}/v1/groups", headers=self.headers) - response.raise_for_status() - return [GroupFetchResponse.model_validate(group) for group in response.json()] - - def create_groups(self, groups: List[GroupCreateRequest]) -> List[GroupCreateResponse]: - response = requests.post( - f"{self.base_url}/v1/groups", - json=[json.loads(group.model_dump_json()) for group in groups], - headers=self.headers, - ) - response.raise_for_status() - return [GroupCreateResponse.model_validate(group) for group in response.json()] - - def delete_groups(self, group_names: str) -> List[GroupUserDeleteResponse]: - groups = ",".join(group_names) - response = requests.delete(f"{self.base_url}/v1/groups/{groups}", headers=self.headers) - response.raise_for_status() - return [GroupUserDeleteResponse.model_validate(group) for group in response.json()] - - def delete_user_group(self, group_name: str, user_name: str) -> GroupUserDeleteResponse: - response = requests.delete(f"{self.base_url}/v1/group/{group_name}/user/{user_name}", headers=self.headers) - response.raise_for_status() - return GroupUserDeleteResponse.model_validate(response.json()) - - def fetch_roles(self) -> List[RoleFetchResponse]: - response = requests.get(f"{self.base_url}/v1/roles", headers=self.headers) - response.raise_for_status() - return [RoleFetchResponse.model_validate(role) for role in response.json()] - - def create_roles(self, roles: List[RoleCreateRequest]) -> List[RoleCreateResponse]: - response = requests.post( - f"{self.base_url}/v1/roles", - json=[json.loads(role.model_dump_json()) for role in roles], - headers=self.headers, - ) - response.raise_for_status() - return [RoleCreateResponse.model_validate(role) for role in response.json()] - - def update_role(self, role_name: str, policies: List[PolicyRequest]) -> RoleCreateResponse: - response = requests.put( - f"{self.base_url}/v1/roles/{role_name}", - json=[json.loads(policy.model_dump_json()) for policy in policies], - headers=self.headers, - ) - response.raise_for_status() - return RoleCreateResponse.model_validate(response.json()) - - def delete_roles(self, role_ids: str) -> List[RoleDeleteResponse]: - roles = ",".join(role_ids) - response = requests.delete(f"{self.base_url}/v1/roles/{roles}", headers=self.headers) - response.raise_for_status() - return [RoleDeleteResponse.model_validate(role) for role in response.json()] - - def create_role_mappings(self, role_mappings: List[RoleMappingRequest]) -> List[RoleMappingResponse]: - response = requests.post( - f"{self.base_url}/v1/role-mappings", - json=[json.loads(role_mapping.model_dump_json()) for role_mapping in role_mappings], - headers=self.headers, - ) - response.raise_for_status() - return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()] - - def fetch_role_mappings(self) -> List[RoleMappingResponse]: - response = requests.get(f"{self.base_url}/v1/role-mappings", headers=self.headers) - response.raise_for_status() - return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()] - - def delete_role_mappings(self, role_name: str, group_name: str) -> List[RoleMappingDeleteResponse]: - response = requests.delete( - f"{self.base_url}/v1/role-mappings/role/{role_name}/group/{group_name}", headers=self.headers - ) - response.raise_for_status() - return [RoleMappingDeleteResponse.model_validate(role_mapping) for role_mapping in response.json()]