diff --git a/internal/core/src/common/jsmn.h b/internal/core/src/common/jsmn.h index f20b56ba48a68..3843d6efe2d86 100644 --- a/internal/core/src/common/jsmn.h +++ b/internal/core/src/common/jsmn.h @@ -1,3 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + /* * MIT License * diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index f02d50d32f796..cbb76f4e0fd06 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -483,7 +483,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [size_per_chunk, this](int64_t offset) -> auto { + retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 8b3710e250987..177b1842e6cb6 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -359,7 +359,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { template VectorPtr ExecRangeVisitorImplJsonForIndex(); - + template VectorPtr ExecRangeVisitorImplArray(OffsetVector* input = nullptr); diff --git a/internal/core/src/index/JsonKeyInvertedIndex.h b/internal/core/src/index/JsonKeyInvertedIndex.h index b686f63897f2e..220374ccb504f 100644 --- a/internal/core/src/index/JsonKeyInvertedIndex.h +++ b/internal/core/src/index/JsonKeyInvertedIndex.h @@ -25,7 +25,7 @@ class JsonKeyInvertedIndex : public InvertedIndexTantivy { explicit JsonKeyInvertedIndex(const storage::FileManagerContext& ctx, bool is_load); - ~JsonKeyInvertedIndex() override {}; + ~JsonKeyInvertedIndex() override{}; public: BinarySet diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 8d3b74cda2943..0daaab7cf76a9 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -282,8 +282,8 @@ BuildJsonKeyIndex(CBinarySet* c_binary_set, auto field_schema = FieldMeta::ParseFrom(build_index_info->field_schema()); - auto index = - std::make_unique(fileManagerContext, false); + auto index = std::make_unique( + fileManagerContext, false); index->Build(config); auto binary = std::make_unique(index->Upload(config)); diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index f285912ba9b97..744adef10af39 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -59,7 +59,7 @@ class ChunkedColumnBase : public ColumnBase { } } - virtual ~ChunkedColumnBase() {}; + virtual ~ChunkedColumnBase(){}; virtual void AppendBatch(const FieldDataPtr data) override { diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 5ad22c8e1f7c9..6fb24a7c4036a 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -5280,387 +5280,4 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) { TEST(CApiTest, IsLoadWithDisk) { ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0)); -} - -// 1000 keys -std::string -GenerateJson(int N) { - std::vector data(N); - std::default_random_engine er(67); - std::normal_distribution<> distr(0, 1); - std::vector keys; - for (int i = 0; i < N; i++) { - keys.push_back("keys" + std::to_string(i)); - } - std::string json_string; - std::vector values(N); - for (int i = 0; i < N; i++) { - if (i % 7 == 0 || i % 7 == 4) { - values[i] = std::to_string(er()); - } else if (i % 7 == 1 || i % 7 == 5) { - values[i] = std::to_string(static_cast(er())); - } else if (i % 7 == 2 || i % 7 == 6) { - values[i] = er() / 2 == 0 ? "true" : "false"; - } else if (i % 7 == 3) { - values[i] = "\"xxxx" + std::to_string(i) + "\""; - // } else if (i % 7 == 4) { - // std::vector intvec(10); - // for (int j = 0; j < 10; j++) { - // intvec[j] = std::to_string(i + j); - // } - // values[i] = "[" + join(intvec, ",") + "]"; - // } else if (i % 7 == 5) { - // std::vector doublevec(10); - // for (int j = 0; j < 10; j++) { - // doublevec[j] = - // std::to_string(static_cast(i + j + er())); - // } - // values[i] = "[" + join(doublevec, ",") + "]"; - // } else if (i % 7 == 6) { - // std::vector stringvec(10); - // for (int j = 0; j < 10; j++) { - // stringvec[j] = "\"xxx" + std::to_string(j) + "\""; - // } - // values[i] = "[" + join(stringvec, ",") + "]"; - } - } - json_string += "{"; - for (int i = 0; i < N - 1; i++) { - json_string += R"(")" + keys[i] + R"(":)" + values[i] + R"(,)"; - } - json_string += R"(")" + keys[N - 1] + R"(":)" + values[N - 1]; - json_string += "}"; - return json_string; -} - -void -ParseJson(const std::string& json) { - jsmn_parser p; - jsmntok_t t[2002]; - - jsmn_init(&p); - int r = jsmn_parse( - &p, json.c_str(), strlen(json.c_str()), t, sizeof(t) / sizeof(t[0])); - if (r < 0) { - printf("Failed to parse JSON: %d\n", r); - return; - } - if (r < 1 || t[0].type != JSMN_OBJECT) { - printf("Object expected\n"); - return; - } - //std::cout << r << std::endl; -} - -TEST(CApiTest, test_parse_perform) { - for (int i = 0; i < 10000; i++) { - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(1000); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - //std::cout << json_string << std::endl; - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(100); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - //std::cout << json_string << std::endl; - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(50); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - } -} - -void -extract_key_value_pairs(const char* json, size_t len) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - size_t pos = 0; - while (pos < len) { - size_t chunk_size = - len - pos > 256 ? 256 : len - pos; // Read in chunks of 256 bytes - int r = - jsmn_parse(&parser, json + pos, chunk_size, tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - - // Update the position - pos += chunk_size; - } - - // Iterate through the tokens - for (int i = 0; i < parser.toknext; i++) { - if (tokens[i].type == JSMN_OBJECT) { - for (int j = 0; j < tokens[i].size; j++) { - // The next token is the key (string) - j++; - printf("Key: %.*s\n", - tokens[j].end - tokens[j].start, - json + tokens[j].start); - - // The next token is the value - j++; - printf("Value: %.*s\n", - tokens[j].end - tokens[j].start, - json + tokens[j].start); - } - } - } - - // Clean up - free(tokens); -} - -void -TravelJson(const char* json, - jsmntok* tokens, - int& index, - std::vector& path) { - jsmntok current = tokens[0]; - if (current.type == JSMN_OBJECT) { - int j = 1; - for (int i = 0; i < current.size; i++) { - assert(tokens[j].type == JSMN_STRING && tokens[j].size != 0); - std::string key(json + tokens[j].start, - tokens[j].end - tokens[j].start); - path.push_back(key); - j++; - int consumed = 0; - TravelJson(json, tokens + j, consumed, path); - path.pop_back(); - j += consumed; - } - index = j; - } else if (current.type == JSMN_PRIMITIVE) { - std::cout << "key:" << Join(path, ".") << "values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - index++; - } else if (current.type == JSMN_ARRAY) { - std::cout << "key:" << Join(path, ".") << "values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - // skip next array parse - int count = current.size; - int j = 1; - while (count > 0) { - if (tokens[j].size == 0) { - count--; - } else { - count += tokens[j].size; - } - j++; - } - index = j; - - } else if (current.type == JSMN_STRING) { - if (current.size == 0) { - std::cout << "key:" << Join(path, ".") << " values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - index++; - } else { - throw std::runtime_error("not should happen"); - } - } else { - throw std::runtime_error("not should happen"); - } -} - -void -extract_key_value_pairs(const char* json) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - // Parse the JSON string - while (1) { - int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - num_tokens = r; - break; // Exit the loop if parsing was successful - } - - std::cout << "num_tokens:" << num_tokens << std::endl; - // Iterate through the tokens - for (int i = 0; i < num_tokens; i++) { - std::cout << "i:" << i << "type: " << tokens[i].type - << "token size:" << tokens[i].size << std::endl; - printf("value: %.*s\n", - tokens[i].end - tokens[i].start, - json + tokens[i].start); - } - - std::cout << "-----------------" << std::endl; - int index = 0; - std::vector path; - TravelJson(json, tokens, index, path); - - // Clean up - free(tokens); -} - -void -extract_json(const char* json) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - // Parse the JSON string - while (1) { - int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - num_tokens = r; - break; // Exit the loop if parsing was successful - } - - // assert(tokens[0].type == JSMN_OBJECT); - - // Iterate through the tokens - for (int i = 0; i < num_tokens; i++) { - std::cout << "i:" << i << "type: " << tokens[i].type - << "token size:" << tokens[i].size << std::endl; - printf("value: %.*s\n", - tokens[i].end - tokens[i].start, - json + tokens[i].start); - } - - // Clean up - free(tokens); -} - -TEST(CApiTest, test_jsmn_function) { - int64_t all_cost = 0; - // auto json_string = GenerateJson(50); - // std::cout << json_string << std::endl; - // extract_key_value_pairs(json_string.c_str()); - - std::string json_string = - R"({"keys0": ["value0", 234, "values1"], "keys1": ["value3", 1235]})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); - - json_string = - R"({"keys0": [{"keys1": 1234, "keys2": "xxx"}, {"keys3": 567, "keys4": "xxxxx"}]})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); - - json_string = R"({"keys0": {"keys1": { "keys2": "xxx", "keys3" :1234}}})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); -} +} \ No newline at end of file diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index f3609d387405e..3878c1fe95e09 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -145,7 +145,15 @@ func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { if !isFlush(segment) { return false } - return true + for _, fieldID := range fieldIDs { + if segment.GetJsonKeyStats() == nil { + return true + } + if segment.GetJsonKeyStats()[fieldID] == nil { + return true + } + } + return false } func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool { @@ -199,7 +207,6 @@ func (jm *statsJobManager) triggerJsonKeyIndexStatsTask() { } } } - } func (jm *statsJobManager) triggerBM25StatsTask() { diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index a0d95e4cd5b3d..bcba4dd77086c 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -62,6 +62,11 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { }, }, }, + { + FieldID: 102, + Name: "json", + DataType: schemapb.DataType_JSON, + }, }, }, }, @@ -117,5 +122,5 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { jm.loopWg.Wait() - s.Equal(2, len(jm.scheduler.tasks)) + s.Equal(4, len(jm.scheduler.tasks)) } diff --git a/internal/querycoordv2/checkers/stats_checker_test.go b/internal/querycoordv2/checkers/stats_checker_test.go new file mode 100644 index 0000000000000..98a56b81358df --- /dev/null +++ b/internal/querycoordv2/checkers/stats_checker_test.go @@ -0,0 +1,280 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type StatsCheckerSuite struct { + suite.Suite + kv kv.MetaKv + checker *StatsChecker + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager + targetMgr *meta.MockTargetManager +} + +func (suite *StatsCheckerSuite) SetupSuite() { + paramtable.Init() +} + +func (suite *StatsCheckerSuite) SetupTest() { + var err error + config := params.GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := querycoord.NewCatalog(suite.kv) + idAllocator := params.RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + distManager := meta.NewDistributionManager() + suite.broker = meta.NewMockBroker(suite.T()) + + suite.targetMgr = meta.NewMockTargetManager(suite.T()) + suite.checker = NewStatsChecker(suite.meta, distManager, suite.broker, suite.nodeMgr, suite.targetMgr) + + suite.targetMgr.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cid, sid int64, i3 int32) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: sid, + Level: datapb.SegmentLevel_L1, + } + }).Maybe() +} + +func (suite *StatsCheckerSuite) TearDownTest() { + suite.kv.Close() +} + +func (suite *StatsCheckerSuite) TestLoadJsonIndex() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_JSON, Name: "JSON"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 1) + + t := tasks[0] + suite.Require().Len(t.Actions(), 1) + + action, ok := t.Actions()[0].(*task.SegmentAction) + suite.Require().True(ok) + suite.EqualValues(200, t.ReplicaID()) + suite.Equal(task.ActionTypeStatsUpdate, action.Type()) + suite.EqualValues(2, action.GetSegmentID()) + + // test skip load json index for read only node + suite.nodeMgr.Stopping(1) + suite.nodeMgr.Stopping(2) + suite.meta.ResourceManager.HandleNodeStopping(ctx, 1) + suite.meta.ResourceManager.HandleNodeStopping(ctx, 2) + utils.RecoverAllCollection(suite.meta) + tasks = checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestJsonIndexNotMatch() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_Int16, Name: "int"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestDescribeCollectionFailed() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(nil, errors.New("mocked error")) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestCreateNewJsonIndex() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + segment := utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel") + segment.JsonIndexField = []int64{102} + checker.dist.SegmentDistManager.Update(1, segment) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_JSON, Name: "JSON"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).Type(), task.ActionTypeStatsUpdate) +} + +func TestStatsChecker(t *testing.T) { + suite.Run(t, new(StatsCheckerSuite)) +}