Skip to content

Commit

Permalink
fix: Added check for validating varchar,array max length (milvus-io#3…
Browse files Browse the repository at this point in the history
…5499)

issue : milvus-io#34150

This is for numpy,parquet,json readers.

---------

Signed-off-by: Nischay Yadav <[email protected]>
  • Loading branch information
nish112022 committed Aug 20, 2024
1 parent fc344d1 commit 44ddb5a
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 14 deletions.
20 changes: 20 additions & 0 deletions internal/util/importutilv2/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package common

import (
"fmt"

"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -58,3 +60,21 @@ func getInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSc
}
return 0
}

func CheckVarcharLength(data any, maxLength int64) error {
str, ok := data.(string)
if !ok {
return fmt.Errorf("expected string, got %T", data)
}
if (int64)(len(str)) > maxLength {
return fmt.Errorf("value length %d exceeds max_length %d", len(str), maxLength)
}
return nil
}

func CheckArrayCapacity(arrLength int, maxCapacity int64) error {
if (int64)(arrLength) > maxCapacity {
return fmt.Errorf("array capacity %d exceeds max_capacity %d", arrLength, maxCapacity)
}
return nil
}
4 changes: 4 additions & 0 deletions internal/util/importutilv2/json/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data
Key: common.MaxLengthKey,
Value: "128",
},
{
Key: common.MaxCapacityKey,
Value: "128",
},
},
},
},
Expand Down
17 changes: 17 additions & 0 deletions internal/util/importutilv2/json/row_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/parameterutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -365,6 +367,13 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
maxLength, err := parameterutil.GetMaxLength(r.id2Field[fieldID])
if err != nil {
return nil, err
}
if err = common.CheckVarcharLength(value, maxLength); err != nil {
return nil, err
}
return value, nil
case schemapb.DataType_JSON:
// for JSON data, we accept two kinds input: string and map[string]interface
Expand All @@ -387,6 +396,14 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) {
}
case schemapb.DataType_Array:
arr, ok := obj.([]interface{})

maxCapacity, err := parameterutil.GetMaxCapacity(r.id2Field[fieldID])
if err != nil {
return nil, err
}
if err = common.CheckArrayCapacity(len(arr), maxCapacity); err != nil {
return nil, err
}
if !ok {
return nil, r.wrapTypeError(obj, fieldID)
}
Expand Down
71 changes: 60 additions & 11 deletions internal/util/importutilv2/json/row_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ func TestRowParser_Parse_Valid(t *testing.T) {
IsDynamic: true,
DataType: schemapb.DataType_JSON,
},
{
FieldID: 4,
Name: "name",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
},
{
FieldID: 5,
Name: "arrayField",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_capacity",
Value: "256",
},
},
},
},
}
r, err := NewRowParser(schema)
Expand All @@ -60,13 +83,14 @@ func TestRowParser_Parse_Valid(t *testing.T) {
}

cases := []testCase{
{name: `{"id": 1, "vector": [], "x": 8, "$meta": "{\"y\": 8}"}`, dyFields: []string{"x", "y"}},
{name: `{"id": 1, "vector": [], "x": 8, "$meta": {}}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "$meta": "{\"x\": 8}"}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "$meta": {"x": 8}}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "$meta": {}}`, dyFields: nil},
{name: `{"id": 1, "vector": [], "x": 8}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": []}`, dyFields: nil},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "x": 8, "$meta": "{\"y\": 8}", "name": "testName"}`, dyFields: []string{"x", "y"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "x": 8, "$meta": "{\"y\": 8}", "name": "testName"}`, dyFields: []string{"x", "y"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "x": 8, "$meta": {}, "name": "testName"}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "$meta": "{\"x\": 8}", "name": "testName"}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "$meta": {"x": 8} , "name": "testName"}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "$meta": {}, "name": "testName"}`, dyFields: nil},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "x": 8 , "name": "testName"}`, dyFields: []string{"x"}},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3], "name": "testName"}`, dyFields: nil},
}

for _, c := range cases {
Expand Down Expand Up @@ -120,6 +144,29 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
IsDynamic: true,
DataType: schemapb.DataType_JSON,
},
{
FieldID: 4,
Name: "name",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "4",
},
},
},
{
FieldID: 5,
Name: "arrayField",
DataType: schemapb.DataType_Array,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_capacity",
Value: "4",
},
},
},
},
}
r, err := NewRowParser(schema)
Expand All @@ -131,10 +178,12 @@ func TestRowParser_Parse_Invalid(t *testing.T) {
}

cases := []testCase{
{name: `{"id": 1, "vector": [], "x": 6, "$meta": {"x": 8}}`, expectErr: "duplicated key is not allowed"},
{name: `{"id": 1, "vector": [], "x": 6, "$meta": "{\"x\": 8}"}`, expectErr: "duplicated key is not allowed"},
{name: `{"id": 1, "vector": [], "x": 6, "$meta": "{*&%%&$*(&"}`, expectErr: "not a JSON format string"},
{name: `{"id": 1, "vector": [], "x": 6, "$meta": []}`, expectErr: "not a JSON object"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": {"x": 8}, "name": "test"}`, expectErr: "duplicated key is not allowed"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": "{\"x\": 8}", "name": "test"}`, expectErr: "duplicated key is not allowed"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": "{*&%%&$*(&", "name": "test"}`, expectErr: "not a JSON format string"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 6, "$meta": [], "name": "test"}`, expectErr: "not a JSON object"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4], "x": 8, "$meta": "{\"y\": 8}", "name": "testName"}`, expectErr: "value length 8 exceeds max_length 4"},
{name: `{"id": 1, "vector": [], "arrayField": [1, 2, 3, 4, 5], "x": 8, "$meta": "{\"z\": 9}", "name": "test"}`, expectErr: "array capacity 5 exceeds max_capacity 4"},
}

for _, c := range cases {
Expand Down
12 changes: 11 additions & 1 deletion internal/util/importutilv2/numpy/field_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"github.com/sbinet/npyio/npy"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/parameterutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -268,7 +270,10 @@ func (c *FieldReader) ReadString(count int64) ([]string, error) {
return nil, merr.WrapErrImportFailed(
fmt.Sprintf("failed to get max length %d of varchar from numpy file header, error: %v", maxLen, err))
}

maxLength, err := parameterutil.GetMaxLength(c.field)
if c.field.DataType == schemapb.DataType_VarChar && err != nil {
return nil, err
}
// read data
data := make([]string, 0, count)
for len(data) < int(count) {
Expand All @@ -285,6 +290,11 @@ func (c *FieldReader) ReadString(count int64) ([]string, error) {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read utf32 bytes from numpy file, error: %v", err))
}
str, err := decodeUtf32(raw, c.order)
if c.field.DataType == schemapb.DataType_VarChar {
if err = common.CheckVarcharLength(str, maxLength); err != nil {
return nil, err
}
}
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to decode utf32 bytes, error: %v", err))
}
Expand Down
61 changes: 60 additions & 1 deletion internal/util/importutilv2/parquet/field_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/parameterutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -95,7 +97,7 @@ func (c *FieldReader) Next(count int64) (any, error) {
}
return data, typeutil.VerifyFloats64(data.([]float64))
case schemapb.DataType_VarChar, schemapb.DataType_String:
return ReadStringData(c, count)
return ReadVarcharData(c, count)
case schemapb.DataType_JSON:
return ReadJSONData(c, count)
case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
Expand Down Expand Up @@ -215,6 +217,35 @@ func ReadStringData(pcr *FieldReader, count int64) (any, error) {
return data, nil
}

func ReadVarcharData(pcr *FieldReader, count int64) (any, error) {
chunked, err := pcr.columnReader.NextBatch(count)
if err != nil {
return nil, err
}
data := make([]string, 0, count)
maxLength, err := parameterutil.GetMaxLength(pcr.field)
if err != nil {
return nil, err
}
for _, chunk := range chunked.Chunks() {
dataNums := chunk.Data().Len()
stringReader, ok := chunk.(*array.String)
if !ok {
return nil, WrapTypeErr("string", chunk.DataType().Name(), pcr.field)
}
for i := 0; i < dataNums; i++ {
if err = common.CheckVarcharLength(stringReader.Value(i), maxLength); err != nil {
return nil, err
}
data = append(data, stringReader.Value(i))
}
}
if len(data) == 0 {
return nil, nil
}
return data, nil
}

func ReadJSONData(pcr *FieldReader, count int64) (any, error) {
// JSON field read data from string array Parquet
data, err := ReadStringData(pcr, count)
Expand Down Expand Up @@ -471,6 +502,10 @@ func ReadStringArrayData(pcr *FieldReader, count int64) (any, error) {

func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
data := make([]*schemapb.ScalarField, 0, count)
maxCapacity, err := parameterutil.GetMaxCapacity(pcr.field)
if err != nil {
return nil, err
}
elementType := pcr.field.GetElementType()
switch elementType {
case schemapb.DataType_Bool:
Expand All @@ -482,6 +517,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range boolArray.([][]bool) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Expand All @@ -499,6 +537,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int8Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Expand All @@ -516,6 +557,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int16Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Expand All @@ -533,6 +577,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int32Array.([][]int32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Expand All @@ -550,6 +597,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range int64Array.([][]int64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Expand All @@ -567,6 +617,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range float32Array.([][]float32) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Expand All @@ -584,6 +637,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range float64Array.([][]float64) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Expand All @@ -601,6 +657,9 @@ func ReadArrayData(pcr *FieldReader, count int64) (any, error) {
return nil, nil
}
for _, elementArray := range stringArray.([][]string) {
if err = common.CheckArrayCapacity(len(elementArray), maxCapacity); err != nil {
return nil, err
}
data = append(data, &schemapb.ScalarField{
Data: &schemapb.ScalarField_StringData{
StringData: &schemapb.StringArray{
Expand Down
Loading

0 comments on commit 44ddb5a

Please sign in to comment.