diff --git a/src/cript/api/__init__.py b/src/cript/api/__init__.py index fb3229f5c..8b2d3bd2c 100644 --- a/src/cript/api/__init__.py +++ b/src/cript/api/__init__.py @@ -1,5 +1,6 @@ # trunk-ignore-all(ruff/F401) from cript.api.api import API +from cript.api.data_schema import DataSchema from cript.api.valid_search_modes import SearchModes from cript.api.vocabulary_categories import VocabCategories diff --git a/src/cript/api/api.py b/src/cript/api/api.py index f3d5872df..51dc150d0 100644 --- a/src/cript/api/api.py +++ b/src/cript/api/api.py @@ -5,14 +5,14 @@ import uuid import warnings from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Optional, Union import boto3 -import jsonschema import requests from beartype import beartype from cript.api.api_config import _API_TIMEOUT +from cript.api.data_schema import DataSchema from cript.api.exceptions import ( APIError, CRIPTAPIRequiredError, @@ -20,12 +20,10 @@ CRIPTConnectionError, CRIPTDuplicateNameError, InvalidHostError, - InvalidVocabulary, ) from cript.api.paginator import Paginator from cript.api.utils.aws_s3_utils import get_s3_client from cript.api.utils.get_host_token import resolve_host_and_token -from cript.api.utils.helper_functions import _get_node_type_from_json from cript.api.utils.save_helper import ( _fix_node_save, _identify_suppress_attributes, @@ -33,8 +31,6 @@ ) from cript.api.utils.web_file_downloader import download_file_from_url from cript.api.valid_search_modes import SearchModes -from cript.api.vocabulary_categories import VocabCategories -from cript.nodes.exceptions import CRIPTNodeSchemaError from cript.nodes.primary_nodes.project import Project # Do not use this directly! That includes devs. @@ -66,8 +62,7 @@ class API: _api_token: str = "" _storage_token: str = "" _http_headers: dict = {} - _vocabulary: dict = {} - _db_schema: dict = {} + _db_schema: Optional[DataSchema] = None _api_prefix: str = "api" _api_version: str = "v1" @@ -81,12 +76,6 @@ class API: _internal_s3_client: Any = None # type: ignore # trunk-ignore-end(cspell) - # Advanced User Tip: Disabling Node Validation - # For experienced users, deactivating node validation during creation can be a time-saver. - # Note that the complete node graph will still undergo validation before being saved to the back end. - # Caution: It's advisable to keep validation active while debugging scripts, as disabling it can delay error notifications and complicate the debugging process. - skip_validation: bool = False - @beartype def __init__(self, host: Union[str, None] = None, api_token: Union[str, None] = None, storage_token: Union[str, None] = None, config_file_path: Union[str, Path] = ""): """ @@ -230,10 +219,9 @@ def __init__(self, host: Union[str, None] = None, api_token: Union[str, None] = # check that api can connect to CRIPT with host and token self._check_initial_host_connection() - self._get_db_schema() - # set a logger instance to use for the class logs self._set_logger() + self._db_schema = DataSchema(self.host) def __str__(self) -> str: """ @@ -496,238 +484,6 @@ def _check_initial_host_connection(self) -> None: except Exception as exc: raise CRIPTConnectionError(self.host, self._api_token) from exc - def _get_vocab(self) -> dict: - """ - gets the entire CRIPT controlled vocabulary and stores it in _vocabulary - - 1. loops through all controlled vocabulary categories - 1. if the category already exists in the controlled vocabulary then skip that category and continue - 1. if the category does not exist in the `_vocabulary` dict, - then request it from the API and append it to the `_vocabulary` dict - 1. at the end the `_vocabulary` should have all the controlled vocabulary and that will be returned - - Examples - -------- - The vocabulary looks like this - ```json - {'algorithm_key': - [ - { - 'description': "Velocity-Verlet integration algorithm. Parameters: 'integration_timestep'.", - 'name': 'velocity_verlet' - }, - } - ``` - """ - - # loop through all vocabulary categories and make a request to each vocabulary category - # and put them all inside of self._vocab with the keys being the vocab category name - for category in VocabCategories: - if category in self._vocabulary: - continue - - self._vocabulary[category.value] = self.get_vocab_by_category(category) - - return self._vocabulary - - @beartype - def get_vocab_by_category(self, category: VocabCategories) -> List[dict]: - """ - get the CRIPT controlled vocabulary by category - - Examples - -------- - >>> import os - >>> import cript - >>> with cript.API( - ... host="https://api.criptapp.org/", - ... api_token=os.getenv("CRIPT_TOKEN"), - ... storage_token=os.getenv("CRIPT_STORAGE_TOKEN") - ... ) as api: - ... api.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY) # doctest: +SKIP - - Parameters - ---------- - category: str - category of - - Returns - ------- - List[dict] - list of JSON containing the controlled vocabulary - """ - - # check if the vocabulary category is already cached - if category.value in self._vocabulary: - return self._vocabulary[category.value] - - vocabulary_category_url: str = f"{self.host}/cv/{category.value}/" - - # if vocabulary category is not in cache, then get it from API and cache it - response: Dict = requests.get(url=vocabulary_category_url, timeout=_API_TIMEOUT).json() - - if response["code"] != 200: - raise APIError(api_error=str(response), http_method="GET", api_url=vocabulary_category_url) - - # add to cache - self._vocabulary[category.value] = response["data"] - - return self._vocabulary[category.value] - - @beartype - def _is_vocab_valid(self, vocab_category: VocabCategories, vocab_word: str) -> bool: - """ - checks if the vocabulary is valid within the CRIPT controlled vocabulary. - Either returns True or InvalidVocabulary Exception - - 1. if the vocabulary is custom (starts with "+") - then it is automatically valid - 2. if vocabulary is not custom, then it is checked against its category - if the word cannot be found in the category then it returns False - - Parameters - ---------- - vocab_category: VocabCategories - ControlledVocabularyCategories enums - vocab_word: str - the vocabulary word e.g. "CAS", "SMILES", "BigSmiles", "+my_custom_key" - - Returns - ------- - a boolean of if the vocabulary is valid - - Raises - ------ - InvalidVocabulary - If the vocabulary is invalid then the error gets raised - """ - - # check if vocab is custom - # This is deactivated currently, no custom vocab allowed. - if vocab_word.startswith("+"): - return True - - # get the entire vocabulary - controlled_vocabulary = self._get_vocab() - # get just the category needed - controlled_vocabulary = controlled_vocabulary[vocab_category.value] - - # TODO this can be faster with a dict of dicts that can do o(1) look up - # looping through an unsorted list is an O(n) look up which is slow - # loop through the list - for vocab_dict in controlled_vocabulary: - # check the name exists within the dict - if vocab_dict.get("name") == vocab_word: - return True - - raise InvalidVocabulary(vocab=vocab_word, possible_vocab=list(controlled_vocabulary)) - - def _get_db_schema(self) -> dict: - """ - Sends a GET request to CRIPT to get the database schema and returns it. - The database schema can be used for validating the JSON request - before submitting it to CRIPT. - - 1. checks if the db schema is already set - * if already exists then it skips fetching it from the API and just returns what it already has - 2. if db schema has not been set yet, then it fetches it from the API - * after getting it from the API it saves it in the `_schema` class variable, - so it can be easily and efficiently gotten next time - """ - - # check if db schema is already saved - if bool(self._db_schema): - return self._db_schema - - # fetch db_schema from API - else: - # fetch db schema from API - response: requests.Response = requests.get(url=f"{self.host}/schema/", timeout=_API_TIMEOUT) - - # raise error if not HTTP 200 - response.raise_for_status() - - # if no error, take the JSON from the API response - response_dict: Dict = response.json() - - # get the data from the API JSON response - self._db_schema = response_dict["data"] - return self._db_schema - - @beartype - def _is_node_schema_valid(self, node_json: str, is_patch: bool = False, force_validation: bool = False) -> Union[bool, None]: - """ - checks a node JSON schema against the db schema to return if it is valid or not. - - 1. get db schema - 1. convert node_json str to dict - 1. take out the node type from the dict - 1. "node": ["material"] - 1. use the node type from dict to tell the db schema which node schema to validate against - 1. Manipulates the string to be title case to work with db schema - - Parameters - ---------- - node_json: str - a node in JSON form string - is_patch: bool - a boolean flag checking if it needs to validate against `NodePost` or `NodePatch` - - Notes - ----- - This function does not take into consideration vocabulary validation. - For vocabulary validation please check `is_vocab_valid` - - Raises - ------ - CRIPTNodeSchemaError - in case a node is invalid - - Returns - ------- - bool - whether the node JSON is valid or not - """ - - # Fast exit without validation - if self.skip_validation and not force_validation: - return None - - db_schema = self._get_db_schema() - - node_type: str = _get_node_type_from_json(node_json=node_json) - - node_dict = json.loads(node_json) - - # logging out info to the terminal for the user feedback - # (improve UX because the program is currently slow) - log_message = f"Validating {node_type} graph..." - if force_validation: - log_message = "Forced: " + log_message + " if error occur, try setting `cript.API.skip_validation = False` for debugging." - else: - log_message += " (Can be disabled by setting `cript.API.skip_validation = True`.)" - - self.logger.info(log_message) - - # set the schema to test against http POST or PATCH of DB Schema - schema_http_method: str - - if is_patch: - schema_http_method = "Patch" - else: - schema_http_method = "Post" - - # set which node you are using schema validation for - db_schema["$ref"] = f"#/$defs/{node_type}{schema_http_method}" - - try: - jsonschema.validate(instance=node_dict, schema=db_schema) - except jsonschema.exceptions.ValidationError as error: - raise CRIPTNodeSchemaError(node_type=node_dict["node"], json_schema_validation_error=str(error)) from error - - # if validation goes through without any problems return True - return True - def save(self, project: Project) -> None: """ This method takes a project node, serializes the class into JSON diff --git a/src/cript/api/data_schema.py b/src/cript/api/data_schema.py new file mode 100644 index 000000000..2133b99be --- /dev/null +++ b/src/cript/api/data_schema.py @@ -0,0 +1,266 @@ +import json +import logging +from typing import Union + +import jsonschema +import requests +from beartype import beartype + +from cript.api.api_config import _API_TIMEOUT +from cript.api.exceptions import APIError, InvalidVocabulary +from cript.api.utils.helper_functions import _get_node_type_from_json +from cript.api.vocabulary_categories import VocabCategories +from cript.nodes.exceptions import CRIPTNodeSchemaError + + +class DataSchema: + """ + ## Definition + DataSchema class, handles the interactions with the JSON node validation schema. + """ + + _vocabulary: dict = {} + _db_schema: dict = {} + # Advanced User Tip: Disabling Node Validation + # For experienced users, deactivating node validation during creation can be a time-saver. + # Note that the complete node graph will still undergo validation before being saved to the back end. + # Caution: It's advisable to keep validation active while debugging scripts, as disabling it can delay error notifications and complicate the debugging process. + skip_validation: bool = False + + def _get_db_schema(self, host: str) -> dict: + """ + Sends a GET request to CRIPT to get the database schema and returns it. + The database schema can be used for validating the JSON request + before submitting it to CRIPT. + + 1. checks if the db schema is already set + * if already exists then it skips fetching it from the API and just returns what it already has + 2. if db schema has not been set yet, then it fetches it from the API + * after getting it from the API it saves it in the `_schema` class variable, + so it can be easily and efficiently gotten next time + """ + + # check if db schema is already saved + if bool(self._db_schema): + return self._db_schema + + # fetch db_schema from API + logging.info(f"Loading node validation schema from {host}/schema/") + # fetch db schema from API + response: requests.Response = requests.get(url=f"{host}/schema/", timeout=_API_TIMEOUT) + + # raise error if not HTTP 200 + response.raise_for_status() + logging.info(f"Loading node validation schema from {host}/schema/ was successful.") + + # if no error, take the JSON from the API response + response_dict: dict = response.json() + + # get the data from the API JSON response + db_schema = response_dict["data"] + + return db_schema + + def __init__(self, host: str): + """ + Initialize DataSchema class with a full hostname to fetch the node validation schema. + + Examples + -------- + ### Create a stand alone DataSchema instance. + >>> import cript + >>> with cript.API(host="https://api.criptapp.org/") as api: + ... data_schema = cript.api.DataSchema(api.host) + """ + + self._db_schema = self._get_db_schema(host) + self._vocabulary = self._get_vocab(host) + + def _get_vocab(self, host: str) -> dict: + """ + gets the entire CRIPT controlled vocabulary and stores it in _vocabulary + + 1. loops through all controlled vocabulary categories + 1. if the category already exists in the controlled vocabulary then skip that category and continue + 1. if the category does not exist in the `_vocabulary` dict, + then request it from the API and append it to the `_vocabulary` dict + 1. at the end the `_vocabulary` should have all the controlled vocabulary and that will be returned + + Examples + -------- + The vocabulary looks like this + ```json + {'algorithm_key': + [ + { + 'description': "Velocity-Verlet integration algorithm. Parameters: 'integration_timestep'.", + 'name': 'velocity_verlet' + }, + } + ``` + """ + + vocabulary: dict = {} + # loop through all vocabulary categories and make a request to each vocabulary category + # and put them all inside of self._vocab with the keys being the vocab category name + for category in VocabCategories: + vocabulary_category_url: str = f"{host}/cv/{category.value}/" + + # if vocabulary category is not in cache, then get it from API and cache it + response: dict = requests.get(url=vocabulary_category_url, timeout=_API_TIMEOUT).json() + + if response["code"] != 200: + raise APIError(api_error=str(response), http_method="GET", api_url=vocabulary_category_url) + + # add to cache + vocabulary[category.value] = response["data"] + + return vocabulary + + @beartype + def get_vocab_by_category(self, category: VocabCategories) -> list: + """ + get the CRIPT controlled vocabulary by category + + Examples + -------- + >>> import os + >>> import cript + >>> with cript.API( + ... host="https://api.criptapp.org/", + ... api_token=os.getenv("CRIPT_TOKEN"), + ... storage_token=os.getenv("CRIPT_STORAGE_TOKEN") + ... ) as api: + ... api.validation_schema.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY) # doctest: +SKIP + + Parameters + ---------- + category: str + category of + + Returns + ------- + List[dict] + list of JSON containing the controlled vocabulary + """ + return self._vocabulary[category.value] + + @beartype + def _is_vocab_valid(self, vocab_category: VocabCategories, vocab_word: str) -> bool: + """ + checks if the vocabulary is valid within the CRIPT controlled vocabulary. + Either returns True or InvalidVocabulary Exception + + 1. if the vocabulary is custom (starts with "+") + then it is automatically valid + 2. if vocabulary is not custom, then it is checked against its category + if the word cannot be found in the category then it returns False + + Parameters + ---------- + vocab_category: VocabCategories + ControlledVocabularyCategories enums + vocab_word: str + the vocabulary word e.g. "CAS", "SMILES", "BigSmiles", "+my_custom_key" + + Returns + ------- + a boolean of if the vocabulary is valid + + Raises + ------ + InvalidVocabulary + If the vocabulary is invalid then the error gets raised + """ + + # check if vocab is custom + # This is deactivated currently, no custom vocab allowed. + # if vocab_word.startswith("+"): + # return True + + # get just the category needed + controlled_vocabulary = self._vocabulary[vocab_category.value] + + # TODO this can be faster with a dict of dicts that can do o(1) look up + # looping through an unsorted list is an O(n) look up which is slow + # loop through the list + for vocab_dict in controlled_vocabulary: + # check the name exists within the dict + if vocab_dict.get("name") == vocab_word: + return True + + raise InvalidVocabulary(vocab=vocab_word, possible_vocab=list(controlled_vocabulary)) + + @beartype + def is_node_schema_valid(self, node_json: str, is_patch: bool = False, force_validation: bool = False) -> Union[bool, None]: + """ + checks a node JSON schema against the db schema to return if it is valid or not. + + 1. get db schema + 1. convert node_json str to dict + 1. take out the node type from the dict + 1. "node": ["material"] + 1. use the node type from dict to tell the db schema which node schema to validate against + 1. Manipulates the string to be title case to work with db schema + + Parameters + ---------- + node_json: str + a node in JSON form string + is_patch: bool + a boolean flag checking if it needs to validate against `NodePost` or `NodePatch` + + Notes + ----- + This function does not take into consideration vocabulary validation. + For vocabulary validation please check `is_vocab_valid` + + Raises + ------ + CRIPTNodeSchemaError + in case a node is invalid + + Returns + ------- + bool + whether the node JSON is valid or not + """ + + # Fast exit without validation + if self.skip_validation and not force_validation: + return None + + db_schema = self._db_schema + + node_type: str = _get_node_type_from_json(node_json=node_json) + + node_dict = json.loads(node_json) + + # logging out info to the terminal for the user feedback + # (improve UX because the program is currently slow) + log_message = f"Validating {node_type} graph..." + if force_validation: + log_message = "Forced: " + log_message + " if error occur, try setting `cript.API.skip_validation = False` for debugging." + else: + log_message += " (Can be disabled by setting `cript.API.skip_validation = True`.)" + + logging.info(log_message) + + # set the schema to test against http POST or PATCH of DB Schema + schema_http_method: str + + if is_patch: + schema_http_method = "Patch" + else: + schema_http_method = "Post" + + # set which node you are using schema validation for + db_schema["$ref"] = f"#/$defs/{node_type}{schema_http_method}" + + try: + jsonschema.validate(instance=node_dict, schema=db_schema) + except jsonschema.exceptions.ValidationError as error: + raise CRIPTNodeSchemaError(node_type=node_dict["node"], json_schema_validation_error=str(error)) from error + + # if validation goes through without any problems return True + return True diff --git a/src/cript/api/vocabulary_categories.py b/src/cript/api/vocabulary_categories.py index 1107f3e00..30a775a32 100644 --- a/src/cript/api/vocabulary_categories.py +++ b/src/cript/api/vocabulary_categories.py @@ -65,7 +65,7 @@ class VocabCategories(Enum): Examples -------- >>> import cript - >>> algorithm_vocabulary = api.get_vocab_by_category( + >>> algorithm_vocabulary = api.schema.get_vocab_by_category( ... cript.VocabCategories.ALGORITHM_KEY ... ) """ diff --git a/src/cript/nodes/core.py b/src/cript/nodes/core.py index 000640813..b385692ed 100644 --- a/src/cript/nodes/core.py +++ b/src/cript/nodes/core.py @@ -154,7 +154,7 @@ def validate(self, api=None, is_patch: bool = False, force_validation: bool = Fa if api is None: api = _get_global_cached_api() - api._is_node_schema_valid(self.get_json(is_patch=is_patch).json, is_patch=is_patch, force_validation=force_validation) + api.schema.is_node_schema_valid(self.get_json(is_patch=is_patch).json, is_patch=is_patch, force_validation=force_validation) @classmethod def _from_json(cls, json_dict: dict): @@ -244,7 +244,7 @@ def json(self): from cript.api.api import _get_global_cached_api api = _get_global_cached_api() - api._is_node_schema_valid(json_string, force_validation=True) + api.schema.is_node_schema_valid(json_string, force_validation=True) return json_string diff --git a/src/cript/nodes/util/material_deserialization.py b/src/cript/nodes/util/material_deserialization.py index 32f761fe0..255721c73 100644 --- a/src/cript/nodes/util/material_deserialization.py +++ b/src/cript/nodes/util/material_deserialization.py @@ -55,7 +55,7 @@ def _deserialize_flattened_material_identifiers(json_dict: Dict) -> Dict: # get material identifiers keys from API and create a simple list # eg ["smiles", "bigsmiles", etc.] - all_identifiers_list: List[str] = [identifier.get("name") for identifier in api.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY)] + all_identifiers_list: List[str] = [identifier.get("name") for identifier in api.schema.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY)] # pop "name" from identifiers list because the node has to have a name all_identifiers_list.remove("name") diff --git a/tests/api/test_api.py b/tests/api/test_api.py index 0a34d6179..e4323b7ee 100644 --- a/tests/api/test_api.py +++ b/tests/api/test_api.py @@ -11,9 +11,7 @@ import cript from conftest import HAS_INTEGRATION_TESTS_ENABLED -from cript.api.exceptions import InvalidVocabulary from cript.api.paginator import Paginator -from cript.nodes.exceptions import CRIPTNodeSchemaError def test_api_with_invalid_host() -> None: @@ -86,174 +84,6 @@ def test_config_file() -> None: assert api._api_token == config_file_texts["api_token"] -@pytest.mark.skip(reason="too early to write as there are higher priority tasks currently") -def test_api_initialization_stress() -> None: - """ - tries to put the API configuration under as much stress as it possibly can - it tries to give it mixed options and try to trip it up and create issues for it - - ## scenarios - 1. if there is a config file and other inputs, then config file wins - 1. if config file, but is missing an attribute, and it is labeled as None, then should get it from env var - 1. if there is half from input and half from env var, then both should work happily - """ - pass - - -def test_get_db_schema_from_api(cript_api: cript.API) -> None: - """ - tests that the Python SDK can successfully get the db schema from API - """ - db_schema = cript_api._get_db_schema() - - assert bool(db_schema) - assert isinstance(db_schema, dict) - - # db schema should have at least 30 fields - assert len(db_schema["$defs"]) > 30 - - -def test_is_node_schema_valid(cript_api: cript.API) -> None: - """ - test that a CRIPT node can be correctly validated and invalidated with the db schema - - * test a couple of nodes to be sure db schema validation is working fine - * material node - * file node - * test db schema validation with an invalid node, and it should be invalid - - Notes - ----- - * does not test if serialization/deserialization works correctly, - just tests if the node schema can work correctly if serialization was correct - - # TODO the tests here only test POST db schema and not PATCH yet, those tests must be added - """ - - # ------ invalid node schema------ - invalid_schema = {"invalid key": "invalid value", "node": ["Material"]} - - with pytest.raises(CRIPTNodeSchemaError): - cript_api._is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False) - - # ------ valid material schema ------ - # valid material node - valid_material_dict = {"node": ["Material"], "name": "0.053 volume fraction CM gel", "uid": "_:0.053 volume fraction CM gel"} - - # convert dict to JSON string because method expects JSON string - assert cript_api._is_node_schema_valid(node_json=json.dumps(valid_material_dict), is_patch=False) is True - # ------ valid file schema ------ - valid_file_dict = { - "node": ["File"], - "source": "https://criptapp.org", - "type": "calibration", - "extension": ".csv", - "data_dictionary": "my file's data dictionary", - } - - # convert dict to JSON string because method expects JSON string - assert cript_api._is_node_schema_valid(node_json=json.dumps(valid_file_dict), is_patch=False) is True - - -def test_is_node_schema_valid_skipped(cript_api: cript.API) -> None: - """ - test that a CRIPT node can be correctly validated and invalidated with the db schema, when skipping tests is active - - * test db schema validation with an invalid node, and it should be invalid, but only detected if forced - - Notes - ----- - * does not test if serialization/deserialization works correctly, - just tests if the node schema can work correctly if serialization was correct - - """ - - def extract_base_url(url): - # Split the URL by "//" first to separate the scheme (like http, https) - parts = url.split("//", 1) - scheme, rest = parts if len(parts) > 1 else ("", parts[0]) - - # Split the rest by the first "/" to separate the domain - domain = rest.split("/", 1)[0] - return f"{scheme}//{domain}" if scheme else domain - - with cript.API(host=extract_base_url(cript_api.host), api_token=cript_api._api_token, storage_token=cript_api._storage_token) as local_cript_api: - local_cript_api.skip_validation = True - # ------ invalid node schema------ - invalid_schema = {"invalid key": "invalid value", "node": ["Material"]} - - # Test should be skipped - assert local_cript_api._is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False) is None - - with pytest.raises(CRIPTNodeSchemaError): - local_cript_api._is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False, force_validation=True) - - -def test_get_vocabulary_by_category(cript_api: cript.API) -> None: - """ - tests if a vocabulary can be retrieved by category - 1. tests response is a list of dicts as expected - 1. create a new list of just material identifiers - 1. tests that the fundamental identifiers exist within the API vocabulary response - - Warnings - -------- - This test only gets the vocabulary category for "material_identifier_key" and does not test all the possible - CRIPT controlled vocabulary - """ - - material_identifier_vocab_list = cript_api.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY) - - # test response is a list of dicts - assert isinstance(material_identifier_vocab_list, list) - - material_identifiers = [identifier["name"] for identifier in material_identifier_vocab_list] - - # assertions - assert "bigsmiles" in material_identifiers - assert "smiles" in material_identifiers - assert "pubchem_cid" in material_identifiers - - -def test_get_controlled_vocabulary_from_api(cript_api: cript.API) -> None: - """ - checks if it can successfully get the controlled vocabulary list from CRIPT API - """ - number_of_vocab_categories = 26 - vocab = cript_api._get_vocab() - - # assertions - # check vocabulary list is not empty - assert bool(vocab) is True - assert len(vocab) == number_of_vocab_categories - - -def test_is_vocab_valid(cript_api: cript.API) -> None: - """ - tests if the method for vocabulary is validating and invalidating correctly - - * test with custom key to check it automatically gives valid - * test with a few vocabulary_category and vocabulary_words - * valid category and valid vocabulary word - * test that invalid category throws the correct error - * invalid category and valid vocabulary word - * test that invalid vocabulary word throws the correct error - * valid category and invalid vocabulary word - tests invalid category and invalid vocabulary word - """ - # custom vocab - assert cript_api._is_vocab_valid(vocab_category=cript.VocabCategories.ALGORITHM_KEY, vocab_word="+my_custom_key") is True - - # valid vocab category and valid word - assert cript_api._is_vocab_valid(vocab_category=cript.VocabCategories.FILE_TYPE, vocab_word="calibration") is True - assert cript_api._is_vocab_valid(vocab_category=cript.VocabCategories.QUANTITY_KEY, vocab_word="mass") is True - assert cript_api._is_vocab_valid(vocab_category=cript.VocabCategories.UNCERTAINTY_TYPE, vocab_word="fwhm") is True - - # valid vocab category but invalid vocab word - with pytest.raises(InvalidVocabulary): - cript_api._is_vocab_valid(vocab_category=cript.VocabCategories.FILE_TYPE, vocab_word="some_invalid_word") - - def test_download_file_from_url(cript_api: cript.API, tmp_path) -> None: """ downloads the file from a URL and writes it to disk diff --git a/tests/api/test_db_schema.py b/tests/api/test_db_schema.py new file mode 100644 index 000000000..9703bb6cd --- /dev/null +++ b/tests/api/test_db_schema.py @@ -0,0 +1,161 @@ +import json + +import pytest + +import cript +from cript.api.exceptions import InvalidVocabulary +from cript.nodes.exceptions import CRIPTNodeSchemaError + + +def test_get_db_schema_from_api(cript_api: cript.API) -> None: + """ + tests that the Python SDK can successfully get the db schema from API + """ + schema = cript_api.schema + + assert bool(schema._db_schema) + assert isinstance(schema._db_schema, dict) + + # db schema should have at least 30 fields + assert len(schema._db_schema["$defs"]) > 30 + + +def test_is_node_schema_valid(cript_api: cript.API) -> None: + """ + test that a CRIPT node can be correctly validated and invalidated with the db schema + + * test a couple of nodes to be sure db schema validation is working fine + * material node + * file node + * test db schema validation with an invalid node, and it should be invalid + + Notes + ----- + * does not test if serialization/deserialization works correctly, + just tests if the node schema can work correctly if serialization was correct + + # TODO the tests here only test POST db schema and not PATCH yet, those tests must be added + """ + + # ------ invalid node schema------ + invalid_schema = {"invalid key": "invalid value", "node": ["Material"]} + + with pytest.raises(CRIPTNodeSchemaError): + cript_api.schema.is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False) + + # ------ valid material schema ------ + # valid material node + valid_material_dict = {"node": ["Material"], "name": "0.053 volume fraction CM gel", "uid": "_:0.053 volume fraction CM gel"} + + # convert dict to JSON string because method expects JSON string + assert cript_api.schema.is_node_schema_valid(node_json=json.dumps(valid_material_dict), is_patch=False) is True + # ------ valid file schema ------ + valid_file_dict = { + "node": ["File"], + "source": "https://criptapp.org", + "type": "calibration", + "extension": ".csv", + "data_dictionary": "my file's data dictionary", + } + + # convert dict to JSON string because method expects JSON string + assert cript_api.schema.is_node_schema_valid(node_json=json.dumps(valid_file_dict), is_patch=False) is True + + +def test_is_node_schema_valid_skipped(cript_api: cript.API) -> None: + """ + test that a CRIPT node can be correctly validated and invalidated with the db schema, when skipping tests is active + + * test db schema validation with an invalid node, and it should be invalid, but only detected if forced + + Notes + ----- + * does not test if serialization/deserialization works correctly, + just tests if the node schema can work correctly if serialization was correct + + """ + + def extract_base_url(url): + # Split the URL by "//" first to separate the scheme (like http, https) + parts = url.split("//", 1) + scheme, rest = parts if len(parts) > 1 else ("", parts[0]) + + # Split the rest by the first "/" to separate the domain + domain = rest.split("/", 1)[0] + return f"{scheme}//{domain}" if scheme else domain + + with cript.API(host=extract_base_url(cript_api.host), api_token=cript_api._api_token, storage_token=cript_api._storage_token) as local_cript_api: + local_cript_api.schema.skip_validation = True + # ------ invalid node schema------ + invalid_schema = {"invalid key": "invalid value", "node": ["Material"]} + + # Test should be skipped + assert local_cript_api.schema.is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False) is None + + with pytest.raises(CRIPTNodeSchemaError): + local_cript_api.schema.is_node_schema_valid(node_json=json.dumps(invalid_schema), is_patch=False, force_validation=True) + + +def test_get_vocabulary_by_category(cript_api: cript.API) -> None: + """ + tests if a vocabulary can be retrieved by category + 1. tests response is a list of dicts as expected + 1. create a new list of just material identifiers + 1. tests that the fundamental identifiers exist within the API vocabulary response + + Warnings + -------- + This test only gets the vocabulary category for "material_identifier_key" and does not test all the possible + CRIPT controlled vocabulary + """ + + material_identifier_vocab_list = cript_api.schema.get_vocab_by_category(cript.VocabCategories.MATERIAL_IDENTIFIER_KEY) + + # test response is a list of dicts + assert isinstance(material_identifier_vocab_list, list) + + material_identifiers = [identifier["name"] for identifier in material_identifier_vocab_list] + + # assertions + assert "bigsmiles" in material_identifiers + assert "smiles" in material_identifiers + assert "pubchem_cid" in material_identifiers + + +def test_get_controlled_vocabulary_from_api(cript_api: cript.API) -> None: + """ + checks if it can successfully get the controlled vocabulary list from CRIPT API + """ + number_of_vocab_categories = 26 + vocab = cript_api.schema._get_vocab(cript_api.host) + + # assertions + # check vocabulary list is not empty + assert bool(vocab) is True + assert len(vocab) == number_of_vocab_categories + + +def test_is_vocab_valid(cript_api: cript.API) -> None: + """ + tests if the method for vocabulary is validating and invalidating correctly + + * test with custom key to check it automatically gives valid + * test with a few vocabulary_category and vocabulary_words + * valid category and valid vocabulary word + * test that invalid category throws the correct error + * invalid category and valid vocabulary word + * test that invalid vocabulary word throws the correct error + * valid category and invalid vocabulary word + tests invalid category and invalid vocabulary word + """ + # custom vocab + # assert cript_api.schema._is_vocab_valid(vocab_category=cript.VocabCategories.ALGORITHM_KEY, vocab_word="+my_custom_key") is True + + # valid vocab category and valid word + assert cript_api.schema._is_vocab_valid(vocab_category=cript.VocabCategories.FILE_TYPE, vocab_word="calibration") is True + assert cript_api.schema._is_vocab_valid(vocab_category=cript.VocabCategories.QUANTITY_KEY, vocab_word="mass") is True + assert cript_api.schema._is_vocab_valid(vocab_category=cript.VocabCategories.UNCERTAINTY_TYPE, vocab_word="fwhm") is True + + # valid vocab category but invalid vocab word + with pytest.raises(InvalidVocabulary): + cript_api.schema._is_vocab_valid(vocab_category=cript.VocabCategories.FILE_TYPE, vocab_word="some_invalid_word")