Skip to content

Commit

Permalink
feat: add property filter to search API
Browse files Browse the repository at this point in the history
  • Loading branch information
eric-nguyen-cs committed Mar 21, 2024
1 parent dbedfb9 commit 004cb15
Showing 1 changed file with 127 additions and 1 deletion.
128 changes: 127 additions & 1 deletion backend/editor/models/search_models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Annotated, Literal

from pydantic import Field, StringConstraints, TypeAdapter, computed_field
from pydantic import Field, StringConstraints, TypeAdapter, computed_field, field_validator

from .base_models import BaseModel
from .node_models import EntryNode
Expand Down Expand Up @@ -115,6 +116,130 @@ def build_cypher_query(self, param_name: str) -> CypherQuery:
)


class PropertyFilterSearchTerm(AbstractFilterSearchTerm):
filter_type: Literal["property"]
filter_value: str

@field_validator("filter_value")
@classmethod
def validate_filter_value(cls, filter_value: str):
"""
The filter value is in the format `not:inherited:property_name:property_value`
where `not:` and `inherited:` and `:property_value` are optional.
Note that a property_name is always of format `name:lc`.
"""
parsed_value = filter_value
if parsed_value.startswith("not:"):
parsed_value = parsed_value[4:]
if parsed_value.startswith("inherited:"):
parsed_value = parsed_value[10:]

assert ":" in parsed_value, "A property_name is mandatory and must contain a colon"

terms = parsed_value.split(":")
property_name = terms[0] + ":" + terms[1]

if not re.match(r"^[^:\\]+:[a-z]{2}$", property_name):
raise ValueError("Invalid property_name")

return filter_value

@computed_field
def negated(self) -> bool:
return self.filter_value.startswith("not:")

@computed_field
def inherited(self) -> bool:
filter_value = self.get_parsed_filter_value(self.negated)
return filter_value.startswith("inherited:")

@computed_field
def property_name(self) -> str:
filter_value = self.get_parsed_filter_value(self.negated, self.inherited)
terms = filter_value.split(":")
return terms[0] + "_" + terms[1]

@computed_field
def property_value(self) -> str | None:
filter_value = self.get_parsed_filter_value(self.negated, self.inherited)
terms = filter_value.split(":")
return ":".join(terms[2:]) if len(terms) > 2 else None

def get_parsed_filter_value(self, negated=False, inherited=False):
filter_value = self.filter_value
if negated:
filter_value = filter_value[4:]
if inherited:
filter_value = filter_value[10:]
return filter_value

def build_cypher_query(self, param_name: str) -> CypherQuery:
branches = {
"negated": self.negated,
"inherited": self.inherited,
"with_value": self.property_value is not None,
}
match branches:
case {"negated": False, "inherited": False, "with_value": False}:
return CypherQuery(f"n.prop_{self.property_name} IS NOT NULL")
case {"negated": True, "inherited": False, "with_value": False}:
return CypherQuery(f"n.prop_{self.property_name} IS NULL")
case {"negated": False, "inherited": False, "with_value": True}:
return CypherQuery(
f"n.prop_{self.property_name} = ${param_name}",
{param_name: self.property_value},
)
case {"negated": True, "inherited": False, "with_value": True}:
return CypherQuery(
f"n.prop_{self.property_name} <> ${param_name}",
{param_name: self.property_value},
)
case {"negated": False, "inherited": True, "with_value": False}:
return CypherQuery(
f"""(n.prop_{self.property_name} IS NOT NULL OR
any(
ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
WHERE ancestor.prop_{self.property_name} IS NOT NULL)
)""",
)
case {"negated": True, "inherited": True, "with_value": False}:
return CypherQuery(
f"""(n.prop_{self.property_name} IS NULL AND
all(
ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
WHERE ancestor.prop_{self.property_name} IS NULL)
)""",
)
case {"negated": False, "inherited": True, "with_value": True}:
return CypherQuery(
f"""
[
property IN
[n.prop_{self.property_name}] +
[(n)<-[:is_child_of*]-(p:ENTRY) | p.prop_{self.property_name}]
WHERE property IS NOT NULL
][0]
= ${param_name}""",
{param_name: self.property_value},
)
case {"negated": True, "inherited": True, "with_value": True}:
return CypherQuery(
f"""((n.prop_{self.property_name} IS NULL AND
all(
ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
WHERE ancestor.prop_{self.property_name} IS NULL)
) OR
[
property IN
[n.prop_{self.property_name}] +
[(n)<-[:is_child_of*]-(p:ENTRY) | p.prop_{self.property_name}]
WHERE property IS NOT NULL
][0]
<> ${param_name})""",
{param_name: self.property_value},
)


FilterSearchTerm = Annotated[
(
IsFilterSearchTerm
Expand All @@ -123,6 +248,7 @@ def build_cypher_query(self, param_name: str) -> CypherQuery:
| ChildFilterSearchTerm
| AncestorFilterSearchTerm
| DescendantFilterSearchTerm
| PropertyFilterSearchTerm
),
Field(discriminator="filter_type"),
]
Expand Down

0 comments on commit 004cb15

Please sign in to comment.