Skip to content

Commit

Permalink
code comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Thejas-bhat committed Dec 3, 2024
1 parent a4b2982 commit f45d2eb
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 11 deletions.
1 change: 1 addition & 0 deletions index/scorch/introducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ func (s *Scorch) introduceMerge(nextMerge *segmentMerge) {
}

skipped := true
// make the newly merged segments part of the newSnapshot being constructed
for i, newMergedSegment := range nextMerge.new {
// checking if this newly merged segment is worth keeping based on
// obsoleted doc count since the merge intro started
Expand Down
8 changes: 7 additions & 1 deletion index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ type mergeTaskIntroStatus struct {
skipped bool
}

// this is important when it comes to introducing multiple merged segments in a
// single introducer channel push. That way there is a check to ensure that the
// file count doesn't explode during the index's lifetime.
type mergedSegmentHistory struct {
workerID uint64
oldNewDocIDs []uint64
Expand Down Expand Up @@ -501,6 +504,9 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1)
filename := zapFileName(newSegmentID)
path := s.path + string(os.PathSeparator) + filename

// the newly merged segment is already flushed out to disk, just needs
// to be opened using mmap.
newDocNums, _, err :=
s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s)
if err != nil {
Expand All @@ -527,7 +533,7 @@ func (s *Scorch) mergeSegmentBasesParallel(snapshot *IndexSnapshot, flushableObj
// close the new merged segments
_ = closeNewMergedSegments(newMergedSegments)

// tbd: need a better way to handle error
// tbd: need a better way to consolidate errors
return nil, nil, errs[0]
}

Expand Down
18 changes: 9 additions & 9 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,13 @@ type flushable struct {
totDocs uint64
}

var DefaultNumPersisterWorkers = 1
// number workers which parallely perform an in-memory merge of the segments followed
// by a flush operation.
var DefaultNumPersisterWorkers = 4

// maximum size of data that a single worker is allowed to perform the in-memory
// merge operation.
var DefaultMaxSizeInMemoryMerge = 0
var DefaultMaxSizeInMemoryMerge = 200 * 1024 * 1024

func legacyFlushBehaviour() bool {
// DefaultMaxSizeInMemoryMerge = 0 is a special value to preserve the leagcy
Expand Down Expand Up @@ -417,6 +419,8 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (

flushSet = append(flushSet, val)
} else {
// constructs a flushSet where each flushable object contains a set of segments
// to be merged and flushed out to disk.
for i, snapshot := range snapshot.segment {
if totSize >= DefaultMaxSizeInMemoryMerge {
if len(sbs) >= DefaultMinSegmentsForInMemoryMerge {
Expand Down Expand Up @@ -480,12 +484,7 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
return false, nil
}

// deploy the workers, have a wait group which waits for the flush set to complete
// each worker
// 1. merges the segments using mergeSegmentBases()
// wait for group to finish
//
// construct equiv snapshot and do a persistSnapshotDirect()
// drains out (after merging in memory) the segments in the flushSet parallely
newSnapshot, newSegmentIDs, err := s.mergeSegmentBasesParallel(snapshot, flushSet)
if err != nil {
return false, err
Expand Down Expand Up @@ -694,7 +693,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
}
filenames = append(filenames, filename)
case segment.UnpersistedSegment:
// need to persist this to disk
// need to persist this to disk if its not part of exclude list (which
// restricts which in-memory segment to be persisted to disk)
if _, ok := exclude[segmentSnapshot.id]; !ok {
filename := zapFileName(segmentSnapshot.id)
path := filepath.Join(path, filename)
Expand Down
1 change: 0 additions & 1 deletion index/scorch/scorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func TestIndexInsertThenDelete(t *testing.T) {
t.Fatal(err)
}

fmt.Println("start delete")
err = idx.Delete("1")
if err != nil {
t.Errorf("Error deleting entry from index: %v", err)
Expand Down
1 change: 1 addition & 0 deletions index/scorch/snapshot_index_vr.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (i *IndexSnapshotVectorReader) Next(preAlloced *index.VectorDoc) (
nnum := next.Number()
rv.ID = docNumberToBytes(rv.ID, nnum+globalOffset)
rv.Score = float64(next.Score())

i.currID = rv.ID
i.currPosting = next

Expand Down
1 change: 1 addition & 0 deletions search/scorer/scorer_term.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (s *TermQueryScorer) SetQueryNorm(qnorm float64) {

// update the query weight
s.queryWeight = s.queryBoost * s.idf * s.queryNorm

if s.options.Explain {
childrenExplanations := make([]*search.Explanation, 3)
childrenExplanations[0] = &search.Explanation{
Expand Down

0 comments on commit f45d2eb

Please sign in to comment.