Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Aug 26, 2024
1 parent 1a1e9b7 commit 28bb0ec
Show file tree
Hide file tree
Showing 17 changed files with 632 additions and 357 deletions.
80 changes: 78 additions & 2 deletions _TODO.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,83 @@ after reluctantly accepting `nested` for certain value-searches... how about mul

select suitable index-strategy based on query

most queries go to a more constrained index-strategy with a smaller, faster, completely non-nested index
most queries go to a more constrained index-strategy with a smaller, faster,
completely non-nested index (calling it "trovesearch_indexcard")

queries that need the extra complexity go to a more complex index-strategy with larger, slower index
queries that need the extra complexity go to a more complex index-strategy
with larger, slower index (calling it "trovesearch_excessive")

however... even simple value-searches need to get metadata about each iri value
(at least `rdf:type` and something name-like (`dcterms:title`, `foaf:name`, `rdfs:label`...))
-- without the `nested` mapping, there's not a good way (that i see) to do that in a single query

so how about a third index strategy just for looking up iri-value metadata?
(calling it "trovesearch_irivalues")


trovesearch_indexcard (one per indexcard):
simple:
indexcard_iri
indexcard_pk
suid.source_config_label
suid.source_record_identifier
focus_iri.exact
focus_iri.suffuniq
propertypaths_present
flattened:
iri_by_propertypath.*
iri_by_depth.*
dynamic:
dynamics.text_by_propertypath.*
dynamics.text_by_depth.*
dynamics.date_by_propertypath.*


trovesearch_irivalues (one per (indexcard, iri) pair)
simple:
iri.exact (includes sameAs synonyms)
iri.suffuniq (includes sameAs synonyms)
indexcard_iri
indexcard_pk
propertypath_from_focus
depth_from_focus
flattened:
iri_by_relative_propertypath.*
iri_by_relative_depth.*
dynamic:
dynamics.text_by_relative_propertypath.*
dynamics.text_by_relative_depth.*
dynamics.date_by_relative_propertypath.*


trovesearch_excessive:
(all fields from trovesearch_indexcard, plus a nested field with
fields from (or similar to) trovesearch_irivalues)


...ok maybe, but revisiting "trovesearch_irivalues (one per (indexcard, iri) pair)",
that's a looot of documents, and awful wasteful for the common case of commonly used iris,
and trickier to remove docs for iri values no longer used

returning to an old idea discarded from the first "index-card-search" implementation...
how about an index with (only) one doc per referenced iri? would need to:
- use IDENTIFIER_USAGE/BACKFILL_IDENTIFIER_USAGE messages
emit after non-backfill indexcard indexing, perhaps deduped within each message chunk
- index strategy should, for each identifier message:
query for indexcards that include that identifier,
aggregate metadata included in those indexcards about that identifier,
store document describing that identifier and its usage

important to account for erroneous sameAs assertions (make it easy to undo)

revised trovesearch_irivalues (one per iri)
simple:
iri
used_at_propertypath
flattened:
iri_by_relative_propertypath.*
iri_by_relative_depth.*
dynamic:
dynamics.text_by_relative_propertypath.*
dynamics.text_by_relative_depth.*
dynamics.date_by_relative_propertypath.*
2 changes: 1 addition & 1 deletion share/models/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class FeatureFlag(models.Model):
ELASTIC_EIGHT_DEFAULT = 'elastic_eight_default'
IGNORE_SHAREV2_INGEST = 'ignore_sharev2_ingest'
SUGGEST_CREATOR_FACET = 'suggest_creator_facet'
USE_FLATTERY_STRATEGY = 'use_flattery_strategy'
TROVESEARCH_POLYSTRAT = 'trovesearch_polystrat'

# name _should_ be one of the constants above, but that is not enforced by `choices`
name = models.TextField(unique=True)
Expand Down
3 changes: 1 addition & 2 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ def _get_daemon_messages(self):
return daemon_messages_by_target_id

def _handle_some_messages(self):
# each message corresponds to one action on this daemon's index
start_time = time.time()
doc_count, error_count = 0, 0
daemon_messages_by_target_id = self._get_daemon_messages()
Expand All @@ -270,7 +269,7 @@ def _handle_some_messages(self):
logger.error('%sEncountered error: %s', self.log_prefix, message_response.error_text)
sentry_sdk.capture_message('error handling message', extras={'message_response': message_response})
target_id = message_response.index_message.target_id
for daemon_message in daemon_messages_by_target_id.pop(target_id):
for daemon_message in daemon_messages_by_target_id.pop(target_id, ()):
daemon_message.ack() # finally set it free
if daemon_messages_by_target_id: # should be empty by now
logger.error('%sUnhandled messages?? %s', self.log_prefix, len(daemon_messages_by_target_id))
Expand Down
18 changes: 10 additions & 8 deletions share/search/index_strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
from .sharev2_elastic5 import Sharev2Elastic5IndexStrategy
from .sharev2_elastic8 import Sharev2Elastic8IndexStrategy
from .trove_indexcard_flats import TroveIndexcardFlatsIndexStrategy
from .trovesearch_flattery import TrovesearchFlatteryIndexStrategy
from .trovesearch_nesterly import TrovesearchNesterlyIndexStrategy
from .trovesearch_indexcard import TrovesearchIndexcardIndexStrategy
from .trovesearch_irivalues import TrovesearchIrivaluesIndexStrategy
from .trovesearch_excessive import TrovesearchExcessiveIndexStrategy
from ._base import IndexStrategy


Expand Down Expand Up @@ -39,8 +40,9 @@ def _iter_all_index_strategies():
if settings.ELASTICSEARCH8_URL:
yield Sharev2Elastic8IndexStrategy(name='sharev2_elastic8')
yield TroveIndexcardFlatsIndexStrategy(name='trove_indexcard_flats')
yield TrovesearchFlatteryIndexStrategy(name='trovesearch_flattery')
yield TrovesearchNesterlyIndexStrategy(name='trovesearch_nesterly')
yield TrovesearchIndexcardIndexStrategy(name='trovesearch_indexcard')
yield TrovesearchIrivaluesIndexStrategy(name='trovesearch_irivalues')
yield TrovesearchExcessiveIndexStrategy(name='trovesearch_excessive')


def get_index_strategy(strategyname: str) -> IndexStrategy:
Expand Down Expand Up @@ -85,12 +87,12 @@ def get_index_for_sharev2_search(requested_name=None) -> IndexStrategy.SpecificI
def get_index_for_trovesearch(params: search_params.CardsearchParams) -> IndexStrategy.SpecificIndex:
if params.index_strategy_name: # specific strategy requested
_name = params.index_strategy_name
elif not FeatureFlag.objects.flag_is_up(FeatureFlag.USE_FLATTERY_STRATEGY):
elif not FeatureFlag.objects.flag_is_up(FeatureFlag.TROVESEARCH_POLYSTRAT):
_name = 'trove_indexcard_flats'
else:
_name = (
'trovesearch_flattery'
if TrovesearchFlatteryIndexStrategy.works_with_params(params)
else 'trovesearch_nesterly'
'trovesearch_indexcard'
if TrovesearchIndexcardIndexStrategy.works_with_params(params)
else 'trovesearch_excessive'
)
return get_specific_index(_name, for_search=True)
231 changes: 231 additions & 0 deletions share/search/index_strategy/_trovesearch_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from __future__ import annotations
import base64
from collections import defaultdict
import contextlib
import dataclasses
import datetime
import functools
import json
import logging
import typing

from django.db.models import Exists, OuterRef
from primitive_metadata import primitive_rdf as rdf

from trove import models as trove_db
from trove.trovesearch.search_params import (
is_globpath,
)
from trove.util.iris import get_sufficiently_unique_iri, is_worthwhile_iri
from trove.vocab.namespaces import (
DCTERMS,
FOAF,
OSFMAP,
OWL,
RDFS,
SKOS,
TROVE,
)
from trove.vocab.osfmap import is_date_property


_logger = logging.getLogger(__name__)


###
# type aliases

Propertypath = tuple[str, ...]


###
# constants

SKIPPABLE_PROPERTIES = (
OSFMAP.contains, # too much, not helpful
OWL.sameAs, # handled special
)

TITLE_PROPERTIES = (DCTERMS.title,)
NAME_PROPERTIES = (FOAF.name, OSFMAP.fileName)
LABEL_PROPERTIES = (RDFS.label, SKOS.prefLabel, SKOS.altLabel)
NAMELIKE_PROPERTIES = (*TITLE_PROPERTIES, *NAME_PROPERTIES, *LABEL_PROPERTIES)

VALUESEARCH_MAX = 234
CARDSEARCH_MAX = 9997

KEYWORD_LENGTH_MAX = 8191 # skip keyword terms that might exceed lucene's internal limit
# (see https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-above.html)
KEYWORD_MAPPING = {'type': 'keyword', 'ignore_above': KEYWORD_LENGTH_MAX}
FLATTENED_MAPPING = {'type': 'flattened', 'ignore_above': KEYWORD_LENGTH_MAX}
TEXT_MAPPING = {
'type': 'text',
'index_options': 'offsets', # for highlighting
}
IRI_KEYWORD_MAPPING = {
'type': 'object',
'properties': { # for indexing iri values two ways:
'exact': KEYWORD_MAPPING, # the exact iri value (e.g. "https://foo.example/bar/")
'suffuniq': KEYWORD_MAPPING, # "sufficiently unique" (e.g. "://foo.example/bar")
},
}


###
# utilities

def latest_rdf_for_indexcard_pks(indexcard_pks):
return (
trove_db.LatestIndexcardRdf.objects
.filter(indexcard_id__in=indexcard_pks)
.filter(Exists(
trove_db.DerivedIndexcard.objects
.filter(upriver_indexcard_id=OuterRef('indexcard_id'))
.filter(deriver_identifier__in=(
trove_db.ResourceIdentifier.objects
.queryset_for_iri(TROVE['derive/osfmap_json'])
))
))
.exclude(indexcard__deleted__isnull=False)
.select_related('indexcard__source_record_suid__source_config')
.prefetch_related('indexcard__focus_identifier_set')
)


def propertypath_as_keyword(path: Propertypath) -> str:
return json.dumps(path if is_globpath(path) else [
get_sufficiently_unique_iri(_iri)
for _iri in path
])


def propertypath_as_field_name(path: Propertypath) -> str:
_path_keyword = propertypath_as_keyword(path)
return base64.urlsafe_b64encode(_path_keyword.encode()).decode()


def suffuniq_iris(iris: typing.Iterable[str]) -> list[str]:
# deduplicates, may reorder
return list({
get_sufficiently_unique_iri(_iri)
for _iri in iris
})


@dataclasses.dataclass
class GraphWalk:
rdfdoc: rdf.RdfGraph
focus_iri: str
recursive: bool = True
iri_values: dict[Propertypath, set[str]] = dataclasses.field(
default_factory=lambda: defaultdict(set),
)
text_values: dict[Propertypath, set[rdf.Literal]] = dataclasses.field(
default_factory=lambda: defaultdict(set),
)
date_values: dict[Propertypath, set[datetime.date]] = dataclasses.field(
default_factory=lambda: defaultdict(set),
)
paths_walked: set[Propertypath] = dataclasses.field(default_factory=set)
_visiting: set[str] = dataclasses.field(default_factory=set)

def __post_init__(self):
for _walk_path, _walk_obj in self._walk_from_subject(self.focus_iri):
self.paths_walked.add(_walk_path)
if isinstance(_walk_obj, str):
self.iri_values[_walk_path].add(_walk_obj)
elif isinstance(_walk_obj, datetime.date):
self.date_values[_walk_path].add(_walk_obj)
elif is_date_property(_walk_path[-1]):
try:
_parsed_date = datetime.date.fromisoformat(_walk_obj.unicode_value)
except ValueError:
_logger.debug('skipping malformatted date "%s"', _walk_obj.unicode_value)
else:
self.date_values[_walk_path].add(_parsed_date)
elif isinstance(_walk_obj, rdf.Literal):
self.text_values[_walk_path].add(_walk_obj.unicode_value)

def shortwalk(self, from_iri: str) -> GraphWalk:
return GraphWalk(
self.rdfdoc,
self.focus_iri,
recursive=False,
)

def _walk_from_subject(
self,
iri_or_blanknode: str | rdf.Blanknode,
path_so_far: tuple[str, ...] = (),
) -> typing.Iterator[tuple[Propertypath, rdf.RdfObject]]:
'''walk the graph from the given subject, yielding (pathkey, obj) for every reachable object
'''
with self._visit(iri_or_blanknode):
_twoples = (
iri_or_blanknode
if isinstance(iri_or_blanknode, frozenset)
else self.rdfdoc.tripledict.get(iri_or_blanknode, {})
)
for _next_steps, _obj in walk_twoples(_twoples):
_path = (*path_so_far, *_next_steps)
yield (_path, _obj)
if self.recursive and isinstance(_obj, str) and (_obj not in self._visiting):
# step further for iri or blanknode
yield from self._walk_from_subject(_obj, path_so_far=_path)

@functools.cached_property
def paths_by_iri(self) -> defaultdict[str, set[Propertypath]]:
_paths_by_iri: defaultdict[str, set[Propertypath]] = defaultdict(set)
for _path, _iris in self.iri_values.items():
for _iri in _iris:
_paths_by_iri[_iri].add(_path)
return _paths_by_iri

def iri_synonyms(self, iri: str) -> set[str]:
# note: extremely limited inference -- assumes objects of owl:sameAs are not used as subjects
_synonyms = (
_synonym
for _synonym in self.rdfdoc.q(iri, OWL.sameAs)
if is_worthwhile_iri(_synonym)
)
return {iri, *_synonyms}

def iris_synonyms(self, iris: typing.Iterable[str]) -> set[str]:
return {
_synonym
for _iri in iris
for _synonym in self.iri_synonyms(_iri)
}

@contextlib.contextmanager
def _visit(self, focus_obj):
assert focus_obj not in self._visiting
self._visiting.add(focus_obj)
yield
self._visiting.discard(focus_obj)


def walk_twoples(
twoples: rdf.RdfTwopleDictionary | rdf.Blanknode,
) -> typing.Iterator[tuple[Propertypath, rdf.RdfObject]]:
if isinstance(twoples, frozenset):
_iter_twoples = (
(_pred, _obj)
for _pred, _obj in twoples
if _pred not in SKIPPABLE_PROPERTIES
)
else:
_iter_twoples = (
(_pred, _obj)
for _pred, _obj_set in twoples.items()
if _pred not in SKIPPABLE_PROPERTIES
for _obj in _obj_set
)
for _pred, _obj in _iter_twoples:
_path = (_pred,)
if isinstance(_obj, frozenset):
for _innerpath, _innerobj in walk_twoples(_obj):
_fullpath = (*_path, *_innerpath)
yield (_fullpath, _innerobj)
else:
yield (_path, _obj)
Loading

0 comments on commit 28bb0ec

Please sign in to comment.