Skip to content

Commit

Permalink
enhance: add metrics for counting number of nun-zeros/tokens of spars…
Browse files Browse the repository at this point in the history
…e/FTS search (#38329)

sparse vectors may have arbitrary number of non zeros and it is hard to
optimize without knowing the actual distribution of nnz. this PR adds a
metric for analyzing that.

issue: #35853

comparing with #38328, this
includes also metric for FTS in query node delegator

also fixed a bug of sparse when searching by pk

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian authored Dec 12, 2024
1 parent b14a0c4 commit 75e64b9
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 6 deletions.
1 change: 1 addition & 0 deletions internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
if err != nil {
return err
}
metrics.ProxySearchSparseNumNonZeros.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), t.collectionName).Observe(float64(typeutil.EstimateSparseVectorNNZFromPlaceholderGroup(t.request.PlaceholderGroup, int(t.request.GetNq()))))
t.SearchRequest.PlaceholderGroup = t.request.PlaceholderGroup
t.SearchRequest.Topk = queryInfo.GetTopk()
t.SearchRequest.MetricType = queryInfo.GetMetricType()
Expand Down
4 changes: 4 additions & 0 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,10 @@ func (sd *shardDelegator) buildBM25IDF(req *internalpb.SearchRequest) (float64,
return 0, err
}

for _, idf := range idfSparseVector {
metrics.QueryNodeSearchFTSNumTokens.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(sd.collectionID)).Observe(float64(typeutil.SparseFloatRowElementCount(idf)))
}

err = SetBM25Params(req, avgdl)
if err != nil {
return 0, err
Expand Down
12 changes: 12 additions & 0 deletions pkg/metrics/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,16 @@ var (
Name: "recall_search_cnt",
Help: "counter of recall search",
}, []string{nodeIDLabelName, queryTypeLabelName, collectionName})

// ProxySearchSparseNumNonZeros records the estimated number of non-zeros in each sparse search task
ProxySearchSparseNumNonZeros = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "search_sparse_num_non_zeros",
Help: "the number of non-zeros in each sparse search task",
Buckets: buckets,
}, []string{nodeIDLabelName, collectionName})
)

// RegisterProxy registers Proxy metrics
Expand Down Expand Up @@ -479,6 +489,8 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxyRetrySearchResultInsufficientCount)
registry.MustRegister(ProxyRecallSearchCount)

registry.MustRegister(ProxySearchSparseNumNonZeros)

RegisterStreamingServiceClient(registry)
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,18 @@ var (
nodeIDLabelName,
})

QueryNodeSearchFTSNumTokens = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "search_fts_num_tokens",
Help: "number of tokens in each Full Text Search search task",
Buckets: buckets,
}, []string{
nodeIDLabelName,
collectionIDLabelName,
})

QueryNodeSearchGroupSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -832,6 +844,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeEvictedReadReqCount)
registry.MustRegister(QueryNodeSearchGroupTopK)
registry.MustRegister(QueryNodeSearchTopK)
registry.MustRegister(QueryNodeSearchFTSNumTokens)
registry.MustRegister(QueryNodeNumFlowGraphs)
registry.MustRegister(QueryNodeNumEntities)
registry.MustRegister(QueryNodeEntitiesSize)
Expand Down
7 changes: 1 addition & 6 deletions pkg/util/funcutil/placeholdergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package funcutil

import (
"encoding/binary"
"fmt"
"math"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -97,14 +96,10 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place
return nil, errors.New("vector data is not schemapb.VectorField_SparseFloatVector")
}
vec := vectors.SparseFloatVector
bytes, err := proto.Marshal(vec)
if err != nil {
return nil, fmt.Errorf("failed to marshal schemapb.SparseFloatArray to bytes: %w", err)
}
placeholderValue := &commonpb.PlaceholderValue{
Tag: "$0",
Type: commonpb.PlaceholderType_SparseFloatVector,
Values: [][]byte{bytes},
Values: vec.Contents,
}
return placeholderValue, nil
case schemapb.DataType_VarChar:
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/typeutil/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,3 +1919,11 @@ func SparseFloatRowDim(row []byte) int64 {
}
return int64(SparseFloatRowIndexAt(row, SparseFloatRowElementCount(row)-1)) + 1
}

// placeholderGroup is a serialized PlaceholderGroup, return estimated total
// number of non-zero elements of all the sparse vectors in the placeholderGroup
// This is a rough estimate, and should be used only for statistics.
func EstimateSparseVectorNNZFromPlaceholderGroup(placeholderGroup []byte, nq int) int {
overheadBytes := math.Max(10, float64(nq*3))
return (len(placeholderGroup) - int(overheadBytes)) / 8
}
65 changes: 65 additions & 0 deletions pkg/util/typeutil/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"fmt"
"math"
"math/rand"
"reflect"
"testing"

Expand Down Expand Up @@ -2714,3 +2715,67 @@ func TestParseJsonSparseFloatRowBytes(t *testing.T) {
assert.Error(t, err)
})
}

// test EstimateSparseVectorNNZFromPlaceholderGroup: given a PlaceholderGroup
// with various nq and averageNNZ, test if the estimated number of non-zero
// elements is close to the actual number.
func TestSparsePlaceholderGroupSize(t *testing.T) {
nqs := []int{1, 10, 100, 1000, 10000}
averageNNZs := []int{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048}
numCases := 0
casesWithLargeError := 0
for _, nq := range nqs {
for _, averageNNZ := range averageNNZs {
variants := make([]int, 0)
for i := 1; i <= averageNNZ/2; i *= 2 {
variants = append(variants, i)
}

for _, variant := range variants {
numCases++
contents := make([][]byte, nq)
contentsSize := 0
totalNNZ := 0
for i := range contents {
// nnz of each row is in range [averageNNZ - variant/2, averageNNZ + variant/2] and at least 1.
nnz := averageNNZ + variant/2 + rand.Intn(variant)
if nnz < 1 {
nnz = 1
}
indices := make([]uint32, nnz)
values := make([]float32, nnz)
for j := 0; j < nnz; j++ {
indices[j] = uint32(i*averageNNZ + j)
values[j] = float32(i*averageNNZ + j)
}
contents[i] = CreateSparseFloatRow(indices, values)
contentsSize += len(contents[i])
totalNNZ += nnz
}

placeholderGroup := &commonpb.PlaceholderGroup{
Placeholders: []*commonpb.PlaceholderValue{
{
Tag: "$0",
Type: commonpb.PlaceholderType_SparseFloatVector,
Values: contents,
},
},
}
bytes, _ := proto.Marshal(placeholderGroup)
estimatedNNZ := EstimateSparseVectorNNZFromPlaceholderGroup(bytes, nq)
errorRatio := (float64(totalNNZ-estimatedNNZ) / float64(totalNNZ)) * 100
assert.Less(t, errorRatio, 10.0)
if errorRatio > 5.0 {
casesWithLargeError++
}
// keep the logs for easy debugging.
// fmt.Printf("nq: %d, total nnz: %d, overhead bytes: %d, len of bytes: %d\n", nq, totalNNZ, len(bytes)-contentsSize, len(bytes))
// fmt.Printf("\tnq: %d, total nnz: %d, estimated nnz: %d, diff: %d, error ratio: %f%%\n", nq, totalNNZ, estimatedNNZ, totalNNZ-estimatedNNZ, errorRatio)
}
}
}
largeErrorRatio := (float64(casesWithLargeError) / float64(numCases)) * 100
// no more than 2% cases have large error ratio.
assert.Less(t, largeErrorRatio, 2.0)
}

0 comments on commit 75e64b9

Please sign in to comment.