Skip to content

Commit

Permalink
fix: fix unittest and make cpUpdater close after flowgraph
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 13, 2024
1 parent ae1005a commit a321eb1
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 82 deletions.
2 changes: 1 addition & 1 deletion internal/distributed/streamingnode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (s *Server) initDataCoord(ctx context.Context) {
log.Info("StreamingNode wait for DataCoord ready")
s.dataCoord.Set(dataCoord)
return nil
})
}, retry.AttemptAlways())
}()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *channelLifetime) Run() error {
// Get recovery info from datacoord.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

pipelineParams, err := c.f.getPipelineParams(ctx)
if err != nil {
return err
}

dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return errors.Wrap(err, "At Get DataCoordClient")
Expand Down Expand Up @@ -118,7 +124,7 @@ func (c *channelLifetime) Run() error {
}

// Build and add pipeline.
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, pipelineParams,
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
if err != nil || t == nil {
return
Expand Down
59 changes: 41 additions & 18 deletions internal/streamingnode/server/flusher/flusherimpl/flusher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource/idalloc"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand All @@ -46,32 +47,30 @@ import (
var _ flusher.Flusher = (*flusherImpl)(nil)

type flusherImpl struct {
fgMgr pipeline.FlowgraphManager
syncMgr syncmgr.SyncManager
wbMgr writebuffer.BufferManager
fgMgr pipeline.FlowgraphManager
wbMgr writebuffer.BufferManager
syncMgr syncmgr.SyncManager
cpUpdater *syncutil.Future[*util.ChannelCheckpointUpdater]
chunkManager storage.ChunkManager

channelLifetimes *typeutil.ConcurrentMap[string, ChannelLifetime]

notifyCh chan struct{}
notifier *syncutil.AsyncTaskNotifier[struct{}]
pipelineParams *util.PipelineParams
notifyCh chan struct{}
notifier *syncutil.AsyncTaskNotifier[struct{}]
}

func NewFlusher(chunkManager storage.ChunkManager) flusher.Flusher {
params := getPipelineParams(chunkManager)
return newFlusherWithParam(params)
}

func newFlusherWithParam(params *util.PipelineParams) flusher.Flusher {
fgMgr := pipeline.NewFlowgraphManager()
syncMgr := syncmgr.NewSyncManager(chunkManager)
wbMgr := writebuffer.NewManager(syncMgr)
return &flusherImpl{
fgMgr: fgMgr,
syncMgr: params.SyncMgr,
wbMgr: params.WriteBufferManager,
fgMgr: pipeline.NewFlowgraphManager(),
wbMgr: wbMgr,
syncMgr: syncMgr,
cpUpdater: syncutil.NewFuture[*util.ChannelCheckpointUpdater](),
chunkManager: chunkManager,
channelLifetimes: typeutil.NewConcurrentMap[string, ChannelLifetime](),
notifyCh: make(chan struct{}, 1),
notifier: syncutil.NewAsyncTaskNotifier[struct{}](),
pipelineParams: params,
}
}

Expand Down Expand Up @@ -135,8 +134,7 @@ func (f *flusherImpl) Start() {
broker := broker.NewCoordBroker(dc, paramtable.GetNodeID())
cpUpdater := util.NewChannelCheckpointUpdater(broker)
go cpUpdater.Start()
// When the flusher exits, the cpUpdater should be closed.
defer cpUpdater.Close()
f.cpUpdater.Set(cpUpdater)

backoff := typeutil.NewBackoffTimer(typeutil.BackoffTimerConfig{
Default: 5 * time.Second,
Expand Down Expand Up @@ -205,4 +203,29 @@ func (f *flusherImpl) Stop() {
})
f.fgMgr.ClearFlowgraphs()
f.wbMgr.Stop()
if f.cpUpdater.Ready() {
f.cpUpdater.Get().Close()
}
}

func (f *flusherImpl) getPipelineParams(ctx context.Context) (*util.PipelineParams, error) {
dc, err := resource.Resource().DataCoordClient().GetWithContext(ctx)
if err != nil {
return nil, err
}

cpUpdater, err := f.cpUpdater.GetWithContext(ctx)
if err != nil {
return nil, err
}
return &util.PipelineParams{
Ctx: context.Background(),
Broker: broker.NewCoordBroker(dc, paramtable.GetNodeID()),
SyncMgr: f.syncMgr,
ChunkManager: f.chunkManager,
WriteBufferManager: f.wbMgr,
CheckpointUpdater: cpUpdater,
Allocator: idalloc.NewMAllocator(resource.Resource().IDAllocator()),
MsgHandler: newMsgHandler(f.wbMgr),
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/mocks/streamingnode/server/mock_wal"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -108,22 +106,8 @@ func newMockWAL(t *testing.T, vchannels []string, maybe bool) *mock_wal.MockWAL
}

func newTestFlusher(t *testing.T, maybe bool) flusher.Flusher {
wbMgr := writebuffer.NewMockBufferManager(t)
register := wbMgr.EXPECT().Register(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
removeChannel := wbMgr.EXPECT().RemoveChannel(mock.Anything).Return()
start := wbMgr.EXPECT().Start().Return()
stop := wbMgr.EXPECT().Stop().Return()
if maybe {
register.Maybe()
removeChannel.Maybe()
start.Maybe()
stop.Maybe()
}
m := mocks.NewChunkManager(t)
params := getPipelineParams(m)
params.SyncMgr = syncmgr.NewMockSyncManager(t)
params.WriteBufferManager = wbMgr
return newFlusherWithParam(params)
return NewFlusher(m)
}

func TestFlusher_RegisterPChannel(t *testing.T) {
Expand Down

This file was deleted.

0 comments on commit a321eb1

Please sign in to comment.