Skip to content

Commit

Permalink
small trove_indexcard speedup
Browse files Browse the repository at this point in the history
- remove unused fields (and code)
- consolidate nested docs to reduce total number
  • Loading branch information
aaxelb committed Aug 23, 2023
1 parent dd6f9ed commit 80b9d8f
Showing 1 changed file with 87 additions and 189 deletions.
276 changes: 87 additions & 189 deletions share/search/index_strategy/trove_indexcard.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import time
import uuid
from typing import Iterable, Optional
from typing import Iterable

import elasticsearch8
from gather import primitive_rdf
Expand Down Expand Up @@ -61,7 +61,7 @@ class TroveIndexcardIndexStrategy(Elastic8IndexStrategy):
CURRENT_STRATEGY_CHECKSUM = ChecksumIri(
checksumalgorithm_name='sha-256',
salt='TroveIndexcardIndexStrategy',
hexdigest='806159c98b9eedafc4d169cecd2315b6ce4cb40ad0322699c6b13f2ad9be2675',
hexdigest='43c4a0ba005630a54973a8b1d52b5d302bd6f34c82eb0f8516849118c7d2527b',
)

# abstract method from IndexStrategy
Expand Down Expand Up @@ -90,10 +90,6 @@ def index_mappings(self):
'path_from_focus': _capped_keyword,
'suffuniq_path_from_focus': _capped_keyword,
'property_iri': _capped_keyword,
'nearest_subject_iri': _capped_keyword,
'nearest_subject_suffuniq_iri': _capped_keyword,
'path_from_nearest_subject': _capped_keyword,
'suffuniq_path_from_nearest_subject': _capped_keyword,
'distance_from_focus': {'type': 'keyword'}, # numeric value as keyword (used for 'term' filter)
}
return {
Expand Down Expand Up @@ -163,20 +159,20 @@ def _build_sourcedoc(self, indexcard_rdf):
_nested_iris = defaultdict(set)
_nested_dates = defaultdict(set)
_nested_texts = defaultdict(set)
for _walk_pathkey, _walk_obj in _PredicatePathWalker(_rdfdoc.tripledict).walk_from_subject(indexcard_rdf.focus_iri):
for _walk_path, _walk_obj in _PredicatePathWalker(_rdfdoc.tripledict).walk_from_subject(indexcard_rdf.focus_iri):
if isinstance(_walk_obj, str):
_nested_iris[_walk_pathkey].add(_walk_obj)
_nested_iris[_NestedIriKey.for_iri_at_path(_walk_path, _walk_obj, _rdfdoc)].add(_walk_obj)
elif isinstance(_walk_obj, datetime.date):
_nested_dates[_walk_pathkey].add(datetime.date.isoformat(_walk_obj))
elif is_date_property(_walk_pathkey.last_predicate_iri):
_nested_dates[_walk_path].add(datetime.date.isoformat(_walk_obj))
elif is_date_property(_walk_path[-1]):
try:
datetime.date.fromisoformat(_walk_obj.unicode_text)
except ValueError:
pass
logger.debug('skipping malformatted date "%s" in %s', _walk_obj.unicode_text, indexcard_rdf)
else:
_nested_dates[_walk_pathkey].add(_walk_obj.unicode_text)
_nested_dates[_walk_path].add(_walk_obj.unicode_text)
elif isinstance(_walk_obj, primitive_rdf.Text):
_nested_texts[(_walk_pathkey, _walk_obj.language_iri)].add(_walk_obj.unicode_text)
_nested_texts[(_walk_path, _walk_obj.language_iri)].add(_walk_obj.unicode_text)
_focus_iris = {indexcard_rdf.focus_iri}
_suffuniq_focus_iris = {get_sufficiently_unique_iri(indexcard_rdf.focus_iri)}
for _identifier in indexcard_rdf.indexcard.focus_identifier_set.all():
Expand All @@ -189,57 +185,41 @@ def _build_sourcedoc(self, indexcard_rdf):
'suffuniq_focus_iri': list(_suffuniq_focus_iris),
'source_record_identifier': indexcard_rdf.indexcard.source_record_suid.identifier,
'source_config_label': indexcard_rdf.indexcard.source_record_suid.source_config.label,
'nested_iri': [
self._iri_nested_sourcedoc(_pathkey, _iri, _rdfdoc)
for _pathkey, _value_set in _nested_iris.items()
for _iri in _value_set
if is_worthwhile_iri(_iri)
],
'nested_iri': list(filter(bool, (
self._iri_nested_sourcedoc(_nested_iri_key, _iris, _rdfdoc)
for _nested_iri_key, _iris in _nested_iris.items()
))),
'nested_date': [
{
**_pathkey.as_nested_keywords(),
**_iri_path_as_indexable_fields(_path),
'date_value': list(_value_set),
}
for _pathkey, _value_set in _nested_dates.items()
for _path, _value_set in _nested_dates.items()
],
'nested_text': [
{
**_pathkey.as_nested_keywords(),
**_iri_path_as_indexable_fields(_path),
'language_iri': _language_iri,
'text_value': list(_value_set),
}
for (_pathkey, _language_iri), _value_set in _nested_texts.items()
for (_path, _language_iri), _value_set in _nested_texts.items()
],
}

def _iri_nested_sourcedoc(self, pathkey, iri, rdfdoc):
_iris = [
iri,
*rdfdoc.q(iri, OWL.sameAs),
]
def _iri_nested_sourcedoc(self, iri_key: '_NestedIriKey', iris, rdfdoc):
_iris_with_synonyms = set(filter(is_worthwhile_iri, iris))
for _iri in iris:
_iris_with_synonyms.update(
filter(is_worthwhile_iri, rdfdoc.q(_iri, OWL.sameAs)),
)
if not _iris_with_synonyms:
return None
_sourcedoc = {
**pathkey.as_nested_keywords(),
'iri_value': _iris,
**iri_key.as_indexable_fields(),
'iri_value': list(_iris_with_synonyms),
'suffuniq_iri_value': [
get_sufficiently_unique_iri(_iri)
for _iri in _iris
],
'value_type_iri': list(rdfdoc.q(iri, RDF.type)),
# TODO: don't discard language for name/title/label
'value_name_text': [
_text.unicode_text
for _text in rdfdoc.q(iri, NAME_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
],
'value_title_text': [
_text.unicode_text
for _text in rdfdoc.q(iri, TITLE_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
],
'value_label_text': [
_text.unicode_text
for _text in rdfdoc.q(iri, LABEL_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
for _iri in _iris_with_synonyms
],
}
return _sourcedoc
Expand Down Expand Up @@ -341,96 +321,6 @@ def pls_handle_valuesearch(self, valuesearch_params: ValuesearchParams) -> Value
raise exceptions.IndexStrategyError() from error # TODO: error messaging
return self._valuesearch_response(valuesearch_params, _es8_response, _cursor)

def get_identifier_usage_as_value(self, identifier: trove_db.ResourceIdentifier) -> Optional[dict]:
_filter_cards_with_identifier = {'nested': {
'path': 'nested_iri',
'query': {'term': {
'nested_iri.suffuniq_iri_value': identifier.sufficiently_unique_iri,
}},
}}
_identifier_usage_agg = {
'in_nested_iri': {
'nested': {'path': 'nested_iri'},
'aggs': {
'with_iri': {
'filter': {'term': {
'nested_iri.suffuniq_iri_value': identifier.sufficiently_unique_iri,
}},
'aggs': {
'exact_iri_value': {'terms': {
'field': 'nested_iri.iri_value',
'size': 100,
}},
'for_property': {'terms': {
'field': 'nested_iri.property_iri',
'size': 100,
}},
'for_path_from_focus': {'terms': {
'field': 'nested_iri.path_from_focus',
'size': 100,
}},
'for_path_from_any_subject': {'terms': {
'field': 'nested_iri.path_from_nearest_subject',
'size': 100,
}},
},
},
},
},
'in_nested_text': {
'nested': {'path': 'nested_text'},
'aggs': {
'about_iri': {
'filter': {'term': {
'nested_text.nearest_subject_suffuniq_iri': identifier.sufficiently_unique_iri,
}},
'aggs': {
'related_text': {'terms': {
'field': 'nested_text.text_value.raw',
'size': 100,
}},
'namelike_text_properties': {
'filter': {'terms': {
'nested_text.suffuniq_path_from_nearest_subject': [
iri_path_as_keyword([_iri], suffuniq=True)
for _iri in NAMELIKE_PROPERTIES
],
}},
'aggs': {
'namelike_text': {'terms': {
'field': 'nested_text.text_value.raw',
'size': 100,
}},
},
},
},
},
},
},
}
try:
_es8_response = self.index_strategy.es8_client.search(
index=self.indexname,
query=_filter_cards_with_identifier,
size=0, # ignore cardsearch hits; just want the aggs
aggs=_identifier_usage_agg,
)
except elasticsearch8.TransportError as error:
raise exceptions.IndexStrategyError() from error # TODO: error messaging
if not _es8_response['hits']['total']['value']:
return None
_iri_results = _es8_response['aggregations']['in_nested_iri']['with_iri']
_text_results = _es8_response['aggregations']['in_nested_text']['about_iri']
return {
# TODO: include bucket counts
'iri': _bucketlist(_iri_results['exact_iri_value']),
'for_property': _bucketlist(_iri_results['for_property']),
'for_path_from_focus': _bucketlist(_iri_results['for_path_from_focus']),
'for_path_from_any_subject': _bucketlist(_iri_results['for_path_from_any_subject']),
'namelike_text': _bucketlist(_text_results['namelike_text_properties']['namelike_text']),
'related_text': _bucketlist(_text_results['related_text']),
}

###
# query implementation

Expand Down Expand Up @@ -976,6 +866,16 @@ def _daterange_value_and_format(datevalue: str):
raise ValueError(f'bad date value "{datevalue}"')


def _iri_path_as_indexable_fields(path: tuple[str, ...]):
assert path, 'path should not be empty'
return {
'path_from_focus': iri_path_as_keyword(path),
'suffuniq_path_from_focus': iri_path_as_keyword(path, suffuniq=True),
'property_iri': path[-1],
'distance_from_focus': len(path),
}


@dataclasses.dataclass
class _SimpleCursor:
start_index: int
Expand Down Expand Up @@ -1013,61 +913,15 @@ def first_cursor(self) -> str | None:


class _PredicatePathWalker:
@dataclasses.dataclass(frozen=True)
class PathKey:
path_from_start: tuple[str]
nearest_subject_iri: str
path_from_nearest_subject: tuple[str]

@classmethod
def initial(cls, subject_iri):
return cls(
path_from_start=(),
nearest_subject_iri=subject_iri,
path_from_nearest_subject=(),
)

def step(self, subject_or_blanknode, predicate_iri):
if isinstance(subject_or_blanknode, str) and is_worthwhile_iri(subject_or_blanknode):
return self.__class__(
path_from_start=(*self.path_from_start, predicate_iri),
nearest_subject_iri=subject_or_blanknode,
path_from_nearest_subject=(predicate_iri,),
)
return self.__class__(
path_from_start=(*self.path_from_start, predicate_iri),
nearest_subject_iri=self.nearest_subject_iri,
path_from_nearest_subject=(*self.path_from_nearest_subject, predicate_iri),
)

@property
def last_predicate_iri(self):
return self.path_from_start[-1]

def as_nested_keywords(self):
return {
'path_from_focus': iri_path_as_keyword(self.path_from_start),
'suffuniq_path_from_focus': iri_path_as_keyword(self.path_from_start, suffuniq=True),
'property_iri': self.last_predicate_iri,
'nearest_subject_iri': self.nearest_subject_iri,
'nearest_subject_suffuniq_iri': get_sufficiently_unique_iri(self.nearest_subject_iri),
'path_from_nearest_subject': iri_path_as_keyword(self.path_from_nearest_subject),
'suffuniq_path_from_nearest_subject': iri_path_as_keyword(self.path_from_nearest_subject, suffuniq=True),
'distance_from_focus': len(self.path_from_start),
}

WalkYield = tuple[PathKey, primitive_rdf.RdfObject]
WalkYield = tuple[tuple[str, ...], primitive_rdf.RdfObject]

def __init__(self, tripledict: primitive_rdf.RdfTripleDictionary):
self.tripledict = tripledict
self._visiting = set()

def walk_from_subject(self, iri_or_blanknode, last_pathkey=None) -> Iterable[WalkYield]:
def walk_from_subject(self, iri_or_blanknode, last_path: tuple[str, ...] = ()) -> Iterable[WalkYield]:
'''walk the graph from the given subject, yielding (pathkey, obj) for every reachable object
'''
if last_pathkey is None:
assert isinstance(iri_or_blanknode, str)
last_pathkey = _PredicatePathWalker.PathKey.initial(iri_or_blanknode)
with self._visit(iri_or_blanknode):
_twopledict = (
primitive_rdf.twopleset_as_twopledict(iri_or_blanknode)
Expand All @@ -1076,17 +930,61 @@ def walk_from_subject(self, iri_or_blanknode, last_pathkey=None) -> Iterable[Wal
)
for _predicate_iri, _obj_set in _twopledict.items():
if _predicate_iri not in SKIPPABLE_PROPERTIES:
_pathkey = last_pathkey.step(iri_or_blanknode, _predicate_iri)
_path = (*last_path, _predicate_iri)
for _obj in _obj_set:
if not isinstance(_obj, frozenset): # omit the blanknode as a value
yield (_pathkey, _obj)
yield (_path, _obj)
if isinstance(_obj, (str, frozenset)) and (_obj not in self._visiting):
# step further for iri or blanknode
yield from self.walk_from_subject(_obj, last_pathkey=_pathkey)
yield from self.walk_from_subject(_obj, last_path=_path)

@contextlib.contextmanager
def _visit(self, focus_obj):
assert focus_obj not in self._visiting
self._visiting.add(focus_obj)
yield
self._visiting.discard(focus_obj)


@dataclasses.dataclass(frozen=True)
class _NestedIriKey:
'''if this is the same for multiple iri values, they can be combined in one `nested_iri` doc
'''
path: tuple[str, ...]
type_iris: frozenset[str]
label_text: frozenset[str]
title_text: frozenset[str]
name_text: frozenset[str]

@classmethod
def for_iri_at_path(cls, path: tuple[str, ...], iri: str, rdfdoc):
return cls(
path=path,
type_iris=frozenset(rdfdoc.q(iri, RDF.type)),
# TODO: don't discard language for name/title/label
name_text=frozenset(
_text.unicode_text
for _text in rdfdoc.q(iri, NAME_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
),
title_text=frozenset(
_text.unicode_text
for _text in rdfdoc.q(iri, TITLE_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
),
label_text=frozenset(
_text.unicode_text
for _text in rdfdoc.q(iri, LABEL_PROPERTIES)
if isinstance(_text, primitive_rdf.Text)
),
)

def as_indexable_fields(self):
# matches fields in the mapping for `nested_iri`, above
return {
**_iri_path_as_indexable_fields(self.path),
'value_type_iri': list(self.type_iris),
'value_label_text': list(self.label_text),
'value_title_text': list(self.title_text),
'value_name_text': list(self.name_text),
}

0 comments on commit 80b9d8f

Please sign in to comment.