Skip to content

Commit

Permalink
enhance: use lazy initializing client for streaming node (milvus-io#3…
Browse files Browse the repository at this point in the history
…8400)

issue: milvus-io#38399

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored and NicoYuan1986 committed Dec 26, 2024
1 parent bb9b281 commit 3f3801b
Show file tree
Hide file tree
Showing 23 changed files with 290 additions and 185 deletions.
89 changes: 55 additions & 34 deletions internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/netutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/tikv"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -83,8 +85,8 @@ type Server struct {
// component client
etcdCli *clientv3.Client
tikvCli *txnkv.Client
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
rootCoord *syncutil.Future[types.RootCoordClient]
dataCoord *syncutil.Future[types.DataCoordClient]
chunkManager storage.ChunkManager
componentState *componentutil.ComponentStateService
}
Expand All @@ -95,6 +97,8 @@ func NewServer(ctx context.Context, f dependency.Factory) (*Server, error) {
return &Server{
stopOnce: sync.Once{},
factory: f,
dataCoord: syncutil.NewFuture[types.DataCoordClient](),
rootCoord: syncutil.NewFuture[types.RootCoordClient](),
grpcServerChan: make(chan struct{}),
componentState: componentutil.NewComponentStateService(typeutil.StreamingNodeRole),
ctx: ctx1,
Expand Down Expand Up @@ -166,8 +170,17 @@ func (s *Server) stop() {

// Stop rootCoord client.
log.Info("streamingnode stop rootCoord client...")
if err := s.rootCoord.Close(); err != nil {
log.Warn("streamingnode stop rootCoord client failed", zap.Error(err))
if s.rootCoord.Ready() {
if err := s.rootCoord.Get().Close(); err != nil {
log.Warn("streamingnode stop rootCoord client failed", zap.Error(err))
}
}

log.Info("streamingnode stop dataCoord client...")
if s.dataCoord.Ready() {
if err := s.dataCoord.Get().Close(); err != nil {
log.Warn("streamingnode stop dataCoord client failed", zap.Error(err))
}
}

// Stop tikv
Expand Down Expand Up @@ -216,12 +229,8 @@ func (s *Server) init() (err error) {
if err := s.initSession(); err != nil {
return err
}
if err := s.initRootCoord(); err != nil {
return err
}
if err := s.initDataCoord(); err != nil {
return err
}
s.initRootCoord()
s.initDataCoord()
s.initGRPCServer()

// Create StreamingNode service.
Expand Down Expand Up @@ -300,36 +309,48 @@ func (s *Server) initMeta() error {
return nil
}

func (s *Server) initRootCoord() (err error) {
func (s *Server) initRootCoord() {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to rootCoord...")
s.rootCoord, err = rcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}
go func() {
retry.Do(s.ctx, func() error {
log.Info("StreamingNode connect to rootCoord...")
rootCoord, err := rcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}

log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}
return nil
log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}
log.Info("StreamingNode wait for RootCoord done")
s.rootCoord.Set(rootCoord)
return nil
}, retry.AttemptAlways())
}()
}

func (s *Server) initDataCoord() (err error) {
func (s *Server) initDataCoord() {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to dataCoord...")
s.dataCoord, err = dcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}
go func() {
retry.Do(s.ctx, func() error {
log.Info("StreamingNode connect to dataCoord...")
dataCoord, err := dcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}

log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}
return nil
log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}
log.Info("StreamingNode wait for DataCoord ready")
s.dataCoord.Set(dataCoord)
return nil
}, retry.AttemptAlways())
}()
}

func (s *Server) initChunkManager() (err error) {
Expand Down
9 changes: 5 additions & 4 deletions internal/streamingnode/server/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// ServerBuilder is used to build a server.
// All component should be initialized before server initialization should be added here.
type ServerBuilder struct {
etcdClient *clientv3.Client
grpcServer *grpc.Server
rc types.RootCoordClient
dc types.DataCoordClient
rc *syncutil.Future[types.RootCoordClient]
dc *syncutil.Future[types.DataCoordClient]
session *sessionutil.Session
kv kv.MetaKv
chunkManager storage.ChunkManager
Expand Down Expand Up @@ -49,13 +50,13 @@ func (b *ServerBuilder) WithGRPCServer(svr *grpc.Server) *ServerBuilder {
}

// WithRootCoordClient sets root coord client to the server builder.
func (b *ServerBuilder) WithRootCoordClient(rc types.RootCoordClient) *ServerBuilder {
func (b *ServerBuilder) WithRootCoordClient(rc *syncutil.Future[types.RootCoordClient]) *ServerBuilder {
b.rc = rc
return b
}

// WithDataCoordClient sets data coord client to the server builder.
func (b *ServerBuilder) WithDataCoordClient(dc types.DataCoordClient) *ServerBuilder {
func (b *ServerBuilder) WithDataCoordClient(dc *syncutil.Future[types.DataCoordClient]) *ServerBuilder {
b.dc = dc
return b
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,17 @@ func (c *channelLifetime) Run() error {
// Get recovery info from datacoord.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
resp, err := resource.Resource().DataCoordClient().
GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{Vchannel: c.vchannel})

pipelineParams, err := c.f.getPipelineParams(ctx)
if err != nil {
return err
}

dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get DataCoordClient")
}
resp, err := dc.GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{Vchannel: c.vchannel})
if err = merr.CheckRPCCall(resp, err); err != nil {
return err
}
Expand Down Expand Up @@ -115,7 +124,7 @@ func (c *channelLifetime) Run() error {
}

// Build and add pipeline.
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, pipelineParams,
// TODO fubang add the db properties
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
if err != nil || t == nil {
Expand Down
93 changes: 61 additions & 32 deletions internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package flusherimpl

import (
"context"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -34,56 +33,55 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var _ flusher.Flusher = (*flusherImpl)(nil)

type flusherImpl struct {
broker broker.Broker
fgMgr pipeline.FlowgraphManager
syncMgr syncmgr.SyncManager
wbMgr writebuffer.BufferManager
cpUpdater *util.ChannelCheckpointUpdater
fgMgr pipeline.FlowgraphManager
wbMgr writebuffer.BufferManager
syncMgr syncmgr.SyncManager
cpUpdater *syncutil.Future[*util.ChannelCheckpointUpdater]
chunkManager storage.ChunkManager

channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]

notifyCh chan struct{}
stopChan lifetime.SafeChan
stopWg sync.WaitGroup
pipelineParams *util.PipelineParams
notifyCh chan struct{}
notifier *syncutil.AsyncTaskNotifier[struct{}]
}

func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
params := getPipelineParams(chunkManager)
return newFlusherWithParam(params)
}

func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
syncMgr := syncmgr.NewSyncManager(chunkManager)
wbMgr := writebuffer.NewManager(syncMgr)
return &flusherImpl{
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
fgMgr: pipeline.NewFlowgraphManager(),
wbMgr: wbMgr,
syncMgr: syncMgr,
cpUpdater: syncutil.NewFuture[*util.ChannelCheckpointUpdater](),
chunkManager: chunkManager,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
pipelineParams: params,
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
}
}

func (f *flusherImpl) RegisterPChannel(pchannel string, wal wal.WAL) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := resource.Resource().RootCoordClient().GetPChannelInfo(ctx, &rootcoordpb.GetPChannelInfoRequest{
rc, err := resource.Resource().RootCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get RootCoordClient")
}
resp, err := rc.GetPChannelInfo(ctx, &rootcoordpb.GetPChannelInfoRequest{
Pchannel: pchannel,
})
if err = merr.CheckRPCCall(resp, err); err != nil {
Expand Down Expand Up @@ -126,11 +124,18 @@ func (f *flusherImpl) notify() {
}

func (f *flusherImpl) Start() {
f.stopWg.Add(1)
f.wbMgr.Start()
go f.cpUpdater.Start()
go func() {
defer f.stopWg.Done()
defer f.notifier.Finish(struct{}{})
dc, err := resource.Resource().DataCoordClient().GetWithContext(f.notifier.Context())
if err != nil {
return
}
broker := broker.NewCoordBroker(dc, paramtable.GetNodeID())
cpUpdater := util.NewChannelCheckpointUpdater(broker)
go cpUpdater.Start()
f.cpUpdater.Set(cpUpdater)

backoff := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Backoff: typeutil.BackoffConfig{
Expand All @@ -143,7 +148,7 @@ func (f *flusherImpl) Start() {
var nextTimer <-chan time.Time
for {
select {
case <-f.stopChan.CloseCh():
case <-f.notifier.Context().Done():
log.Info("flusher exited")
return
case <-f.notifyCh:
Expand Down Expand Up @@ -190,13 +195,37 @@ func (f *flusherImpl) handle(backoff *typeutil.BackoffTimer) <-chan time.Time {
}

func (f *flusherImpl) Stop() {
f.stopChan.Close()
f.stopWg.Wait()
f.notifier.Cancel()
f.notifier.BlockUntilFinish()
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
lifetime.Cancel()
return true
})
f.fgMgr.ClearFlowgraphs()
f.wbMgr.Stop()
f.cpUpdater.Close()
if f.cpUpdater.Ready() {
f.cpUpdater.Get().Close()
}
}

func (f *flusherImpl) getPipelineParams(ctx context.Context) (*util.PipelineParams, error) {
dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return nil, err
}

cpUpdater, err := f.cpUpdater.GetWithContext(ctx)
if err != nil {
return nil, err
}
return &util.PipelineParams{
Ctx: context.Background(),
Broker: broker.NewCoordBroker(dc, paramtable.GetNodeID()),
SyncMgr: f.syncMgr,
ChunkManager: f.chunkManager,
WriteBufferManager: f.wbMgr,
CheckpointUpdater: cpUpdater,
Allocator: idalloc.NewMAllocator(resource.Resource().IDAllocator()),
MsgHandler: newMsgHandler(f.wbMgr),
}, nil
}
Loading

0 comments on commit 3f3801b

Please sign in to comment.