Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Text Embedding Function #36366

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ require (
require (
cloud.google.com/go/storage v1.43.0
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/aws/aws-sdk-go-v2/config v1.28.6
github.com/aws/aws-sdk-go-v2/credentials v1.17.47
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.23.0
github.com/bits-and-blooms/bitset v1.10.0
github.com/bytedance/sonic v1.12.2
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -101,6 +105,17 @@ require (
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 // indirect
github.com/apache/thrift v0.18.1 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
Expand Down
30 changes: 30 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,36 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 h1:lL7IfaFzngfx0ZwUGOZdsFFnQ5uLvR0hWqqhyE7Q9M8=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc=
github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo=
github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.23.0 h1:mfV5tcLXeRLbiyI4EHoHWH1sIU7JvbfXVvymUCIgZEo=
github.com/aws/aws-sdk-go-v2/service/bedrockruntime v1.23.0/go.mod h1:YSSgYnasDKm5OjU3bOPkaz+2PFO6WjEQGIA6KQNsR3Q=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 h1:50+XsN70RS7dwJ2CkVNXzj7U2L1HKP8nqTd3XWEXBN4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 h1:s4074ZO1Hk8qv65GqNXqDjmkf4HSQqJukaLuuW0TpDA=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8=
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b h1:5JgaFtHFRnOPReItxvhMDXbvuBkjSWE+9glJyF466yw=
Expand Down
102 changes: 102 additions & 0 deletions internal/datanode/importv2/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/testutil"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -435,6 +436,107 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.NoError(err)
}

func (s *SchedulerSuite) TestScheduler_ImportFileWithFunction() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
})
ts := function.CreateOpenAIEmbeddingServer()
defer ts.Close()
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.MaxLengthKey, Value: "128"},
},
},
{
FieldID: 101,
Name: "vec",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "4",
},
},
},
{
FieldID: 102,
Name: "int64",
DataType: schemapb.DataType_Int64,
},
},
Functions: []*schemapb.FunctionSchema{
{
Name: "test",
Type: schemapb.FunctionType_TextEmbedding,
InputFieldIds: []int64{100},
InputFieldNames: []string{"text"},
OutputFieldIds: []int64{101},
OutputFieldNames: []string{"vec"},
Params: []*commonpb.KeyValuePair{
{Key: "provider", Value: "openai"},
{Key: "model_name", Value: "text-embedding-ada-002"},
{Key: "api_key", Value: "mock"},
{Key: "url", Value: ts.URL},
{Key: "dim", Value: "4"},
},
},
},
}

var once sync.Once
data, err := testutil.CreateInsertData(schema, s.numRows)
s.NoError(err)
s.reader = importutilv2.NewMockReader(s.T())
s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) {
var res *storage.InsertData
once.Do(func() {
res = data
})
if res != nil {
return res, nil
}
return nil, io.EOF
})
importReq := &datapb.ImportRequest{
JobID: 10,
TaskID: 11,
CollectionID: 12,
PartitionIDs: []int64{13},
Vchannels: []string{"v0"},
Schema: schema,
Files: []*internalpb.ImportFile{
{
Paths: []string{"dummy.json"},
},
},
Ts: 1000,
IDRange: &datapb.IDRange{
Begin: 0,
End: int64(s.numRows),
},
RequestSegments: []*datapb.ImportRequestSegment{
{
SegmentID: 14,
PartitionID: 13,
Vchannel: "v0",
},
},
}
importTask := NewImportTask(importReq, s.manager, s.syncMgr, s.cm)
s.manager.Add(importTask)
err = importTask.(*ImportTask).importFile(s.reader)
s.NoError(err)
}

func TestScheduler(t *testing.T) {
suite.Run(t, new(SchedulerSuite))
}
27 changes: 27 additions & 0 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,39 @@ func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error {
}

func RunEmbeddingFunction(task *ImportTask, data *storage.InsertData) error {
if err := RunBm25Function(task, data); err != nil {
return err
}
if err := RunDenseEmbedding(task, data); err != nil {
return err
}
return nil
}

func RunDenseEmbedding(task *ImportTask, data *storage.InsertData) error {
schema := task.GetSchema()
if function.HasFunctions(schema.Functions, []int64{}) {
exec, err := function.NewFunctionExecutor(schema)
if err != nil {
return err
}
if err := exec.ProcessBulkInsert(data); err != nil {
return err
}
}
return nil
}

func RunBm25Function(task *ImportTask, data *storage.InsertData) error {
fns := task.GetSchema().GetFunctions()
for _, fn := range fns {
runner, err := function.NewFunctionRunner(task.GetSchema(), fn)
if err != nil {
return err
}
if runner == nil {
continue
}
inputDatas := make([]any, 0, len(fn.InputFieldIds))
for _, inputFieldID := range fn.InputFieldIds {
inputDatas = append(inputDatas, data.Data[inputFieldID].GetDataRows())
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,8 +1030,8 @@ func generatePlaceholderGroup(ctx context.Context, body string, collSchema *sche

if vectorField.GetIsFunctionOutput() {
for _, function := range collSchema.Functions {
if function.Type == schemapb.FunctionType_BM25 {
// TODO: currently only BM25 function is supported, thus guarantees one input field to one output field
if function.Type == schemapb.FunctionType_BM25 || function.Type == schemapb.FunctionType_TextEmbedding {
// TODO: currently only BM25 & text embedding function is supported, thus guarantees one input field to one output field
if function.OutputFieldNames[0] == vectorField.Name {
dataType = schemapb.DataType_VarChar
}
Expand Down
3 changes: 3 additions & 0 deletions internal/flushcommon/pipeline/flow_graph_embedding_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func newEmbeddingNode(channelName string, schema *schemapb.CollectionSchema) (*e
if err != nil {
return nil, err
}
if functionRunner == nil {
continue
}
node.functionRunners[tf.GetId()] = functionRunner
}
return node, nil
Expand Down
11 changes: 11 additions & 0 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/util/function"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -141,6 +142,16 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
}
it.schema = schema.CollectionSchema

// Calculate embedding fields
if function.HasFunctions(schema.CollectionSchema.Functions, []int64{}) {
exec, err := function.NewFunctionExecutor(schema.CollectionSchema)
if err != nil {
return err
}
if err := exec.ProcessInsert(it.insertMsg); err != nil {
return err
}
}
rowNums := uint32(it.insertMsg.NRows())
// set insertTask.rowIDs
var rowIDBegin UniqueID
Expand Down
Loading