Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
mrwunderbar666 committed Aug 24, 2022
1 parent 2e691a5 commit 4b64711
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 77 deletions.
192 changes: 178 additions & 14 deletions flaskinventory/flaskdgraph/dgraph_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,150 @@ def update_facets(self, facets: dict) -> None:
def nquad(self) -> str:
return f'{self.newid}'

class Facet:
"""
Base class for facets. Use simple coercion.
Facet keys are strings and values can be string, bool, int, float and dateTime.
"""

predicate = None
default_operator = "eq"

def __init__(self, key: str,
dtype: Union[str, bool, int, float, datetime.datetime]=str,
queryable=False,
coerce=None,
query_label=None,
comparison_operators=None,
render_kw=None,
choices=None) -> None:

self.key = key
self.type = dtype
self.queryable = queryable
self._query_label = query_label
if isinstance(comparison_operators, dict):
comparison_operators = [(k, v) for k, v in comparison_operators.items()]
comparison_operators.insert(0, ('',''))
self.operators = comparison_operators
self.render_kw = render_kw or {}

if isinstance(choices, dict):
choices = [(k, v) for k, v in choices.items()]

self.choices = choices

def __repr__(self) -> str:
if self.predicate:
return f'<{self.type.__name__} Facet "{self.key}" of "{self.predicate}">'
else:
return f'<Unbound {self.type.__name__} Facet "{self.key}">'

def __str__(self) -> str:
if self.predicate:
return f'{self.predicate}|{self.key}'
else:
return f'{self.key}'

def corece(self, val) -> Any:
if self.type == bool:
return self._coerce_bool(val)
elif self.type == datetime.datetime:
try:
return self._coerce_datetime(val)
except:
return None
else:
try:
return self.type(val)
except:
return None

def query_filter(self, vals: Union[str, list], operator=None, predicate=None, **kwargs) -> str:

if not predicate:
predicate = self.predicate

if not operator:
operator = self.default_operator

if vals is None:
return None

if not isinstance(vals, list):
vals = [vals]


if len(vals) == 0:
return None

if self.type == datetime.datetime:
vals = [self.corece(vals)]

if operator == 'between':
if self.type == datetime.datetime:
return f'ge({self.key}, "{vals[0].strftime("%Y-%m-%d")}") AND lt({self.key}, "{vals[1].strftime("%Y-%m-%d")}")'
else:
return f'ge({self.key}, "{self.coerce(vals[0])}") AND lt({self.key}, "{self.corece(vals[1])}")'

else:
if self.type == datetime.datetime:
val1 = vals[0]
val2 = val1 + datetime.timedelta(days=1)
return f'ge({self.key}, "{val1.strftime("%Y-%m-%d")}") AND lt({self.key}, "{val2.strftime("%Y-%m-%d")}")'
filters = [f'{operator}({self.key}, "{self.corece(val)}")' for val in vals]
if len(filters) > 1:
filter_string = " OR ".join(filters)
return f'({filter_string})'
else:
return filters[0]


@staticmethod
def _coerce_bool(val) -> bool:
if isinstance(val, bool):
return bool(val)
elif isinstance(val, str):
return val.lower() in ('yes', 'true', 't', '1', 'y')
elif isinstance(val, int):
return val > 0
else:
return False

@staticmethod
def _coerce_datetime(val):
if isinstance(val, (datetime.date, datetime.datetime)):
return val
elif isinstance(val, int):
try:
return datetime.date(year=val, month=1, day=1)
except:
pass
return dateparser.parse(val)

@property
def query_label(self) -> str:
if self._query_label:
return self._query_label
else:
return f'{self.predicate.replace("_", " ").title()}: {self.key.replace("_", " ").title()}'

@property
def query_field(self) -> StringField:
self.render_kw.update({'data-entities': ",".join(Schema.__predicates_types__[self.predicate])})
if self.type == bool:
return BooleanField(label=self.query_label, render_kw=self.render_kw)
elif self.type == int:
return IntegerField(label=self.query_label, render_kw=self.render_kw)
elif self.choices:
return TomSelectMutlitpleField(label=self.query_label, render_kw=self.render_kw, choices=self.choices)
else:
return StringField(label=self.query_label, render_kw=self.render_kw)






class _PrimitivePredicate:

Expand All @@ -105,13 +249,14 @@ class _PrimitivePredicate:

dgraph_predicate_type = 'string'
is_list_predicate = False
comparison_operator = "eq"
default_operator = "eq"

def __init__(self,
label: str = None,
default: str = None,
required=False,
overwrite=False,
facets=None,
new=True,
edit=True,
queryable=False,
Expand All @@ -124,7 +269,7 @@ def __init__(self,
tom_select=False,
render_kw: dict = None,
predicate_alias: list = None,
comparison_operator: str = None) -> None:
comparison_operators: str = None) -> None:
"""
Contruct a new Primitive Predicate
"""
Expand All @@ -139,8 +284,22 @@ def __init__(self,
self.query_label = query_label or label
self.query_description = query_description
self.permission = permission
if comparison_operator:
self.comparison_operator = comparison_operator
self.operators = comparison_operators

# Facets: parameter should accept lists and single Facet objects
if isinstance(facets, Facet):
facets = [facets]

if facets:
self.facets = {facet.key: facet for facet in facets}
else:
self.facets = None

if isinstance(comparison_operators, dict):
comparison_operators = [(k, v) for k, v in comparison_operators.items()]
comparison_operators.insert(0, ('',''))
self.operators = comparison_operators


# WTF Forms
self.required = required
Expand Down Expand Up @@ -244,13 +403,13 @@ def query_filter(self, vals: Union[str, list], predicate=None, **kwargs) -> str:

try:
if self.is_list_predicate:
return " AND ".join([f'{self.comparison_operator}({predicate}, "{strip_query(val)}")' for val in vals])
return " AND ".join([f'{self.default_operator}({predicate}, "{strip_query(val)}")' for val in vals])
else:
vals_string = ", ".join([f'"{strip_query(val)}"' for val in vals])
if len(vals) == 1:
return f'{self.comparison_operator}({predicate}, {vals_string})'
return f'{self.default_operator}({predicate}, {vals_string})'
else:
return f'{self.comparison_operator}({predicate}, [{vals_string}])'
return f'{self.default_operator}({predicate}, [{vals_string}])'
except:
return f'has({predicate})'

Expand Down Expand Up @@ -442,7 +601,7 @@ class ReverseRelationship(_PrimitivePredicate):
"""

dgraph_predicate_type = 'uid'
comparison_operator = 'uid_in'
default_operator = 'uid_in'

def __init__(self,
predicate_name,
Expand All @@ -465,6 +624,11 @@ def __init__(self,

self.default_predicates = default_predicates

# Facets
if self.facets:
for facet in self.facets.values():
facet.predicate = predicate_name

# WTForms
# if we want the form field to show all choices automatically.
self.autoload_choices = autoload_choices
Expand Down Expand Up @@ -600,7 +764,7 @@ class MutualRelationship(_PrimitivePredicate):

dgraph_predicate_type = 'uid'
is_list_predicate = False
comparison_operator = "uid_in"
default_operator = "uid_in"

def __init__(self,
allow_new=False,
Expand Down Expand Up @@ -743,7 +907,7 @@ class UIDPredicate(Predicate):

dgraph_predicate_type = 'uid'
is_list_predicate = False
comparison_operator = 'uid_in'
default_operator = 'uid_in'

def __init__(self, *args, **kwargs) -> None:
super().__init__(read_only=True, hidden=True, new=False,
Expand Down Expand Up @@ -906,7 +1070,7 @@ class DateTime(Predicate):

dgraph_predicate_type = 'datetime'
is_list_predicate = False
comparison_operator = 'between'
default_operator = 'between'

def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -950,7 +1114,7 @@ def query_filter(self, vals: Union[str, list, int], custom_operator: Union['lt',
try:
if isinstance(vals, list) and len(vals) > 1:
vals = [self.validation_hook(val) for val in vals[:2]]
return f'{self.comparison_operator}({self.predicate}, "{vals[0].year}-01-01", "{vals[1].year}-12-31")'
return f'{self.default_operator}({self.predicate}, "{vals[0].year}-01-01", "{vals[1].year}-12-31")'

else:
if isinstance(vals, list):
Expand All @@ -959,7 +1123,7 @@ def query_filter(self, vals: Union[str, list, int], custom_operator: Union['lt',
if custom_operator:
return f'{custom_operator}({self.predicate}, "{date.year}")'
else:
return f'{self.comparison_operator}({self.predicate}, "{date.year}-01-01", "{date.year}-12-31")'
return f'{self.default_operator}({self.predicate}, "{date.year}-01-01", "{date.year}-12-31")'
except:
return f'has({self.predicate})'

Expand Down Expand Up @@ -1078,7 +1242,7 @@ class SingleRelationship(Predicate):

dgraph_predicate_type = 'uid'
is_list_predicate = False
comparison_operator = 'uid_in'
default_operator = 'uid_in'

def __init__(self,
relationship_constraint=None,
Expand Down
Loading

0 comments on commit 4b64711

Please sign in to comment.