From 03175bba3fbc437bc231b56276e70a5c063bbbe0 Mon Sep 17 00:00:00 2001 From: Yansheng Wei Date: Sat, 3 Feb 2024 18:16:10 -0600 Subject: [PATCH] Release 1.1.0, see CHANGELOG.md for details. --- CHANGELOG.md | 6 ++ CONTRIBUTING.md | 2 +- aiobastion/accounts.py | 20 ++++--- aiobastion/config.py | 8 +++ aiobastion/cyberark.py | 98 ++++++++++++++++++++----------- aiobastion/safe.py | 10 +++- docs/accounts.rst | 5 ++ docs/login.rst | 14 ++++- docs/safe.rst | 10 ++++ tests/test_accounts.py | 9 +-- tests/test_config.py | 36 +++++++++++- tests/test_cyberark.py | 3 +- tests/test_data/custom_config.yml | 10 ++++ tests/test_safe.py | 6 ++ tests/test_session_management.py | 2 + 15 files changed, 186 insertions(+), 53 deletions(-) create mode 100644 tests/test_data/custom_config.yml diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f06d2d..8aa68e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.0] - 2024-02-03 +### Changes +- Add "get_safe_details" method +- Add support for "custom" configs to override the default logon and reconcile account index +- Add support to retain cookies during login, and use for subsequent API calls for load-balanced PVWAs. + ## [1.0.0] - 2024-01-26 ### Changes - Adding some debug information diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b6eb8dd..efad809 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -36,7 +36,7 @@ If you wish to contribute with code the workflow is : - Then, generate some accounts with mockaroo and the following schemas : https://www.mockaroo.com/b41fedb0. See "Troubleshoot" section of some cleanup to avoid issues. - Create the associated safes : sample-it-dept,sample-iaadmins,sample-coolteam -- Create safe "BSA-SYS-PTT-R", and grant user "admin_bot" (see below) to the "Safe Management" permissions (for safe +- Create safe "RENAME_ME", and grant user "admin_bot" (see below) to the "Safe Management" permissions (for safe rename testing) - Import the data (with bulk upload) - Create the configuration file for your testing Vault diff --git a/aiobastion/accounts.py b/aiobastion/accounts.py index 2aaea3e..1e12528 100644 --- a/aiobastion/accounts.py +++ b/aiobastion/accounts.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- import asyncio import re -from typing import List, Union, AsyncIterator, AsyncGenerator +from typing import List, Union, AsyncIterator + import aiohttp from .config import validate_ip, flatten @@ -338,7 +339,7 @@ async def link_reconciliation_account(self, account: Union[PrivilegedAccount, Li reconcile_account: PrivilegedAccount): """ | This function links the account (or the list of accounts) to the given reconcile account - | ⚠️ The "reconcile" Account is supposed to have an index of 3 + | ⚠️ The "reconcile" Account index is default to 3 :param account: a PrivilegedAccount object or a list of PrivilegedAccount objects :type account: PrivilegedAccount, list @@ -346,13 +347,13 @@ async def link_reconciliation_account(self, account: Union[PrivilegedAccount, Li :return: A boolean that indicates if the operation was successful. :raises CyberarkException: If link failed """ - return await self.link_account(account, reconcile_account, 3) + return await self.link_account(account, reconcile_account, self.epv.RECONCILE_ACCOUNT_INDEX) async def link_logon_account(self, account: Union[PrivilegedAccount, List[PrivilegedAccount]], logon_account: PrivilegedAccount): """ | This function links the account (or the list of accounts) to the given logon account - | ⚠️ The "logon" Account is supposed to have an index of 2 + | ⚠️ The "logon" Account index is default to 2, you can change it by setting custom:LOGON_ACCOUNT_INDEX in the config :param account: a PrivilegedAccount object or a list of PrivilegedAccount objects :type account: PrivilegedAccount, list @@ -361,7 +362,7 @@ async def link_logon_account(self, account: Union[PrivilegedAccount, List[Privil :raises CyberarkException: If link failed """ #TODO check the index of logon account at platform level ! - return await self.link_account(account, logon_account, 2) + return await self.link_account(account, logon_account, self.epv.LOGON_ACCOUNT_INDEX) async def link_reconcile_account_by_address(self, acc_username, rec_acc_username, address): """ This function links the account with the given username and address to the reconciliation account with @@ -396,7 +397,7 @@ async def link_reconcile_account_by_address(self, acc_username, rec_acc_username async def remove_reconcile_account(self, account: Union[PrivilegedAccount, List[PrivilegedAccount]]): """ | This function unlinks the reconciliation account of the given account (or the list of accounts) - | ⚠️ The "reconcile" Account is supposed to have an index of 3 + | ⚠️ The "reconcile" Account index is default to 3 :param account: a PrivilegedAccount object or a list of PrivilegedAccount objects @@ -404,23 +405,24 @@ async def remove_reconcile_account(self, account: Union[PrivilegedAccount, List[ :return: A boolean that indicates if the operation was successful. :raises CyberarkException: If link failed: """ - return await self.unlink_account(account, 3) + return await self.unlink_account(account, self.epv.RECONCILE_ACCOUNT_INDEX) async def remove_logon_account(self, account: Union[PrivilegedAccount, List[PrivilegedAccount]]): """ | This function unlinks the logon account of the given account (or the list of accounts) - | ⚠️ The "logon" Account is supposed to have an index of 2 + | ⚠️ The "logon" Account index is default to 2, you can change it by setting custom:LOGON_ACCOUNT_INDEX in the config :param account: a PrivilegedAccount object or a list of PrivilegedAccount objects :type account: PrivilegedAccount, list :return: A boolean that indicates if the operation was successful. :raises CyberarkException: If link failed: """ - return await self.unlink_account(account, 2) + return await self.unlink_account(account, self.epv.LOGON_ACCOUNT_INDEX) async def unlink_account(self, account: Union[PrivilegedAccount, List[PrivilegedAccount]], extra_password_index: int): """ This function unlinks the account of the given account (or the list of accounts) + | ⚠️ Double check the linked account index on your platform. :param account: a PrivilegedAccount object or a list of PrivilegedAccount objects :type account: PrivilegedAccount, list diff --git a/aiobastion/config.py b/aiobastion/config.py index 0f86e59..4a0acc3 100644 --- a/aiobastion/config.py +++ b/aiobastion/config.py @@ -6,6 +6,7 @@ class Config: + """Parse a config file into an object""" # Default value CYBERARK_DEFAULT_TIMEOUT = 30 CYBERARK_DEFAULT_MAX_CONCURRENT_TASKS = 10 @@ -35,6 +36,7 @@ def __init__(self, configfile): self.max_concurrent_tasks = Config.CYBERARK_DEFAULT_MAX_CONCURRENT_TASKS self.timeout = Config.CYBERARK_DEFAULT_TIMEOUT self.PVWA_CA = False + self.keep_cookies = False with open(configfile, 'r') as config: configuration = yaml.safe_load(config) @@ -125,6 +127,8 @@ def _read_section_pvwa(self, configuration): elif keyname == "maxtasks" or keyname == "max_concurrent_tasks": self.max_concurrent_tasks = self._to_integer("PVWA/" + k, configuration[k]) synonyme_max_concurrent_tasks += 1 + elif keyname == "keep_cookies": + self.keep_cookies = bool(configuration[k]) elif keyname == "verify" or keyname == "ca": self.PVWA_CA = configuration[k] synonyme_PVWA_CA += 1 @@ -147,6 +151,7 @@ def _read_section_aim(self, configuration): "host": None, # Default = PVWA (host) "key": None, "max_concurrent_tasks": None, # Default = PVWA (max_concurrent_tasks) + "keep_cookies": False, # Default = False "verify": None, # Default = PVWA (PVWA_CA) "timeout": None, # Default = PVWA (timeout) } @@ -161,6 +166,8 @@ def _read_section_aim(self, configuration): configuration_aim[keyname] = configuration[k] elif keyname == "timeout": configuration_aim[keyname] = self._to_integer("AIM/" + k, configuration[k]) + elif keyname == "keep_cookies": + configuration_aim[keyname] = bool(configuration[k]) elif keyname in ["maxtasks", "max_concurrent_tasks"]: configuration_aim["max_concurrent_tasks"] = self._to_integer("AIM/" + k, configuration[k]) synonyme_max_concurrent_tasks += 1 @@ -195,6 +202,7 @@ def _read_section_aim(self, configuration): if self.AIM["verify"] is None: self.AIM["verify"] = self.PVWA_CA + def _to_integer(self, section_key, val): try: v = int(val) diff --git a/aiobastion/cyberark.py b/aiobastion/cyberark.py index bf2213d..8ec4934 100644 --- a/aiobastion/cyberark.py +++ b/aiobastion/cyberark.py @@ -36,25 +36,30 @@ def __init__(self, configfile: str = None, serialized: dict = None, token: str = self.logger = logger # PVWA initialization - self.api_host = None # CyberArk host - self.authtype = "cyberark" # CyberArk authentification type + self.api_host = None # CyberArk host + self.authtype = "cyberark" # CyberArk authentification type - # Number of parrallel task for PVWA and AIM + # Number of parallel task for PVWA and AIM self.max_concurrent_tasks = Config.CYBERARK_DEFAULT_MAX_CONCURRENT_TASKS # Communication timeout in seconds self.timeout = Config.CYBERARK_DEFAULT_TIMEOUT - self.verify = False # root certificate authority (CA) + self.keep_cookies = False # Whether to keep cookies between API calls + self.verify = False # root certificate authority (CA) self.request_params = {"timeout": self.timeout, "ssl": False} # timeout & ssl setupn default value self.__token = token # CyberArk authorization token # AIM Communication initialization - self.AIM = None # EPV_AIM definition + self.AIM = None # EPV_AIM definition + + # Linked accounts index defaults, can be overriden by configs + self.LOGON_ACCOUNT_INDEX = 2 # This really should be 1, but keep as 2 for backward compatibility + self.RECONCILE_ACCOUNT_INDEX = 3 # You SHOULD NOT change this, except for backward compatibility # Other section initialization - self.configfile = configfile # Name of the configuration file - self.config = None # Definition from the configuration file - self.cpm = "" # CPM to assign to safes + self.configfile = configfile # Name of the configuration file + self.config = None # Definition from the configuration file + self.cpm = "" # CPM to assign to safes self.retention = Config.CYBERARK_DEFAULT_RETENTION # days of retention for objects in safe if configfile is None and serialized is None: @@ -70,6 +75,7 @@ def __init__(self, configfile: str = None, serialized: dict = None, token: str = # Session management self.session = None + self.cookies = None self.__sema = None # utilities @@ -84,6 +90,11 @@ def __init__(self, configfile: str = None, serialized: dict = None, token: str = self.system_health = SystemHealth(self) self.utils = Utilities(self) + def _epv_set_linked_account_index(self, custom): + if custom is not None: + if custom['LOGON_ACCOUNT_INDEX']: self.LOGON_ACCOUNT_INDEX = int(custom['LOGON_ACCOUNT_INDEX']) # noqa: + if custom['RECONCILE_ACCOUNT_INDEX']: self.RECONCILE_ACCOUNT_INDEX = int(custom['RECONCILE_ACCOUNT_INDEX']) # noqa: + def _epv_config(self, configfile): self.config = Config(configfile) @@ -92,6 +103,7 @@ def _epv_config(self, configfile): self.authtype = self.config.authtype self.max_concurrent_tasks = self.config.max_concurrent_tasks self.timeout = self.config.timeout + self.keep_cookies = self.config.keep_cookies self.verify = self.config.PVWA_CA # AIM Communication @@ -102,6 +114,8 @@ def _epv_config(self, configfile): self.cpm = self.config.CPM self.retention = self.config.retention + self._epv_set_linked_account_index(self.config.custom) + def _epv_serialize(self, serialized): if not isinstance(serialized, dict): raise AiobastionException("Type error: Parameter 'serialized' must be a dictionary.") @@ -117,8 +131,9 @@ def _epv_serialize(self, serialized): "retention", "timeout", "token", + "keep_cookies", "verify", - ]: + "custom"]: raise AiobastionException(f"Unknown serialized field: {k} = {serialized[k]!r}") # PVWA definition @@ -130,10 +145,15 @@ def _epv_serialize(self, serialized): self.max_concurrent_tasks = serialized['max_concurrent_tasks'] if "timeout" in serialized: self.timeout = serialized["timeout"] + if "keep_cookies" in serialized: + self.keep_cookies = bool(serialized["keep_cookies"]) if "verify" in serialized: self.verify = serialized["verify"] if "token" in serialized: self.__token = serialized['token'] + if "custom" in serialized: + self.custom = serialized['custom'] + self._epv_set_linked_account_index(self.custom) # AIM Communication if "AIM" in serialized: @@ -145,7 +165,7 @@ def _epv_serialize(self, serialized): getattr(self, "max_concurrent_tasks", Config.CYBERARK_DEFAULT_MAX_CONCURRENT_TASKS)) serialized_aim.setdefault("timeout", getattr(self, "timeout", Config.CYBERARK_DEFAULT_TIMEOUT)) serialized_aim.setdefault("verify", getattr(self, "verify", False)) - + serialized_aim.setdefault("keep_cookies", getattr(self, "keep_cookies", False)) self.AIM = EPV_AIM(serialized=serialized_aim) # Other definition @@ -162,16 +182,17 @@ def validate_and_setup_ssl(self): if os.path.isdir(self.verify): self.request_params = {"timeout": self.timeout, - "ssl": ssl.create_default_context(capath=self.verify)} + "ssl": ssl.create_default_context(capath=self.verify)} else: self.request_params = {"timeout": self.timeout, - "ssl": ssl.create_default_context(cafile=self.verify)} - elif self.verify: # True + "ssl": ssl.create_default_context(cafile=self.verify)} + elif self.verify: # True self.request_params = {"timeout": self.timeout, - "ssl": ssl.create_default_context()} - else: # None or False + "ssl": ssl.create_default_context()} + else: # None or False self.request_params = {"timeout": self.timeout, "ssl": False} + # Context manager async def __aenter__(self): await self.login() @@ -206,8 +227,15 @@ async def __login_cyberark(self, username: str, password: str, auth_type: str) - raise CyberarkException(error) tok = await req.text() + # Copy the cookies to insert into later sessions + if self.keep_cookies: + self.cookies = session.cookie_jar.filter_cookies(f"https://{self.api_host}") # type: ignore + for cookie in self.cookies: + self.cookies[cookie]['domain'] = self.api_host # Closing session because now we are connected and we need to update headers which can be done - # only by recreating a new session (or passing the headers on each request) + # only by recreating a new session (or passing the headers on each request). However, since the session + # token is only recognized by the PVWA instance that issued the token, load-balancers need to enable session + # stickiness which is often done with cookies. await session.close() return tok.replace('"', '') @@ -221,11 +249,13 @@ async def __login_cyberark(self, username: str, password: str, auth_type: str) - async def logoff(self): url, head = self.get_url("API/Auth/Logoff") async with aiohttp.ClientSession() as session: + self.cookies and session.cookie_jar.update_cookies(self.cookies) async with session.post(url, headers=head, **self.request_params) as req: if req.status != 200: raise CyberarkException("Error disconnecting to PVWA with code : %s" % str(req.status)) await self.close_session() self.__token = None + self.cookies = None return True @@ -312,20 +342,20 @@ async def login_with_aim(self, aim_host: str = None, appid: str = None, username timeout = (timeout or self.AIM.timeout or self.timeout) max_concurrent_tasks = (max_concurrent_tasks or self.AIM.max_concurrent_tasks or self.max_concurrent_tasks) - if root_ca is None: # May be false + if root_ca is None: # May be false if self.AIM.verify is not None: - root_ca = self.AIM.verify + root_ca = self.AIM.verify else: if self.verify is not None: - root_ca = self.verify # PVWA + root_ca = self.verify # PVWA else: - root_ca = True + root_ca = True - if (aim_host and aim_host != self.AIM.host) or \ - (appid and appid != self.AIM.appid) or \ - (cert_file and cert_file != self.AIM.cert) or \ - (cert_key and cert_key != self.AIM.key) or \ - (root_ca is not None and root_ca != self.AIM.verify): + if (aim_host and aim_host != self.AIM.host) or \ + (appid and appid != self.AIM.appid) or \ + (cert_file and cert_file != self.AIM.cert) or \ + (cert_key and cert_key != self.AIM.key) or \ + (root_ca is not None and root_ca != self.AIM.verify): raise CyberarkException("AIM is already initialized ! Please close EPV before reopen it.") else: if root_ca is None: @@ -341,12 +371,12 @@ async def login_with_aim(self, aim_host: str = None, appid: str = None, username self.AIM.validate_and_setup_aim_ssl() # Check mandatory attributs - if self.AIM.host is None or \ - self.AIM.appid is None or \ - self.AIM.cert is None or \ - self.AIM.key is None: - raise AiobastionException( - "Missing AIM mandatory parameters: host, appid, cert, key (and a optional verify).") + if self.AIM.host is None or \ + self.AIM.appid is None or \ + self.AIM.cert is None or \ + self.AIM.key is None: + raise AiobastionException("Missing AIM mandatory parameters: host, appid, cert, key (and a optional verify).") + # Complete undefined parameters with PVWA attributes if username is None and self.config and self.config.username: @@ -446,6 +476,7 @@ async def login(self, username=None, password=None, auth_type="", user_search=No except (CyberarkAIMnotFound, CyberarkAPIException, CyberarkException) as err: raise GetTokenException(str(err)) from err + try: self.__token = await self.__login_cyberark(username, password, auth_type) # update the session @@ -538,7 +569,8 @@ def versiontuple(self, v): async def handle_request(self, method: str, short_url: str, data=None, params: dict = None, filter_func=lambda x: x): """ - Function that handles requests to the API + Function that handles requests to the API. This is a low-level function, and you most likely wouldn't need to + call it. If you do, there is the opportunity to enhance other modules. :param filter_func: :param params: :param method: @@ -570,7 +602,7 @@ async def handle_request(self, method: str, short_url: str, data=None, params: d return response else: return True - # except: + #except: # raise else: if req.status == 404: diff --git a/aiobastion/safe.py b/aiobastion/safe.py index ba6276c..d061226 100644 --- a/aiobastion/safe.py +++ b/aiobastion/safe.py @@ -309,7 +309,7 @@ async def search_safe_paginate(self, page: int = 1, size_of_page: int = 100, sea params["offset"] = (page - 1) * size_of_page try: search_results = await self.epv.handle_request("get", "API/Safes", params=params, - filter_func=lambda x: x) + filter_func=lambda x: x) except CyberarkAPIException as err: if err.err_code == "CAWS00001E": raise AiobastionException("Please don't list safes with a user member of PSMMaster (Cyberark bug)") @@ -336,6 +336,14 @@ async def list(self, details=False): else: return [r["safeName"] for r in await self.search()] + async def get_safe_details(self, safename: str): + """ + Get details of a given safe. We do a direct query instead of a search for efficiency. + :return: A dict of the safe details + """ + if not safename: raise AiobastionException("A safe name must be provided") + return await self.epv.handle_request("get", f"API/Safes/{safename}", params={"extendedDetails": "True"}) + async def v1_get_safes(self): return await self.epv.handle_request("get", 'WebServices/PIMServices.svc/Safes/', filter_func=lambda r: r) diff --git a/docs/accounts.rst b/docs/accounts.rst index 5b57135..3bff629 100644 --- a/docs/accounts.rst +++ b/docs/accounts.rst @@ -41,6 +41,11 @@ it has the following methods : * cpm_status: return the CPM status of the account * last_modified : return the last modified time (days since last password change) +About linked account index : + * reconcile account index: 3 - you should NOT change it unless your system has different custom value. + * logon account index: 2 - this is different from the installation (1). The default value is kept at 2 to avoid + breaking existing users. You can override it to 1 by providing a "custom.LOGON_ACCOUNT_INDEX" value in your config. + Calling functions ------------------- | When it's possible, functions support call with a list as argument instead of single item argument. diff --git a/docs/login.rst b/docs/login.rst index e69d8f4..881209e 100644 --- a/docs/login.rst +++ b/docs/login.rst @@ -150,6 +150,7 @@ You can use the `Serialization tools`_ to extract the EPV serialization at any t "Cert": r"C:\Folder\AIM_Cert.pem", # (required) AIM Filename public certificate "Key": r"C:\Folder\AIM_private_key", # (required) AIM Filename Private Key certificate "Verify": r"C:\Folder\AIM_Root_CA.pem" # (optional) Directory or filename of the ROOT certificate authority (CA) + "keep_cookies": False, # (optional) whether to keep cookies between calls, set to true if API host is behind a load balancer "max_concurrent_tasks": 10, # (optional) AIM Maximum number of parallel task (default 10) "timeout": 30 # (optional) AIM Maximum wait time in seconds before generating a timeout (default 30 seconds) } @@ -163,6 +164,7 @@ You can use the `Serialization tools`_ to extract the EPV serialization at any t "retention": 10, # (optional) Days of retention for objects in safe, default = 10 "timeout": 30, # (optional) Maximum wait time in seconds before generating a timeout (default 30 seconds) "verify": r"C:\Folder\PVWA_Root_CA.pem", # (optional) set if you want to add additional ROOT ca certs + "keep_cookies": False, # (optional) whether to keep cookies between calls, set to true if API host is behind a load balancer "AIM": aim_config # (optional) if AIM API is not needed } @@ -483,6 +485,9 @@ PVWA section / field definitions +----------------------+-------------------------+ + | ca | Optional, deprecated + + +----------------------+-------------------------+--------------------------------------------------------------------------------------------+ +| keep_cookies | Optional | Keep cookies from login and send in subsequent API calls (default Fasle). You may need to + +| | | You may need to set to True when a load-balancer is present. | ++----------------------+-------------------------+--------------------------------------------------------------------------------------------+ AIM section / field definitions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -515,7 +520,10 @@ AIM section / field definitions +----------------------+-------------------------+--------------------------------------------------------------------------------------------+ | timeout | Optional | AIM Maximum wait time in seconds before generating a timeout (default 30 seconds). + | | | + -+ | | If not define use the *timeout* from the PVWA section. + +| | | If not define use the *timeout* from the PVWA section. + ++----------------------+-------------------------+--------------------------------------------------------------------------------------------+ +| keep_cookies | Optional | Keep cookies from login and send in subsequent API calls (default Fasle). You may need to + +| | | You may need to set to True when a load-balancer is present. | +----------------------+-------------------------+--------------------------------------------------------------------------------------------+ @@ -593,3 +601,7 @@ login function login_with_aim function ~~~~~~~~~~~~~~~~~~~~~~~ .. autofunction:: login_with_aim + +handle_request function +~~~~~~~~~~~~~~~~~~~~~~~ +.. autofunction:: handle_request diff --git a/docs/safe.rst b/docs/safe.rst index c16d632..710bcfa 100644 --- a/docs/safe.rst +++ b/docs/safe.rst @@ -121,6 +121,16 @@ list :return: A list of safes names +get_safe_details +~~~~~~~~~~ +.. py:function:: get_safe_details(safename: str) + :async: + + Get details of a given safe. We do a direct query instead of a search for efficiency. + + :param safename: Name of the safe + :return: A dict of the safe details + get_permissions ~~~~~~~~~~~~~~~~~~~ .. py:function:: get_permissions(safename: str, username: str) diff --git a/tests/test_accounts.py b/tests/test_accounts.py index fe17161..fe33384 100644 --- a/tests/test_accounts.py +++ b/tests/test_accounts.py @@ -1,5 +1,6 @@ import asyncio import logging +import os.path import random import secrets import unittest @@ -452,8 +453,8 @@ async def _get_password_version(reason): async def test_get_password_aim(self): - if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '': - self.skipTest("AIM_CONFIG is not set in init file") + if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '' or not os.path.exists(tests.AIM_CONFIG): + self.skipTest("AIM_CONFIG is not set in init file, or not exist") account = await self.get_random_account() logging.debug(f"Account : {account}") @@ -472,8 +473,8 @@ async def test_get_password_aim(self): await self.vault.account.get_password_aim(address="not_exist") async def test_get_secret_aim(self): - if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '': - self.skipTest("AIM_CONFIG is not set in init file") + if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '' or not os.path.exists(tests.AIM_CONFIG): + self.skipTest("AIM_CONFIG is not set in init file, or not exist") account = await self.get_random_account(50) retrieved_password = await self.vault.account.get_secret(account[15]) diff --git a/tests/test_config.py b/tests/test_config.py index 4a5400d..d780da0 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,4 +1,5 @@ import asyncio +import os import unittest from unittest import IsolatedAsyncioTestCase import aiobastion @@ -7,6 +8,15 @@ class TestEPV(IsolatedAsyncioTestCase): + @classmethod + def setUpClass(cls): + cls.custom_linked_acounts = { + "custom": { + "RECONCILE_ACCOUNT_INDEX": 1, + "LOGON_ACCOUNT_INDEX": 3 + } + } + async def asyncSetUp(self): self.vault = aiobastion.EPV(tests.CONFIG) await self.vault.login() @@ -18,6 +28,26 @@ async def asyncTearDown(self): # test_logoff pass + def test_default_linked_accounts_from_yml(self): + vault = aiobastion.EPV(tests.CONFIG) + self.assertEqual(2, vault.LOGON_ACCOUNT_INDEX) + self.assertEqual(3, vault.RECONCILE_ACCOUNT_INDEX) + + def test_default_linked_accounts_from_obj(self): + vault = aiobastion.EPV(serialized={}) + self.assertEqual(2, vault.LOGON_ACCOUNT_INDEX) + self.assertEqual(3, vault.RECONCILE_ACCOUNT_INDEX) + + def test_custom_linked_accounts_from_yml(self): + vault = aiobastion.EPV("test_data/custom_config.yml") + self.assertEqual(3, vault.LOGON_ACCOUNT_INDEX) + self.assertEqual(1, vault.RECONCILE_ACCOUNT_INDEX) + + def test_custom_linked_accounts_from_obj(self): + vault = aiobastion.EPV(serialized=self.custom_linked_acounts) + self.assertEqual(3, vault.LOGON_ACCOUNT_INDEX) + self.assertEqual(1, vault.RECONCILE_ACCOUNT_INDEX) + async def test_logoff(self): await self.vault.logoff() self.assertFalse(await self.vault.check_token()) @@ -27,7 +57,7 @@ async def test_login(self): self.assertTrue(await self.vault.check_token()) async def test_login_aim(self): - if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '': + if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '' or not os.path.exists(tests.AIM_CONFIG): self.skipTest("AIM_CONFIG is not set in init file") await self.vault.logoff() self.assertFalse(await self.vault.check_token()) @@ -52,9 +82,9 @@ async def test_login_pvwa_only(self): PVWA_CONFIG = '../../confs/config_test_pvwa_only.yml' self.vault = aiobastion.EPV(PVWA_CONFIG) with self.assertRaises(aiobastion.exceptions.GetTokenException): - await self.vault.login(username="admin", password="Cyberark1") + await self.vault.login(username="admin", password="wrong_password") # For a relevant test we need a correct login password that we cant display in code - # It could be stored in a test safe + # It could be stored in a test safe. For now, we use a wrong password ane assert token exception. # self.assertTrue(await self.vault.check_token()) diff --git a/tests/test_cyberark.py b/tests/test_cyberark.py index fe46f0a..200e8d0 100644 --- a/tests/test_cyberark.py +++ b/tests/test_cyberark.py @@ -1,4 +1,5 @@ import asyncio +import os import unittest from unittest import IsolatedAsyncioTestCase import aiobastion @@ -26,7 +27,7 @@ async def test_login(self): self.assertTrue(await self.vault.check_token()) async def test_login_aim(self): - if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '': + if tests.AIM_CONFIG is None or tests.AIM_CONFIG == '' or not os.path.exists(tests.AIM_CONFIG): self.skipTest("AIM_CONFIG is not set in init file") await self.vault.logoff() self.assertFalse(await self.vault.check_token()) diff --git a/tests/test_data/custom_config.yml b/tests/test_data/custom_config.yml new file mode 100644 index 0000000..5a98aa4 --- /dev/null +++ b/tests/test_data/custom_config.yml @@ -0,0 +1,10 @@ +connection: + authtype: cyberark + password: abc + username: def +custom: + LOGON_ACCOUNT_INDEX: 3 + RECONCILE_ACCOUNT_INDEX: 1 +label: Production Demo +pvwa: + host: host1 \ No newline at end of file diff --git a/tests/test_safe.py b/tests/test_safe.py index 9353198..668a096 100644 --- a/tests/test_safe.py +++ b/tests/test_safe.py @@ -115,6 +115,12 @@ async def test_add_defaults_admin(self): async def test_delete(self): self.skipTest("Test covered by test_create_safe") + async def test_get(self): + safe = await self.vault.safe.get_safe_details(self.test_safe) + self.assertEqual(self.test_safe, safe['safeName']) + [ self.assertIn(k, safe) for k in ['safeName', 'description', 'accounts']] + self.assertNotIn("no_such_attribute", safe) + async def test_list_members(self): members = await self.vault.safe.list_members(self.test_safe) self.assertIn(self.api_user, members) diff --git a/tests/test_session_management.py b/tests/test_session_management.py index e2b8f51..dca1272 100644 --- a/tests/test_session_management.py +++ b/tests/test_session_management.py @@ -1,4 +1,5 @@ import asyncio +import os import random import secrets import unittest @@ -10,6 +11,7 @@ import time +@unittest.skipIf(not os.path.exists(tests.AIM_CONFIG), "AIM Config File does Not Exist") class TestSessionManagement(IsolatedAsyncioTestCase): async def asyncSetUp(self): self.vault = aiobastion.EPV(tests.AIM_CONFIG)