Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support hooks in analysis pipeline #1887

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 88 additions & 2 deletions analysis/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,89 @@ type TokenFilter interface {
}

type Analyzer interface {
Analyze([]byte) TokenStream
Type() string
// return value of this method depends on the type of analyzer
Analyze([]byte) interface{}
}

const (
TokensAnalyzerType = "token"
HookTokensAnalyzerType = "hook_token"
VectorAnalyzerType = "vector"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move vector related stuff to a separate file with "vector" build tag

HookVectorAnalyzerType = "hook_vector"
)

// # Analyzer Analyze() return type
type TokensAnalyzer struct {
Tokens TokenStream
}
type HookTokensAnalyzer struct {
Tokens TokenStream
Err error
}
type VectorAnalyzer []float64
type HookVectorAnalyzer struct {
Vector []float64
Err error
}

func AnalyzeForTokens(analyzer Analyzer, input []byte) (TokenStream, error) {
Copy link
Contributor Author

@moshaad7 moshaad7 Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment:
// A utility function, helpful for analyzing an input to generate TokenStream ( and error, if any )

Previously, Analyze() method of an analyzer to return TokenStream.
But as per the change in this PR, Analyze() method will now return a value of type interface{}.
( Validating and using it can be done based on analyzer.Type() )

Thus, For the benefit of users of old Analyzer interface, this utiity will come handly , to migrate to new Analyzer interface.

analyzerType := analyzer.Type()
if analyzerType != TokensAnalyzerType &&
analyzerType != HookTokensAnalyzerType {
return nil, fmt.Errorf("cannot analyze text with analyzer of type: %s",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alternate error msg: "given analyzer is not compatible to be used as a token analyzer"

analyzerType)
}

// analyze ouput
analyzedOp := analyzer.Analyze(input)
err := CheckAnalyzed(analyzedOp, analyzer)
if err != nil {
return nil, fmt.Errorf("incompatible analysis result for analyzer "+
"of type: %s, err:%+v", analyzerType, err)
}

if analyzerType == TokensAnalyzerType {
op := analyzedOp.(TokensAnalyzer)
return op.Tokens, nil
}

// analyzerType == analysis.HookTokensAnalyzerType

op := analyzedOp.(HookTokensAnalyzer)
if op.Err != nil {
return nil, fmt.Errorf("analyzer hook failed, err:%+v", op.Err)
}

return op.Tokens, nil
}

func CheckAnalyzed(value interface{}, analyzer Analyzer) error {
switch analyzer.Type() {
case TokensAnalyzerType:
_, ok := value.(TokensAnalyzer)
if !ok {
return fmt.Errorf("expected TokensAnalyzer, got %T", value)
}
case HookTokensAnalyzerType:
_, ok := value.(HookTokensAnalyzer)
if !ok {
return fmt.Errorf("expected HookTokensAnalyzer, got %T", value)
}
case VectorAnalyzerType:
_, ok := value.(VectorAnalyzer)
if !ok {
return fmt.Errorf("expected VectorAnalyzer, got %T", value)
}
case HookVectorAnalyzerType:
_, ok := value.(HookVectorAnalyzer)
if !ok {
return fmt.Errorf("expected HookVectorAnalyzer, got %T", value)
}
default:
return fmt.Errorf("unknown analyzer type %s", analyzer.Type())
}
return nil
}

type DefaultAnalyzer struct {
Expand All @@ -82,7 +164,7 @@ type DefaultAnalyzer struct {
TokenFilters []TokenFilter
}

func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream {
func (a *DefaultAnalyzer) Analyze(input []byte) interface{} {
if a.CharFilters != nil {
for _, cf := range a.CharFilters {
input = cf.Filter(input)
Expand All @@ -97,6 +179,10 @@ func (a *DefaultAnalyzer) Analyze(input []byte) TokenStream {
return tokens
}

func (a *DefaultAnalyzer) Type() string {
return TokensAnalyzerType
}

var ErrInvalidDateTime = fmt.Errorf("unable to parse datetime with any of the layouts")

var ErrInvalidTimestampString = fmt.Errorf("unable to parse timestamp string")
Expand Down
2 changes: 1 addition & 1 deletion document/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Field interface {
// "doc1", then "field" in "doc2".
ArrayPositions() []uint64
Options() index.FieldIndexingOptions
Analyze()
Analyze() error
Value() []byte

// NumPlainTextBytes should return the number of plain text bytes
Expand Down
4 changes: 3 additions & 1 deletion document/field_boolean.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (b *BooleanField) Options() index.FieldIndexingOptions {
return b.options
}

func (b *BooleanField) Analyze() {
func (b *BooleanField) Analyze() error {
tokens := make(analysis.TokenStream, 0)
tokens = append(tokens, &analysis.Token{
Start: 0,
Expand All @@ -73,6 +73,8 @@ func (b *BooleanField) Analyze() {

b.length = len(tokens)
b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options)

return nil
}

func (b *BooleanField) Value() []byte {
Expand Down
3 changes: 2 additions & 1 deletion document/field_composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func (c *CompositeField) Options() index.FieldIndexingOptions {
return c.options
}

func (c *CompositeField) Analyze() {
func (c *CompositeField) Analyze() error {
return nil
}

func (c *CompositeField) Value() []byte {
Expand Down
4 changes: 3 additions & 1 deletion document/field_datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (n *DateTimeField) splitValue() (numeric.PrefixCoded, string) {
return numeric.PrefixCoded(parts[0]), string(parts[1])
}

func (n *DateTimeField) Analyze() {
func (n *DateTimeField) Analyze() error {
valueWithoutLayout, _ := n.splitValue()
tokens := make(analysis.TokenStream, 0)
tokens = append(tokens, &analysis.Token{
Expand Down Expand Up @@ -126,6 +126,8 @@ func (n *DateTimeField) Analyze() {

n.length = len(tokens)
n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options)

return nil
}

func (n *DateTimeField) Value() []byte {
Expand Down
4 changes: 3 additions & 1 deletion document/field_geopoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (n *GeoPointField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return n.frequencies
}

func (n *GeoPointField) Analyze() {
func (n *GeoPointField) Analyze() error {
tokens := make(analysis.TokenStream, 0, 8)
tokens = append(tokens, &analysis.Token{
Start: 0,
Expand Down Expand Up @@ -127,6 +127,8 @@ func (n *GeoPointField) Analyze() {

n.length = len(tokens)
n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options)

return nil
}

func (n *GeoPointField) Value() []byte {
Expand Down
4 changes: 3 additions & 1 deletion document/field_geoshape.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (n *GeoShapeField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return n.frequencies
}

func (n *GeoShapeField) Analyze() {
func (n *GeoShapeField) Analyze() error {
// compute the bytes representation for the coordinates
tokens := make(analysis.TokenStream, 0)
tokens = append(tokens, &analysis.Token{
Expand All @@ -104,6 +104,8 @@ func (n *GeoShapeField) Analyze() {

n.length = len(tokens)
n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options)

return nil
}

func (n *GeoShapeField) Value() []byte {
Expand Down
3 changes: 2 additions & 1 deletion document/field_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (n *IPField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return n.frequencies
}

func (b *IPField) Analyze() {
func (b *IPField) Analyze() error {

tokens := analysis.TokenStream{
&analysis.Token{
Expand All @@ -87,6 +87,7 @@ func (b *IPField) Analyze() {
}
b.length = 1
b.frequencies = analysis.TokenFrequency(tokens, b.arrayPositions, b.options)
return nil
}

func (b *IPField) Value() []byte {
Expand Down
3 changes: 2 additions & 1 deletion document/field_numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (n *NumericField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return n.frequencies
}

func (n *NumericField) Analyze() {
func (n *NumericField) Analyze() error {
tokens := make(analysis.TokenStream, 0)
tokens = append(tokens, &analysis.Token{
Start: 0,
Expand Down Expand Up @@ -108,6 +108,7 @@ func (n *NumericField) Analyze() {

n.length = len(tokens)
n.frequencies = analysis.TokenFrequency(tokens, n.arrayPositions, n.options)
return nil
}

func (n *NumericField) Value() []byte {
Expand Down
10 changes: 8 additions & 2 deletions document/field_text.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *TextField) AnalyzedTokenFrequencies() index.TokenFrequencies {
return t.frequencies
}

func (t *TextField) Analyze() {
func (t *TextField) Analyze() error {
var tokens analysis.TokenStream
if t.analyzer != nil {
bytesToAnalyze := t.Value()
Expand All @@ -84,7 +84,11 @@ func (t *TextField) Analyze() {
copy(bytesCopied, bytesToAnalyze)
bytesToAnalyze = bytesCopied
}
tokens = t.analyzer.Analyze(bytesToAnalyze)
var err error
tokens, err = analysis.AnalyzeForTokens(t.analyzer, bytesToAnalyze)
if err != nil {
return err
}
} else {
tokens = analysis.TokenStream{
&analysis.Token{
Expand All @@ -98,6 +102,8 @@ func (t *TextField) Analyze() {
}
t.length = len(tokens) // number of tokens in this doc field
t.frequencies = analysis.TokenFrequency(tokens, t.arrayPositions, t.options)

return nil
}

func (t *TextField) Analyzer() analysis.Analyzer {
Expand Down
31 changes: 28 additions & 3 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scorch
import (
"encoding/json"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -77,6 +78,13 @@ type Scorch struct {
segPlugin SegmentPlugin

spatialPlugin index.SpatialAnalyzerPlugin

failedAnalysisMutex sync.RWMutex
// note: this can grow unboundedly.
// In future, we may want to limit the size of this map.
// (something like, only keep the last 1000 failed analysis)
// In addition to that, we can store total number of failed analysis so far.
failedAnalysis map[string]map[string]error // docID -> fieldName -> error
}

// AsyncPanicError is passed to scorch asyncErrorHandler when panic occurs in scorch background process
Expand Down Expand Up @@ -112,6 +120,8 @@ func NewScorch(storeName string,
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,

failedAnalysis: make(map[string]map[string]error),
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -396,7 +406,16 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
if doc != nil {
// put the work on the queue
s.analysisQueue.Queue(func() {
analyze(doc, s.setSpatialAnalyzerPlugin)
fieldsError := analyze(doc, s.setSpatialAnalyzerPlugin)
if len(fieldsError) > 0 {
s.failedAnalysisMutex.Lock()
s.failedAnalysis[doc.ID()] = fieldsError
s.failedAnalysisMutex.Unlock()

log.Printf("AnalysisReport: docID: %s, fieldsError: %v",
doc.ID(), fieldsError)
}

resultChan <- doc
})
}
Expand Down Expand Up @@ -658,14 +677,18 @@ func (s *Scorch) setSpatialAnalyzerPlugin(f index.Field) {
}
}

func analyze(d index.Document, fn customAnalyzerPluginInitFunc) {
func analyze(d index.Document, fn customAnalyzerPluginInitFunc) map[string]error {
rv := make(map[string]error)
d.VisitFields(func(field index.Field) {
if field.Options().IsIndexed() {
if fn != nil {
fn(field)
}

field.Analyze()
err := field.Analyze()
if err != nil {
rv[field.Name()] = err
}

if d.HasComposite() && field.Name() != "_id" {
// see if any of the composite fields need this
Expand All @@ -675,6 +698,8 @@ func analyze(d index.Document, fn customAnalyzerPluginInitFunc) {
}
}
})

return rv
}

func (s *Scorch) AddEligibleForRemoval(epoch uint64) {
Expand Down
8 changes: 7 additions & 1 deletion mapping/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,13 @@ func (im *IndexMappingImpl) AnalyzeText(analyzerName string, text []byte) (analy
if err != nil {
return nil, err
}
return analyzer.Analyze(text), nil

tokens, err := analysis.AnalyzeForTokens(analyzer, text)
if err != nil {
return nil, err
}

return tokens, nil
}

// FieldAnalyzer returns the name of the analyzer used on a field.
Expand Down
20 changes: 19 additions & 1 deletion registry/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,25 @@ func NewAnalyzerCache() *AnalyzerCache {
}
}

func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) {
func AnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (
interface{}, error) {
analyzer, errStatic := StaticAnalyzerBuild(name, config, cache)
if errStatic == nil {
return analyzer, nil
}

analyzer, errTokensAnalyzerHook := HookTokensAnalyzerBuild(name, config,
cache)
if errTokensAnalyzerHook == nil {
return analyzer, nil
}

return nil, fmt.Errorf("error building analyzer: %v (static: %v, "+
"tokens_analyzer_hook: %v)", errTokensAnalyzerHook, errStatic,
errTokensAnalyzerHook)
}

func StaticAnalyzerBuild(name string, config map[string]interface{}, cache *Cache) (interface{}, error) {
cons, registered := analyzers[name]
if !registered {
return nil, fmt.Errorf("no analyzer with name or type '%s' registered", name)
Expand Down
Loading
Loading