Skip to content

Commit

Permalink
Creating entity methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Oct 7, 2024
1 parent a3b5932 commit 770588c
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 131 deletions.
165 changes: 165 additions & 0 deletions compass_sdk/rbac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import json
from typing import Dict, List, Type, TypeVar

import requests
from pydantic import BaseModel
from requests import HTTPError

from compass_sdk.types import (
GroupCreateRequest,
GroupCreateResponse,
GroupFetchResponse,
GroupUserDeleteResponse,
PolicyRequest,
RoleCreateRequest,
RoleCreateResponse,
RoleDeleteResponse,
RoleFetchResponse,
RoleMappingDeleteResponse,
RoleMappingRequest,
RoleMappingResponse,
UserCreateRequest,
UserCreateResponse,
UserDeleteResponse,
UserFetchResponse,
)


def create_root_user(compass_url: str, tenancy_token: str) -> UserCreateResponse:
headers = {"Authorization": f"Bearer {tenancy_token}", "Content-Type": "application/json"}
response = requests.post(f"{compass_url}/security/admin/rbac/v1/root", headers=headers)
response.raise_for_status()
return UserCreateResponse.model_validate(response.json())


class CompassRootClient:
def __init__(self, compass_url: str, root_user_token: str):
self.base_url = compass_url + "/security/admin/rbac"
self.headers = {"Authorization": f"Bearer {root_user_token}", "Content-Type": "application/json"}

T = TypeVar("T", bound=BaseModel)
U = TypeVar("U", bound=BaseModel)
Headers = Dict[str, str]

@staticmethod
def fetch_entities(url: str, headers: Headers, entity_type: Type[T]) -> List[T]:
response = requests.get(url, headers=headers)
CompassRootClient.raise_for_status(response)
return [entity_type.model_validate(entity) for entity in response.json()]

@staticmethod
def create_entities(url: str, headers: Headers, entity_request: List[T], entity_response: Type[U]) -> List[U]:
response = requests.post(
url,
json=[json.loads(entity.model_dump_json()) for entity in entity_request],
headers=headers,
)
CompassRootClient.raise_for_status(response)
return [entity_response.model_validate(response) for response in response.json()]

@staticmethod
def delete_entities(self, url: str, headers: Headers, names: List[str], entity_response: Type[U]) -> List[U]:

Check warning on line 61 in compass_sdk/rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

Static methods should not take a "self" or "cls" parameter (reportSelfClsParameterName)
entities = ",".join(names)
response = requests.delete(f"{url}/{entities}", headers=headers)
CompassRootClient.raise_for_status(response)
return [entity_response.model_validate(entity) for entity in response.json()]

def fetch_users(self) -> List[UserFetchResponse]:
return self.fetch_entities(f"{self.base_url}/v1/users", self.headers, UserFetchResponse)

def fetch_groups(self) -> List[GroupFetchResponse]:
return self.fetch_entities(f"{self.base_url}/v1/groups", self.headers, GroupFetchResponse)

def fetch_roles(self) -> List[RoleFetchResponse]:
return self.fetch_entities(f"{self.base_url}/v1/roles", self.headers, RoleFetchResponse)

def fetch_role_mappings(self) -> List[RoleMappingResponse]:
return self.fetch_entities(f"{self.base_url}/v1/role-mappings", self.headers, RoleMappingResponse)

def create_users(self, users: List[UserCreateRequest]) -> List[UserCreateResponse]:
return self.create_entities(
url=f"{self.base_url}/v1/users",
headers=self.headers,
entity_request=users,
entity_response=UserCreateResponse,
)

def create_groups(self, groups: List[GroupCreateRequest]) -> List[GroupCreateResponse]:
return self.create_entities(
url=f"{self.base_url}/v1/groups",
headers=self.headers,
entity_request=groups,
entity_response=GroupCreateResponse,
)

def create_roles(self, roles: List[RoleCreateRequest]) -> List[RoleCreateResponse]:
return self.create_entities(
url=f"{self.base_url}/v1/roles",
headers=self.headers,
entity_request=roles,
entity_response=RoleCreateResponse,
)

def create_role_mappings(self, role_mappings: List[RoleMappingRequest]) -> List[RoleMappingResponse]:
return self.create_entities(
url=f"{self.base_url}/v1/role-mappings",
headers=self.headers,
entity_request=role_mappings,
entity_response=RoleMappingResponse,
)

def delete_users(self, user_names: List[str]) -> List[UserDeleteResponse]:
return self.delete_entities(f"{self.base_url}/v1/users", self.headers, user_names, UserDeleteResponse)

Check failure on line 112 in compass_sdk/rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

Argument missing for parameter "entity_response" (reportCallIssue)

def delete_groups(self, group_names: List[str]) -> List[GroupUserDeleteResponse]:
return self.delete_entities(f"{self.base_url}/v1/groups", self.headers, group_names, GroupUserDeleteResponse)

Check failure on line 115 in compass_sdk/rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

Argument missing for parameter "entity_response" (reportCallIssue)

def delete_roles(self, role_ids: List[str]) -> List[RoleDeleteResponse]:
return self.delete_entities(f"{self.base_url}/v1/roles", self.headers, role_ids, RoleDeleteResponse)

Check failure on line 118 in compass_sdk/rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

Argument missing for parameter "entity_response" (reportCallIssue)

def delete_role_mappings(self, role_name: str, group_name: str) -> List[RoleMappingDeleteResponse]:
response = requests.delete(
f"{self.base_url}/v1/role-mappings/role/{role_name}/group/{group_name}", headers=self.headers
)
self.raise_for_status(response)
return [RoleMappingDeleteResponse.model_validate(role_mapping) for role_mapping in response.json()]

def delete_user_group(self, group_name: str, user_name: str) -> GroupUserDeleteResponse:
response = requests.delete(f"{self.base_url}/v1/group/{group_name}/user/{user_name}", headers=self.headers)
self.raise_for_status(response)
return GroupUserDeleteResponse.model_validate(response.json())

def update_role(self, role_name: str, policies: List[PolicyRequest]) -> RoleCreateResponse:
response = requests.put(
f"{self.base_url}/v1/roles/{role_name}",
json=[json.loads(policy.model_dump_json()) for policy in policies],
headers=self.headers,
)
self.raise_for_status(response)
return RoleCreateResponse.model_validate(response.json())

@staticmethod
def raise_for_status(response: requests.Response):
"""Raises :class:`HTTPError`, if one occurred."""

http_error_msg = ""
if isinstance(response.reason, bytes):
# We attempt to decode utf-8 first because some servers
# choose to localize their reason strings. If the string
# isn't utf-8, we fall back to iso-8859-1 for all other
# encodings. (See PR #3538)
try:
reason = response.reason.decode("utf-8")
except UnicodeDecodeError:
reason = response.reason.decode("iso-8859-1")
else:
reason = response.content

if 400 <= response.status_code < 500:
http_error_msg = f"{response.status_code} Client Error: {reason} for url: {response.url}"

elif 500 <= response.status_code < 600:
http_error_msg = f"{response.status_code} Server Error: {reason} for url: {response.url}"

if http_error_msg:
raise HTTPError(http_error_msg, response=response)
131 changes: 0 additions & 131 deletions compass_sdk/root_rbac.py

This file was deleted.

0 comments on commit 770588c

Please sign in to comment.