diff --git a/.gitignore b/.gitignore index 9dceef81..c81a4e17 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ tests/data/ tests/my_*.py docs/build/ .pdm-python +*.egg-info +*.ipynb \ No newline at end of file diff --git a/aiopenapi3/openapi.py b/aiopenapi3/openapi.py index 9c2689d2..7038897f 100644 --- a/aiopenapi3/openapi.py +++ b/aiopenapi3/openapi.py @@ -397,66 +397,43 @@ def _init_operationindex(self, use_operation_tags: bool): self._operationindex = OperationIndex(self, use_operation_tags) @staticmethod - def _iterate_schemas(schemas: Dict[int, "SchemaType"], next: Set[int], processed: Set[int]): - """ - recursively collect all schemas related to the starting set - """ - if not next: - return processed - - processed.update(next) - - def not_in_processed(z: Tuple[int, SchemaBase]) -> TypeGuard[Tuple[int, "SchemaType"]]: - return z[0] not in processed - - def is_instance_schema_ref( - x: Union["SchemaType", "ReferenceType"] - ) -> TypeGuard[Union["SchemaType", "ReferenceType"]]: - return isinstance(x, (ReferenceBase, SchemaBase)) - - new: Dict[int, "SchemaType"] = cast( - Dict[int, "SchemaType"], - collections.ChainMap( - *[ - dict( - filter( - not_in_processed, - map( - lambda y: (id(y._target), y._target) if isinstance(y, ReferenceBase) else (id(y), y), - filter( - is_instance_schema_ref, - getattr(schemas[i], "oneOf", []) # Swagger compat - + getattr(schemas[i], "anyOf", []) # Swagger compat - + schemas[i].allOf - + list(schemas[i].properties.values()) - + ( - [schemas[i].items] - if schemas[i].type == "array" - and schemas[i].items - and not isinstance(schemas[i].items, list) - else [] - ) - + ( - schemas[i].items - if schemas[i].type == "array" - and schemas[i].items - and isinstance(schemas[i].items, list) - else [] - ), - ), # type: ignore[operator] - ), - ) - ) # type: ignore[arg-type] - for i in next - ] - ), + def _get_combined_attributes(schema): + """Combine attributes from the schema.""" + return ( + getattr(schema, "oneOf", []) # Swagger compat + + getattr(schema, "anyOf", []) # Swagger compat + + schema.allOf + + list(schema.properties.values()) + + ([schema.items] if schema.type == "array" and schema.items and not isinstance(schema, list) else []) + + (schema.items if schema.type == "array" and schema.items and isinstance(schema, list) else []) ) - sets = set(new.keys()) - schemas.update(new) - processed.update(sets) + @classmethod + def _process_schema_attributes(cls, schema, processed): + """Process attributes of a schema and filter out the processed ones.""" + combined_attributes = cls._get_combined_attributes(schema) + return { + id(item._target) + if isinstance(item, ReferenceBase) + else id(item): item._target + if isinstance(item, ReferenceBase) + else item + for item in combined_attributes + if isinstance(item, (ReferenceBase, SchemaBase)) and id(item) not in processed + } - return OpenAPI._iterate_schemas(schemas, sets, processed) + @classmethod + def _iterate_schemas(cls, schemas: dict, next_set: set, processed: set): + """Iteratively collect all schemas related to the starting set.""" + while next_set: + processed.update(next_set) + # Using dictionary comprehension to combine the results, + # which seems more efficient than ChainMap + new = {k: v for i in next_set for k, v in cls._process_schema_attributes(schemas[i], processed).items()} + next_set = set(new.keys()) - processed + schemas.update(new) + processed.update(next_set) + return processed def _init_schema_types_collect(self) -> Dict[str, "SchemaType"]: byname: Dict[str, "SchemaType"] = dict()