Skip to content

Commit

Permalink
feat: add event watch task logics
Browse files Browse the repository at this point in the history
--story=121104198
  • Loading branch information
wcy00000000000000 committed Dec 19, 2024
1 parent d39b8ad commit fb25cef
Show file tree
Hide file tree
Showing 10 changed files with 981 additions and 88 deletions.
6 changes: 6 additions & 0 deletions src/storage/dal/mongo/local/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ func (c *Mongo) GetDBClient() *mongo.Client {
func (c *Mongo) GetDBName() string {
return c.cli.DBName()
}

// GetMongoClient get mongo client
func (c *Mongo) GetMongoClient() *MongoClient {
return c.cli
}

func (c *Mongo) redirectTable(tableName string) string {
if common.IsObjectInstShardingTable(tableName) {
tableName = common.BKTableNameBaseInst
Expand Down
2 changes: 1 addition & 1 deletion src/storage/stream/event/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (e *Event) List(ctx context.Context, opts *types.ListOptions) (ch chan *typ
eventChan := make(chan *types.Event, types.DefaultEventChanSize)

go func() {
e.lister(ctx, false, listOpts, eventChan)
e.lister(ctx, opts.WithRetry, listOpts, eventChan)
}()

return eventChan, nil
Expand Down
2 changes: 2 additions & 0 deletions src/storage/stream/event/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStream
if *opts.MajorityCommitted {
major := options.UpdateLookup
streamOptions.FullDocument = &major
preImage := options.WhenAvailable
streamOptions.FullDocumentBeforeChange = &preImage
} else {
def := options.Default
streamOptions.FullDocument = &def
Expand Down
156 changes: 82 additions & 74 deletions src/storage/stream/loop/loop_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,17 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error {
return err
}

startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions)
if err != nil {
blog.Errorf("%s job, run loop watch %s, but get start token failed, err: %v", opts.Name, lw.streamWatch.DBName,
err)
return err
}

// update the start token.
if len(startToken) != 0 {
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken}
}
watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken

var cancel func()
var cancelCtx context.Context
cancelCtx, cancel = context.WithCancel(context.Background())

watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt)
watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt)
if err != nil {
blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err)
cancel()
Expand All @@ -88,30 +82,43 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error {
return nil
}

func (lw *LoopsWatch) updateStartTokenInfo(opts *types.LoopOptions) (*types.WatchOptions, error) {
startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
if err != nil {
blog.Errorf("%s job, loop watch db %s, but get start watch token failed, err: %v", opts.Name,
lw.streamWatch.DBName, err)
return nil, err
}

// update the start token.
if len(startToken.Token) != 0 {
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken.Token}
}
if startToken.StartAtTime != nil {
opts.WatchOpt.StartAtTime = startToken.StartAtTime
}

return opts.WatchOpt, nil
}

// WithBatch allows users to watch events with batch.
func (lw *LoopsWatch) WithBatch(opts *types.LoopBatchOptions) error {
if err := opts.Validate(); err != nil {
blog.Errorf("run loop watch batch, but option is invalid, err: %v", err)
return err
}

startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background())
watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions)
if err != nil {
blog.Errorf("%s job, run loop watch batch %s, but get start token failed, err: %v", opts.Name,
lw.streamWatch.DBName, err)
return err
}

// update the start token.
if len(startToken) != 0 {
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken}
}
watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken

var cancel func()
var cancelCtx context.Context
cancelCtx, cancel = context.WithCancel(context.Background())

watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt)
watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt)
if err != nil {
blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err)
cancel()
Expand Down Expand Up @@ -167,23 +174,16 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc,
cancel()

// use the last token to resume so that we can start again from where we stopped.
lastToken, err := opts.TokenHandler.GetStartWatchToken(ctx)
watchOpt, err := lw.updateStartTokenInfo(opts)
if err != nil {
blog.Errorf("%s job, run loop watch, but get last event token failed, err: %v", opts.Name, err)
// notify retry signal, exit loop
close(retrySignal)
continue
}
opts.WatchOpt = watchOpt

blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %s.", opts.Name,
lw.streamWatch.DBName, lastToken)

// set start after token if needed.
if len(lastToken) != 0 {
// we have already received the new event and handle it success,
// so we need to use this token. otherwise, we should still use the w.watchOpt.StartAfterToken
opts.WatchOpt.StartAfterToken = &types.EventToken{Data: lastToken}
}
blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %+v.", opts.Name,
lw.streamWatch.DBName, watchOpt.StartAfterToken)

var cancelCtx context.Context
cancelCtx, cancel = context.WithCancel(ctx)
Expand All @@ -200,7 +200,8 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc,
// start handle loop jobs
go doHandler(cancelCtx, watcher, retrySignal)

blog.Warnf("%s job, retry loop %s from token: %s success.", opts.Name, lw.streamWatch.DBName, lastToken)
blog.Warnf("%s job, retry loop %s from token: %+v success.", opts.Name, lw.streamWatch.DBName,
watchOpt.StartAfterToken)
}
}
}
Expand All @@ -220,17 +221,14 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
}

for {

reWatch, loop := observer.canLoop()
if reWatch {
// stop the tick to release resource.
ticker.Stop()
blog.Warnf("%s job, master status has changed, try to re-watch again, collection:%s", opts.Name,
blog.Warnf("%s job, master status has changed, try to re-watch again, db:%s", opts.Name,
lw.streamWatch.DBName)

// trigger re-watch action now.
close(retrySignal)

// exit the for loop
return
}
Expand All @@ -248,14 +246,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
case <-ctxWithCancel.Done():
// stop the tick to release resource.
ticker.Stop()

blog.Warnf("%s job, received cancel loop watch %s signal, exit loop.", opts.Name, lw.streamWatch.DBName)
// exist the goroutine
return

case one := <-watcher.EventChan:
batchEvents = append(batchEvents, one)

if blog.V(4) {
blog.Infof("%s job, received %s event, detail: %s, op-time: %s, rid: %s", opts.Name,
lw.streamWatch.DBName, one.String(), one.ClusterTime.String(), one.ID())
Expand All @@ -266,14 +262,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
// continue to get more events
continue
}

case <-ticker.C:
// handle with batch event.
if len(batchEvents) == 0 {
// ticks, but no events received, loop next round to get events.
continue
}

case <-opts.StopNotifier:
ticker.Stop()
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
Expand All @@ -284,50 +278,62 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context,
break
}

// for safety guarantee
if len(batchEvents) == 0 {
continue
if lw.handleBatchEvents(ctxWithCancel, batchEvents, opts, retryObserver, retrySignal) {
return
}
}
}

first := batchEvents[0]
// handleBatchEvents handle batch events, returns if the loop watch needs retry
func (lw *LoopsWatch) handleBatchEvents(ctx context.Context, batchEvents []*types.Event, opts *types.LoopBatchOptions,
retryObserver *retryHandler, retrySignal chan struct{}) bool {

blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName,
len(batchEvents), first.ClusterTime.String(), first.ID())
// for safety guarantee
if len(batchEvents) == 0 {
return false
}

retry := opts.EventHandler.DoBatch(batchEvents)
if retry {
first := batchEvents[0]

if retryObserver.canStillRetry() {
blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name,
lw.streamWatch.DBName, len(batchEvents), first.ID())
// an error occurred, we need to retry it later.
// tell the schedule to re-watch again.
close(retrySignal)
// exist this goroutine.
return
}
blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName,
len(batchEvents), first.ClusterTime.String(), first.ID())

blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name,
lw.streamWatch.DBName, first.ID())
// save the event token now.
retry := opts.EventHandler.DoBatch(batchEvents)
if retry {
if retryObserver.canStillRetry() {
blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name,
lw.streamWatch.DBName, len(batchEvents), first.ID())
// an error occurred, we need to retry it later.
// tell the schedule to re-watch again.
close(retrySignal)
// exit this goroutine.
return true
}

// reset retry counter so that the previous retry count will not affect the next event
retryObserver.resetRetryCounter()
blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name,
lw.streamWatch.DBName, first.ID())
// save the event token now.
}

last := batchEvents[len(batchEvents)-1]
// update the last watched token for resume usage.
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, last.Token.Data); err != nil {
blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.",
opts.Name, lw.streamWatch.DBName, err, first.ID())
// reset retry counter so that the previous retry count will not affect the next event
retryObserver.resetRetryCounter()

// retry later.
close(retrySignal)
// exist this goroutine
return
}
last := batchEvents[len(batchEvents)-1]
// update the last watched token for resume usage.
lastToken := &types.TokenInfo{
Token: last.Token.Data,
StartAtTime: &last.ClusterTime,
}

if err := opts.TokenHandler.SetLastWatchToken(ctx, lastToken); err != nil {
blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.",
opts.Name, lw.streamWatch.DBName, err, first.ID())

// retry later.
close(retrySignal)
// exit this goroutine
return true
}
return false
}

// tryLoopWithOne try handle event one by one
Expand All @@ -348,11 +354,9 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context,
blog.Warnf("%s job, received cancel loop watch %s signal, exit loop, exit loop", opts.Name,
lw.streamWatch.DBName)
return

case <-opts.StopNotifier:
blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name)
return

default:
}

Expand Down Expand Up @@ -398,7 +402,11 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context,
retryObserver.resetRetryCounter()

// update the last watched token for resume usage.
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, one.Token.Data); err != nil {
lastToken := &types.TokenInfo{
Token: one.Token.Data,
StartAtTime: &one.ClusterTime,
}
if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, lastToken); err != nil {
blog.Errorf("%s job, loop watch %s event, but set last watched token failed, err: %v, rid: %s, "+
"retry later.", lw.streamWatch.DBName, err, one.ID())

Expand Down
Loading

0 comments on commit fb25cef

Please sign in to comment.