Skip to content

Commit

Permalink
enhance: use lazy initializing client for streaming node
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 17, 2024
1 parent 2afe2ea commit 12c59b4
Show file tree
Hide file tree
Showing 23 changed files with 252 additions and 115 deletions.
93 changes: 57 additions & 36 deletions internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/netutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/tikv"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -83,8 +85,8 @@ type Server struct {
// component client
etcdCli *clientv3.Client
tikvCli *txnkv.Client
rootCoord types.RootCoordClient
dataCoord types.DataCoordClient
rootCoord *syncutil.Future[types.RootCoordClient]
dataCoord *syncutil.Future[types.DataCoordClient]
chunkManager storage.ChunkManager
componentState *componentutil.ComponentStateService
}
Expand All @@ -95,6 +97,8 @@ func NewServer(ctx context.Context, f dependency.Factory) (*Server, error) {
return &Server{
stopOnce: sync.Once{},
factory: f,
dataCoord: syncutil.NewFuture[types.DataCoordClient](),
rootCoord: syncutil.NewFuture[types.RootCoordClient](),
grpcServerChan: make(chan struct{}),
componentState: componentutil.NewComponentStateService(typeutil.StreamingNodeRole),
ctx: ctx1,
Expand Down Expand Up @@ -166,8 +170,17 @@ func (s *Server) stop() {

// Stop rootCoord client.
log.Info("streamingnode stop rootCoord client...")
if err := s.rootCoord.Close(); err != nil {
log.Warn("streamingnode stop rootCoord client failed", zap.Error(err))
if s.rootCoord.Ready() {
if err := s.rootCoord.Get().Close(); err != nil {
log.Warn("streamingnode stop rootCoord client failed", zap.Error(err))
}
}

log.Info("streamingnode stop dataCoord client...")
if s.dataCoord.Ready() {
if err := s.dataCoord.Get().Close(); err != nil {
log.Warn("streamingnode stop dataCoord client failed", zap.Error(err))
}
}

// Stop tikv
Expand Down Expand Up @@ -216,12 +229,8 @@ func (s *Server) init() (err error) {
if err := s.initSession(); err != nil {
return err
}
if err := s.initRootCoord(); err != nil {
return err
}
if err := s.initDataCoord(); err != nil {
return err
}
s.initRootCoord()
s.initDataCoord()
s.initGRPCServer()

// Create StreamingNode service.
Expand Down Expand Up @@ -300,36 +309,48 @@ func (s *Server) initMeta() error {
return nil
}

func (s *Server) initRootCoord() (err error) {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to rootCoord...")
s.rootCoord, err = rcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}
func (s *Server) initRootCoord(ctx context.Context) {
go func() {
retry.Do(ctx, func() error {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to rootCoord...")
rootCoord, err := rcc.NewClient(ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}

log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}
return nil
log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}
log.Info("StreamingNode wait for RootCoord done")
s.rootCoord.Set(rootCoord)
return nil
}, retry.AttemptAlways())
}()
}

func (s *Server) initDataCoord() (err error) {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to dataCoord...")
s.dataCoord, err = dcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}
func (s *Server) initDataCoord(ctx context.Context) {
go func() {
retry.Do(ctx, func() error {
log := log.Ctx(s.ctx)
log.Info("StreamingNode connect to dataCoord...")
dataCoord, err := dcc.NewClient(ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}

log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}
return nil
log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}
log.Info("StreamingNode wait for DataCoord ready")
s.dataCoord.Set(dataCoord)
return nil
})
}()
}

func (s *Server) initChunkManager() (err error) {
Expand Down
9 changes: 5 additions & 4 deletions internal/streamingnode/server/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

// ServerBuilder is used to build a server.
// All component should be initialized before server initialization should be added here.
type ServerBuilder struct {
etcdClient *clientv3.Client
grpcServer *grpc.Server
rc types.RootCoordClient
dc types.DataCoordClient
rc *syncutil.Future[types.RootCoordClient]
dc *syncutil.Future[types.DataCoordClient]
session *sessionutil.Session
kv kv.MetaKv
chunkManager storage.ChunkManager
Expand Down Expand Up @@ -49,13 +50,13 @@ func (b *ServerBuilder) WithGRPCServer(svr *grpc.Server) *ServerBuilder {
}

// WithRootCoordClient sets root coord client to the server builder.
func (b *ServerBuilder) WithRootCoordClient(rc types.RootCoordClient) *ServerBuilder {
func (b *ServerBuilder) WithRootCoordClient(rc *syncutil.Future[types.RootCoordClient]) *ServerBuilder {
b.rc = rc
return b
}

// WithDataCoordClient sets data coord client to the server builder.
func (b *ServerBuilder) WithDataCoordClient(dc types.DataCoordClient) *ServerBuilder {
func (b *ServerBuilder) WithDataCoordClient(dc *syncutil.Future[types.DataCoordClient]) *ServerBuilder {
b.dc = dc
return b
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,11 @@ func (c *channelLifetime) Run() error {
// Get recovery info from datacoord.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
resp, err := resource.Resource().DataCoordClient().
GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{Vchannel: c.vchannel})
dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get DataCoordClient")
}
resp, err := dc.GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{Vchannel: c.vchannel})
if err = merr.CheckRPCCall(resp, err); err != nil {
return err
}
Expand Down
46 changes: 26 additions & 20 deletions internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package flusherimpl

import (
"context"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -38,25 +37,23 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var _ flusher.Flusher = (*flusherImpl)(nil)

type flusherImpl struct {
broker broker.Broker
fgMgr pipeline.FlowgraphManager
syncMgr syncmgr.SyncManager
wbMgr writebuffer.BufferManager
cpUpdater *util.ChannelCheckpointUpdater
fgMgr pipeline.FlowgraphManager
syncMgr syncmgr.SyncManager
wbMgr writebuffer.BufferManager

channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]

notifyCh chan struct{}
stopChan lifetime.SafeChan
stopWg sync.WaitGroup
notifier *syncutil.AsyncTaskNotifier[struct{}]
pipelineParams *util.PipelineParams
}

Expand All @@ -68,22 +65,24 @@ func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
return &flusherImpl{
broker: params.Broker,
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
cpUpdater: params.CheckpointUpdater,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
stopChan: lifetime.NewSafeChan(),
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
pipelineParams: params,
}
}

func (f *flusherImpl) RegisterPChannel(pchannel string, wal wal.WAL) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := resource.Resource().RootCoordClient().GetPChannelInfo(ctx, &rootcoordpb.GetPChannelInfoRequest{
rc, err := resource.Resource().RootCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get RootCoordClient")
}
resp, err := rc.GetPChannelInfo(ctx, &rootcoordpb.GetPChannelInfoRequest{
Pchannel: pchannel,
})
if err = merr.CheckRPCCall(resp, err); err != nil {
Expand Down Expand Up @@ -126,11 +125,19 @@ func (f *flusherImpl) notify() {
}

func (f *flusherImpl) Start() {
f.stopWg.Add(1)
f.wbMgr.Start()
go f.cpUpdater.Start()
go func() {
defer f.stopWg.Done()
defer f.notifier.Finish(struct{}{})
dc, err := resource.Resource().DataCoordClient().GetWithContext(f.notifier.Context())
if err != nil {
return
}
broker := broker.NewCoordBroker(dc, paramtable.GetNodeID())
cpUpdater := util.NewChannelCheckpointUpdater(broker)
go cpUpdater.Start()
// When the flusher exits, the cpUpdater should be closed.
defer cpUpdater.Close()

backoff := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Backoff: typeutil.BackoffConfig{
Expand All @@ -143,7 +150,7 @@ func (f *flusherImpl) Start() {
var nextTimer <-chan time.Time
for {
select {
case <-f.stopChan.CloseCh():
case <-f.notifier.Context().Done():
log.Info("flusher exited")
return
case <-f.notifyCh:
Expand Down Expand Up @@ -190,13 +197,12 @@ func (f *flusherImpl) handle(backoff *typeutil.BackoffTimer) <-chan time.Time {
}

func (f *flusherImpl) Stop() {
f.stopChan.Close()
f.stopWg.Wait()
f.notifier.Cancel()
f.notifier.BlockUntilFinish()
f.channelLifetimes.Range(func(vchannel string, lifetime ChannelLifetime) bool {
lifetime.Cancel()
return true
})
f.fgMgr.ClearFlowgraphs()
f.wbMgr.Stop()
f.cpUpdater.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

func init() {
Expand Down Expand Up @@ -146,10 +148,16 @@ func TestFlusher_RegisterPChannel(t *testing.T) {
rootcoord.EXPECT().GetPChannelInfo(mock.Anything, mock.Anything).
Return(&rootcoordpb.GetPChannelInfoResponse{Collections: collectionsInfo}, nil)
datacoord := newMockDatacoord(t, maybe)

fDatacoord := syncutil.NewFuture[types.DataCoordClient]()
fDatacoord.Set(datacoord)

fRootcoord := syncutil.NewFuture[types.RootCoordClient]()
fRootcoord.Set(rootcoord)
resource.InitForTest(
t,
resource.OptRootCoordClient(rootcoord),
resource.OptDataCoordClient(datacoord),
resource.OptRootCoordClient(fRootcoord),
resource.OptDataCoordClient(fDatacoord),
)

f := newTestFlusher(t, maybe)
Expand Down Expand Up @@ -182,9 +190,11 @@ func TestFlusher_RegisterVChannel(t *testing.T) {
}

datacoord := newMockDatacoord(t, maybe)
fDatacoord := syncutil.NewFuture[types.DataCoordClient]()
fDatacoord.Set(datacoord)
resource.InitForTest(
t,
resource.OptDataCoordClient(datacoord),
resource.OptDataCoordClient(fDatacoord),
)

f := newTestFlusher(t, maybe)
Expand Down Expand Up @@ -220,9 +230,11 @@ func TestFlusher_Concurrency(t *testing.T) {
}

datacoord := newMockDatacoord(t, maybe)
fDatacoord := syncutil.NewFuture[types.DataCoordClient]()
fDatacoord.Set(datacoord)
resource.InitForTest(
t,
resource.OptDataCoordClient(datacoord),
resource.OptDataCoordClient(fDatacoord),
)

f := newTestFlusher(t, maybe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,26 @@ package flusherimpl
import (
"context"

"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// getPipelineParams initializes the pipeline parameters.
func getPipelineParams(chunkManager storage.ChunkManager) *util.PipelineParams {
var (
rsc = resource.Resource()
syncMgr = syncmgr.NewSyncManager(chunkManager)
wbMgr = writebuffer.NewManager(syncMgr)
coordBroker = broker.NewCoordBroker(rsc.DataCoordClient(), paramtable.GetNodeID())
cpUpdater = util.NewChannelCheckpointUpdater(coordBroker)
rsc = resource.Resource()
syncMgr = syncmgr.NewSyncManager(chunkManager)
wbMgr = writebuffer.NewManager(syncMgr)
)
return &util.PipelineParams{
Ctx: context.Background(),
Broker: coordBroker,
SyncMgr: syncMgr,
ChunkManager: chunkManager,
WriteBufferManager: wbMgr,
CheckpointUpdater: cpUpdater,
Allocator: idalloc.NewMAllocator(rsc.IDAllocator()),
MsgHandler: newMsgHandler(wbMgr),
}
Expand Down
Loading

0 comments on commit 12c59b4

Please sign in to comment.