From 6f4359cb5c3055ede8e65b8f17c899217deb42d0 Mon Sep 17 00:00:00 2001 From: iresolis Date: Wed, 2 Oct 2024 16:01:01 +0000 Subject: [PATCH 1/3] added API checker feature --- Integrations/API Utility/README.MD | 42 +++-- .../API Utility/ir_api_util/addEndPoint.py | 131 +++++++++++++ .../API Utility/ir_api_util/apiChecker.py | 172 ++++++++++++++++++ .../API Utility/ir_api_util/config.json | 3 +- .../API Utility/ir_api_util/reception.py | 44 +++++ 5 files changed, 373 insertions(+), 19 deletions(-) create mode 100644 Integrations/API Utility/ir_api_util/addEndPoint.py create mode 100644 Integrations/API Utility/ir_api_util/apiChecker.py diff --git a/Integrations/API Utility/README.MD b/Integrations/API Utility/README.MD index bffdf78..b899321 100644 --- a/Integrations/API Utility/README.MD +++ b/Integrations/API Utility/README.MD @@ -24,7 +24,7 @@ Run the install script: "./IriusRisk-Central/Integrations/API Utility/install_irtool_reqs" #Command End -Copy the ir_api_util folder to you home directory: +Copy the ir_api_util folder to your home directory: #Command Start cp -r "./IriusRisk-Central/Integrations/API Utility/ir_api_util/" ~ #Command End @@ -39,29 +39,35 @@ Follow the on-screen instructions to utilize the utility's features. Features Feature: Get Project List (1) -Feature Details: -This feature will return via json file, a list of the projects in your instance. - +This feature will return a list of the projects in your instance via a JSON file. Feature: Export IR Project Status (2) -Feature Details: -This feature will export to CSV and Excel the status of your project with respect to countermeasure status. It will prompt you for you IR project reference ID. +This feature will export the status of your project to CSV and Excel, focusing on countermeasure status. It will prompt you for your IR project reference ID. In addition, it will include the countermeasure status of associated projects by tag. -For example, if your project contains a project component, and you tag that component with reference ID of its actual project, the countermeasure data will for that project component will be included along with the data for your target project. - +For example, if your project contains a project component tagged with the reference ID of its actual project, the countermeasure data for that project component will be included along with the data for your target project. Feature: User Access Report (8) -Feature Details: -This feature will produce a simple output which diplays the active or inactive users over a given period in days. +This feature generates a report that displays active or inactive users over a specified period in days. + +Feature: Business Unit Reports (9) +This feature offers two options. The first option generates a CSV report for a single Business Unit by either Name or UUID. The second option generates the same report for all Business Units. + +Feature: Audit Log Report (10) +This feature generates an Excel report focusing on Project Activity and User Activity, sourced from audit log events for up to 180 days. + +Feature: API Query Checker (12) +This feature allows users to validate API queries by running checks against expected outputs. It provides two options: +1. Run API Query Checker + This option executes the API Query Checker to validate queries against sample output files. It checks if the API responses match the expected results. -Feature: Business Unit Reports -Feature Details: -This feature offers two options. The first option will generate a CSV reports for a single Business Unit by either Name or UUID. -The second option will generate the same reports for all business units. +2. Add New Query to be Checked + This option allows users to add a new API query for validation. You will be prompted to provide the following details: + - Friendly Name: A descriptive name for the API query (e.g., "v1 GET Project Details"). + - HTTP Method: The type of request (GET, POST, PUT, DELETE). + - API URL Endpoint: The API call endpoint (e.g., /v1/projects/{reference-id}). + - Sample Output File: The path to a JSON file with the expected output. -Feature: Audit Log Report -Feature Details: -This feature will generate an Excel report focusing on Project Activity and User Activity, sourced from audit log events, for up to 180 days. +Once a query is added, it will be included in future checks performed by the API Query Checker. -Additional features to be added as needed in the future. +Additional features may be added as needed in the future. diff --git a/Integrations/API Utility/ir_api_util/addEndPoint.py b/Integrations/API Utility/ir_api_util/addEndPoint.py new file mode 100644 index 0000000..b2759fa --- /dev/null +++ b/Integrations/API Utility/ir_api_util/addEndPoint.py @@ -0,0 +1,131 @@ +import sys +import json +import os + + + +def parse_sample_response(sample): + if isinstance(sample, dict): + parsed = {} + for key, value in sample.items(): + if value is None: + parsed[key] = None # Allow None values + elif isinstance(value, bool): + parsed[key] = "bool" + elif isinstance(value, str): + parsed[key] = "string" + elif isinstance(value, int): + parsed[key] = "int" + elif isinstance(value, float): + parsed[key] = "float" + elif isinstance(value, list): + if not value: + parsed[key] = [] # Empty list + else: + if isinstance(value[0], dict): + parsed[key] = [parse_sample_response(value[0])] + elif isinstance(value[0], str): + parsed[key] = ["string"] # Handle lists of strings + else: + parsed[key] = ["unknown"] + elif isinstance(value, dict): + parsed[key] = parse_sample_response(value) + else: + parsed[key] = "unknown" + return parsed + elif isinstance(sample, list): + if not sample: + return [] + elif isinstance(sample[0], dict): + return [parse_sample_response(sample[0])] + else: + return ["string"] if isinstance(sample[0], str) else ["unknown"] + else: + return "unknown" + + + +def read_credentials(api_token_path='~/ir/.ir_user_token', instance_domain_path='~/ir/ir_instance_domain'): + try: + with open(os.path.expanduser(instance_domain_path), 'r') as domain_file: + instance_domain = domain_file.read().strip() + return instance_domain + except FileNotFoundError as e: + print(f"Error: {e}. Make sure the paths are correct.") + sys.exit(1) # Exit if credentials cannot be read + + + +def add_endpoint_to_queries(name, method, url, sample_structure, instance_domain, filename='apiChecker.json'): + parsed_structure = parse_sample_response(sample_structure) + + if not url.startswith("https://") and not url.startswith("http://"): + url = f"https://{instance_domain}.iriusrisk.com{url}" + + # Detect if it's v1 or v2 based on the URL pattern + if "/v2/" in url: + accept_header = "application/hal+json" + else: + accept_header = "application/json" + + new_endpoint = { + "name": name, # Use the provided name + "method": method.upper(), + "url": url, + "headers": { + "Accept": accept_header # Use the correct Accept header based on v1 or v2 + }, + "expected_status": 200, + "expected_response": parsed_structure + } + + try: + if not os.path.exists(filename): + print(f"{filename} not found, creating a new one.") + data = {"endpoints": [new_endpoint]} + with open(filename, 'w') as file: + json.dump(data, file, indent=4) + else: + with open(filename, 'r+') as file: + try: + data = json.load(file) + data['endpoints'].append(new_endpoint) + except json.JSONDecodeError: + data = {"endpoints": [new_endpoint]} + file.seek(0) + json.dump(data, file, indent=4) + print(f"Successfully added the endpoint {name} to {filename}.") + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error handling {filename}: {e}") + + + + +def main(): + if len(sys.argv) != 5: + print("Usage: python3 addEndPoint.py ") + sys.exit(1) + + name = sys.argv[1] # Take the name as an argument + method = sys.argv[2].upper() + url = sys.argv[3] + sample_output_file = sys.argv[4] + + valid_methods = ["GET", "POST", "PUT", "DELETE"] + if method not in valid_methods: + print(f"Error: Unsupported HTTP method '{method}'. Supported methods are {', '.join(valid_methods)}.") + sys.exit(1) + + try: + with open(sample_output_file, 'r') as f: + sample_structure = json.load(f) + except json.JSONDecodeError as e: + print(f"Error parsing sample output: {e}") + sys.exit(1) + + instance_domain = read_credentials() + + add_endpoint_to_queries(name, method, url, sample_structure, instance_domain) + +if __name__ == "__main__": + main() diff --git a/Integrations/API Utility/ir_api_util/apiChecker.py b/Integrations/API Utility/ir_api_util/apiChecker.py new file mode 100644 index 0000000..d2a54d5 --- /dev/null +++ b/Integrations/API Utility/ir_api_util/apiChecker.py @@ -0,0 +1,172 @@ +import sys +import requests +import os +import json +from deepdiff import DeepDiff +from auth import Auth + +# Function to load queries from the JSON file +def load_queries(filename): + try: + with open(filename, 'r') as file: + return json.load(file) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error reading {filename}: {e}") + sys.exit(1) + +# Read config file for output path and page size +def read_config(config_path): + try: + with open(config_path, 'r') as config_file: + config = json.load(config_file) + output_path = os.path.expanduser(config.get('output_path', '~/')) + os.makedirs(output_path, exist_ok=True) + page_size = config.get('page_size', 2000) + return output_path, page_size + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"Error reading config file: {e}. Defaulting to home directory.") + output_path = os.path.expanduser('~/') + os.makedirs(output_path, exist_ok=True) + return output_path, 2000 + +# Function to compare types +def compare_types(expected, actual): + # If the expected value is None, allow any actual value + if expected is None: + return True, None # Null means any value is valid + + # If the actual value is None, allow it for any expected type + if actual is None: + return True, None # Null is acceptable for any expected type + + if isinstance(expected, dict) and isinstance(actual, dict): + for key in expected: + if key not in actual: + return False, f"Missing key: {key}" + match, error = compare_types(expected[key], actual[key]) + if not match: + return False, error + elif isinstance(expected, list) and isinstance(actual, list): + if len(expected) == 0 or len(actual) == 0: + return True, None # Allow empty lists + return compare_types(expected[0], actual[0]) + else: + # For string types + if expected == "string" and isinstance(actual, str): + return True, None + # For int types (ensuring bools are not mistaken for ints) + elif expected == "int" and isinstance(actual, int) and not isinstance(actual, bool): + return True, None + # For bool types + elif expected == "bool" and isinstance(actual, bool): + return True, None + # For float types + elif expected == "float" and isinstance(actual, float): + return True, None + # For list types + elif expected == "list" and isinstance(actual, list): + return True, None + # For dict types + elif expected == "dict" and isinstance(actual, dict): + return True, None + else: + return False, f"Type mismatch: expected {expected}, got {type(actual).__name__}" + + return True, None + + +# Class to handle API checking +class APIChecker: + def __init__(self, api_token_path='~/ir/.ir_user_token', instance_domain_path='~/ir/ir_instance_domain'): + self.auth = Auth() + self.api_token_path = os.path.expanduser(api_token_path) + self.instance_domain_path = os.path.expanduser(instance_domain_path) + self.auth.check_user_instance_file(self.instance_domain_path) + self.auth.check_user_token_file(self.api_token_path) + self.api_token, self.instance_domain = self.read_credentials() + script_dir = os.path.dirname(os.path.realpath(__file__)) + self.output_path, self.page_size = read_config(os.path.join(script_dir, 'config.json')) + + def read_credentials(self): + try: + with open(self.api_token_path, 'r') as token_file: + api_token = token_file.read().strip() + with open(self.instance_domain_path, 'r') as domain_file: + instance_domain = domain_file.read().strip() + return api_token, instance_domain + except FileNotFoundError as e: + print(f"Error: {e}. Make sure the paths are correct.") + sys.exit(1) # Exit if credentials cannot be read + + def test_endpoint(self, endpoint): + method = endpoint.get("method", "GET").upper() + relative_url = endpoint["url"] + + # Ensure the URL is properly formatted + if not relative_url.startswith("http"): + url = f"https://{self.instance_domain}.iriusrisk.com{relative_url}" + else: + url = relative_url + + headers = endpoint.get("headers", {}) + headers['api-token'] = self.api_token # Use the stored API token + expected_status = endpoint["expected_status"] + expected_response = endpoint["expected_response"] + + try: + response = requests.request(method, url, headers=headers) + except requests.exceptions.RequestException as e: + print(f"Error fetching {url}: {e}") + return False + + status_code = response.status_code + try: + response_json = response.json() + except json.JSONDecodeError: + print(f"Invalid JSON response from {url}") + return False + + print(f"Testing {endpoint['name']} - {method} {url}") + print(f"Expected Status: {expected_status}, Actual Status: {status_code}") + + if status_code != expected_status: + print(f"Status Code Mismatch! Expected {expected_status}, got {status_code}") + return False + + # Compare response JSON structure with expected structure + if isinstance(response_json, list) and isinstance(expected_response, list): + for i, item in enumerate(response_json): + match, error = compare_types(expected_response[0], item) + if not match: + print(f"Response Mismatch at item {i}! {error}") + print(f"Expected: {expected_response[0]}\nGot: {item}") + return False + else: + match, error = compare_types(expected_response, response_json) + if not match: + print(f"Response Mismatch Found! {error}") + print(f"Expected: {expected_response}\nGot: {response_json}") + return False + + print("Test Passed!") + return True + + def run_tests(self, queries): + print(f"Found {len(queries['endpoints'])} endpoints to test.") + for i, endpoint in enumerate(queries['endpoints']): + print(f"Running test {i + 1} of {len(queries['endpoints'])}...") + success = self.test_endpoint(endpoint) + if not success: + print(f"Test Failed for {endpoint['name']}!\n") + else: + print(f"Test Succeeded for {endpoint['name']}!\n") + +def main(): + script_dir = os.path.dirname(os.path.realpath(__file__)) + api_checker = APIChecker() + queries = load_queries(os.path.join(script_dir, 'apiChecker.json')) + api_checker.run_tests(queries) + +# Proper entry point check +if __name__ == "__main__": + main() diff --git a/Integrations/API Utility/ir_api_util/config.json b/Integrations/API Utility/ir_api_util/config.json index 6887c75..ecb82c2 100644 --- a/Integrations/API Utility/ir_api_util/config.json +++ b/Integrations/API Utility/ir_api_util/config.json @@ -1,3 +1,4 @@ { - "output_path": "~/ir_api_util_output" + "output_path": "~/reports_ir", + "page_size": 2000 } diff --git a/Integrations/API Utility/ir_api_util/reception.py b/Integrations/API Utility/ir_api_util/reception.py index 0d3bc7d..3c328fa 100644 --- a/Integrations/API Utility/ir_api_util/reception.py +++ b/Integrations/API Utility/ir_api_util/reception.py @@ -9,6 +9,8 @@ def __init__(self): "8. User Access Report", "9. Business Unit Reports", "10. Audit Log Report", + # "11. Create Rule from Excel Workbook", + "12. API Query Checker", "0. Exit"] self.menuSelection = 0 @@ -16,6 +18,7 @@ def main_menu(self): for item in self.menu: print(item) + def execute_script(self, script_path, args): script_absolute_path = os.path.expanduser(script_path) try: @@ -23,6 +26,7 @@ def execute_script(self, script_path, args): except subprocess.CalledProcessError as e: print(f"Error executing script: {script_absolute_path}, {e}") + def execute_script_noArgs(self, script_path): script_absolute_path = os.path.expanduser(script_path) try: @@ -54,6 +58,42 @@ def business_unit_reports_menu(self): print("Invalid Selection. Please try again.") self.business_unit_reports_menu() + def api_query_checker_menu(self): + sub_menu = ["API Query Checker:", "", + "1. Run API Query Checker", + "2. Add New Query to be Checked", + "0. Back to Main Menu"] + for item in sub_menu: + print(item) + + print("") + choice = input("Please make a selection: ") + print("") + + if choice == "1": + self.execute_script_noArgs('~/ir_api_util/apiChecker.py') + elif choice == "2": + name = input("Enter a Friendly name for the query (v1 GET Project Details): ") + print("") + method = input("Enter the HTTP method (GET, POST, PUT, DELETE): ").upper() + print("") + url = input("Enter the API URL endpoint (e.g., /v1/products/{reference-id}): ") + print("") + sample_output_file = input("Enter the path to the sample output JSON file: ") + print("") + + valid_methods = ["GET", "POST", "PUT", "DELETE"] + if method not in valid_methods: + print(f"Invalid HTTP method: {method}. Please enter one of the following: {', '.join(valid_methods)}") + else: + self.execute_script('~/ir_api_util/addEndPoint.py', [name, method, url, sample_output_file]) + + elif choice == "0": + return + else: + print("Invalid Selection. Please try again.") + self.api_query_checker_menu() + def main(self): while True: self.main_menu() @@ -74,6 +114,10 @@ def main(self): self.business_unit_reports_menu() elif choice == "10": self.execute_script_noArgs('~/ir_api_util/auditLogReport.py') +# elif choice == "11": +# self.execute_script_noArgs('~/ir_api_util/createRule_FromExcel.py') + elif choice == "12": + self.api_query_checker_menu() elif choice == "0": print("Exiting") print("") From 21929980697f1572dcb5821fadf0696619a26dd0 Mon Sep 17 00:00:00 2001 From: iresolis Date: Wed, 2 Oct 2024 16:14:33 +0000 Subject: [PATCH 2/3] added deepdiff to install reqs --- Integrations/API Utility/install_irtool_reqs | 1 + 1 file changed, 1 insertion(+) diff --git a/Integrations/API Utility/install_irtool_reqs b/Integrations/API Utility/install_irtool_reqs index 32da7dd..94c1563 100755 --- a/Integrations/API Utility/install_irtool_reqs +++ b/Integrations/API Utility/install_irtool_reqs @@ -8,3 +8,4 @@ sudo apt-get install python3-pip -y pip3 install pandas pip3 install openpyxl pip3 install pyarrow +pip3 install deepdiff From 1edfd6cdc7ac354948ab78fad62b1be9250bf995 Mon Sep 17 00:00:00 2001 From: iresolis Date: Wed, 2 Oct 2024 16:30:48 +0000 Subject: [PATCH 3/3] modified error handling for apiChecker.json not found. --- Integrations/API Utility/ir_api_util/apiChecker.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Integrations/API Utility/ir_api_util/apiChecker.py b/Integrations/API Utility/ir_api_util/apiChecker.py index d2a54d5..1b2b784 100644 --- a/Integrations/API Utility/ir_api_util/apiChecker.py +++ b/Integrations/API Utility/ir_api_util/apiChecker.py @@ -11,8 +11,11 @@ def load_queries(filename): with open(filename, 'r') as file: return json.load(file) except (FileNotFoundError, json.JSONDecodeError) as e: - print(f"Error reading {filename}: {e}") - sys.exit(1) + print("") + print("The apiChecker.json file was not found. Try using the Add feature to create a new query to be checked.") + print("") + return None + #sys.exit(1) # Read config file for output path and page size def read_config(config_path):