Skip to content

Commit

Permalink
performance improvement
Browse files Browse the repository at this point in the history
iterating without recursion, using just dict comprehension instead of ChainMap, and avoiding filtering/mapping looks to be significantly more performant
  • Loading branch information
ggpwnkthx committed Sep 7, 2023
1 parent 9947b29 commit 7fedf4d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ tests/data/
tests/my_*.py
docs/build/
.pdm-python
*.egg-info
*.ipynb
91 changes: 34 additions & 57 deletions aiopenapi3/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,66 +397,43 @@ def _init_operationindex(self, use_operation_tags: bool):
self._operationindex = OperationIndex(self, use_operation_tags)

@staticmethod
def _iterate_schemas(schemas: Dict[int, "SchemaType"], next: Set[int], processed: Set[int]):
"""
recursively collect all schemas related to the starting set
"""
if not next:
return processed

processed.update(next)

def not_in_processed(z: Tuple[int, SchemaBase]) -> TypeGuard[Tuple[int, "SchemaType"]]:
return z[0] not in processed

def is_instance_schema_ref(
x: Union["SchemaType", "ReferenceType"]
) -> TypeGuard[Union["SchemaType", "ReferenceType"]]:
return isinstance(x, (ReferenceBase, SchemaBase))

new: Dict[int, "SchemaType"] = cast(
Dict[int, "SchemaType"],
collections.ChainMap(
*[
dict(
filter(
not_in_processed,
map(
lambda y: (id(y._target), y._target) if isinstance(y, ReferenceBase) else (id(y), y),
filter(
is_instance_schema_ref,
getattr(schemas[i], "oneOf", []) # Swagger compat
+ getattr(schemas[i], "anyOf", []) # Swagger compat
+ schemas[i].allOf
+ list(schemas[i].properties.values())
+ (
[schemas[i].items]
if schemas[i].type == "array"
and schemas[i].items
and not isinstance(schemas[i].items, list)
else []
)
+ (
schemas[i].items
if schemas[i].type == "array"
and schemas[i].items
and isinstance(schemas[i].items, list)
else []
),
), # type: ignore[operator]
),
)
) # type: ignore[arg-type]
for i in next
]
),
def _get_combined_attributes(schema):
"""Combine attributes from the schema."""
return (
getattr(schema, "oneOf", []) # Swagger compat
+ getattr(schema, "anyOf", []) # Swagger compat
+ schema.allOf
+ list(schema.properties.values())
+ ([schema.items] if schema.type == "array" and schema.items and not isinstance(schema, list) else [])
+ (schema.items if schema.type == "array" and schema.items and isinstance(schema, list) else [])
)

sets = set(new.keys())
schemas.update(new)
processed.update(sets)
@classmethod
def _process_schema_attributes(cls, schema, processed):
"""Process attributes of a schema and filter out the processed ones."""
combined_attributes = cls._get_combined_attributes(schema)
return {
id(item._target)
if isinstance(item, ReferenceBase)
else id(item): item._target
if isinstance(item, ReferenceBase)
else item
for item in combined_attributes
if isinstance(item, (ReferenceBase, SchemaBase)) and id(item) not in processed
}

return OpenAPI._iterate_schemas(schemas, sets, processed)
@classmethod
def _iterate_schemas(cls, schemas: dict, next_set: set, processed: set):
"""Iteratively collect all schemas related to the starting set."""
while next_set:
processed.update(next_set)
# Using dictionary comprehension to combine the results,
# which seems more efficient than ChainMap
new = {k: v for i in next_set for k, v in cls._process_schema_attributes(schemas[i], processed).items()}
next_set = set(new.keys()) - processed
schemas.update(new)
processed.update(next_set)
return processed

def _init_schema_types_collect(self) -> Dict[str, "SchemaType"]:
byname: Dict[str, "SchemaType"] = dict()
Expand Down

0 comments on commit 7fedf4d

Please sign in to comment.