diff --git a/setup.py b/setup.py index 6121054..83a3ae6 100644 --- a/setup.py +++ b/setup.py @@ -1,27 +1,27 @@ -from setuptools import setup, find_packages import os +from setuptools import find_packages, setup + this_directory = os.path.abspath(os.path.dirname(__file__)) -with open(os.path.join(this_directory, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(this_directory, "README.md"), encoding="utf-8") as f: long_description = f.read() - setup( - name='trustauthx', - version='0.6.4', - description='Official connector SDK for TrustAuthx', + name="trustauthx", + version="0.6.4", + description="Official connector SDK for TrustAuthx", long_description=long_description, - long_description_content_type='text/markdown', # This is important! - author='moonlightnexus', - author_email='nexus@trustauthx.com', + long_description_content_type="text/markdown", # This is important! + author="moonlightnexus", + author_email="nexus@trustauthx.com", url="https://github.com/One-Click-Auth/TrustAuthx-Py-SDK.git", license="MIT", classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.9', - ], + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.9", + ], packages=find_packages(), install_requires=[ "certifi>=2023.5.7", @@ -37,11 +37,11 @@ "urllib3<=3.0.0", "charset-normalizer>=3.2.0", "python-jose>=3.3.0", - "python-dotenv==1.0.0" - ], + "python-dotenv==1.0.0", + ], entry_points={ - 'console_scripts': [ - 'trustauthx = trustauthx.cli:main', + "console_scripts": [ + "trustauthx = trustauthx.cli:main", ], }, ) diff --git a/trustauthx/authlite.py b/trustauthx/authlite.py index 61d8891..8f41fc0 100644 --- a/trustauthx/authlite.py +++ b/trustauthx/authlite.py @@ -1,12 +1,15 @@ +import json +import sqlite3 +import threading +from functools import wraps + import requests -from requests.exceptions import HTTPError from jose import JWTError, jwt from jose.constants import ALGORITHMS -import json -import sqlite3 +from requests.exceptions import HTTPError + from .scheme import * -from functools import wraps -import threading + class _EdgeDBRoleQuery: """ @@ -31,6 +34,7 @@ class _EdgeDBRoleQuery: count_roles(self): Returns the number of roles stored. """ + total_roles = 0 roles = None @@ -44,19 +48,28 @@ def __init__(self, roles, in_memory=True): """ self.in_memory = in_memory if self.in_memory: - self.__class__.roles = {role_id: permissions for role in roles for role_id, permissions in role.items()} + self.__class__.roles = { + role_id: permissions + for role in roles + for role_id, permissions in role.items() + } else: - self.conn = sqlite3.connect(':memory:') # replace ':memory:' with your database path + # replace ':memory:' with your database path + self.conn = sqlite3.connect(":memory:") self.cursor = self.conn.cursor() - self.cursor.execute(""" + self.cursor.execute( + """ CREATE TABLE IF NOT EXISTS roles ( role_id TEXT PRIMARY KEY, permissions TEXT ) - """) + """ + ) for role in roles: for role_id, permissions in role.items(): - self.cursor.execute("INSERT INTO roles VALUES (?, ?)", (role_id, permissions)) + self.cursor.execute( + "INSERT INTO roles VALUES (?, ?)", (role_id, permissions) + ) self.conn.commit() self.count_roles() @@ -77,21 +90,33 @@ def query(self, role_id=None, permission_key=None): elif role_id: return self.__class__.roles.get(role_id, None) elif permission_key: - return {role_id: permissions[permission_key] for role_id, permissions in self.__class__.roles.items() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.__class__.roles.items() + if permission_key in permissions + } else: return self.__class__.roles else: if role_id and permission_key: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) elif role_id: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) return self.cursor.fetchone() elif permission_key: self.cursor.execute("SELECT * FROM roles") - return {role_id: permissions[permission_key] for role_id, permissions in self.cursor.fetchall() if permission_key in permissions} + return { + role_id: permissions[permission_key] + for role_id, permissions in self.cursor.fetchall() + if permission_key in permissions + } else: self.cursor.execute("SELECT * FROM roles") return self.cursor.fetchall() @@ -109,9 +134,14 @@ def validate(self, role_id, permission_key, permission_val): bool: True if the permission value matches the expected value, False otherwise. """ if self.in_memory: - return self.__class__.roles.get(role_id, {}).get(permission_key, None) == permission_val + return ( + self.__class__.roles.get(role_id, {}).get(permission_key, None) + == permission_val + ) else: - self.cursor.execute("SELECT permissions FROM roles WHERE role_id = ?", (role_id,)) + self.cursor.execute( + "SELECT permissions FROM roles WHERE role_id = ?", (role_id,) + ) permissions = self.cursor.fetchone() if permissions: return permissions[0].get(permission_key, None) == permission_val @@ -126,29 +156,32 @@ def count_roles(self): if self.in_memory: r = len(self.__class__.roles) _EdgeDBRoleQuery.total_roles = r - return + return else: self.cursor.execute("SELECT COUNT(*) FROM roles") r = self.cursor.fetchone()[0] _EdgeDBRoleQuery.total_roles = r - return + return @classmethod def reinitialize_all(foreground=True): if foreground: for instance in AuthLiteClient.instances: - instance:AuthLiteClient = instance + instance: AuthLiteClient = instance instance._re_init_roles() else: + def target(): for instance in AuthLiteClient.instances: - instance:AuthLiteClient = instance + instance: AuthLiteClient = instance instance._re_init_roles() + thread = threading.Thread(target=target) thread.start() @staticmethod def _EDGE_Wrapper(func): + @wraps(func) def wrapper(*args, **kwargs): # Call the function @@ -157,14 +190,17 @@ def wrapper(*args, **kwargs): x_edge = response.headers.get("X-EDGE") if x_edge != None or bool: if int(x_edge) != _EdgeDBRoleQuery.total_roles: - _EdgeDBRoleQuery.reinitialize_all() # Add data + _EdgeDBRoleQuery.reinitialize_all() # Add data return response + return wrapper + requests.get = _EdgeDBRoleQuery._EDGE_Wrapper(requests.get) requests.post = _EdgeDBRoleQuery._EDGE_Wrapper(requests.post) requests.delete = _EdgeDBRoleQuery._EDGE_Wrapper(requests.delete) + class _Roles(_EdgeDBRoleQuery): """ A class for managing roles and permissions in the EdgeDB system. @@ -193,9 +229,19 @@ class _Roles(_EdgeDBRoleQuery): delete_permission(self, rol_id, **Permission_): Deletes a permission from a role with the specified role ID. """ + instances = [] - def __init__(self, roles, org_id, api_key, signed_key, secret_key, API_BASE_URL, InMemory=True): + def __init__( + self, + roles, + org_id, + api_key, + signed_key, + secret_key, + API_BASE_URL, + InMemory=True, + ): """ Initializes the _Roles instance. @@ -224,7 +270,7 @@ def get_all_roles(self) -> GetAllRolesResponse: Returns: roles_list = List[Role]: A list of Role objects representing the roles and their permissions.roles roles_json_list = List[dict]: A list of dict representing the roles and their permissions - + demo response ==> [ { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", @@ -259,18 +305,20 @@ def get_all_roles(self) -> GetAllRolesResponse: ] } ]""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } response = requests.get(url, headers=headers, params=params) roles = [Role(**role_data) for role_data in response.json()] - return GetAllRolesResponse(roles_list=roles, roles_json_list=[role.to_dict() for role in roles]) + return GetAllRolesResponse( + roles_list=roles, roles_json_list=[role.to_dict() for role in roles] + ) - def add_role(self, name, **Permission_)->AddRoleResponse: + def add_role(self, name, **Permission_) -> AddRoleResponse: """ Adds a new role with the specified name and permissions. @@ -280,7 +328,7 @@ def add_role(self, name, **Permission_)->AddRoleResponse: Returns: AddRoleResponse: An AddRoleResponse object representing the newly created role. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -297,34 +345,28 @@ def add_role(self, name, **Permission_)->AddRoleResponse: } ] }""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] - data = { - "org_id": f'{self.org_id}', - "name": name, - "permissions": permissions - } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "name": name, "permissions": permissions} + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return AddRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=permissions + permissions=permissions, ) def delete_role(self, rol_id) -> DeleteRoleResponse: - """ Deletes a role with the specified role ID. @@ -333,7 +375,7 @@ def delete_role(self, rol_id) -> DeleteRoleResponse: Returns: DeleteRoleResponse: A DeleteRoleResponse object representing the deleted role. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_YHV_78ae9006bcaa4c77", @@ -350,31 +392,29 @@ def delete_role(self, rol_id) -> DeleteRoleResponse: } ] }""" - url = f'{self.API_BASE_URL}/rbac/role' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } - data = { - "org_id": f'{self.org_id}', - "rol_id": rol_id - } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + data = {"org_id": f"{self.org_id}", "rol_id": rol_id} + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) role_data = response.json() permissions = [Permission(**p) for p in role_data.get("permissions", [])] return DeleteRoleResponse( org_id=role_data.get("org_id"), rol_id=role_data.get("rol_id"), name=role_data.get("name"), - permissions=permissions + permissions=permissions, ) - def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissionResponse: + def add_permission( + self, rol_id, foreground=False, **Permission_ + ) -> AddPermissionResponse: """ Adds a new permission to a role with the specified role ID. @@ -384,7 +424,7 @@ def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissi Returns: AddPermissionResponse: An AddPermissionResponse object representing the added permission. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -394,33 +434,34 @@ def add_permission(self, rol_id, foreground=False, **Permission_) -> AddPermissi } ] }""" - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.post(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.post( + url, headers=headers, params=params, data=json.dumps(data) + ) response_data = response.json() permissions = [{k: v} for k, v in permissions.items()] self.reinitialize_all(foreground) return AddPermissionResponse( org_id=response_data.get("org_id"), rol_id=response_data.get("rol_id"), - permissions=permissions + permissions=permissions, ) - def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePermissionResponse: + def delete_permission( + self, rol_id, foreground=False, **Permission_ + ) -> DeletePermissionResponse: """ Deletes a permission from a role with the specified role ID. @@ -430,7 +471,7 @@ def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePe Returns: DeletePermissionResponse: A DeletePermissionResponse object representing the role with the deleted permission. - + demo response ==> { "org_id": "4195502c85984d27ae1aceb677d99551543808625aeb11ee88069dc8f7663e88", "rol_id": "rol_rce_474ae9e59b3d49ce", @@ -445,30 +486,29 @@ def delete_permission(self, rol_id, foreground=False, **Permission_) -> DeletePe "maintainer": "administration" } ] - }""" #return full - url = f'{self.API_BASE_URL}/rbac/permission' - headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json' - } + }""" # return full + url = f"{self.API_BASE_URL}/rbac/permission" + headers = {"accept": "application/json", "Content-Type": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } permissions = [{k: v} for k, v in Permission_.items()] data = { - "org_id": f'{self.org_id}', + "org_id": f"{self.org_id}", "rol_id": rol_id, - "permissions": permissions + "permissions": permissions, } - response = requests.delete(url, headers=headers, params=params, data=json.dumps(data)) + response = requests.delete( + url, headers=headers, params=params, data=json.dumps(data) + ) self.reinitialize_all(foreground) return response.json() - -class AuthLiteClient(): - instances = [] + +class AuthLiteClient: + instances = [] """ AuthLiteClient is a Python client for the TrustAuthX authentication service. @@ -524,49 +564,70 @@ class TokenCheck: refresh (str): The refresh token. state (bool): The state of the tokens (True if valid, False otherwise). """ - access :str - refresh:str - state:bool - def __init__(self, api_key, secret_key, org_id=None, API_BASE_URL="https://api.trustauthx.com", in_memory=True): + access: str + refresh: str + state: bool + + def __init__( + self, + api_key, + secret_key, + org_id=None, + API_BASE_URL="https://api.trustauthx.com", + in_memory=True, + ): """ - Initializes the AuthLiteClient instance. + Initializes the AuthLiteClient instance. - Args: - api_key (str): The API key used for authentication. - secret_key (str): The secret key used for JWT encoding. - org_id (str, optional): The organization ID for generating authentication URLs. - API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". - in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). - - """ - self.jwt_encode = lambda key, data: jwt.encode(data, key=key, algorithm= ALGORITHMS.HS256) - self.jwt_decode = lambda key, data: jwt.decode(str(data), key=key, algorithms=ALGORITHMS.HS256) + Args: + api_key (str): The API key used for authentication. + secret_key (str): The secret key used for JWT encoding. + org_id (str, optional): The organization ID for generating authentication URLs. + API_BASE_URL (str, optional): The base URL for the API. Defaults to "https://api.trustauthx.com". + in_memory (bool, optional): Flag indicating whether to store the roles in-memory or in a SQLite database. Defaults to True (ie. in-memory). + + """ + self.jwt_encode = lambda key, data: jwt.encode( + data, key=key, algorithm=ALGORITHMS.HS256 + ) + self.jwt_decode = lambda key, data: jwt.decode( + str(data), key=key, algorithms=ALGORITHMS.HS256 + ) self._secret_key = secret_key self._api_key = api_key self.org_id = org_id - self._signed_key = self.jwt_encode(key=self._secret_key, data={"api_key":self._api_key}) + self._signed_key = self.jwt_encode( + key=self._secret_key, data={"api_key": self._api_key} + ) self.API_BASE_URL = API_BASE_URL self.in_memory = in_memory - self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, - api_key=self._api_key, signed_key=self._signed_key, - secret_key=self._secret_key, API_BASE_URL=self.API_BASE_URL, - InMemory=in_memory) + self.Roles: _Roles = _Roles( + roles=self._set_edge_roles(), + org_id=self.org_id, + api_key=self._api_key, + signed_key=self._signed_key, + secret_key=self._secret_key, + API_BASE_URL=self.API_BASE_URL, + InMemory=in_memory, + ) self.__class__.instances.append(self) - + def generate_url(self) -> str: """ Generates an authentication URL for the given organization. Returns: str: The generated authentication URL. - + Raises: ValueError: If org_id is not provided. """ # Generate an authentication url for the given org - if self.org_id:return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" - else:raise ValueError("must provide org_id") + if self.org_id: + return f"https://app.trustauthx.com/widget/login/?org_id={self.org_id}" + else: + raise ValueError("must provide org_id") def generate_edit_user_url(self, access_token, url) -> str: """ @@ -580,15 +641,15 @@ def generate_edit_user_url(self, access_token, url) -> str: str: The generated authentication URL. """ # Generate an authentication url for the given org - headers = {'accept': 'application/json'} + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key, - 'url':url - } + "AccessToken": access_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + "url": url, + } url = f"{self.API_BASE_URL}/api/user/me/settings/" - req = requests.Request('GET', url, params=params, headers=headers).prepare() + req = requests.Request("GET", url, params=params, headers=headers).prepare() return req.url def re_auth(self, code): @@ -607,23 +668,24 @@ def re_auth(self, code): url = f"{self.API_BASE_URL}/api/user/me/widget/re-auth/token" params = { "code": code, - 'api_key': self._api_key, - 'signed_key': self._signed_key + "api_key": self._api_key, + "signed_key": self._signed_key, } headers = {"accept": "application/json"} response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user(self, token) -> dict: """ @@ -639,26 +701,27 @@ def get_user(self, token) -> dict: HTTPError: If the request fails with an HTTP error status code. """ # Validate the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/data" + headers = {"accept": "application/json"} params = { - 'UserToken': token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "UserToken": token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) sub = json.loads(rtn["sub"]) rtn.pop("sub") rtn["email"] = sub["email"] rtn["uid"] = sub["uid"] return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_user_data(self, AccessToken) -> dict: """ @@ -675,22 +738,23 @@ def get_user_data(self, AccessToken) -> dict: """ # Validate the given authentication token """returns a dict containing 'access_token', 'refresh_token', 'img', 'sub'""" - url = f'{self.API_BASE_URL}/api/user/me/data' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/data" + headers = {"accept": "application/json"} params = { - 'AccessToken': AccessToken, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "AccessToken": AccessToken, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) if response.status_code == 200: - rtn = self.jwt_decode(self._secret_key,response.json()) + rtn = self.jwt_decode(self._secret_key, response.json()) return rtn - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def get_access_token_from_refresh_token(self, refresh_token): """ @@ -706,20 +770,22 @@ def get_access_token_from_refresh_token(self, refresh_token): HTTPError: If the request fails with an HTTP error status code. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/access/token/' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/access/token/" + headers = {"accept": "application/json"} params = { - 'RefreshToken': refresh_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "RefreshToken": refresh_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format( - response.status_code, - response.text) - ) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_access_token(self, access_token) -> bool: """ @@ -732,17 +798,22 @@ def validate_access_token(self, access_token) -> bool: bool: True if the access token is valid, False otherwise. """ # Store the given authentication token - url = f'{self.API_BASE_URL}/api/user/me/auth/validate/token' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/api/user/me/auth/validate/token" + headers = {"accept": "application/json"} params = { - 'AccessToken': access_token, - 'api_key': self._api_key, - 'signed_key': self._signed_key - } + "AccessToken": access_token, + "api_key": self._api_key, + "signed_key": self._signed_key, + } response = requests.get(url, headers=headers, params=params) return response.status_code == 200 - def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_tokens:bool = False) -> bool: + def revoke_token( + self, + AccessToken: str = None, + RefreshToken: str = None, + revoke_all_tokens: bool = False, + ) -> bool: """ Revokes an access token or refresh token. @@ -758,22 +829,28 @@ def revoke_token(self,AccessToken:str=None, RefreshToken:str = None, revoke_all_ HTTPError: If the request fails with an HTTP error status code. AttributeError: If neither AccessToken nor RefreshToken is provided. """ - url = f'{self.API_BASE_URL}/api/user/me/token/' - headers = {'accept': 'application/json'} - if not AccessToken and not RefreshToken:raise AttributeError("must provide either AccessToken or RefreshToken") - tt=True if AccessToken else False + url = f"{self.API_BASE_URL}/api/user/me/token/" + headers = {"accept": "application/json"} + if not AccessToken and not RefreshToken: + raise AttributeError("must provide either AccessToken or RefreshToken") + tt = True if AccessToken else False t = AccessToken if AccessToken else RefreshToken params = { - 'Token': t, - 'api_key': self._api_key, - 'signed_key': self._signed_key, - 'AccessToken': tt, - 'SpecificTokenOnly':not revoke_all_tokens, - } + "Token": t, + "api_key": self._api_key, + "signed_key": self._signed_key, + "AccessToken": tt, + "SpecificTokenOnly": not revoke_all_tokens, + } response = requests.delete(url, headers=headers, params=params) - if response.status_code == 200:return response.json() - else:raise HTTPError( - 'Request failed with status code : {} \n this code contains a msg : {}'.format(response.status_code, response.text)) + if response.status_code == 200: + return response.json() + else: + raise HTTPError( + "Request failed with status code : {} \n this code contains a msg : {}".format( + response.status_code, response.text + ) + ) def validate_token_set(self, access_token, refresh_token) -> TokenCheck: """ @@ -796,8 +873,8 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: if refresh_token: new_tokens = self.get_access_token_from_refresh_token(refresh_token) d.state = False - d.access = new_tokens['access_token'] - d.refresh = new_tokens['refresh_token'] + d.access = new_tokens["access_token"] + d.refresh = new_tokens["refresh_token"] return d else: d.state = True @@ -805,25 +882,32 @@ def validate_token_set(self, access_token, refresh_token) -> TokenCheck: d.refresh = refresh_token return d except: - raise HTTPError('both tokens are invalid login again') - + raise HTTPError("both tokens are invalid login again") + def _set_edge_roles(self) -> list: self.Roles - url = f'{self.API_BASE_URL}/rbac/role' - headers = {'accept': 'application/json'} + url = f"{self.API_BASE_URL}/rbac/role" + headers = {"accept": "application/json"} params = { - 'org_id': f'{self.org_id}', - 'api_key': f'{self._api_key}', - 'signed_key': f'{self._secret_key}' + "org_id": f"{self.org_id}", + "api_key": f"{self._api_key}", + "signed_key": f"{self._secret_key}", } response = requests.get(url, headers=headers, params=params) roles = [Role(**role_data) for role_data in response.json()] - roles = GetAllRolesResponse(roles_list=roles, roles_json_list=[role.to_dict() for role in roles]) + roles = GetAllRolesResponse( + roles_list=roles, roles_json_list=[role.to_dict() for role in roles] + ) return roles.roles_json_list def _re_init_roles(self) -> _Roles: - self.Roles: _Roles = _Roles(roles=self._set_edge_roles(), org_id=self.org_id, - api_key=self._api_key, signed_key=self._signed_key, - secret_key=self._secret_key, API_BASE_URL=self.API_BASE_URL, - InMemory=self.in_memory) - return self.Roles \ No newline at end of file + self.Roles: _Roles = _Roles( + roles=self._set_edge_roles(), + org_id=self.org_id, + api_key=self._api_key, + signed_key=self._signed_key, + secret_key=self._secret_key, + API_BASE_URL=self.API_BASE_URL, + InMemory=self.in_memory, + ) + return self.Roles