From fb25cef82f0f19c08005c5646c39bc48922ada6c Mon Sep 17 00:00:00 2001 From: wcy00000000000000 <2269766985@qq.com> Date: Wed, 18 Dec 2024 20:07:34 +0800 Subject: [PATCH] feat: add event watch task logics --story=121104198 --- src/storage/dal/mongo/local/mongo.go | 6 + src/storage/stream/event/list.go | 2 +- src/storage/stream/event/utils.go | 2 + src/storage/stream/loop/loop_watch.go | 156 ++++++------ src/storage/stream/task/task.go | 332 ++++++++++++++++++++++++++ src/storage/stream/task/token.go | 110 +++++++++ src/storage/stream/task/util.go | 59 +++++ src/storage/stream/task/watch_task.go | 197 +++++++++++++++ src/storage/stream/types/task.go | 169 +++++++++++++ src/storage/stream/types/types.go | 36 ++- 10 files changed, 981 insertions(+), 88 deletions(-) create mode 100644 src/storage/stream/task/task.go create mode 100644 src/storage/stream/task/token.go create mode 100644 src/storage/stream/task/util.go create mode 100644 src/storage/stream/task/watch_task.go create mode 100644 src/storage/stream/types/task.go diff --git a/src/storage/dal/mongo/local/mongo.go b/src/storage/dal/mongo/local/mongo.go index 7d95c8f3f4..7572fb8a91 100644 --- a/src/storage/dal/mongo/local/mongo.go +++ b/src/storage/dal/mongo/local/mongo.go @@ -417,6 +417,12 @@ func (c *Mongo) GetDBClient() *mongo.Client { func (c *Mongo) GetDBName() string { return c.cli.DBName() } + +// GetMongoClient get mongo client +func (c *Mongo) GetMongoClient() *MongoClient { + return c.cli +} + func (c *Mongo) redirectTable(tableName string) string { if common.IsObjectInstShardingTable(tableName) { tableName = common.BKTableNameBaseInst diff --git a/src/storage/stream/event/list.go b/src/storage/stream/event/list.go index f6f94133f9..97a8a6481a 100644 --- a/src/storage/stream/event/list.go +++ b/src/storage/stream/event/list.go @@ -58,7 +58,7 @@ func (e *Event) List(ctx context.Context, opts *types.ListOptions) (ch chan *typ eventChan := make(chan *types.Event, types.DefaultEventChanSize) go func() { - e.lister(ctx, false, listOpts, eventChan) + e.lister(ctx, opts.WithRetry, listOpts, eventChan) }() return eventChan, nil diff --git a/src/storage/stream/event/utils.go b/src/storage/stream/event/utils.go index cca6e8fdbd..28b84f84db 100644 --- a/src/storage/stream/event/utils.go +++ b/src/storage/stream/event/utils.go @@ -72,6 +72,8 @@ func generateOptions(opts *types.Options) (mongo.Pipeline, *options.ChangeStream if *opts.MajorityCommitted { major := options.UpdateLookup streamOptions.FullDocument = &major + preImage := options.WhenAvailable + streamOptions.FullDocumentBeforeChange = &preImage } else { def := options.Default streamOptions.FullDocument = &def diff --git a/src/storage/stream/loop/loop_watch.go b/src/storage/stream/loop/loop_watch.go index 1340a5eec4..066ea02f93 100644 --- a/src/storage/stream/loop/loop_watch.go +++ b/src/storage/stream/loop/loop_watch.go @@ -46,23 +46,17 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error { return err } - startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background()) + watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions) if err != nil { - blog.Errorf("%s job, run loop watch %s, but get start token failed, err: %v", opts.Name, lw.streamWatch.DBName, - err) return err } - - // update the start token. - if len(startToken) != 0 { - opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken} - } + watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken var cancel func() var cancelCtx context.Context cancelCtx, cancel = context.WithCancel(context.Background()) - watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt) + watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt) if err != nil { blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err) cancel() @@ -88,6 +82,25 @@ func (lw *LoopsWatch) WithOne(opts *types.LoopOneOptions) error { return nil } +func (lw *LoopsWatch) updateStartTokenInfo(opts *types.LoopOptions) (*types.WatchOptions, error) { + startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background()) + if err != nil { + blog.Errorf("%s job, loop watch db %s, but get start watch token failed, err: %v", opts.Name, + lw.streamWatch.DBName, err) + return nil, err + } + + // update the start token. + if len(startToken.Token) != 0 { + opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken.Token} + } + if startToken.StartAtTime != nil { + opts.WatchOpt.StartAtTime = startToken.StartAtTime + } + + return opts.WatchOpt, nil +} + // WithBatch allows users to watch events with batch. func (lw *LoopsWatch) WithBatch(opts *types.LoopBatchOptions) error { if err := opts.Validate(); err != nil { @@ -95,23 +108,17 @@ func (lw *LoopsWatch) WithBatch(opts *types.LoopBatchOptions) error { return err } - startToken, err := opts.TokenHandler.GetStartWatchToken(context.Background()) + watchOpt, err := lw.updateStartTokenInfo(&opts.LoopOptions) if err != nil { - blog.Errorf("%s job, run loop watch batch %s, but get start token failed, err: %v", opts.Name, - lw.streamWatch.DBName, err) return err } - - // update the start token. - if len(startToken) != 0 { - opts.WatchOpt.StartAfterToken = &types.EventToken{Data: startToken} - } + watchOpt.WatchFatalErrorCallback = opts.TokenHandler.ResetWatchToken var cancel func() var cancelCtx context.Context cancelCtx, cancel = context.WithCancel(context.Background()) - watcher, err := lw.streamWatch.Watch(cancelCtx, opts.WatchOpt) + watcher, err := lw.streamWatch.Watch(cancelCtx, watchOpt) if err != nil { blog.Errorf("%s job, run loop, but watch failed, err: %v", opts.Name, err) cancel() @@ -167,23 +174,16 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc, cancel() // use the last token to resume so that we can start again from where we stopped. - lastToken, err := opts.TokenHandler.GetStartWatchToken(ctx) + watchOpt, err := lw.updateStartTokenInfo(opts) if err != nil { - blog.Errorf("%s job, run loop watch, but get last event token failed, err: %v", opts.Name, err) // notify retry signal, exit loop close(retrySignal) continue } + opts.WatchOpt = watchOpt - blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %s.", opts.Name, - lw.streamWatch.DBName, lastToken) - - // set start after token if needed. - if len(lastToken) != 0 { - // we have already received the new event and handle it success, - // so we need to use this token. otherwise, we should still use the w.watchOpt.StartAfterToken - opts.WatchOpt.StartAfterToken = &types.EventToken{Data: lastToken} - } + blog.Errorf("%s job, the former watch loop: %s failed, start retry again from token: %+v.", opts.Name, + lw.streamWatch.DBName, watchOpt.StartAfterToken) var cancelCtx context.Context cancelCtx, cancel = context.WithCancel(ctx) @@ -200,7 +200,8 @@ func (lw *LoopsWatch) watchRetry(cancel context.CancelFunc, // start handle loop jobs go doHandler(cancelCtx, watcher, retrySignal) - blog.Warnf("%s job, retry loop %s from token: %s success.", opts.Name, lw.streamWatch.DBName, lastToken) + blog.Warnf("%s job, retry loop %s from token: %+v success.", opts.Name, lw.streamWatch.DBName, + watchOpt.StartAfterToken) } } } @@ -220,17 +221,14 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, } for { - reWatch, loop := observer.canLoop() if reWatch { // stop the tick to release resource. ticker.Stop() - blog.Warnf("%s job, master status has changed, try to re-watch again, collection:%s", opts.Name, + blog.Warnf("%s job, master status has changed, try to re-watch again, db:%s", opts.Name, lw.streamWatch.DBName) - // trigger re-watch action now. close(retrySignal) - // exit the for loop return } @@ -248,14 +246,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, case <-ctxWithCancel.Done(): // stop the tick to release resource. ticker.Stop() - blog.Warnf("%s job, received cancel loop watch %s signal, exit loop.", opts.Name, lw.streamWatch.DBName) // exist the goroutine return case one := <-watcher.EventChan: batchEvents = append(batchEvents, one) - if blog.V(4) { blog.Infof("%s job, received %s event, detail: %s, op-time: %s, rid: %s", opts.Name, lw.streamWatch.DBName, one.String(), one.ClusterTime.String(), one.ID()) @@ -266,14 +262,12 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, // continue to get more events continue } - case <-ticker.C: // handle with batch event. if len(batchEvents) == 0 { // ticks, but no events received, loop next round to get events. continue } - case <-opts.StopNotifier: ticker.Stop() blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name) @@ -284,50 +278,62 @@ func (lw *LoopsWatch) tryLoopWithBatch(ctxWithCancel context.Context, break } - // for safety guarantee - if len(batchEvents) == 0 { - continue + if lw.handleBatchEvents(ctxWithCancel, batchEvents, opts, retryObserver, retrySignal) { + return } + } +} - first := batchEvents[0] +// handleBatchEvents handle batch events, returns if the loop watch needs retry +func (lw *LoopsWatch) handleBatchEvents(ctx context.Context, batchEvents []*types.Event, opts *types.LoopBatchOptions, + retryObserver *retryHandler, retrySignal chan struct{}) bool { - blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName, - len(batchEvents), first.ClusterTime.String(), first.ID()) + // for safety guarantee + if len(batchEvents) == 0 { + return false + } - retry := opts.EventHandler.DoBatch(batchEvents) - if retry { + first := batchEvents[0] - if retryObserver.canStillRetry() { - blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name, - lw.streamWatch.DBName, len(batchEvents), first.ID()) - // an error occurred, we need to retry it later. - // tell the schedule to re-watch again. - close(retrySignal) - // exist this goroutine. - return - } + blog.Infof("%s job, received %s batch %d events, first op-time: %s rid: %s.", opts.Name, lw.streamWatch.DBName, + len(batchEvents), first.ClusterTime.String(), first.ID()) - blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name, - lw.streamWatch.DBName, first.ID()) - // save the event token now. + retry := opts.EventHandler.DoBatch(batchEvents) + if retry { + if retryObserver.canStillRetry() { + blog.Warnf("%s job, received %s %d events in batch, but do batch failed, retry now, rid: %s", opts.Name, + lw.streamWatch.DBName, len(batchEvents), first.ID()) + // an error occurred, we need to retry it later. + // tell the schedule to re-watch again. + close(retrySignal) + // exit this goroutine. + return true } - // reset retry counter so that the previous retry count will not affect the next event - retryObserver.resetRetryCounter() + blog.Warnf("%s job, collection %s batch watch retry exceed max count, skip, rid: %s.", opts.Name, + lw.streamWatch.DBName, first.ID()) + // save the event token now. + } - last := batchEvents[len(batchEvents)-1] - // update the last watched token for resume usage. - if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, last.Token.Data); err != nil { - blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.", - opts.Name, lw.streamWatch.DBName, err, first.ID()) + // reset retry counter so that the previous retry count will not affect the next event + retryObserver.resetRetryCounter() - // retry later. - close(retrySignal) - // exist this goroutine - return - } + last := batchEvents[len(batchEvents)-1] + // update the last watched token for resume usage. + lastToken := &types.TokenInfo{ + Token: last.Token.Data, + StartAtTime: &last.ClusterTime, } - + if err := opts.TokenHandler.SetLastWatchToken(ctx, lastToken); err != nil { + blog.Errorf("%s job, loop watch %s event, but set last token failed, err: %v, rid: %s, retry later.", + opts.Name, lw.streamWatch.DBName, err, first.ID()) + + // retry later. + close(retrySignal) + // exit this goroutine + return true + } + return false } // tryLoopWithOne try handle event one by one @@ -348,11 +354,9 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, blog.Warnf("%s job, received cancel loop watch %s signal, exit loop, exit loop", opts.Name, lw.streamWatch.DBName) return - case <-opts.StopNotifier: blog.Warnf("received stop %s loop watch job notify, stopping now.", opts.Name) return - default: } @@ -398,7 +402,11 @@ func (lw *LoopsWatch) tryLoopWithOne(ctxWithCancel context.Context, retryObserver.resetRetryCounter() // update the last watched token for resume usage. - if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, one.Token.Data); err != nil { + lastToken := &types.TokenInfo{ + Token: one.Token.Data, + StartAtTime: &one.ClusterTime, + } + if err := opts.TokenHandler.SetLastWatchToken(ctxWithCancel, lastToken); err != nil { blog.Errorf("%s job, loop watch %s event, but set last watched token failed, err: %v, rid: %s, "+ "retry later.", lw.streamWatch.DBName, err, one.ID()) diff --git a/src/storage/stream/task/task.go b/src/storage/stream/task/task.go new file mode 100644 index 0000000000..33f14e5386 --- /dev/null +++ b/src/storage/stream/task/task.go @@ -0,0 +1,332 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +// Package task defines event watch task logics +package task + +import ( + "context" + "fmt" + "time" + + "configcenter/src/apimachinery/discovery" + "configcenter/src/common" + "configcenter/src/common/blog" + "configcenter/src/common/util" + "configcenter/src/storage/dal" + "configcenter/src/storage/dal/mongo/local" + "configcenter/src/storage/stream/event" + "configcenter/src/storage/stream/loop" + "configcenter/src/storage/stream/types" +) + +// Task is the event watch task that contains all resource watch tasks +type Task struct { + // eventMap is the db uuid to event instance map + eventMap map[string]*event.Event + // loopWatch is the db uuid to loop watch instance map + loopWatch map[string]*loop.LoopsWatch + // dbClients is the db uuid to db client map + dbClients map[string]local.DB + // watchClients is the db uuid to watch client map + watchClients map[string]*local.Mongo + // watchTasks is the task name to watch task map + watchTasks map[string]*watchTask + + // these options are used to generate loop watch options + majorityCommitted *bool + maxAwaitTime *time.Duration + + // stopNotifier is used when user need to stop loop events and release related resources. + // It's a optional option. when it's not set(as is nil), then the loop will not exit forever. + // Otherwise, user can use it to stop loop events. + // When a user want to stop the loop, the only thing that a user need to do is to just + // **close** this stop notifier channel. + // Attention: + // Close this notifier channel is the only way to stop loop correctly. + // Do not send data to this channel. + stopNotifier <-chan struct{} +} + +// New create a new watch task instance +func New(db, watchDB dal.Dal, isMaster discovery.ServiceManageInterface, opts *types.NewTaskOptions) (*Task, error) { + if err := opts.Validate(); err != nil { + blog.Errorf("validate new task options(%+v) failed, err: %v", opts, err) + return nil, err + } + + t := &Task{ + eventMap: make(map[string]*event.Event), + loopWatch: make(map[string]*loop.LoopsWatch), + dbClients: make(map[string]local.DB), + watchClients: make(map[string]*local.Mongo), + watchTasks: make(map[string]*watchTask), + stopNotifier: opts.StopNotifier, + } + + watchDBRelation, err := genWatchDBRelationMap(db) + if err != nil { + return nil, err + } + + // generate watch db uuid to watch db client map + watchDBClientMap := make(map[string]*local.Mongo) + err = watchDB.ExecForAllDB(func(db local.DB) error { + dbClient, ok := db.(*local.Mongo) + if !ok { + return fmt.Errorf("watch db is not an instance of local mongo") + } + watchDBClientMap[dbClient.GetMongoClient().UUID()] = dbClient + return nil + }) + if err != nil { + blog.Errorf("get all watch db client failed, err: %v", err) + return nil, err + } + + // generate db uuid to db client & watch db client & loop watch instance map + err = db.ExecForAllDB(func(db local.DB) error { + dbClient, ok := db.(*local.Mongo) + if !ok { + return fmt.Errorf("db to be watched is not an instance of local mongo") + } + mongoClient := dbClient.GetMongoClient() + uuid := mongoClient.UUID() + + watchDBUUID, exists := watchDBRelation[uuid] + if !exists { + blog.Warnf("db %s has no watch db", uuid) + return nil + } + + watchClient, exists := watchDBClientMap[watchDBUUID] + if !exists { + return fmt.Errorf("db %s related watch db %s is invalid", uuid, watchDBUUID) + } + t.watchClients[uuid] = watchClient + t.dbClients[uuid] = dbClient + + eventInst, err := event.NewEvent(mongoClient.Client(), mongoClient.DBName(), uuid) + if err != nil { + return fmt.Errorf("new event for db %s failed, err: %v", uuid, err) + } + t.eventMap[uuid] = eventInst + + loopWatch, err := loop.NewLoopWatch(eventInst, isMaster) + if err != nil { + return fmt.Errorf("new loop watch for db %s failed, err: %v", uuid, err) + } + t.loopWatch[uuid] = loopWatch + return nil + }) + if err != nil { + blog.Errorf("generate db uuid related map failed, err: %v", err) + return nil, err + } + + return t, nil +} + +// AddLoopOneTask add a loop watch task that handles one event at one time +func (t *Task) AddLoopOneTask(opts *types.LoopOneTaskOptions) error { + if err := opts.Validate(); err != nil { + blog.Errorf("validate loop batch task options(%s) failed, err: %v", opts.Name, err) + return err + } + + batchOpts := &types.LoopBatchTaskOptions{ + WatchTaskOptions: opts.WatchTaskOptions, + BatchSize: 1, + EventHandler: &types.TaskBatchHandler{ + DoBatch: func(watchDB *local.Mongo, ccDB local.DB, es []*types.Event) (unhandledEvents []*types.Event, + retry bool) { + for _, e := range es { + switch e.OperationType { + case types.Insert: + retry = opts.EventHandler.DoAdd(watchDB, ccDB, e) + case types.Update, types.Replace: + retry = opts.EventHandler.DoUpdate(watchDB, ccDB, e) + case types.Delete: + retry = opts.EventHandler.DoDelete(watchDB, ccDB, e) + default: + blog.Warnf("received unsupported operation type for %s job, doc: %s", opts.Name, e.DocBytes) + continue + } + if retry { + return es, retry + } + } + return nil, false + }, + }, + } + + return t.addWatchTask(batchOpts, false) +} + +// AddLoopBatchTask add a loop watch task that handles batch events +func (t *Task) AddLoopBatchTask(opts *types.LoopBatchTaskOptions) error { + if err := opts.Validate(); err != nil { + blog.Errorf("validate loop batch task options(%s) failed, err: %v", opts.Name, err) + return err + } + return t.addWatchTask(opts, false) +} + +// AddListWatchTask add a list watch task +func (t *Task) AddListWatchTask(opts *types.LoopBatchTaskOptions) error { + if err := opts.Validate(); err != nil { + blog.Errorf("validate list watch task options(%s) failed, err: %v", opts.Name, err) + return err + } + return t.addWatchTask(opts, true) +} + +func (t *Task) addWatchTask(opts *types.LoopBatchTaskOptions, needList bool) error { + _, exists := t.watchTasks[opts.Name] + if exists { + return fmt.Errorf("loop watch task %s already exists", opts.Name) + } + + if opts.MajorityCommitted != nil && *opts.MajorityCommitted { + t.majorityCommitted = opts.MajorityCommitted + } + if opts.MaxAwaitTime != nil && (t.maxAwaitTime == nil || *opts.MaxAwaitTime > *t.maxAwaitTime) { + t.maxAwaitTime = opts.MaxAwaitTime + } + + t.watchTasks[opts.Name] = &watchTask{ + name: opts.Name, + collOptions: opts.CollOpts, + eventHandler: opts.EventHandler, + tokenHandler: opts.TokenHandler, + needList: needList, + retryOptions: opts.RetryOptions, + batchSize: opts.BatchSize, + } + + return nil +} + +// Start execute all watch tasks +func (t *Task) Start() error { + if len(t.watchTasks) == 0 { + return nil + } + + // generate task name to collection options map and db uuid to task name to db watch tasks map by watch task info + collOptions := make(map[string]types.WatchCollOptions) + listCollOptions := make(map[string]types.CollectionOptions) + dbWatchTasks := make(map[string]map[string]*dbWatchTask) + var batchSize int + for taskName, task := range t.watchTasks { + collOptions[taskName] = *task.collOptions + if task.needList { + listCollOptions[taskName] = task.collOptions.CollectionOptions + } + if task.batchSize > batchSize { + batchSize = task.batchSize + } + for uuid, dbClient := range t.dbClients { + dbTask, err := newDBWatchTask(task, uuid, t.watchClients[uuid], dbClient) + if err != nil { + return err + } + if _, exists := dbWatchTasks[uuid]; !exists { + dbWatchTasks[uuid] = make(map[string]*dbWatchTask) + } + dbWatchTasks[uuid][taskName] = dbTask + } + } + + // list data for all list watch tasks + if len(listCollOptions) > 0 { + for uuid, eventInst := range t.eventMap { + ctx := util.SetDBReadPreference(context.Background(), common.SecondaryPreferredMode) + opt := &types.ListOptions{ + CollOpts: listCollOptions, + PageSize: &batchSize, + WithRetry: true, + } + listCh, err := eventInst.List(ctx, opt) + if err != nil { + blog.Errorf("list db %s failed, err: %v, options: %+v", uuid, err, *opt) + return err + } + + go func(uuid string) { + for e := range listCh { + task, exists := dbWatchTasks[uuid][e.TaskID] + if !exists { + blog.Warnf("loop watch task %s not exists, event: %+v", e.TaskID, *e) + continue + } + task.listChan <- e + } + }(uuid) + } + } + + // loop watch all db events for all tasks + for uuid, loopWatch := range t.loopWatch { + opts := &types.LoopBatchOptions{ + LoopOptions: types.LoopOptions{ + Name: uuid, + WatchOpt: &types.WatchOptions{ + Options: types.Options{ + MajorityCommitted: t.majorityCommitted, + MaxAwaitTime: t.maxAwaitTime, + CollOpts: collOptions, + }, + }, + TokenHandler: newDBTokenHandler(uuid, t.watchClients[uuid], dbWatchTasks[uuid]), + RetryOptions: &types.RetryOptions{ + MaxRetryCount: types.DefaultRetryCount, + RetryDuration: types.DefaultRetryDuration, + }, + StopNotifier: t.stopNotifier, + }, + EventHandler: &types.BatchHandler{DoBatch: func(es []*types.Event) (retry bool) { + for _, e := range es { + task, exists := dbWatchTasks[uuid][e.TaskID] + if !exists { + blog.Warnf("loop watch task %s not exists, event: %+v", e.TaskID, *e) + continue + } + task.eventChan <- e + } + return false + }}, + BatchSize: batchSize, + } + + err := loopWatch.WithBatch(opts) + if err != nil { + blog.Errorf("start loop watch for db failed, err: %v", err) + return err + } + } + + // run watch tasks for all dbs + for _, dbTaskMap := range dbWatchTasks { + for _, dbTask := range dbTaskMap { + dbTask.start(t.stopNotifier) + } + } + + return nil +} diff --git a/src/storage/stream/task/token.go b/src/storage/stream/task/token.go new file mode 100644 index 0000000000..021d21ed3d --- /dev/null +++ b/src/storage/stream/task/token.go @@ -0,0 +1,110 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +package task + +import ( + "context" + + "configcenter/src/common" + "configcenter/src/common/blog" + "configcenter/src/storage/dal/mongo/local" + "configcenter/src/storage/stream/types" +) + +type dbTokenHandler struct { + uuid string + watchDB local.DB + dbWatchTasks []*dbWatchTask +} + +// newDBTokenHandler new token handler for db watch task +func newDBTokenHandler(uuid string, watchDB local.DB, taskMap map[string]*dbWatchTask) types.TokenHandler { + dbWatchTasks := make([]*dbWatchTask, 0, len(taskMap)) + for _, task := range taskMap { + dbWatchTasks = append(dbWatchTasks, task) + } + return &dbTokenHandler{ + uuid: uuid, + watchDB: watchDB, + dbWatchTasks: dbWatchTasks, + } +} + +// SetLastWatchToken set last watch token for db watch task +func (d *dbTokenHandler) SetLastWatchToken(ctx context.Context, token *types.TokenInfo) error { + // update last token for db to the earliest last token of all db watch tasks + // this token specifies the last event that all db watch tasks has handled + lastToken := d.dbWatchTasks[0].lastToken + for _, task := range d.dbWatchTasks { + if lastToken.Token > task.lastToken.Token { + lastToken = task.lastToken + continue + } + } + + filter := map[string]interface{}{ + "_id": d.uuid, + } + + data := map[string]interface{}{ + common.BKTokenField: lastToken.Token, + common.BKStartAtTimeField: lastToken.StartAtTime, + } + + if err := d.watchDB.Table(common.BKTableNameWatchToken).Update(ctx, filter, data); err != nil { + blog.Errorf("set db %s last watch token failed, err: %v, data: %+v", d.uuid, err, data) + return err + } + return nil +} + +// GetStartWatchToken get start watch token of db watch task +func (d *dbTokenHandler) GetStartWatchToken(ctx context.Context) (*types.TokenInfo, error) { + filter := map[string]interface{}{ + "_id": d.uuid, + } + + data := new(types.TokenInfo) + err := d.watchDB.Table(common.BKTableNameWatchToken).Find(filter).Fields(common.BKStartAtTimeField).One(ctx, data) + if err != nil { + if !d.watchDB.IsNotFoundError(err) { + blog.Errorf("get db %s last watch token failed, err: %v", d.uuid, err) + return nil, err + } + return new(types.TokenInfo), nil + } + return data, nil +} + +// ResetWatchToken set watch token to empty and set the start watch time to the given one for next watch +func (d *dbTokenHandler) ResetWatchToken(startAtTime types.TimeStamp) error { + filter := map[string]interface{}{ + "_id": d.uuid, + } + + data := map[string]interface{}{ + common.BKTokenField: "", + common.BKStartAtTimeField: startAtTime, + } + + if err := d.watchDB.Table(common.BKTableNameWatchToken).Update(context.Background(), filter, data); err != nil { + blog.Errorf("reset db %s watch token failed, err: %v, data: %+v", d.uuid, err, data) + return err + } + return nil +} diff --git a/src/storage/stream/task/util.go b/src/storage/stream/task/util.go new file mode 100644 index 0000000000..51e2d8e455 --- /dev/null +++ b/src/storage/stream/task/util.go @@ -0,0 +1,59 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +package task + +import ( + "context" + "fmt" + + "configcenter/src/common" + "configcenter/src/common/blog" + "configcenter/src/storage/dal" + "configcenter/src/storage/dal/mongo/sharding" + "configcenter/src/storage/stream/types" +) + +// genWatchDBRelationMap generate db uuid to watch db uuid map +func genWatchDBRelationMap(db dal.Dal) (map[string]string, error) { + ctx := context.Background() + masterDB := db.Shard(sharding.NewShardOpts().WithIgnoreTenant()) + + cond := map[string]any{common.MongoMetaID: common.ShardingDBConfID} + shardingConf := new(sharding.ShardingDBConf) + if err := masterDB.Table(common.BKTableNameSystem).Find(cond).One(ctx, &shardingConf); err != nil { + blog.Errorf("get sharding db config failed, err: %v", err) + return nil, err + } + + watchDBRelation := make(map[string]string) + relations := make([]sharding.WatchDBRelation, 0) + if err := masterDB.Table(common.BKTableNameWatchDBRelation).Find(nil).All(ctx, &relations); err != nil { + return nil, fmt.Errorf("get db and watch db relation failed, err: %v", err) + } + + for _, relation := range relations { + watchDBRelation[relation.DB] = watchDBRelation[relation.WatchDB] + } + return watchDBRelation, nil +} + +// compareToken compare event with token, returns if event is greater than the token +func compareToken(event *types.Event, token *types.TokenInfo) bool { + return event.Token.Data > token.Token || event.ClusterTime.Sec > token.StartAtTime.Sec || + event.ClusterTime.Sec == token.StartAtTime.Sec && event.ClusterTime.Nano > token.StartAtTime.Nano +} diff --git a/src/storage/stream/task/watch_task.go b/src/storage/stream/task/watch_task.go new file mode 100644 index 0000000000..6df7529b53 --- /dev/null +++ b/src/storage/stream/task/watch_task.go @@ -0,0 +1,197 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +package task + +import ( + "context" + "time" + + "configcenter/src/common/blog" + "configcenter/src/storage/dal/mongo/local" + "configcenter/src/storage/stream/types" +) + +// watchTask is the resource watch task +type watchTask struct { + // name is the watch task name that uniquely identifies the watch task + name string + // collOptions is the watch collection options + collOptions *types.WatchCollOptions + // eventHandler is the batch event handler + eventHandler *types.TaskBatchHandler + // tokenHandler is the token handler + tokenHandler types.TaskTokenHandler + // needList defines whether to list all data before watch + needList bool + + retryOptions *types.RetryOptions + batchSize int +} + +type dbWatchTask struct { + *watchTask + dbUUID string + watchDB *local.Mongo + ccDB local.DB + eventChan chan *types.Event + listChan chan *types.Event + lastToken *types.TokenInfo +} + +// maxUnhandledEventLimit if the number of unhandled events exceeds this value, block the event watch process +const maxUnhandledEventLimit = 2000 + +func newDBWatchTask(task *watchTask, dbUUID string, watchDB *local.Mongo, ccDB local.DB) (*dbWatchTask, error) { + lastToken, err := task.tokenHandler.GetStartWatchToken(context.Background(), dbUUID) + if err != nil { + blog.Errorf("get task %s db %s last watch token failed, err: %v", task.name, dbUUID, err) + return nil, err + } + + return &dbWatchTask{ + watchTask: task, + dbUUID: dbUUID, + watchDB: watchDB, + ccDB: ccDB, + eventChan: make(chan *types.Event, maxUnhandledEventLimit+task.batchSize), + lastToken: lastToken, + }, nil +} + +// start execute watch task +func (t *dbWatchTask) start(stopNotifier <-chan struct{}) { + go func() { + // list all data before watch if this task is a list watch task + if t.needList { + t.lastToken = &types.TokenInfo{ + StartAtTime: &types.TimeStamp{ + Sec: uint32(time.Now().Unix()), + }, + } + + events := make([]*types.Event, 0) + for event := range t.listChan { + events = append(events, event) + if len(events) == t.batchSize { + t.eventHandler.DoBatch(t.watchDB, t.ccDB, events) + } + if event.OperationType == types.ListDone { + break + } + } + if len(events) > 0 { + t.eventHandler.DoBatch(t.watchDB, t.ccDB, events) + } + } + + ticker := time.NewTicker(50 * time.Millisecond) + for { + // get events to be handled + events := make([]*types.Event, 0) + for { + select { + case one := <-t.eventChan: + // skip previous event with smaller token + if !compareToken(one, t.lastToken) { + blog.V(4).Infof("%s-%s job, skip previous event(%s)", t.name, t.dbUUID, one.String()) + continue + } + events = append(events, one) + if len(events) < t.batchSize { + continue + } + case <-ticker.C: + if len(events) == 0 { + continue + } + case <-stopNotifier: + ticker.Stop() + return + } + break + } + + // handle events + t.handleEvents(events) + } + }() +} + +func (t *dbWatchTask) handleEvents(events []*types.Event) { + ctx := context.Background() + first, last := events[0], events[len(events)-1] + rid := first.ID() + blog.Infof("%s-%s job, received %d events, first op-time: %s, fist token: %s, rid: %s", t.name, t.dbUUID, + len(events), first.ClusterTime.String(), first.Token.Data, rid) + + needRetry := false + retryCnt := 0 + for { + // get start watch token after retry to avoid conflict with another watch task + if needRetry { + time.Sleep(t.retryOptions.RetryDuration) + lastToken, err := t.tokenHandler.GetStartWatchToken(ctx, t.dbUUID) + if err != nil { + blog.Errorf("get task %s db %s last watch token failed, err: %v, rid: %s", t.name, t.dbUUID, err, rid) + time.Sleep(t.retryOptions.RetryDuration) + continue + } + t.lastToken = lastToken + + // if current token is greater than last token, return + if !compareToken(last, lastToken) { + return + } + + // remove events with smaller token that are already handled + index := 0 + for i, event := range events { + if compareToken(event, lastToken) { + break + } + index = i + 1 + } + events = events[index:] + } + + // handle events, if all events are handled, just update last watch token + if len(events) > 0 { + events, needRetry = t.eventHandler.DoBatch(t.watchDB, t.ccDB, events) + if needRetry { + if retryCnt < t.retryOptions.MaxRetryCount { + retryCnt++ + continue + } + } + } + + // update last watch token, retry if failed + lastToken := &types.TokenInfo{ + Token: last.Token.Data, + StartAtTime: &last.ClusterTime, + } + if err := t.tokenHandler.SetLastWatchToken(ctx, t.dbUUID, lastToken); err != nil { + blog.Errorf("set task %s db %s last watch token(%+v) failed, err: %v, rid: %s", t.name, t.dbUUID, + *lastToken, err, rid) + needRetry = true + continue + } + t.lastToken = lastToken + return + } +} diff --git a/src/storage/stream/types/task.go b/src/storage/stream/types/task.go new file mode 100644 index 0000000000..1075910c60 --- /dev/null +++ b/src/storage/stream/types/task.go @@ -0,0 +1,169 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +package types + +import ( + "context" + "errors" + "time" + + "configcenter/src/storage/dal/mongo/local" +) + +// NewTaskOptions is the new task options +type NewTaskOptions struct { + StopNotifier <-chan struct{} +} + +// Validate NewTaskOptions +func (o *NewTaskOptions) Validate() error { + if o.StopNotifier == nil { + // if not set, then set never stop loop as default + o.StopNotifier = make(<-chan struct{}) + } + return nil +} + +// TaskTokenHandler is the token handler for db watch task +type TaskTokenHandler interface { + SetLastWatchToken(ctx context.Context, uuid string, token *TokenInfo) error + GetStartWatchToken(ctx context.Context, uuid string) (*TokenInfo, error) +} + +// WatchTaskOptions is the common options for watch task +type WatchTaskOptions struct { + Name string + CollOpts *WatchCollOptions + TokenHandler TaskTokenHandler + RetryOptions *RetryOptions + MajorityCommitted *bool + MaxAwaitTime *time.Duration +} + +// Validate WatchTaskOptions +func (o *WatchTaskOptions) Validate() error { + if len(o.Name) == 0 { + return errors.New("watch task name is not set") + } + + if o.CollOpts == nil { + return errors.New("watch task coll options is not set") + } + + if err := o.CollOpts.Validate(); err != nil { + return err + } + + if o.TokenHandler == nil { + return errors.New("token handler is not set") + } + + if o.TokenHandler.SetLastWatchToken == nil || o.TokenHandler.GetStartWatchToken == nil { + return errors.New("some token handler functions is not set") + } + + if o.RetryOptions != nil { + if o.RetryOptions.MaxRetryCount <= 0 { + o.RetryOptions.MaxRetryCount = DefaultRetryCount + } + + if o.RetryOptions.RetryDuration == 0 { + o.RetryOptions.RetryDuration = DefaultRetryDuration + } + + if o.RetryOptions.RetryDuration < 500*time.Millisecond { + return errors.New("invalid retry duration, can not less than 500ms") + } + } else { + o.RetryOptions = &RetryOptions{ + MaxRetryCount: DefaultRetryCount, + RetryDuration: DefaultRetryDuration, + } + } + + return nil +} + +// LoopOneTaskOptions is the options for loop watch events one by one operation of one task +type LoopOneTaskOptions struct { + *WatchTaskOptions + EventHandler *TaskOneHandler +} + +// Validate LoopOneTaskOptions +func (o *LoopOneTaskOptions) Validate() error { + if o.WatchTaskOptions == nil { + return errors.New("common watch task options is not set") + } + + if err := o.WatchTaskOptions.Validate(); err != nil { + return err + } + + if o.EventHandler == nil { + return errors.New("event handler is not set") + } + + if o.EventHandler.DoAdd == nil || o.EventHandler.DoUpdate == nil || o.EventHandler.DoDelete == nil { + return errors.New("some event handler functions is not set") + } + return nil +} + +// TaskOneHandler is the watch task's event handler that handles events one by one +type TaskOneHandler struct { + DoAdd func(watchDB *local.Mongo, ccDB local.DB, event *Event) (retry bool) + DoUpdate func(watchDB *local.Mongo, ccDB local.DB, event *Event) (retry bool) + DoDelete func(watchDB *local.Mongo, ccDB local.DB, event *Event) (retry bool) +} + +// LoopBatchTaskOptions is the options for loop watch batch events operation of one task +type LoopBatchTaskOptions struct { + *WatchTaskOptions + BatchSize int + EventHandler *TaskBatchHandler +} + +// Validate LoopBatchTaskOptions +func (o *LoopBatchTaskOptions) Validate() error { + if o.WatchTaskOptions == nil { + return errors.New("common watch task options is not set") + } + + if err := o.WatchTaskOptions.Validate(); err != nil { + return err + } + + if o.BatchSize <= 0 { + return errors.New("batch size is invalid") + } + + if o.EventHandler == nil { + return errors.New("event handler is not set") + } + + if o.EventHandler.DoBatch == nil { + return errors.New("event handler DoBatch function is not set") + } + return nil +} + +// TaskBatchHandler is the watch task's batch events handler +type TaskBatchHandler struct { + DoBatch func(watchDB *local.Mongo, ccDB local.DB, es []*Event) (unhandledEvents []*Event, retry bool) +} diff --git a/src/storage/stream/types/types.go b/src/storage/stream/types/types.go index 4184248e3e..956d899622 100644 --- a/src/storage/stream/types/types.go +++ b/src/storage/stream/types/types.go @@ -78,6 +78,9 @@ type ListOptions struct { // Step defines the list step when the client try to list all the data defines in the // namespace. default value is `DefaultListStep`, value range [200,2000] PageSize *int + + // WithRetry defines whether the list operation needs to retry when failed + WithRetry bool } // CheckSetDefault validate list options, and set default value for not set fields @@ -444,10 +447,17 @@ func GetEventDetail(detailStr *string) *string { return &detail } -// TokenHandler TODO +// TokenHandler is the token handler interface type TokenHandler interface { - SetLastWatchToken(ctx context.Context, token string) error - GetStartWatchToken(ctx context.Context) (token string, err error) + SetLastWatchToken(ctx context.Context, token *TokenInfo) error + GetStartWatchToken(ctx context.Context) (token *TokenInfo, err error) + ResetWatchToken(startAtTime TimeStamp) error +} + +// TokenInfo is the watch token info +type TokenInfo struct { + Token string `bson:"token"` + StartAtTime *TimeStamp `bson:"start_at_time"` } // LoopOptions TODO @@ -495,11 +505,11 @@ func (lo *LoopOneOptions) Validate() error { if lo.RetryOptions != nil { if lo.RetryOptions.MaxRetryCount <= 0 { - lo.RetryOptions.MaxRetryCount = defaultRetryCount + lo.RetryOptions.MaxRetryCount = DefaultRetryCount } if lo.RetryOptions.RetryDuration == 0 { - lo.RetryOptions.RetryDuration = defaultRetryDuration + lo.RetryOptions.RetryDuration = DefaultRetryDuration } if lo.RetryOptions.RetryDuration < 500*time.Millisecond { @@ -507,8 +517,8 @@ func (lo *LoopOneOptions) Validate() error { } } else { lo.RetryOptions = &RetryOptions{ - MaxRetryCount: defaultRetryCount, - RetryDuration: defaultRetryDuration, + MaxRetryCount: DefaultRetryCount, + RetryDuration: DefaultRetryDuration, } } @@ -530,8 +540,8 @@ type LoopBatchOptions struct { const ( defaultBatchSize = 200 - defaultRetryCount = 10 - defaultRetryDuration = 1 * time.Second + DefaultRetryCount = 10 + DefaultRetryDuration = 1 * time.Second ) // Validate TODO @@ -558,11 +568,11 @@ func (lo *LoopBatchOptions) Validate() error { if lo.RetryOptions != nil { if lo.RetryOptions.MaxRetryCount <= 0 { - lo.RetryOptions.MaxRetryCount = defaultRetryCount + lo.RetryOptions.MaxRetryCount = DefaultRetryCount } if lo.RetryOptions.RetryDuration == 0 { - lo.RetryOptions.RetryDuration = defaultRetryDuration + lo.RetryOptions.RetryDuration = DefaultRetryDuration } if lo.RetryOptions.RetryDuration < 200*time.Millisecond { @@ -570,8 +580,8 @@ func (lo *LoopBatchOptions) Validate() error { } } else { lo.RetryOptions = &RetryOptions{ - MaxRetryCount: defaultRetryCount, - RetryDuration: defaultRetryDuration, + MaxRetryCount: DefaultRetryCount, + RetryDuration: DefaultRetryDuration, } }