diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index c5b0286468b95..f4685bf2d8102 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/http/healthz" rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/internal/util/dependency" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" internalmetrics "github.com/milvus-io/milvus/internal/util/metrics" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -413,6 +414,9 @@ func (mr *MilvusRoles) Run() { log.Info("proxy stopped!") } + // close reused etcd client + kvfactory.CloseEtcdClient() + log.Info("Milvus components graceful stop done") } diff --git a/cmd/tools/migration/migration/runner.go b/cmd/tools/migration/migration/runner.go index 87d8d664dbfdf..e490a2ed12a2a 100644 --- a/cmd/tools/migration/migration/runner.go +++ b/cmd/tools/migration/migration/runner.go @@ -82,8 +82,7 @@ func (r *Runner) initEtcdCli() { func (r *Runner) init() { r.initEtcdCli() - - r.session = sessionutil.NewSession( + r.session = sessionutil.NewSessionWithEtcd( r.ctx, r.cfg.EtcdCfg.MetaRootPath.GetValue(), r.etcdCli, diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f4143256c7f2d..0fd99e770abea 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -87,7 +87,7 @@ type dataNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (t type indexNodeCreatorFunc func(ctx context.Context, addr string, nodeID int64) (types.IndexNodeClient, error) -type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoordClient, error) +type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoordClient, error) // makes sure Server implements `DataCoord` var _ types.DataCoord = (*Server)(nil) @@ -236,8 +236,8 @@ func defaultIndexNodeCreatorFunc(ctx context.Context, addr string, nodeID int64) return indexnodeclient.NewClient(ctx, addr, nodeID, Params.DataCoordCfg.WithCredential.GetAsBool()) } -func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoordClient, error) { - return rootcoordclient.NewClient(ctx, metaRootPath, client) +func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoordClient, error) { + return rootcoordclient.NewClient(ctx) } // QuitSignal returns signal when server quits @@ -282,14 +282,14 @@ func (s *Server) Register() error { } func (s *Server) initSession() error { - s.icSession = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + s.icSession = sessionutil.NewSession(s.ctx) if s.icSession == nil { return errors.New("failed to initialize IndexCoord session") } s.icSession.Init(typeutil.IndexCoordRole, s.address, true, true) s.icSession.SetEnableActiveStandBy(s.enableActiveStandBy) - s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + s.session = sessionutil.NewSession(s.ctx) if s.session == nil { return errors.New("failed to initialize session") } @@ -1024,7 +1024,7 @@ func (s *Server) handleFlushingSegments(ctx context.Context) { func (s *Server) initRootCoordClient() error { var err error if s.rootCoordClient == nil { - if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli); err != nil { + if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil { return err } } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 5cc54021031d3..2806d80d38af6 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2485,7 +2485,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2529,7 +2529,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2634,7 +2634,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2713,7 +2713,7 @@ func TestGetRecoveryInfo(t *testing.T) { Schema: newTestSchema(), }) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2810,7 +2810,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2852,7 +2852,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -2893,7 +2893,7 @@ func TestGetRecoveryInfo(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -3166,7 +3166,7 @@ func TestOptions(t *testing.T) { t.Run("WithRootCoordCreator", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - var crt rootCoordCreatorFunc = func(ctx context.Context, metaRoot string, etcdClient *clientv3.Client) (types.RootCoordClient, error) { + var crt rootCoordCreatorFunc = func(ctx context.Context) (types.RootCoordClient, error) { return nil, errors.New("dummy") } opt := WithRootCoordCreator(crt) @@ -4170,7 +4170,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return newMockDataNodeClient(0, receiveCh) } - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -4224,7 +4224,7 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts .. svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return newMockDataNodeClient(0, receiveCh) } - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } // indexCoord := mocks.NewMockIndexCoord(t) @@ -4281,7 +4281,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { return newMockDataNodeClient(0, receiveCh) } - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index ffd8c9b6b9e92..07c11b0d0e0c7 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - clientv3 "go.etcd.io/etcd/client/v3" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -102,7 +101,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -144,7 +143,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -252,7 +251,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -333,7 +332,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { Schema: newTestSchema(), }) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -427,7 +426,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -472,7 +471,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } @@ -512,11 +511,200 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NotEqual(t, 0, resp.GetChannels()[0].GetSeekPosition().GetTimestamp()) }) + t.Run("with failed compress", func(t *testing.T) { + svr := newTestServer(t, nil) + defer closeTestServer(t, svr) + + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { + return newMockRootCoordClient(), nil + } + + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: newTestSchema(), + }) + + err := svr.meta.UpdateChannelCheckpoint("vchan1", &msgpb.MsgPosition{ + ChannelName: "vchan1", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.meta.AddCollection(&collectionInfo{ + ID: 1, + Schema: newTestSchema(), + }) + + err = svr.meta.UpdateChannelCheckpoint("vchan2", &msgpb.MsgPosition{ + ChannelName: "vchan2", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.meta.AddCollection(&collectionInfo{ + ID: 2, + Schema: newTestSchema(), + }) + + err = svr.meta.UpdateChannelCheckpoint("vchan3", &msgpb.MsgPosition{ + ChannelName: "vchan3", + Timestamp: 0, + MsgID: []byte{0, 0, 0, 0, 0, 0, 0, 0}, + }) + assert.NoError(t, err) + + svr.channelManager.AddNode(0) + ch := &channel{ + Name: "vchan1", + CollectionID: 0, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + ch = &channel{ + Name: "vchan2", + CollectionID: 1, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + ch = &channel{ + Name: "vchan3", + CollectionID: 2, + } + err = svr.channelManager.Watch(context.TODO(), ch) + assert.NoError(t, err) + + seg := createSegment(8, 0, 0, 100, 40, "vchan1", commonpb.SegmentState_Flushed) + binLogPaths := make([]*datapb.Binlog, 1) + // miss one field + path := metautil.JoinIDPath(0, 0, 8, fieldID) + path = path + "/mock" + binLogPaths[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: path, + } + + seg.Statslogs = append(seg.Statslogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths, + }) + + binLogPaths2 := make([]*datapb.Binlog, 1) + pathCorrect := metautil.JoinIDPath(0, 0, 8, fieldID, 1) + binLogPaths2[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: pathCorrect, + } + + seg.Binlogs = append(seg.Binlogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths2, + }) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg)) + assert.NoError(t, err) + + // make sure collection is indexed + err = svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 0, + IndexName: "_default_idx_1", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + + svr.meta.segments.SetSegmentIndex(seg.ID, &model.SegmentIndex{ + SegmentID: seg.ID, + CollectionID: 0, + PartitionID: 0, + NumRows: 100, + IndexID: 0, + BuildID: 0, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + + req := &datapb.GetRecoveryInfoRequestV2{ + CollectionID: 0, + } + _, err = svr.GetRecoveryInfoV2(context.TODO(), req) + assert.NoError(t, err) + + // test bin log + path = metautil.JoinIDPath(0, 0, 9, fieldID) + path = path + "/mock" + binLogPaths[0] = &datapb.Binlog{ + EntriesNum: 10000, + LogPath: path, + } + + seg2 := createSegment(9, 1, 0, 100, 40, "vchan2", commonpb.SegmentState_Flushed) + seg2.Binlogs = append(seg2.Binlogs, &datapb.FieldBinlog{ + FieldID: fieldID, + Binlogs: binLogPaths, + }) + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + assert.NoError(t, err) + + // make sure collection is indexed + err = svr.meta.CreateIndex(&model.Index{ + TenantID: "", + CollectionID: 1, + FieldID: 2, + IndexID: 1, + IndexName: "_default_idx_2", + IsDeleted: false, + CreateTime: 0, + TypeParams: nil, + IndexParams: nil, + IsAutoIndex: false, + UserIndexParams: nil, + }) + assert.NoError(t, err) + + svr.meta.segments.SetSegmentIndex(seg2.ID, &model.SegmentIndex{ + SegmentID: seg2.ID, + CollectionID: 1, + PartitionID: 0, + NumRows: 100, + IndexID: 1, + BuildID: 0, + NodeID: 0, + IndexVersion: 1, + IndexState: commonpb.IndexState_Finished, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }) + req = &datapb.GetRecoveryInfoRequestV2{ + CollectionID: 1, + } + _, err = svr.GetRecoveryInfoV2(context.TODO(), req) + assert.NoError(t, err) + }) + t.Run("with continuous compaction", func(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr) - svr.rootCoordClientCreator = func(ctx context.Context, metaRootPath string, etcdCli *clientv3.Client) (types.RootCoordClient, error) { + svr.rootCoordClientCreator = func(ctx context.Context) (types.RootCoordClient, error) { return newMockRootCoordClient(), nil } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index f22f964f961ed..f1c23b0f7bb74 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -201,7 +201,7 @@ func (node *DataNode) Register() error { } func (node *DataNode) initSession() error { - node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), node.etcdCli) + node.session = sessionutil.NewSession(node.ctx) if node.session == nil { return errors.New("failed to initialize session") } diff --git a/internal/distributed/connection_manager_test.go b/internal/distributed/connection_manager_test.go index ce3eff2273b09..feaa960679bf2 100644 --- a/internal/distributed/connection_manager_test.go +++ b/internal/distributed/connection_manager_test.go @@ -291,6 +291,6 @@ func initSession(ctx context.Context) *sessionutil.Session { if err != nil { panic(err) } - session := sessionutil.NewSession(ctx, metaRootPath, etcdCli) + session := sessionutil.NewSessionWithEtcd(ctx, metaRootPath, etcdCli) return session } diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 97c3bfd908a13..d24b17e7e7f54 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" @@ -51,8 +50,8 @@ type Client struct { } // NewClient creates a new client instance -func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) { - sess := sessionutil.NewSession(ctx, metaRoot, etcdCli) +func NewClient(ctx context.Context) (*Client, error) { + sess := sessionutil.NewSession(ctx) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") log.Debug("DataCoordClient NewClient failed", zap.Error(err)) diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index ab4b78590a862..775013301a001 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/util/mock" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -57,16 +56,8 @@ func TestMain(m *testing.M) { func Test_NewClient(t *testing.T) { ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.NoError(t, err) - client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + + client, err := NewClient(ctx) assert.NoError(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 0a8b29c727d2c..ded08780ccb05 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -70,8 +70,8 @@ type Server struct { rootCoord types.RootCoord dataCoord types.DataCoord - newRootCoordClient func(string, *clientv3.Client) (types.RootCoordClient, error) - newDataCoordClient func(string, *clientv3.Client) (types.DataCoordClient, error) + newRootCoordClient func() (types.RootCoordClient, error) + newDataCoordClient func() (types.DataCoordClient, error) } // NewServer new DataNode grpc server @@ -82,11 +82,11 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) cancel: cancel, factory: factory, grpcErrChan: make(chan error), - newRootCoordClient: func(etcdMetaRoot string, client *clientv3.Client) (types.RootCoordClient, error) { - return rcc.NewClient(ctx1, etcdMetaRoot, client) + newRootCoordClient: func() (types.RootCoordClient, error) { + return rcc.NewClient(ctx1) }, - newDataCoordClient: func(etcdMetaRoot string, client *clientv3.Client) (types.DataCoordClient, error) { - return dcc.NewClient(ctx1, etcdMetaRoot, client) + newDataCoordClient: func() (types.DataCoordClient, error) { + return dcc.NewClient(ctx1) }, } @@ -253,7 +253,7 @@ func (s *Server) init() error { // --- RootCoord Client --- if s.newRootCoordClient != nil { log.Info("initializing RootCoord client for DataNode") - rootCoordClient, err := s.newRootCoordClient(dn.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + rootCoordClient, err := s.newRootCoordClient() if err != nil { log.Error("failed to create new RootCoord client", zap.Error(err)) panic(err) @@ -272,7 +272,7 @@ func (s *Server) init() error { // --- DataCoord Client --- if s.newDataCoordClient != nil { log.Debug("starting DataCoord client for DataNode") - dataCoordClient, err := s.newDataCoordClient(dn.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + dataCoordClient, err := s.newDataCoordClient() if err != nil { log.Error("failed to create new DataCoord client", zap.Error(err)) panic(err) diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index e6550c84c23a7..b64d658c8aca3 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -216,11 +216,11 @@ func Test_NewServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, server) - server.newRootCoordClient = func(string, *clientv3.Client) (types.RootCoordClient, error) { + server.newRootCoordClient = func() (types.RootCoordClient, error) { return &mockRootCoord{}, nil } - server.newDataCoordClient = func(string, *clientv3.Client) (types.DataCoordClient, error) { + server.newDataCoordClient = func() (types.DataCoordClient, error) { return &mockDataCoord{}, nil } @@ -355,11 +355,11 @@ func Test_Run(t *testing.T) { regErr: errors.New("error"), } - server.newRootCoordClient = func(string, *clientv3.Client) (types.RootCoordClient, error) { + server.newRootCoordClient = func() (types.RootCoordClient, error) { return &mockRootCoord{}, nil } - server.newDataCoordClient = func(string, *clientv3.Client) (types.DataCoordClient, error) { + server.newDataCoordClient = func() (types.DataCoordClient, error) { return &mockDataCoord{}, nil } diff --git a/internal/distributed/proxy/service.go b/internal/distributed/proxy/service.go index 8f52f39a8401b..2b923c002fa01 100644 --- a/internal/distributed/proxy/service.go +++ b/internal/distributed/proxy/service.go @@ -551,7 +551,7 @@ func (s *Server) init() error { if s.rootCoordClient == nil { var err error log.Debug("create RootCoord client for Proxy") - s.rootCoordClient, err = rcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + s.rootCoordClient, err = rcc.NewClient(s.ctx) if err != nil { log.Warn("failed to create RootCoord client for Proxy", zap.Error(err)) return err @@ -573,7 +573,7 @@ func (s *Server) init() error { if s.dataCoordClient == nil { var err error log.Debug("create DataCoord client for Proxy") - s.dataCoordClient, err = dcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + s.dataCoordClient, err = dcc.NewClient(s.ctx) if err != nil { log.Warn("failed to create DataCoord client for Proxy", zap.Error(err)) return err @@ -595,7 +595,7 @@ func (s *Server) init() error { if s.queryCoordClient == nil { var err error log.Debug("create QueryCoord client for Proxy") - s.queryCoordClient, err = qcc.NewClient(s.ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + s.queryCoordClient, err = qcc.NewClient(s.ctx) if err != nil { log.Warn("failed to create QueryCoord client for Proxy", zap.Error(err)) return err diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index 22eedf074a068..ac1e78e202478 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" @@ -46,8 +45,8 @@ type Client struct { } // NewClient creates a client for QueryCoord grpc call. -func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) { - sess := sessionutil.NewSession(ctx, metaRoot, etcdCli) +func NewClient(ctx context.Context) (*Client, error) { + sess := sessionutil.NewSession(ctx) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") log.Debug("QueryCoordClient NewClient failed", zap.Error(err)) diff --git a/internal/distributed/querycoord/client/client_test.go b/internal/distributed/querycoord/client/client_test.go index 248d19d8c561b..f2b94ca599b2f 100644 --- a/internal/distributed/querycoord/client/client_test.go +++ b/internal/distributed/querycoord/client/client_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/util/mock" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -58,16 +57,7 @@ func TestMain(m *testing.M) { func Test_NewClient(t *testing.T) { ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.NoError(t, err) - client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + client, err := NewClient(ctx) assert.NoError(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 80716c1bd0970..30271af103d6b 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -154,7 +154,7 @@ func (s *Server) init() error { // --- Master Server Client --- if s.rootCoord == nil { - s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + s.rootCoord, err = rcc.NewClient(s.loopCtx) if err != nil { log.Error("QueryCoord try to new RootCoord client failed", zap.Error(err)) panic(err) @@ -176,7 +176,7 @@ func (s *Server) init() error { // --- Data service client --- if s.dataCoord == nil { - s.dataCoord, err = dcc.NewClient(s.loopCtx, qc.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + s.dataCoord, err = dcc.NewClient(s.loopCtx) if err != nil { log.Error("QueryCoord try to new DataCoord client failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 1f07f904bd631..e5ab6f2a2ce7d 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" @@ -53,8 +52,8 @@ type Client struct { // metaRoot is the path in etcd for root coordinator registration // etcdEndpoints are the address list for etcd end points // timeout is default setting for each grpc call -func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) { - sess := sessionutil.NewSession(ctx, metaRoot, etcdCli) +func NewClient(ctx context.Context) (*Client, error) { + sess := sessionutil.NewSession(ctx) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") log.Debug("QueryCoordClient NewClient failed", zap.Error(err)) diff --git a/internal/distributed/rootcoord/client/client_test.go b/internal/distributed/rootcoord/client/client_test.go index ca5aa24ca4613..ce5c45a44500c 100644 --- a/internal/distributed/rootcoord/client/client_test.go +++ b/internal/distributed/rootcoord/client/client_test.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc" "github.com/milvus-io/milvus/internal/proto/rootcoordpb" - "github.com/milvus-io/milvus/internal/proxy" "github.com/milvus-io/milvus/internal/util/mock" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -57,16 +56,8 @@ func TestMain(m *testing.M) { func Test_NewClient(t *testing.T) { ctx := context.Background() - etcdCli, err := etcd.GetEtcdClient( - Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), - Params.EtcdCfg.EtcdUseSSL.GetAsBool(), - Params.EtcdCfg.Endpoints.GetAsStrings(), - Params.EtcdCfg.EtcdTLSCert.GetValue(), - Params.EtcdCfg.EtcdTLSKey.GetValue(), - Params.EtcdCfg.EtcdTLSCACert.GetValue(), - Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) - assert.NoError(t, err) - client, err := NewClient(ctx, proxy.Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli) + + client, err := NewClient(ctx) assert.NoError(t, err) assert.NotNil(t, client) diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index 7e9003c5f3c74..3b478fed32ca3 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -73,8 +73,8 @@ type Server struct { dataCoord types.DataCoordClient queryCoord types.QueryCoordClient - newDataCoordClient func(string, *clientv3.Client) types.DataCoordClient - newQueryCoordClient func(string, *clientv3.Client) types.QueryCoordClient + newDataCoordClient func() types.DataCoordClient + newQueryCoordClient func() types.QueryCoordClient } func (s *Server) CreateDatabase(ctx context.Context, request *milvuspb.CreateDatabaseRequest) (*commonpb.Status, error) { @@ -126,16 +126,16 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) } func (s *Server) setClient() { - s.newDataCoordClient = func(etcdMetaRoot string, etcdCli *clientv3.Client) types.DataCoordClient { - dsClient, err := dcc.NewClient(s.ctx, etcdMetaRoot, etcdCli) + s.newDataCoordClient = func() types.DataCoordClient { + dsClient, err := dcc.NewClient(s.ctx) if err != nil { panic(err) } return dsClient } - s.newQueryCoordClient = func(metaRootPath string, etcdCli *clientv3.Client) types.QueryCoordClient { - qsClient, err := qcc.NewClient(s.ctx, metaRootPath, etcdCli) + s.newQueryCoordClient = func() types.QueryCoordClient { + qsClient, err := qcc.NewClient(s.ctx) if err != nil { panic(err) } @@ -201,7 +201,7 @@ func (s *Server) init() error { if s.newDataCoordClient != nil { log.Debug("RootCoord start to create DataCoord client") - dataCoord := s.newDataCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + dataCoord := s.newDataCoordClient() s.dataCoord = dataCoord if err := s.rootCoord.SetDataCoordClient(dataCoord); err != nil { panic(err) @@ -210,7 +210,7 @@ func (s *Server) init() error { if s.newQueryCoordClient != nil { log.Debug("RootCoord start to create QueryCoord client") - queryCoord := s.newQueryCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + queryCoord := s.newQueryCoordClient() s.queryCoord = queryCoord if err := s.rootCoord.SetQueryCoordClient(queryCoord); err != nil { panic(err) diff --git a/internal/distributed/rootcoord/service_test.go b/internal/distributed/rootcoord/service_test.go index e8843c032de65..d7b8e53e461d0 100644 --- a/internal/distributed/rootcoord/service_test.go +++ b/internal/distributed/rootcoord/service_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/types" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" @@ -168,10 +169,10 @@ func TestRun(t *testing.T) { assert.Error(t, err) assert.EqualError(t, err, "listen tcp: address 1000000: invalid port") - svr.newDataCoordClient = func(string, *clientv3.Client) types.DataCoordClient { + svr.newDataCoordClient = func() types.DataCoordClient { return &mockDataCoord{} } - svr.newQueryCoordClient = func(string, *clientv3.Client) types.QueryCoordClient { + svr.newQueryCoordClient = func() types.QueryCoordClient { return &mockQueryCoord{} } @@ -182,6 +183,9 @@ func TestRun(t *testing.T) { randVal := rand.Int() rootPath := fmt.Sprintf("/%d/test", randVal) rootcoord.Params.Save("etcd.rootPath", rootPath) + // Need to reset global etcd to follow new path + // Need to reset global etcd to follow new path + kvfactory.CloseEtcdClient() etcdCli, err := etcd.GetEtcdClient( etcdConfig.UseEmbedEtcd.GetAsBool(), @@ -247,7 +251,7 @@ func TestServerRun_DataCoordClientInitErr(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, server) - server.newDataCoordClient = func(string, *clientv3.Client) types.DataCoordClient { + server.newDataCoordClient = func() types.DataCoordClient { return &mockDataCoord{} } assert.Panics(t, func() { server.Run() }) @@ -273,7 +277,7 @@ func TestServerRun_DataCoordClientStartErr(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, server) - server.newDataCoordClient = func(string, *clientv3.Client) types.DataCoordClient { + server.newDataCoordClient = func() types.DataCoordClient { return &mockDataCoord{} } assert.Panics(t, func() { server.Run() }) @@ -299,7 +303,7 @@ func TestServerRun_QueryCoordClientInitErr(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, server) - server.newQueryCoordClient = func(string, *clientv3.Client) types.QueryCoordClient { + server.newQueryCoordClient = func() types.QueryCoordClient { return &mockQueryCoord{initErr: errors.New("mock querycoord init error")} } assert.Panics(t, func() { server.Run() }) @@ -325,7 +329,7 @@ func TestServer_QueryCoordClientStartErr(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, server) - server.newQueryCoordClient = func(string, *clientv3.Client) types.QueryCoordClient { + server.newQueryCoordClient = func() types.QueryCoordClient { return &mockQueryCoord{startErr: errors.New("mock querycoord start error")} } assert.Panics(t, func() { server.Run() }) diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 912b41fd8ac77..665f087e2cc9f 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -188,7 +188,7 @@ func (i *IndexNode) CloseSegcore() { } func (i *IndexNode) initSession() error { - i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath.GetValue(), i.etcdCli, sessionutil.WithEnableDisk(Params.IndexNodeCfg.EnableDisk.GetAsBool())) + i.session = sessionutil.NewSession(i.loopCtx, sessionutil.WithEnableDisk(Params.IndexNodeCfg.EnableDisk.GetAsBool())) if i.session == nil { return errors.New("failed to initialize session") } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 7439c27ae0fe2..72b82ce19df00 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -181,7 +181,7 @@ func (node *Proxy) Register() error { // initSession initialize the session of Proxy. func (node *Proxy) initSession() error { - node.session = sessionutil.NewSession(node.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), node.etcdCli) + node.session = sessionutil.NewSession(node.ctx) if node.session == nil { return errors.New("new session failed, maybe etcd cannot be connected") } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 18f7ba41c80f5..1df1c148c44a8 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -454,21 +454,21 @@ func TestProxy(t *testing.T) { go testServer.startGrpc(ctx, &wg, &p) assert.NoError(t, testServer.waitForGrpcReady()) - rootCoordClient, err := rcc.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli) + rootCoordClient, err := rcc.NewClient(ctx) assert.NoError(t, err) err = componentutil.WaitForComponentHealthy(ctx, rootCoordClient, typeutil.RootCoordRole, attempts, sleepDuration) assert.NoError(t, err) proxy.SetRootCoordClient(rootCoordClient) log.Info("Proxy set root coordinator client") - dataCoordClient, err := grpcdatacoordclient2.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli) + dataCoordClient, err := grpcdatacoordclient2.NewClient(ctx) assert.NoError(t, err) err = componentutil.WaitForComponentHealthy(ctx, dataCoordClient, typeutil.DataCoordRole, attempts, sleepDuration) assert.NoError(t, err) proxy.SetDataCoordClient(dataCoordClient) log.Info("Proxy set data coordinator client") - queryCoordClient, err := grpcquerycoordclient.NewClient(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdcli) + queryCoordClient, err := grpcquerycoordclient.NewClient(ctx) assert.NoError(t, err) err = componentutil.WaitForComponentHealthy(ctx, queryCoordClient, typeutil.QueryCoordRole, attempts, sleepDuration) assert.NoError(t, err) diff --git a/internal/querycoordv2/mocks/querynode.go b/internal/querycoordv2/mocks/querynode.go index 0151db85978f4..c8fc5e835fc11 100644 --- a/internal/querycoordv2/mocks/querynode.go +++ b/internal/querycoordv2/mocks/querynode.go @@ -60,7 +60,7 @@ func NewMockQueryNode(t *testing.T, etcdCli *clientv3.Client, nodeID int64) *Moc MockQueryNodeServer: NewMockQueryNodeServer(t), ctx: ctx, cancel: cancel, - session: sessionutil.NewSession(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli), + session: sessionutil.NewSessionWithEtcd(ctx, Params.EtcdCfg.MetaRootPath.GetValue(), etcdCli), channels: make(map[int64][]string), segments: make(map[int64]map[string][]int64), ID: nodeID, diff --git a/internal/querycoordv2/params/params.go b/internal/querycoordv2/params/params.go index 1b7fe5aa9f6ff..0e1b0d1ab5104 100644 --- a/internal/querycoordv2/params/params.go +++ b/internal/querycoordv2/params/params.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/errors" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -39,6 +40,10 @@ func GenerateEtcdConfig() *paramtable.EtcdConfig { suffix := "-test-querycoord" + strconv.FormatInt(rand.Int63(), 10) Params.Save("etcd.rootPath", config.MetaRootPath.GetValue()+suffix) + // Due to switching etcd path mid test cases, we need to update the cached client + // that is used by default + kvfactory.CloseEtcdClient() + return &Params.EtcdCfg } diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index be1ad632aac43..0779e343d352f 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -165,7 +165,7 @@ func (s *Server) Register() error { func (s *Server) initSession() error { // Init QueryCoord session - s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli) + s.session = sessionutil.NewSession(s.ctx) if s.session == nil { return fmt.Errorf("failed to create session") } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index b5633c15d1ad3..43126d598a84d 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -176,7 +176,7 @@ func (suite *ServiceSuite) SetupTest() { suite.server = &Server{ kv: suite.kv, store: suite.store, - session: sessionutil.NewSession(context.Background(), Params.EtcdCfg.MetaRootPath.GetValue(), cli), + session: sessionutil.NewSessionWithEtcd(context.Background(), Params.EtcdCfg.MetaRootPath.GetValue(), cli), metricsCacheManager: metricsinfo.NewMetricsCacheManager(), dist: suite.dist, meta: suite.meta, diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index 7b2fa15b21e40..4d6168c19d25a 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -148,11 +148,7 @@ func NewQueryNode(ctx context.Context, factory dependency.Factory) *QueryNode { func (node *QueryNode) initSession() error { minimalIndexVersion, currentIndexVersion := getIndexEngineVersion() - node.session = sessionutil.NewSession(node.ctx, - paramtable.Get().EtcdCfg.MetaRootPath.GetValue(), - node.etcdCli, - sessionutil.WithIndexEngineVersion(minimalIndexVersion, currentIndexVersion), - ) + node.session = sessionutil.NewSession(node.ctx, sessionutil.WithIndexEngineVersion(minimalIndexVersion, currentIndexVersion)) if node.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 92a015f1a0615..3719162e1e172 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -308,7 +308,7 @@ func (c *Core) SetTiKVClient(client *txnkv.Client) { } func (c *Core) initSession() error { - c.session = sessionutil.NewSession(c.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), c.etcdCli) + c.session = sessionutil.NewSession(c.ctx) if c.session == nil { return fmt.Errorf("session is nil, the etcd client connection may have failed") } diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 4acadce05a8de..ef5ec8497c5f3 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/dependency" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" @@ -1702,6 +1703,8 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) { randVal := rand.Int() paramtable.Init() Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) + // Need to reset global etcd to follow new path + kvfactory.CloseEtcdClient() paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "true") paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) @@ -1753,6 +1756,9 @@ func TestRootcoord_DisableActiveStandby(t *testing.T) { randVal := rand.Int() paramtable.Init() Params.Save("etcd.rootPath", fmt.Sprintf("/%d", randVal)) + // Need to reset global etcd to follow new path + kvfactory.CloseEtcdClient() + paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "false") paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal)) paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal)) diff --git a/internal/util/dependency/kv/kv_client_handler.go b/internal/util/dependency/kv/kv_client_handler.go new file mode 100644 index 0000000000000..77935d9509a1d --- /dev/null +++ b/internal/util/dependency/kv/kv_client_handler.go @@ -0,0 +1,73 @@ +package kvfactory + +import ( + "fmt" + "sync" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +var clientCreator = &etcdClientCreator{} + +var getEtcdAndPathFunction = getEtcdAndPath + +type etcdClientCreator struct { + mu sync.Mutex + client *clientv3.Client + rootpath *string +} + +// Returns an Etcd client and the metaRootPath, if an error is hit, will panic. +// This client is reused by all calls in the current runtime. +func GetEtcdAndPath() (*clientv3.Client, string) { + client, path := getEtcdAndPathFunction() + return client, path +} + +// Reset the stored client, mainly used during testing when paramtable params have changed +// during runtime. +func CloseEtcdClient() { + clientCreator.mu.Lock() + defer clientCreator.mu.Unlock() + if clientCreator.client != nil { + err := clientCreator.client.Close() + if err != nil { + panic(err) + } + } + clientCreator.client = nil + clientCreator.rootpath = nil +} + +// Returns an Etcd client and the metaRootPath, if an error is hit, will panic +func getEtcdAndPath() (*clientv3.Client, string) { + clientCreator.mu.Lock() + defer clientCreator.mu.Unlock() + // If client/path doesnt exist, create a new one + if clientCreator.client == nil { + var err error + clientCreator.client, err = createEtcdClient() + if err != nil { + panic(fmt.Errorf("failed to create etcd client: %w", err)) + } + path := paramtable.Get().ServiceParam.EtcdCfg.MetaRootPath.GetValue() + clientCreator.rootpath = &path + } + return clientCreator.client, *clientCreator.rootpath +} + +// Function that calls the Etcd constructor +func createEtcdClient() (*clientv3.Client, error) { + cfg := ¶mtable.Get().ServiceParam + return etcd.GetEtcdClient( + cfg.EtcdCfg.UseEmbedEtcd.GetAsBool(), + cfg.EtcdCfg.EtcdUseSSL.GetAsBool(), + cfg.EtcdCfg.Endpoints.GetAsStrings(), + cfg.EtcdCfg.EtcdTLSCert.GetValue(), + cfg.EtcdCfg.EtcdTLSKey.GetValue(), + cfg.EtcdCfg.EtcdTLSCACert.GetValue(), + cfg.EtcdCfg.EtcdTLSMinVersion.GetValue()) +} diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index a1e5daca89bc7..7eb5b4e1da5e0 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -36,6 +36,7 @@ import ( "go.uber.org/atomic" "go.uber.org/zap" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -208,11 +209,17 @@ func (s *Session) MarshalJSON() ([]byte, error) { return json.Marshal(s.SessionRaw) } -// NewSession is a helper to build Session object. +// Create a new Session object. Will use global etcd client +func NewSession(ctx context.Context, opts ...SessionOption) *Session { + client, path := kvfactory.GetEtcdAndPath() + return NewSessionWithEtcd(ctx, path, client, opts...) +} + +// NewSessionWithEtcd is a helper to build a Session object. // ServerID, ServerName, Address, Exclusive will be assigned after Init(). // metaRoot is a path in etcd to save session information. // etcdEndpoints is to init etcdCli when NewSession -func NewSession(ctx context.Context, metaRoot string, client *clientv3.Client, opts ...SessionOption) *Session { +func NewSessionWithEtcd(ctx context.Context, metaRoot string, client *clientv3.Client, opts ...SessionOption) *Session { hostName, hostNameErr := os.Hostname() if hostNameErr != nil { log.Error("get host name fail", zap.Error(hostNameErr)) diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index 25f9d7a80efb1..d58ae5552a072 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -57,7 +57,7 @@ func TestGetServerIDConcurrently(t *testing.T) { var wg sync.WaitGroup muList := sync.Mutex{} - s := NewSession(ctx, metaRoot, etcdCli) + s := NewSessionWithEtcd(ctx, metaRoot, etcdCli) res := make([]int64, 0) getIDFunc := func() { @@ -96,7 +96,35 @@ func TestInit(t *testing.T) { defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("") - s := NewSession(ctx, metaRoot, etcdCli) + s := NewSessionWithEtcd(ctx, metaRoot, etcdCli) + s.Init("inittest", "testAddr", false, false) + assert.NotEqual(t, int64(0), s.LeaseID) + assert.NotEqual(t, int64(0), s.ServerID) + s.Register() + sessions, _, err := s.GetSessions("inittest") + assert.NoError(t, err) + assert.Contains(t, sessions, "inittest-"+strconv.FormatInt(s.ServerID, 10)) +} + +func TestInitNoArgs(t *testing.T) { + ctx := context.Background() + paramtable.Init() + params := paramtable.Get() + + endpoints := params.EtcdCfg.Endpoints.GetValue() + metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) + + etcdEndpoints := strings.Split(endpoints, ",") + etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) + require.NoError(t, err) + etcdKV := etcdkv.NewEtcdKV(etcdCli, metaRoot) + err = etcdKV.RemoveWithPrefix("") + assert.NoError(t, err) + + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + s := NewSession(ctx) s.Init("inittest", "testAddr", false, false) assert.NotEqual(t, int64(0), s.LeaseID) assert.NotEqual(t, int64(0), s.ServerID) @@ -125,7 +153,7 @@ func TestUpdateSessions(t *testing.T) { var wg sync.WaitGroup muList := sync.Mutex{} - s := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false)) + s := NewSessionWithEtcd(ctx, metaRoot, etcdCli, WithResueNodeID(false)) sessions, rev, err := s.GetSessions("test") assert.NoError(t, err) @@ -137,7 +165,7 @@ func TestUpdateSessions(t *testing.T) { getIDFunc := func() { etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) require.NoError(t, err) - singleS := NewSession(ctx, metaRoot, etcdCli, WithResueNodeID(false)) + singleS := NewSessionWithEtcd(ctx, metaRoot, etcdCli, WithResueNodeID(false)) singleS.Init("test", "testAddr", false, false) singleS.Register() muList.Lock() @@ -194,7 +222,7 @@ func TestSessionLivenessCheck(t *testing.T) { etcdEndpoints := strings.Split(endpoints, ",") etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints) require.NoError(t, err) - s := NewSession(context.Background(), metaRoot, etcdCli) + s := NewSessionWithEtcd(context.Background(), metaRoot, etcdCli) s.Register() ch := make(chan struct{}) s.liveCh = ch @@ -218,7 +246,7 @@ func TestSessionLivenessCheck(t *testing.T) { // test context done, liveness exit, callback shouldn't trigger metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) - s1 := NewSession(context.Background(), metaRoot, etcdCli) + s1 := NewSessionWithEtcd(context.Background(), metaRoot, etcdCli) s1.Register() ctx, cancel := context.WithCancel(context.Background()) flag.Store(false) @@ -232,7 +260,7 @@ func TestSessionLivenessCheck(t *testing.T) { // test context done, liveness start failed, callback should trigger metaRoot = fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot) - s2 := NewSession(context.Background(), metaRoot, etcdCli) + s2 := NewSessionWithEtcd(context.Background(), metaRoot, etcdCli) s2.Register() ctx, cancel = context.WithCancel(context.Background()) signal = make(chan struct{}, 1) @@ -262,7 +290,7 @@ func TestWatcherHandleWatchResp(t *testing.T) { etcdKV := etcdkv.NewEtcdKV(etcdCli, "/by-dev/session-ut") defer etcdKV.Close() defer etcdKV.RemoveWithPrefix("/by-dev/session-ut") - s := NewSession(ctx, metaRoot, etcdCli) + s := NewSessionWithEtcd(ctx, metaRoot, etcdCli) defer s.Revoke(time.Second) getWatcher := func(s *Session, rewatch Rewatch) *sessionWatcher { @@ -370,7 +398,7 @@ func TestWatcherHandleWatchResp(t *testing.T) { }) t.Run("err handled but list failed", func(t *testing.T) { - s := NewSession(ctx, "/by-dev/session-ut", etcdCli) + s := NewSessionWithEtcd(ctx, "/by-dev/session-ut", etcdCli) s.etcdCli.Close() w := getWatcher(s, func(sessions map[string]*Session) error { return nil @@ -487,21 +515,21 @@ func (suite *SessionWithVersionSuite) SetupTest() { suite.metaRoot = "sessionWithVersion" suite.serverName = "sessionComp" - s1 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) + s1 := NewSessionWithEtcd(ctx, suite.metaRoot, client, WithResueNodeID(false)) s1.Version.Major, s1.Version.Minor, s1.Version.Patch = 0, 0, 0 s1.Init(suite.serverName, "s1", false, false) s1.Register() suite.sessions = append(suite.sessions, s1) - s2 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) + s2 := NewSessionWithEtcd(ctx, suite.metaRoot, client, WithResueNodeID(false)) s2.Version.Major, s2.Version.Minor, s2.Version.Patch = 2, 1, 0 s2.Init(suite.serverName, "s2", false, false) s2.Register() suite.sessions = append(suite.sessions, s2) - s3 := NewSession(ctx, suite.metaRoot, client, WithResueNodeID(false)) + s3 := NewSessionWithEtcd(ctx, suite.metaRoot, client, WithResueNodeID(false)) s3.Version.Major, s3.Version.Minor, s3.Version.Patch = 2, 2, 0 s3.Version.Build = []string{"dev"} s3.Init(suite.serverName, "s3", false, false) @@ -526,7 +554,7 @@ func (suite *SessionWithVersionSuite) TearDownTest() { } func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() { - s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) + s := NewSessionWithEtcd(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) suite.Run(">1.0.0", func() { r, err := semver.ParseRange(">1.0.0") @@ -570,7 +598,7 @@ func (suite *SessionWithVersionSuite) TestGetSessionsWithRangeVersion() { } func (suite *SessionWithVersionSuite) TestWatchServicesWithVersionRange() { - s := NewSession(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) + s := NewSessionWithEtcd(context.Background(), suite.metaRoot, suite.client, WithResueNodeID(false)) suite.Run(">1.0.0 <=2.1.0", func() { r, err := semver.ParseRange(">1.0.0 <=2.1.0") @@ -624,7 +652,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { // register session 1, will be active ctx1 := context.Background() - s1 := NewSession(ctx1, metaRoot, etcdCli, WithResueNodeID(false)) + s1 := NewSessionWithEtcd(ctx1, metaRoot, etcdCli, WithResueNodeID(false)) s1.Init("inittest", "testAddr", true, true) s1.SetEnableActiveStandBy(true) s1.Register() @@ -645,7 +673,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) { // register session 2, will be standby ctx2 := context.Background() - s2 := NewSession(ctx2, metaRoot, etcdCli, WithResueNodeID(false)) + s2 := NewSessionWithEtcd(ctx2, metaRoot, etcdCli, WithResueNodeID(false)) s2.Init("inittest", "testAddr", true, true) s2.SetEnableActiveStandBy(true) s2.Register() @@ -762,9 +790,9 @@ func TestIntegrationMode(t *testing.T) { err = etcdKV.RemoveWithPrefix("") assert.NoError(t, err) - s1 := NewSession(ctx, metaRoot, etcdCli) + s1 := NewSessionWithEtcd(ctx, metaRoot, etcdCli) assert.Equal(t, false, s1.reuseNodeID) - s2 := NewSession(ctx, metaRoot, etcdCli) + s2 := NewSessionWithEtcd(ctx, metaRoot, etcdCli) assert.Equal(t, false, s2.reuseNodeID) s1.Init("inittest1", "testAddr1", false, false) s1.Init("inittest2", "testAddr2", false, false) @@ -857,10 +885,10 @@ func (s *SessionSuite) TestDisconnected() { func (s *SessionSuite) TestGoingStop() { ctx := context.Background() - sdisconnect := NewSession(ctx, s.metaRoot, s.client) + sdisconnect := NewSessionWithEtcd(ctx, s.metaRoot, s.client) sdisconnect.SetDisconnected(true) - sess := NewSession(ctx, s.metaRoot, s.client) + sess := NewSessionWithEtcd(ctx, s.metaRoot, s.client) sess.Init("test", "normal", false, false) sess.Register() @@ -889,12 +917,12 @@ func (s *SessionSuite) TestGoingStop() { func (s *SessionSuite) TestRevoke() { ctx := context.Background() - disconnected := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + disconnected := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) disconnected.Init("test", "disconnected", false, false) disconnected.Register() disconnected.SetDisconnected(true) - sess := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + sess := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) sess.Init("test", "normal", false, false) sess.Register() @@ -927,12 +955,12 @@ func (s *SessionSuite) TestRevoke() { func (s *SessionSuite) TestForceActiveWithLeaseID() { ctx := context.Background() role := "test" - sess1 := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) sess1.Init(role, "normal1", false, false) sess1.Register() sess1.ProcessActiveStandBy(nil) - sess2 := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) sess2.Init(role, "normal2", false, false) sess2.Register() sess2.ForceActiveStandby(nil) @@ -953,14 +981,14 @@ func (s *SessionSuite) TestForceActiveWithLeaseID() { func (s *SessionSuite) TestForceActiveWithDelete() { ctx := context.Background() role := "test" - sess1 := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + sess1 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) sess1.Init(role, "normal1", false, false) sessionJSON, err := json.Marshal(sess1) s.NoError(err) s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, fmt.Sprintf("%s-%d", role, 1)), string(sessionJSON)) s.client.Put(ctx, path.Join(s.metaRoot, DefaultServiceRoot, role), string(sessionJSON)) - sess2 := NewSession(ctx, s.metaRoot, s.client, WithResueNodeID(false)) + sess2 := NewSessionWithEtcd(ctx, s.metaRoot, s.client, WithResueNodeID(false)) sess2.Init(role, "normal2", false, false) sess2.Register() sess2.ForceActiveStandby(nil) @@ -980,7 +1008,7 @@ func (s *SessionSuite) TestForceActiveWithDelete() { func (s *SessionSuite) TestKeepAliveRetryActiveCancel() { ctx := context.Background() - session := NewSession(ctx, s.metaRoot, s.client) + session := NewSessionWithEtcd(ctx, s.metaRoot, s.client) session.Init("test", "normal", false, false) // Register @@ -1000,7 +1028,7 @@ func (s *SessionSuite) TestKeepAliveRetryActiveCancel() { func (s *SessionSuite) TestKeepAliveRetryChannelClose() { ctx := context.Background() - session := NewSession(ctx, s.metaRoot, s.client) + session := NewSessionWithEtcd(ctx, s.metaRoot, s.client) session.Init("test", "normal", false, false) // Register @@ -1027,7 +1055,7 @@ func (s *SessionSuite) TestKeepAliveRetryChannelClose() { func (s *SessionSuite) TestSafeCloseLiveCh() { ctx := context.Background() - session := NewSession(ctx, s.metaRoot, s.client) + session := NewSessionWithEtcd(ctx, s.metaRoot, s.client) session.Init("test", "normal", false, false) session.liveCh = make(chan struct{}) session.safeCloseLiveCh() diff --git a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go index 08cc1eeb99ade..33c22f1d0db4c 100644 --- a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go +++ b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go @@ -112,14 +112,13 @@ func (s *CrossClusterRoutingSuite) SetupTest() { etcdConfig.EtcdTLSCACert.GetValue(), etcdConfig.EtcdTLSMinVersion.GetValue()) s.NoError(err) - metaRoot := paramtable.Get().EtcdCfg.MetaRootPath.GetValue() // setup clients - s.rootCoordClient, err = grpcrootcoordclient.NewClient(s.ctx, metaRoot, s.client) + s.rootCoordClient, err = grpcrootcoordclient.NewClient(s.ctx) s.NoError(err) - s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx, metaRoot, s.client) + s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx) s.NoError(err) - s.queryCoordClient, err = grpcquerycoordclient.NewClient(s.ctx, metaRoot, s.client) + s.queryCoordClient, err = grpcquerycoordclient.NewClient(s.ctx) s.NoError(err) s.proxyClient, err = grpcproxyclient.NewClient(s.ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 1) s.NoError(err) diff --git a/tests/integration/minicluster.go b/tests/integration/minicluster.go index dd9221c4a611b..588261e05bdd6 100644 --- a/tests/integration/minicluster.go +++ b/tests/integration/minicluster.go @@ -47,6 +47,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" @@ -152,6 +153,9 @@ func StartMiniCluster(ctx context.Context, opts ...Option) (cluster *MiniCluster RefreshInterval: 2 * time.Second, })) + // Reset the default client due to param changes for test + kvfactory.CloseEtcdClient() + if cluster.factory == nil { params.Save(params.LocalStorageCfg.Path.Key, "/tmp/milvus/") params.Save(params.CommonCfg.StorageType.Key, "local") @@ -1204,7 +1208,7 @@ func (cluster *MiniCluster) GetRootCoordClient() types.RootCoordClient { return cluster.RootCoordClient } - client, err := rootcoordclient.NewClient(cluster.ctx, GetMetaRootPath(cluster.params[EtcdRootPath]), cluster.EtcdCli) + client, err := rootcoordclient.NewClient(cluster.ctx) if err != nil { panic(err) } @@ -1219,7 +1223,7 @@ func (cluster *MiniCluster) GetDataCoordClient() types.DataCoordClient { return cluster.DataCoordClient } - client, err := datacoordclient.NewClient(cluster.ctx, GetMetaRootPath(cluster.params[EtcdRootPath]), cluster.EtcdCli) + client, err := datacoordclient.NewClient(cluster.ctx) if err != nil { panic(err) } @@ -1234,7 +1238,7 @@ func (cluster *MiniCluster) GetQueryCoordClient() types.QueryCoordClient { return cluster.QueryCoordClient } - client, err := querycoordclient.NewClient(cluster.ctx, GetMetaRootPath(cluster.params[EtcdRootPath]), cluster.EtcdCli) + client, err := querycoordclient.NewClient(cluster.ctx) if err != nil { panic(err) }