Skip to content

Commit

Permalink
enhance: support null and default value in go sdk (#819)
Browse files Browse the repository at this point in the history
milvus-io/milvus#31728

Signed-off-by: lixinguo <[email protected]>
Co-authored-by: lixinguo <[email protected]>
  • Loading branch information
smellthemoon and lixinguo authored Oct 9, 2024
1 parent ef72f07 commit 8008f14
Show file tree
Hide file tree
Showing 20 changed files with 2,509 additions and 201 deletions.
10 changes: 10 additions & 0 deletions client/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func (c *GrpcClient) processInsertColumns(colSchema *entity.Schema, columns ...e
_, has := mNameColumn[field.Name]
if !has &&
!field.AutoID && !field.IsDynamic {
// add a column which set nullable == true and fill it with all null value
if field.DefaultValue != nil || field.Nullable {
emptyColumn, err := entity.NewAllNullValueColumn(field.Name, field.DataType, rowSize)
// no err will be throw
if err != nil {
return nil, 0, fmt.Errorf("empty column fail to create, field name:%s ", field.Name)
}
mNameColumn[field.Name] = emptyColumn
continue
}
return nil, 0, fmt.Errorf("field %s not passed", field.Name)
}
}
Expand Down
246 changes: 242 additions & 4 deletions client/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,19 +260,26 @@ func (s *InsertSuite) TestInsertSuccess() {
})

s.Run("missing_field_with_default_value", func() {
s.T().Skip("skip for default value test")
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("default_value").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("default_value").WithDataType(entity.FieldTypeInt64).WithDefaultValueLong(1)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(ctx context.Context, req *milvuspb.InsertRequest) {
var data *schemapb.FieldData
for _, d := range req.GetFieldsData() {
if d.FieldName == "default_value" {
data = d
}
}
s.Equal([]bool{false}, data.ValidData)
s.Equal(0, len(data.GetScalars().GetLongData().Data))
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
Expand All @@ -291,6 +298,118 @@ func (s *InsertSuite) TestInsertSuccess() {
s.NoError(err)
s.Equal(1, r.Len())
})

s.Run("missing_field_with_nullable", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(ctx context.Context, req *milvuspb.InsertRequest) {
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1},
},
},
},
}, nil)

r, err := c.Insert(ctx, testCollectionName, "partition_1",
entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)),
)
s.NoError(err)
s.Equal(1, r.Len())
})

s.Run("insert_with_nullable_column", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(ctx context.Context, req *milvuspb.InsertRequest) {
var data *schemapb.FieldData
for _, d := range req.GetFieldsData() {
if d.FieldName == "nullable_fid" {
data = d
}
}
s.Equal([]bool{false, true}, data.ValidData)
s.Equal([]int64{1}, data.GetScalars().GetLongData().Data)
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2},
},
},
},
}, nil)
value := make([]int64, 2)
value[1] = 1
validValue := make([]bool, 2)
validValue[1] = true
r, err := c.Insert(ctx, testCollectionName, "partition_1",
entity.NewNullableColumnInt64("nullable_fid", value, validValue),
entity.NewColumnFloatVector("vector", 128, generateFloatVector(2, 128)),
)
s.NoError(err)
s.Equal(2, r.Len())
})

s.Run("insert_with_nullable_column_with_default_value", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true).WithDefaultValueLong(10)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(ctx context.Context, req *milvuspb.InsertRequest) {
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2},
},
},
},
}, nil)
value := make([]int64, 2)
value[1] = 1
validValue := make([]bool, 2)
validValue[1] = true
r, err := c.Insert(ctx, testCollectionName, "partition_1",
entity.NewNullableColumnInt64("nullable_fid", value, validValue),
entity.NewColumnFloatVector("vector", 128, generateFloatVector(2, 128)),
)
s.NoError(err)
s.Equal(2, r.Len())
})
}

type UpsertSuite struct {
Expand Down Expand Up @@ -529,19 +648,26 @@ func (s *UpsertSuite) TestUpsertSuccess() {
})

s.Run("missing_field_with_default_value", func() {
s.T().Skip("skip for default value test")
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("default_value").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("default_value").WithDataType(entity.FieldTypeInt64).WithDefaultValueLong(1)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).
Run(func(ctx context.Context, req *milvuspb.UpsertRequest) {
var data *schemapb.FieldData
for _, d := range req.GetFieldsData() {
if d.FieldName == "default_value" {
data = d
}
}
s.Equal([]bool{false}, data.ValidData)
s.Equal(0, len(data.GetScalars().GetLongData().Data))
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
Expand All @@ -560,6 +686,118 @@ func (s *UpsertSuite) TestUpsertSuccess() {
s.NoError(err)
s.Equal(1, r.Len())
})

s.Run("missing_field_with_nullable", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).
Run(func(ctx context.Context, req *milvuspb.UpsertRequest) {
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1},
},
},
},
}, nil)

r, err := c.Upsert(ctx, testCollectionName, "partition_1",
entity.NewColumnFloatVector("vector", 128, generateFloatVector(1, 128)),
)
s.NoError(err)
s.Equal(1, r.Len())
})

s.Run("insert_with_nullable_column", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).
Run(func(ctx context.Context, req *milvuspb.UpsertRequest) {
var data *schemapb.FieldData
for _, d := range req.GetFieldsData() {
if d.FieldName == "nullable_fid" {
data = d
}
}
s.Equal([]bool{false, true}, data.ValidData)
s.Equal([]int64{1}, data.GetScalars().GetLongData().Data)
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2},
},
},
},
}, nil)
value := make([]int64, 2)
value[1] = 1
validValue := make([]bool, 2)
validValue[1] = true
r, err := c.Upsert(ctx, testCollectionName, "partition_1",
entity.NewNullableColumnInt64("nullable_fid", value, validValue),
entity.NewColumnFloatVector("vector", 128, generateFloatVector(2, 128)),
)
s.NoError(err)
s.Equal(2, r.Len())
})

s.Run("insert_with_nullable_column_with_default_value", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, "partition_1")

s.setupDescribeCollection(testCollectionName, entity.NewSchema().
WithField(entity.NewField().WithIsPrimaryKey(true).WithIsAutoID(true).WithName("ID").WithDataType(entity.FieldTypeInt64)).
WithField(entity.NewField().WithName("nullable_fid").WithDataType(entity.FieldTypeInt64).WithNullable(true).WithDefaultValueLong(10)).
WithField(entity.NewField().WithName("vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Upsert(mock.Anything, mock.AnythingOfType("*milvuspb.UpsertRequest")).
Run(func(ctx context.Context, req *milvuspb.UpsertRequest) {
s.Equal(2, len(req.GetFieldsData()))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{},
IDs: &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{1, 2},
},
},
},
}, nil)
value := make([]int64, 2)
value[1] = 1
validValue := make([]bool, 2)
validValue[1] = true
r, err := c.Upsert(ctx, testCollectionName, "partition_1",
entity.NewNullableColumnInt64("nullable_fid", value, validValue),
entity.NewColumnFloatVector("vector", 128, generateFloatVector(2, 128)),
)
s.NoError(err)
s.Equal(2, r.Len())
})
}

func TestWrite(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions client/row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,46 @@ func (s *InsertByRowsSuite) TestSuccess() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.Run("null value", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
s.setupHasPartition(testCollectionName, partName)
s.setupDescribeCollection(testCollectionName, entity.NewSchema().WithName(testCollectionName).
WithField(entity.NewField().WithName("ID").WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true)).
WithField(entity.NewField().WithName("NullableFid").WithDataType(entity.FieldTypeInt64).WithNullable(true)).
WithField(entity.NewField().WithName("Vector").WithDataType(entity.FieldTypeFloatVector).WithTypeParams(entity.TypeParamDim, "128")),
)

s.mock.EXPECT().Insert(mock.Anything, mock.AnythingOfType("*milvuspb.InsertRequest")).
Run(func(_ context.Context, req *milvuspb.InsertRequest) {
s.Equal(testCollectionName, req.GetCollectionName())
s.Equal(partName, req.GetPartitionName())
s.Equal(3, len(req.GetFieldsData()))
var data *schemapb.FieldData
for _, d := range req.GetFieldsData() {
if d.FieldName == "NullableFid" {
data = d
}
}
s.Equal(1, len(data.ValidData))
s.Equal(0, len(data.GetScalars().GetLongData().Data))
}).Return(&milvuspb.MutationResult{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IDs: &schemapb.IDs{IdField: &schemapb.IDs_IntId{IntId: &schemapb.LongArray{Data: []int64{100}}}},
}, nil)
type row struct {
entity.RowBase
ID int64
NullableFid interface{}
Vector []float32
}
columns, err := c.InsertByRows(ctx, testCollectionName, partName, []entity.Row{
row{ID: 100, NullableFid: nil, Vector: make([]float32, 128)},
})
s.NoError(err)
s.Equal(1, columns.Len())
})

s.Run("non_dynamic", func() {
defer s.resetMock()
s.setupHasCollection(testCollectionName)
Expand Down
Loading

0 comments on commit 8008f14

Please sign in to comment.