From 56dd8fccbfde34aabc39fc385ee15029e58dae93 Mon Sep 17 00:00:00 2001 From: Ankush Khanna Date: Fri, 4 Oct 2024 18:46:15 +0200 Subject: [PATCH] Adding rbac sdk path --- compass_sdk/root_rbac.py | 129 +++++++++++++++++++++++++++++++++++++++ compass_sdk/types.py | 84 +++++++++++++++++++++++++ 2 files changed, 213 insertions(+) create mode 100644 compass_sdk/root_rbac.py create mode 100644 compass_sdk/types.py diff --git a/compass_sdk/root_rbac.py b/compass_sdk/root_rbac.py new file mode 100644 index 0000000..0caca33 --- /dev/null +++ b/compass_sdk/root_rbac.py @@ -0,0 +1,129 @@ +import json +from types import ( + GroupCreateRequest, + GroupCreateResponse, + GroupFetchResponse, + GroupUserDeleteResponse, + PolicyRequest, + RoleCreateRequest, + RoleCreateResponse, + RoleDeleteResponse, + RoleFetchResponse, + RoleMappingDeleteResponse, + RoleMappingRequest, + RoleMappingResponse, + UserCreateRequest, + UserCreateResponse, + UserDeleteResponse, + UserFetchResponse, +) +from typing import List + +import requests + + +class CompassRootClient: + def __init__(self, compass_url: str, root_user_token: str): + self.base_url = compass_url + "/security/admin/rbac" + self.headers = {"Authorization": f"Bearer {root_user_token}", "Content-Type": "application/json"} + + def create_root_user(self, tenancy_token: str) -> UserCreateResponse: + headers = {"Authorization": f"Bearer {tenancy_token}", "Content-Type": "application/json"} + response = requests.post(f"{self.base_url}/v1/root", headers=headers) + response.raise_for_status() + return UserCreateResponse.model_validate(response.json()) + + def fetch_users(self) -> List[UserFetchResponse]: + response = requests.get(f"{self.base_url}/v1/users", headers=self.headers) + response.raise_for_status() + return [UserFetchResponse.model_validate(user) for user in response.json()] + + def create_users(self, users: List[UserCreateRequest]) -> List[UserCreateResponse]: + response = requests.post( + f"{self.base_url}/v1/users", + json=[json.loads(user.model_dump_json()) for user in users], + headers=self.headers, + ) + response.raise_for_status() + return [UserCreateResponse.model_validate(user) for user in response.json()] + + def delete_users(self, user_names: List[str]) -> List[UserDeleteResponse]: + users = ",".join(user_names) + response = requests.delete(f"{self.base_url}/v1/users/{users}", headers=self.headers) + response.raise_for_status() + return [UserDeleteResponse.model_validate(user) for user in response.json()] + + def fetch_groups(self) -> List[GroupFetchResponse]: + response = requests.get(f"{self.base_url}/v1/groups", headers=self.headers) + response.raise_for_status() + return [GroupFetchResponse.model_validate(group) for group in response.json()] + + def create_groups(self, groups: List[GroupCreateRequest]) -> List[GroupCreateResponse]: + response = requests.post( + f"{self.base_url}/v1/groups", + json=[json.loads(group.model_dump_json()) for group in groups], + headers=self.headers, + ) + response.raise_for_status() + return [GroupCreateResponse.model_validate(group) for group in response.json()] + + def delete_groups(self, group_names: str) -> List[GroupUserDeleteResponse]: + groups = ",".join(group_names) + response = requests.delete(f"{self.base_url}/v1/groups/{groups}", headers=self.headers) + response.raise_for_status() + return [GroupUserDeleteResponse.model_validate(group) for group in response.json()] + + def delete_user_group(self, group_name: str, user_name: str) -> GroupUserDeleteResponse: + response = requests.delete(f"{self.base_url}/v1/group/{group_name}/user/{user_name}", headers=self.headers) + response.raise_for_status() + return GroupUserDeleteResponse.model_validate(response.json()) + + def fetch_roles(self) -> List[RoleFetchResponse]: + response = requests.get(f"{self.base_url}/v1/roles", headers=self.headers) + response.raise_for_status() + return [RoleFetchResponse.model_validate(role) for role in response.json()] + + def insert_roles(self, roles: List[RoleCreateRequest]) -> List[RoleCreateResponse]: + response = requests.post( + f"{self.base_url}/v1/roles", + json=[json.loads(role.model_dump_json()) for role in roles], + headers=self.headers, + ) + response.raise_for_status() + return [RoleCreateResponse.model_validate(role) for role in response.json()] + + def update_role(self, role_name: str, policies: List[PolicyRequest]) -> RoleCreateResponse: + response = requests.put( + f"{self.base_url}/v1/roles/{role_name}", + json=[json.loads(policy.model_dump_json()) for policy in policies], + headers=self.headers, + ) + response.raise_for_status() + return RoleCreateResponse.model_validate(response.json()) + + def delete_roles(self, role_ids: str) -> List[RoleDeleteResponse]: + roles = ",".join(role_ids) + response = requests.delete(f"{self.base_url}/v1/roles/{roles}", headers=self.headers) + response.raise_for_status() + return [RoleDeleteResponse.model_validate(role) for role in response.json()] + + def insert_role_mappings(self, role_mappings: List[RoleMappingRequest]) -> List[RoleMappingResponse]: + response = requests.post( + f"{self.base_url}/v1/role-mappings", + json=[json.loads(role_mapping.model_dump_json()) for role_mapping in role_mappings], + headers=self.headers, + ) + response.raise_for_status() + return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()] + + def fetch_role_mappings(self) -> List[RoleMappingResponse]: + response = requests.get(f"{self.base_url}/v1/role-mappings", headers=self.headers) + response.raise_for_status() + return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()] + + def delete_role_mappings(self, role_name: str, group_name: str) -> List[RoleMappingDeleteResponse]: + response = requests.delete( + f"{self.base_url}/v1/role-mappings/role/{role_name}/group/{group_name}", headers=self.headers + ) + response.raise_for_status() + return [RoleMappingDeleteResponse.model_validate(role_mapping) for role_mapping in response.json()] diff --git a/compass_sdk/types.py b/compass_sdk/types.py new file mode 100644 index 0000000..cc8ac60 --- /dev/null +++ b/compass_sdk/types.py @@ -0,0 +1,84 @@ +from typing import List + +from pydantic import BaseModel + + +class UserFetchResponse(BaseModel): + name: str + + +class UserCreateRequest(BaseModel): + name: str + + +class UserCreateResponse(BaseModel): + name: str + token: str + + +class UserDeleteResponse(BaseModel): + name: str + + +class GroupFetchResponse(BaseModel): + name: str + user_name: str + + +class GroupCreateRequest(BaseModel): + name: str + user_names: List[str] + + +class GroupCreateResponse(BaseModel): + name: str + user_name: str + + +class GroupUserDeleteResponse(BaseModel): + group_name: str + user_name: str + + +class PolicyRequest(BaseModel): + indexes: List[str] + permission: str + + +class PolicyResponse(BaseModel): + indexes: List[str] + permission: str + + +class RoleFetchResponse(BaseModel): + name: str + policies: List[PolicyResponse] + + +class RoleCreateRequest(BaseModel): + name: str + policies: List[PolicyRequest] + + +class RoleCreateResponse(BaseModel): + name: str + policies: List[PolicyResponse] + + +class RoleDeleteResponse(BaseModel): + name: str + + +class RoleMappingRequest(BaseModel): + role_name: str + group_name: str + + +class RoleMappingResponse(BaseModel): + role_name: str + group_name: str + + +class RoleMappingDeleteResponse(BaseModel): + role_name: str + group_name: str