diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 85c97e508..16cf7c19f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,8 +6,15 @@ dcicutils Change Log ---------- +8.7.0 +===== + +* Add new schema_utils module for schema parsing + + 8.6.0 ===== + * Minor fix to misc_utils.to_integer to handle float strings. * Minor fix to structured_data to accumulate unique resolved_refs across schemas. * Added ability to autoadd properties structured_data.StructuredDataSet; @@ -21,12 +28,14 @@ Change Log 8.5.0 ===== + * Moved structured_data.py from smaht-portal to here; new portal_utils and data_readers modules. * Strip sheet name in data_readers.Excel; respecte (ignore) hidden sheets. 8.4.0 ===== + * More work related to SMaHT ingestion (bundle/sheet_utils, data_readers, etc). @@ -37,6 +46,7 @@ Change Log 8.2.0 ===== + * 2023-11-02 * Added ``SchemaManager.get_identifying_properties`` in ``bundle_utils`` which implicitly adds ``identifier`` to ``identifyingProperties``. diff --git a/dcicutils/schema_utils.py b/dcicutils/schema_utils.py new file mode 100644 index 000000000..df8b8bba7 --- /dev/null +++ b/dcicutils/schema_utils.py @@ -0,0 +1,185 @@ +from typing import Any, Dict, List + + +class JsonSchemaConstants: + ANY_OF = "anyOf" + ARRAY = "array" + BOOLEAN = "boolean" + DEFAULT = "default" + ENUM = "enum" + FORMAT = "format" + INTEGER = "integer" + ITEMS = "items" + NUMBER = "number" + OBJECT = "object" + ONE_OF = "oneOf" + PATTERN = "pattern" + PROPERTIES = "properties" + REF = "$ref" + REQUIRED = "required" + STRING = "string" + TYPE = "type" + + +class EncodedSchemaConstants: + IDENTIFYING_PROPERTIES = "identifyingProperties" + LINK_TO = "linkTo" + MERGE_REF = "$merge" + MIXIN_PROPERTIES = "mixinProperties" + UNIQUE_KEY = "uniqueKey" + + +class SchemaConstants(JsonSchemaConstants, EncodedSchemaConstants): + pass + + +def get_properties(schema: Dict[str, Any]) -> Dict[str, Any]: + """Return the properties of a schema.""" + return schema.get(SchemaConstants.PROPERTIES, {}) + + +def get_property(schema: Dict[str, Any], property_name: str) -> Dict[str, Any]: + """Return property in properties, if found.""" + return get_properties(schema).get(property_name, {}) + + +def has_property(schema: Dict[str, Any], property_name: str) -> bool: + """Return True if the schema has the given property.""" + return property_name in get_properties(schema) + + +def get_required(schema: Dict[str, Any]) -> List[str]: + """Return the required properties of a schema.""" + return schema.get(SchemaConstants.REQUIRED, []) + + +def get_pattern(schema: Dict[str, Any]) -> str: + """Return the pattern property of a schema.""" + return schema.get(SchemaConstants.PATTERN, "") + + +def get_any_of(schema: Dict[str, Any]) -> List[Dict[str, Any]]: + """Return the anyOf properties of a schema.""" + return schema.get(SchemaConstants.ANY_OF, []) + + +def get_one_of(schema: Dict[str, Any]) -> List[Dict[str, Any]]: + """Return the oneOf properties of a schema.""" + return schema.get(SchemaConstants.ONE_OF, []) + + +def get_conditional_required(schema: Dict[str, Any]) -> List[str]: + """Get required + possibly required properties. + + Using heuristics here; update as needed. + """ + return sorted( + list( + set( + get_required(schema) + + get_any_of_required(schema) + + get_one_of_required(schema) + ) + ) + ) + + +def get_any_of_required(schema: Dict[str, Any]) -> List[str]: + """Get required properties from anyOf.""" + return [ + property_name + for any_of_schema in get_any_of(schema) + for property_name in get_required(any_of_schema) + ] + + +def get_one_of_required(schema: Dict[str, Any]) -> List[str]: + """Get required properties from oneOf.""" + return [ + property_name + for one_of_schema in get_one_of(schema) + for property_name in get_required(one_of_schema) + ] + + +def get_mixin_properties(schema: Dict[str, Any]) -> List[Dict[str, Any]]: + """Return the mixin properties of a schema.""" + return schema.get(SchemaConstants.MIXIN_PROPERTIES, []) + + +def get_identifying_properties(schema: Dict[str, Any]) -> List[str]: + """Return the identifying properties of a schema.""" + return schema.get(SchemaConstants.IDENTIFYING_PROPERTIES, []) + + +def get_schema_type(schema: Dict[str, Any]) -> str: + """Return the type of a schema.""" + return schema.get(SchemaConstants.TYPE, "") + + +def is_array_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is an array.""" + return get_schema_type(schema) == SchemaConstants.ARRAY + + +def is_object_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is an object.""" + return get_schema_type(schema) == SchemaConstants.OBJECT + + +def is_string_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is a string.""" + return get_schema_type(schema) == SchemaConstants.STRING + + +def is_number_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is a number.""" + return get_schema_type(schema) == SchemaConstants.NUMBER + + +def is_integer_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is an integer.""" + return get_schema_type(schema) == SchemaConstants.INTEGER + + +def is_boolean_schema(schema: Dict[str, Any]) -> bool: + """Return True if the schema is a boolean.""" + return get_schema_type(schema) == SchemaConstants.BOOLEAN + + +def get_items(schema: Dict[str, Any]) -> Dict[str, Any]: + """Return the items of a schema.""" + return schema.get(SchemaConstants.ITEMS, {}) + + +def get_format(schema: Dict[str, Any]) -> str: + """Return the format of a schema.""" + return schema.get(SchemaConstants.FORMAT, "") + + +def get_conditional_formats(schema: Dict[str, Any]) -> List[str]: + """Return the format of a schema, as directly given or as listed + as an option via oneOf or anyOf. + """ + formats = set( + get_any_of_formats(schema) + get_one_of_formats(schema) + [get_format(schema)] + ) + return sorted(list([format_ for format_ in formats if format_])) + + +def get_any_of_formats(schema: Dict[str, Any]) -> List[str]: + """Return the formats of a schema's anyOf properties.""" + return [ + get_format(any_of_schema) + for any_of_schema in get_any_of(schema) + if get_format(any_of_schema) + ] + + +def get_one_of_formats(schema: Dict[str, Any]) -> List[str]: + """Return the formats of a schema's oneOf properties.""" + return [ + get_format(one_of_schema) + for one_of_schema in get_one_of(schema) + if get_format(one_of_schema) + ] diff --git a/docs/source/dcicutils.rst b/docs/source/dcicutils.rst index 470e11828..743fb0066 100644 --- a/docs/source/dcicutils.rst +++ b/docs/source/dcicutils.rst @@ -295,6 +295,13 @@ s3_utils :members: +schema_utils +^^^^^^^^ + +.. automodule:: dcicutils.schema_utils + :members: + + secrets_utils ^^^^^^^^^^^^^ diff --git a/pyproject.toml b/pyproject.toml index e7fa23cfc..f50214532 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dcicutils" -version = "8.6.0" +version = "8.7.0" description = "Utility package for interacting with the 4DN Data Portal and other 4DN resources" authors = ["4DN-DCIC Team "] license = "MIT" diff --git a/test/test_schema_utils.py b/test/test_schema_utils.py new file mode 100644 index 000000000..6b948b8ed --- /dev/null +++ b/test/test_schema_utils.py @@ -0,0 +1,322 @@ +from typing import Any, Dict + +import pytest +from dcicutils import schema_utils + + +REQUIRED = ["bar", "foo"] +MIXIN_PROPERTIES = [{"$ref": "mixins.json#/link"}] +ANY_OF_REQUIRED = ["baz", "fu"] +ANY_OF = [ + {"type": "string"}, + {"required": ANY_OF_REQUIRED}, +] +ONE_OF_REQUIRED = ["baz", "fa"] +ONE_OF = [ + {"foo": "bar"}, + {"required": ONE_OF_REQUIRED}, +] +CONDITIONAL_REQUIRED = ["bar", "baz", "fa", "foo", "fu"] +IDENTIFYING_PROPERTIES = ["bar", "foo"] +FOO_SCHEMA = {"type": "string"} +PROPERTIES = { + "foo": FOO_SCHEMA, + "bar": { + "type": "object", + "properties": { + "baz": { + "type": "string", + } + }, + }, + "fun": {"type": "array", "items": {"type": "string"}}, +} +SCHEMA = { + "required": REQUIRED, + "anyOf": ANY_OF, + "oneOf": ONE_OF, + "identifyingProperties": IDENTIFYING_PROPERTIES, + "mixinProperties": MIXIN_PROPERTIES, + "properties": PROPERTIES, +} +FORMAT = "email" +PATTERN = "some_regex" +STRING_SCHEMA = {"type": "string", "format": FORMAT, "pattern": PATTERN} +ARRAY_SCHEMA = {"type": "array", "items": [STRING_SCHEMA]} +OBJECT_SCHEMA = {"type": "object", "properties": {"foo": STRING_SCHEMA}} +NUMBER_SCHEMA = {"type": "number"} +BOOLEAN_SCHEMA = {"type": "boolean"} +INTEGER_SCHEMA = {"type": "integer"} +FORMAT_SCHEMA = { + "type": "string", + "format": "date-time", + "oneOf": [{"format": "date"}], + "anyOf": [{"format": "time"}, {"format": "date-time"}], +} + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, {}), + (SCHEMA, PROPERTIES), + ], +) +def test_get_properties(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_properties(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, {}), + (STRING_SCHEMA, {}), + (SCHEMA, FOO_SCHEMA), + ], +) +def test_get_property(schema: Dict[str, Any], expected: Dict[str, any]) -> None: + assert schema_utils.get_property(schema, "foo") == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, REQUIRED), + ], +) +def test_get_required(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_required(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, ""), + (STRING_SCHEMA, PATTERN), + ], +) +def test_get_pattern(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_pattern(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, ANY_OF), + ], +) +def test_get_any_of(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_any_of(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, ONE_OF), + ], +) +def test_get_one_of(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_one_of(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + ({"anyOf": ANY_OF}, ANY_OF_REQUIRED), + ({"oneOf": ONE_OF}, ONE_OF_REQUIRED), + ({"required": REQUIRED}, REQUIRED), + (SCHEMA, CONDITIONAL_REQUIRED), + ], +) +def test_get_conditional_required( + schema: Dict[str, Any], expected: Dict[str, Any] +) -> None: + assert schema_utils.get_conditional_required(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, ANY_OF_REQUIRED), + ], +) +def test_get_any_of_required(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_any_of_required(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, ONE_OF_REQUIRED), + ], +) +def test_get_one_of_required(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_one_of_required(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, MIXIN_PROPERTIES), + ], +) +def test_get_mixin_properties(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_mixin_properties(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (SCHEMA, IDENTIFYING_PROPERTIES), + ], +) +def test_get_identifying_properties( + schema: Dict[str, Any], expected: Dict[str, Any] +) -> None: + assert schema_utils.get_identifying_properties(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, ""), + (STRING_SCHEMA, "string"), + (ARRAY_SCHEMA, "array"), + (OBJECT_SCHEMA, "object"), + (NUMBER_SCHEMA, "number"), + (BOOLEAN_SCHEMA, "boolean"), + (INTEGER_SCHEMA, "integer"), + ], +) +def test_get_schema_type(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_schema_type(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (ARRAY_SCHEMA, False), + (STRING_SCHEMA, True), + ], +) +def test_is_string_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_string_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (STRING_SCHEMA, False), + (ARRAY_SCHEMA, True), + ], +) +def test_is_array_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_array_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (STRING_SCHEMA, False), + (OBJECT_SCHEMA, True), + ], +) +def test_is_object_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_object_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (STRING_SCHEMA, False), + (NUMBER_SCHEMA, True), + ], +) +def test_is_number_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_number_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (STRING_SCHEMA, False), + (BOOLEAN_SCHEMA, True), + ], +) +def test_is_boolean_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_boolean_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, False), + (STRING_SCHEMA, False), + (INTEGER_SCHEMA, True), + ], +) +def test_is_integer_schema(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.is_integer_schema(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, {}), + (ARRAY_SCHEMA, [STRING_SCHEMA]), + ], +) +def test_get_items(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_items(schema) == expected + + +@pytest.mark.parametrize( + "schema,property_name,expected", + [ + ({}, "foo", False), + (ARRAY_SCHEMA, "foo", False), + (SCHEMA, "foo", True), + ], +) +def test_has_property( + schema: Dict[str, Any], property_name: str, expected: Dict[str, Any] +) -> None: + assert schema_utils.has_property(schema, property_name) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, ""), + (STRING_SCHEMA, FORMAT), + ], +) +def test_get_format(schema: Dict[str, Any], expected: Dict[str, Any]) -> None: + assert schema_utils.get_format(schema) == expected + + +@pytest.mark.parametrize( + "schema,expected", + [ + ({}, []), + (STRING_SCHEMA, [FORMAT]), + (FORMAT_SCHEMA, ["date", "date-time", "time"]), + ], +) +def test_get_conditional_formats( + schema: Dict[str, Any], expected: Dict[str, Any] +) -> None: + assert schema_utils.get_conditional_formats(schema) == expected