Skip to content

Commit

Permalink
fix: fix unittest and make cpUpdater close after flowgraph
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 16, 2024
1 parent e775ede commit 0dc6628
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 92 deletions.
22 changes: 11 additions & 11 deletions internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,18 @@ func (s *Server) initMeta() error {
return nil
}

func (s *Server) initRootCoord(ctx context.Context) {
func (s *Server) initRootCoord() {
log := log.Ctx(s.ctx)
go func() {
retry.Do(ctx, func() error {
log := log.Ctx(s.ctx)
retry.Do(s.ctx, func() error {
log.Info("StreamingNode connect to rootCoord...")
rootCoord, err := rcc.NewClient(ctx)
rootCoord, err := rcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new RootCoord client failed")
}

Check warning on line 320 in internal/distributed/streamingnode/service.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streamingnode/service.go#L319-L320

Added lines #L319 - L320 were not covered by tests

log.Info("StreamingNode try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
err = componentutil.WaitForComponentHealthy(s.ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for RootCoord ready failed")
}

Check warning on line 326 in internal/distributed/streamingnode/service.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streamingnode/service.go#L325-L326

Added lines #L325 - L326 were not covered by tests
Expand All @@ -331,25 +331,25 @@ func (s *Server) initRootCoord(ctx context.Context) {
}()
}

func (s *Server) initDataCoord(ctx context.Context) {
func (s *Server) initDataCoord() {
log := log.Ctx(s.ctx)
go func() {
retry.Do(ctx, func() error {
log := log.Ctx(s.ctx)
retry.Do(s.ctx, func() error {
log.Info("StreamingNode connect to dataCoord...")
dataCoord, err := dcc.NewClient(ctx)
dataCoord, err := dcc.NewClient(s.ctx)
if err != nil {
return errors.Wrap(err, "StreamingNode try to new DataCoord client failed")
}

Check warning on line 342 in internal/distributed/streamingnode/service.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streamingnode/service.go#L341-L342

Added lines #L341 - L342 were not covered by tests

log.Info("StreamingNode try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
err = componentutil.WaitForComponentHealthy(s.ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
return errors.Wrap(err, "StreamingNode wait for DataCoord ready failed")
}

Check warning on line 348 in internal/distributed/streamingnode/service.go

View check run for this annotation

Codecov / codecov/patch

internal/distributed/streamingnode/service.go#L347-L348

Added lines #L347 - L348 were not covered by tests
log.Info("StreamingNode wait for DataCoord ready")
s.dataCoord.Set(dataCoord)
return nil
})
}, retry.AttemptAlways())
}()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *channelLifetime) Run() error {
// Get recovery info from datacoord.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

pipelineParams, err := c.f.getPipelineParams(ctx)
if err != nil {
return err
}

Check warning on line 93 in internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go#L92-L93

Added lines #L92 - L93 were not covered by tests

dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get DataCoordClient")
Expand Down Expand Up @@ -118,7 +124,7 @@ func (c *channelLifetime) Run() error {
}

// Build and add pipeline.
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, pipelineParams,
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
if err != nil || t == nil {
return
Expand Down
59 changes: 41 additions & 18 deletions internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand All @@ -46,32 +47,30 @@ import (
var _ flusher.Flusher = (*flusherImpl)(nil)

type flusherImpl struct {
fgMgr pipeline.FlowgraphManager
syncMgr syncmgr.SyncManager
wbMgr writebuffer.BufferManager
fgMgr pipeline.FlowgraphManager
wbMgr writebuffer.BufferManager
syncMgr syncmgr.SyncManager
cpUpdater *syncutil.Future[*util.ChannelCheckpointUpdater]
chunkManager storage.ChunkManager

channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]

notifyCh chan struct{}
notifier *syncutil.AsyncTaskNotifier[struct{}]
pipelineParams *util.PipelineParams
notifyCh chan struct{}
notifier *syncutil.AsyncTaskNotifier[struct{}]
}

func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
params := getPipelineParams(chunkManager)
return newFlusherWithParam(params)
}

func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
syncMgr := syncmgr.NewSyncManager(chunkManager)
wbMgr := writebuffer.NewManager(syncMgr)
return &flusherImpl{
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
fgMgr: pipeline.NewFlowgraphManager(),
wbMgr: wbMgr,
syncMgr: syncMgr,
cpUpdater: syncutil.NewFuture[*util.ChannelCheckpointUpdater](),
chunkManager: chunkManager,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
pipelineParams: params,
}
}

Expand Down Expand Up @@ -135,8 +134,7 @@ func (f *flusherImpl) Start() {
broker := broker.NewCoordBroker(dc, paramtable.GetNodeID())
cpUpdater := util.NewChannelCheckpointUpdater(broker)
go cpUpdater.Start()
// When the flusher exits, the cpUpdater should be closed.
defer cpUpdater.Close()
f.cpUpdater.Set(cpUpdater)

backoff := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Expand Down Expand Up @@ -205,4 +203,29 @@ func (f *flusherImpl) Stop() {
})
f.fgMgr.ClearFlowgraphs()
f.wbMgr.Stop()
if f.cpUpdater.Ready() {
f.cpUpdater.Get().Close()
}
}

func (f *flusherImpl) getPipelineParams(ctx context.Context) (*util.PipelineParams, error) {
dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return nil, err
}

Check warning on line 215 in internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go#L214-L215

Added lines #L214 - L215 were not covered by tests

cpUpdater, err := f.cpUpdater.GetWithContext(ctx)
if err != nil {
return nil, err
}

Check warning on line 220 in internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go#L219-L220

Added lines #L219 - L220 were not covered by tests
return &util.PipelineParams{
Ctx: context.Background(),
Broker: broker.NewCoordBroker(dc, paramtable.GetNodeID()),
SyncMgr: f.syncMgr,
ChunkManager: f.chunkManager,
WriteBufferManager: f.wbMgr,
CheckpointUpdater: cpUpdater,
Allocator: idalloc.NewMAllocator(resource.Resource().IDAllocator()),
MsgHandler: newMsgHandler(f.wbMgr),
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -108,22 +106,8 @@ func newMockWAL(t *testing.T, vchannels []string, maybe bool) *mock_wal.MockWAL
}

func newTestFlusher(t *testing.T, maybe bool) flusher.Flusher {
wbMgr := writebuffer.NewMockBufferManager(t)
register := wbMgr.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
removeChannel := wbMgr.EXPECT().RemoveChannel(mock.Anything).Return()
start := wbMgr.EXPECT().Start().Return()
stop := wbMgr.EXPECT().Stop().Return()
if maybe {
register.Maybe()
removeChannel.Maybe()
start.Maybe()
stop.Maybe()
}
m := mocks.NewChunkManager(t)
params := getPipelineParams(m)
params.SyncMgr = syncmgr.NewMockSyncManager(t)
params.WriteBufferManager = wbMgr
return newFlusherWithParam(params)
return NewFlusher(m)
}

func TestFlusher_RegisterPChannel(t *testing.T) {
Expand Down

This file was deleted.

0 comments on commit 0dc6628

Please sign in to comment.