Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the thesaurus datatype to search for equivalent terms #2090

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
}
}

func NewSynonymDocument(id string) *Document {
return &Document{
id: id,
Fields: make([]Field, 0),
}
}

func (d *Document) Size() int {
sizeInBytes := reflectStaticSizeDocument + size.SizeOfPtr +
len(d.id)
Expand Down Expand Up @@ -133,3 +140,11 @@
func (d *Document) HasComposite() bool {
return len(d.CompositeFields) > 0
}

func (d *Document) VisitSynonymFields(visitor index.SynonymFieldVisitor) {

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.SynonymFieldVisitor

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.SynonymFieldVisitor

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.SynonymFieldVisitor

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.SynonymFieldVisitor

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.SynonymFieldVisitor

Check failure on line 144 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.SynonymFieldVisitor
for _, f := range d.Fields {
if sf, ok := f.(index.SynonymField); ok {

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, ubuntu-latest)

undefined: index.SynonymField

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.20.x, macos-latest)

undefined: index.SynonymField

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

undefined: index.SynonymField

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, macos-latest)

undefined: index.SynonymField

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, ubuntu-latest)

undefined: index.SynonymField

Check failure on line 146 in document/document.go

View workflow job for this annotation

GitHub Actions / test (1.22.x, macos-latest)

undefined: index.SynonymField
visitor(sf)
}
}
}
143 changes: 143 additions & 0 deletions document/field_synonym.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2024 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package document

import (
"reflect"

"github.com/blevesearch/bleve/v2/analysis"
"github.com/blevesearch/bleve/v2/size"
index "github.com/blevesearch/bleve_index_api"
)

var reflectStaticSizeSynonymField int

func init() {
var f SynonymField
reflectStaticSizeSynonymField = int(reflect.TypeOf(f).Size())
}

const DefaultSynonymIndexingOptions = index.IndexField

type SynonymField struct {
name string
analyzer analysis.Analyzer
options index.FieldIndexingOptions
input []string
synonyms []string
numPlainTextBytes uint64

// populated during analysis
synonymMap map[string][]string
}

func (s *SynonymField) Size() int {
return reflectStaticSizeSynonymField + size.SizeOfPtr +
len(s.name)
}

func (s *SynonymField) Name() string {
return s.name
}

func (s *SynonymField) ArrayPositions() []uint64 {
return nil
}

func (s *SynonymField) Options() index.FieldIndexingOptions {
return s.options
}

func (s *SynonymField) NumPlainTextBytes() uint64 {
return s.numPlainTextBytes
}

func (s *SynonymField) AnalyzedLength() int {
return 0
}

func (s *SynonymField) EncodedFieldType() byte {
return 'y'
}

func (s *SynonymField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return nil
}

func (s *SynonymField) Analyze() {
var analyzedInput []string
if len(s.input) > 0 {
analyzedInput = make([]string, 0, len(s.input))
for _, term := range s.input {
analyzedInput = append(analyzedInput, analyzeSynonymTerm(term, s.analyzer))
}
}
analyzedSynonyms := make([]string, 0, len(s.synonyms))
for _, syn := range s.synonyms {
analyzedSynonyms = append(analyzedSynonyms, analyzeSynonymTerm(syn, s.analyzer))
}
s.synonymMap = processSynonymData(analyzedInput, analyzedSynonyms)
}

func (s *SynonymField) Value() []byte {
return nil
}

func (s *SynonymField) IterateSynonyms(visitor func(term string, synonyms []string)) {
for term, synonyms := range s.synonymMap {
visitor(term, synonyms)
}
}

func NewSynonymField(name string, analyzer analysis.Analyzer, input []string, synonyms []string) *SynonymField {
return &SynonymField{
name: name,
analyzer: analyzer,
options: DefaultSynonymIndexingOptions,
input: input,
synonyms: synonyms,
}
}

func processSynonymData(input []string, synonyms []string) map[string][]string {
var synonymMap map[string][]string
if len(input) > 0 {
// Map each term to the same list of synonyms.
synonymMap = make(map[string][]string, len(input))
for _, term := range input {
synonymMap[term] = append([]string(nil), synonyms...) // Avoid sharing slices.
}
} else {
synonymMap = make(map[string][]string, len(synonyms))
// Precompute a map where each synonym points to all other synonyms.
for i, elem := range synonyms {
synonymMap[elem] = make([]string, 0, len(synonyms)-1)
for j, otherElem := range synonyms {
if i != j {
synonymMap[elem] = append(synonymMap[elem], otherElem)
}
}
}
}
return synonymMap
}

func analyzeSynonymTerm(term string, analyzer analysis.Analyzer) string {
tokenStream := analyzer.Analyze([]byte(term))
if len(tokenStream) == 0 {
return term
}
return string(tokenStream[0].Term)
}
2 changes: 2 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
ErrorEmptyID
ErrorIndexReadInconsistency
ErrorTwoPhaseSearchInconsistency
ErrorSynonymSearchNotSupported
)

// Error represents a more strongly typed bleve error for detecting
Expand All @@ -49,4 +50,5 @@ var errorMessages = map[Error]string{
ErrorEmptyID: "document ID cannot be empty",
ErrorIndexReadInconsistency: "index read inconsistency detected",
ErrorTwoPhaseSearchInconsistency: "2-phase search failed, likely due to an overlapping topology change",
ErrorSynonymSearchNotSupported: "synonym search not supported",
}
55 changes: 55 additions & 0 deletions index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package bleve

import (
"context"
"fmt"

"github.com/blevesearch/bleve/v2/index/upsidedown"

Expand Down Expand Up @@ -63,6 +64,36 @@ func (b *Batch) Index(id string, data interface{}) error {
return nil
}

func (b *Batch) IndexSynonym(id string, collection string, definition *SynonymDefinition) error {
if id == "" {
return ErrorEmptyID
}
if eventIndex, ok := b.index.(index.EventIndex); ok {
eventIndex.FireIndexEvent()
}
synMap, ok := b.index.Mapping().(mapping.SynonymMapping)
if !ok {
return ErrorSynonymSearchNotSupported
}

if err := definition.Validate(); err != nil {
return err
}

doc := document.NewSynonymDocument(id)
err := synMap.MapSynonymDocument(doc, collection, definition.Input, definition.Synonyms)
if err != nil {
return err
}
b.internal.Update(doc)

b.lastDocSize = uint64(doc.Size() +
len(id) + size.SizeOfString) // overhead from internal
b.totalSize += b.lastDocSize

return nil
}

func (b *Batch) LastDocSize() uint64 {
return b.lastDocSize
}
Expand Down Expand Up @@ -323,3 +354,27 @@ type IndexCopyable interface {
// FileSystemDirectory is the default implementation for the
// index.Directory interface.
type FileSystemDirectory string

// SynonymDefinition represents a synonym mapping in Bleve.
// Each instance associates one or more input terms with a list of synonyms,
// defining how terms are treated as equivalent in searches.
type SynonymDefinition struct {
// Input is an optional list of terms for unidirectional synonym mapping.
// When terms are specified in Input, they will map to the terms in Synonyms,
// making the relationship unidirectional (each Input maps to all Synonyms).
// If Input is omitted, the relationship is bidirectional among all Synonyms.
Input []string `json:"input"`

// Synonyms is a list of terms that are considered equivalent.
// If Input is specified, each term in Input will map to each term in Synonyms.
// If Input is not specified, the Synonyms list will be treated bidirectionally,
// meaning each term in Synonyms is treated as synonymous with all others.
Synonyms []string `json:"synonyms"`
}

func (sd *SynonymDefinition) Validate() error {
if len(sd.Synonyms) == 0 {
return fmt.Errorf("synonym definition must have at least one synonym")
}
return nil
}
91 changes: 89 additions & 2 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var reflectStaticSizeIndexSnapshot int
// exported variable, or at the index level by setting the FieldTFRCacheThreshold
// in the kvConfig.
var DefaultFieldTFRCacheThreshold uint64 = 10
var DefaultSynonymTermReaderCacheThreshold uint64 = 10

func init() {
var is interface{} = IndexSnapshot{}
Expand Down Expand Up @@ -87,8 +88,9 @@ type IndexSnapshot struct {
m sync.Mutex // Protects the fields that follow.
refs int64

m2 sync.Mutex // Protects the fields that follow.
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
m2 sync.Mutex // Protects the fields that follow.
fieldTFRs map[string][]*IndexSnapshotTermFieldReader // keyed by field, recycled TFR's
synonymTermReaders map[string][]*IndexSnapshotSynonymTermReader // keyed by thesaurus name, recycled thesaurus readers
}

func (i *IndexSnapshot) Segments() []*SegmentSnapshot {
Expand Down Expand Up @@ -649,6 +651,15 @@ func (is *IndexSnapshot) getFieldTFRCacheThreshold() uint64 {
return DefaultFieldTFRCacheThreshold
}

func (is *IndexSnapshot) getSynonymTermReaderCacheThreshold() uint64 {
if is.parent.config != nil {
if _, ok := is.parent.config["SynonymTermReaderCacheThreshold"]; ok {
return is.parent.config["SynonymTermReaderCacheThreshold"].(uint64)
}
}
return DefaultSynonymTermReaderCacheThreshold
}

func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReader) {
if !tfr.recycle {
// Do not recycle an optimized unadorned term field reader (used for
Expand Down Expand Up @@ -677,6 +688,25 @@ func (is *IndexSnapshot) recycleTermFieldReader(tfr *IndexSnapshotTermFieldReade
is.m2.Unlock()
}

func (is *IndexSnapshot) recycleSynonymTermReader(str *IndexSnapshotSynonymTermReader) {
is.parent.rootLock.RLock()
obsolete := is.parent.root != is
is.parent.rootLock.RUnlock()
if obsolete {
// if we're not the current root (mutations happened), don't bother recycling
return
}

is.m2.Lock()
if is.synonymTermReaders == nil {
is.synonymTermReaders = map[string][]*IndexSnapshotSynonymTermReader{}
}
if uint64(len(is.synonymTermReaders[str.name])) < is.getSynonymTermReaderCacheThreshold() {
is.synonymTermReaders[str.name] = append(is.synonymTermReaders[str.name], str)
}
is.m2.Unlock()
}

func docNumberToBytes(buf []byte, in uint64) []byte {
if len(buf) != 8 {
if cap(buf) >= 8 {
Expand Down Expand Up @@ -956,3 +986,60 @@ func (is *IndexSnapshot) CloseCopyReader() error {
// close the index snapshot normally
return is.Close()
}

func (is *IndexSnapshot) allocSynonymTermReader(name string) (str *IndexSnapshotSynonymTermReader) {
is.m2.Lock()
if is.synonymTermReaders != nil {
strs := is.synonymTermReaders[name]
last := len(strs) - 1
if last >= 0 {
str = strs[last]
strs[last] = nil
is.synonymTermReaders[name] = strs[:last]
is.m2.Unlock()
return
}
}
is.m2.Unlock()
return &IndexSnapshotSynonymTermReader{}
}

func (is *IndexSnapshot) SynonymTermReader(ctx context.Context, thesaurusName string, term []byte) (index.SynonymTermReader, error) {
rv := is.allocSynonymTermReader(thesaurusName)

rv.name = thesaurusName
rv.snapshot = is
if rv.postings == nil {
rv.postings = make([]segment.SynonymsList, len(is.segment))
}
if rv.iterators == nil {
rv.iterators = make([]segment.SynonymsIterator, len(is.segment))
}
rv.segmentOffset = 0

if rv.thesauri == nil {
rv.thesauri = make([]segment.Thesaurus, len(is.segment))
for i, s := range is.segment {
if synSeg, ok := s.segment.(segment.SynonymSegment); ok {
thes, err := synSeg.Thesaurus(thesaurusName)
if err != nil {
return nil, err
}
rv.thesauri[i] = thes
}
}
}

for i, s := range is.segment {
if _, ok := s.segment.(segment.SynonymSegment); ok {
pl, err := rv.thesauri[i].SynonymsList(term, s.deleted, rv.postings[i])
if err != nil {
return nil, err
}
rv.postings[i] = pl

rv.iterators[i] = pl.Iterator(rv.iterators[i])
}
}
return rv, nil
}
Loading
Loading