Skip to content

Commit

Permalink
Adding rbac sdk path
Browse files Browse the repository at this point in the history
  • Loading branch information
ankush-cohere committed Oct 4, 2024
1 parent 6f26265 commit 56dd8fc
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 0 deletions.
129 changes: 129 additions & 0 deletions compass_sdk/root_rbac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import json
from types import (
GroupCreateRequest,

Check failure on line 3 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"GroupCreateRequest" is unknown import symbol (reportAttributeAccessIssue)
GroupCreateResponse,

Check failure on line 4 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"GroupCreateResponse" is unknown import symbol (reportAttributeAccessIssue)
GroupFetchResponse,

Check failure on line 5 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"GroupFetchResponse" is unknown import symbol (reportAttributeAccessIssue)
GroupUserDeleteResponse,

Check failure on line 6 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"GroupUserDeleteResponse" is unknown import symbol (reportAttributeAccessIssue)
PolicyRequest,

Check failure on line 7 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"PolicyRequest" is unknown import symbol (reportAttributeAccessIssue)
RoleCreateRequest,

Check failure on line 8 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"RoleCreateRequest" is unknown import symbol (reportAttributeAccessIssue)
RoleCreateResponse,

Check failure on line 9 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"RoleCreateResponse" is unknown import symbol (reportAttributeAccessIssue)
RoleDeleteResponse,

Check failure on line 10 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"RoleDeleteResponse" is unknown import symbol (reportAttributeAccessIssue)
RoleFetchResponse,

Check failure on line 11 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"RoleFetchResponse" is unknown import symbol (reportAttributeAccessIssue)
RoleMappingDeleteResponse,

Check failure on line 12 in compass_sdk/root_rbac.py

View workflow job for this annotation

GitHub Actions / typecheck (3.11, .)

"RoleMappingDeleteResponse" is unknown import symbol (reportAttributeAccessIssue)
RoleMappingRequest,
RoleMappingResponse,
UserCreateRequest,
UserCreateResponse,
UserDeleteResponse,
UserFetchResponse,
)
from typing import List

import requests


class CompassRootClient:
def __init__(self, compass_url: str, root_user_token: str):
self.base_url = compass_url + "/security/admin/rbac"
self.headers = {"Authorization": f"Bearer {root_user_token}", "Content-Type": "application/json"}

def create_root_user(self, tenancy_token: str) -> UserCreateResponse:
headers = {"Authorization": f"Bearer {tenancy_token}", "Content-Type": "application/json"}
response = requests.post(f"{self.base_url}/v1/root", headers=headers)
response.raise_for_status()
return UserCreateResponse.model_validate(response.json())

def fetch_users(self) -> List[UserFetchResponse]:
response = requests.get(f"{self.base_url}/v1/users", headers=self.headers)
response.raise_for_status()
return [UserFetchResponse.model_validate(user) for user in response.json()]

def create_users(self, users: List[UserCreateRequest]) -> List[UserCreateResponse]:
response = requests.post(
f"{self.base_url}/v1/users",
json=[json.loads(user.model_dump_json()) for user in users],
headers=self.headers,
)
response.raise_for_status()
return [UserCreateResponse.model_validate(user) for user in response.json()]

def delete_users(self, user_names: List[str]) -> List[UserDeleteResponse]:
users = ",".join(user_names)
response = requests.delete(f"{self.base_url}/v1/users/{users}", headers=self.headers)
response.raise_for_status()
return [UserDeleteResponse.model_validate(user) for user in response.json()]

def fetch_groups(self) -> List[GroupFetchResponse]:
response = requests.get(f"{self.base_url}/v1/groups", headers=self.headers)
response.raise_for_status()
return [GroupFetchResponse.model_validate(group) for group in response.json()]

def create_groups(self, groups: List[GroupCreateRequest]) -> List[GroupCreateResponse]:
response = requests.post(
f"{self.base_url}/v1/groups",
json=[json.loads(group.model_dump_json()) for group in groups],
headers=self.headers,
)
response.raise_for_status()
return [GroupCreateResponse.model_validate(group) for group in response.json()]

def delete_groups(self, group_names: str) -> List[GroupUserDeleteResponse]:
groups = ",".join(group_names)
response = requests.delete(f"{self.base_url}/v1/groups/{groups}", headers=self.headers)
response.raise_for_status()
return [GroupUserDeleteResponse.model_validate(group) for group in response.json()]

def delete_user_group(self, group_name: str, user_name: str) -> GroupUserDeleteResponse:
response = requests.delete(f"{self.base_url}/v1/group/{group_name}/user/{user_name}", headers=self.headers)
response.raise_for_status()
return GroupUserDeleteResponse.model_validate(response.json())

def fetch_roles(self) -> List[RoleFetchResponse]:
response = requests.get(f"{self.base_url}/v1/roles", headers=self.headers)
response.raise_for_status()
return [RoleFetchResponse.model_validate(role) for role in response.json()]

def insert_roles(self, roles: List[RoleCreateRequest]) -> List[RoleCreateResponse]:
response = requests.post(
f"{self.base_url}/v1/roles",
json=[json.loads(role.model_dump_json()) for role in roles],
headers=self.headers,
)
response.raise_for_status()
return [RoleCreateResponse.model_validate(role) for role in response.json()]

def update_role(self, role_name: str, policies: List[PolicyRequest]) -> RoleCreateResponse:
response = requests.put(
f"{self.base_url}/v1/roles/{role_name}",
json=[json.loads(policy.model_dump_json()) for policy in policies],
headers=self.headers,
)
response.raise_for_status()
return RoleCreateResponse.model_validate(response.json())

def delete_roles(self, role_ids: str) -> List[RoleDeleteResponse]:
roles = ",".join(role_ids)
response = requests.delete(f"{self.base_url}/v1/roles/{roles}", headers=self.headers)
response.raise_for_status()
return [RoleDeleteResponse.model_validate(role) for role in response.json()]

def insert_role_mappings(self, role_mappings: List[RoleMappingRequest]) -> List[RoleMappingResponse]:
response = requests.post(
f"{self.base_url}/v1/role-mappings",
json=[json.loads(role_mapping.model_dump_json()) for role_mapping in role_mappings],
headers=self.headers,
)
response.raise_for_status()
return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()]

def fetch_role_mappings(self) -> List[RoleMappingResponse]:
response = requests.get(f"{self.base_url}/v1/role-mappings", headers=self.headers)
response.raise_for_status()
return [RoleMappingResponse.model_validate(role_mapping) for role_mapping in response.json()]

def delete_role_mappings(self, role_name: str, group_name: str) -> List[RoleMappingDeleteResponse]:
response = requests.delete(
f"{self.base_url}/v1/role-mappings/role/{role_name}/group/{group_name}", headers=self.headers
)
response.raise_for_status()
return [RoleMappingDeleteResponse.model_validate(role_mapping) for role_mapping in response.json()]
84 changes: 84 additions & 0 deletions compass_sdk/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import List

from pydantic import BaseModel


class UserFetchResponse(BaseModel):
name: str


class UserCreateRequest(BaseModel):
name: str


class UserCreateResponse(BaseModel):
name: str
token: str


class UserDeleteResponse(BaseModel):
name: str


class GroupFetchResponse(BaseModel):
name: str
user_name: str


class GroupCreateRequest(BaseModel):
name: str
user_names: List[str]


class GroupCreateResponse(BaseModel):
name: str
user_name: str


class GroupUserDeleteResponse(BaseModel):
group_name: str
user_name: str


class PolicyRequest(BaseModel):
indexes: List[str]
permission: str


class PolicyResponse(BaseModel):
indexes: List[str]
permission: str


class RoleFetchResponse(BaseModel):
name: str
policies: List[PolicyResponse]


class RoleCreateRequest(BaseModel):
name: str
policies: List[PolicyRequest]


class RoleCreateResponse(BaseModel):
name: str
policies: List[PolicyResponse]


class RoleDeleteResponse(BaseModel):
name: str


class RoleMappingRequest(BaseModel):
role_name: str
group_name: str


class RoleMappingResponse(BaseModel):
role_name: str
group_name: str


class RoleMappingDeleteResponse(BaseModel):
role_name: str
group_name: str

0 comments on commit 56dd8fc

Please sign in to comment.