Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch调整为按库监听 #8296

Open
wants to merge 4 commits into
base: feature-tenant
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/apidoc/inner/admin-server/get_sharding_db_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ GET /migrate/v3/find/system/sharding_db_config
"permission": null,
"data": {
"master_db": "masteruuid",
"for_new_tenant": "slave1uuid",
"for_new_data": "slave1uuid",
"slave_db": {
"slave1uuid": {
"name": "slave1",
Expand Down Expand Up @@ -53,7 +53,7 @@ GET /migrate/v3/find/system/sharding_db_config
| 参数名称 | 参数类型 | 描述 |
|----------------|-------------------|----------------------------|
| master_db | string | 主库唯一标识 |
| for_new_tenant | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
| for_new_data | string | 指定新增租户数据写入哪个库,存储这个数据库的唯一标识 |
| slave_db | map[string]object | 从库唯一标识->从库配置的映射 |

#### data.slave_db[key]
Expand Down
4 changes: 2 additions & 2 deletions docs/apidoc/inner/admin-server/update_sharding_db_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ PUT /migrate/v3/update/system/sharding_db_config

| 参数名称 | 参数类型 | 必选 | 描述 |
|-----------------|-------------------|----|---------------------------------------------|
| for_new_tenant | string | 否 | 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
| for_new_data | string | 否 | 指定新增租户数据写入哪个库。对于存量数据库指定它的唯一标识。对于新增的从库指定它的名称 |
| create_slave_db | object array | 否 | 新增的从库配置数组 |
| update_slave_db | map[string]object | 否 | 更新的从库唯一标识->从库配置的映射 |

Expand Down Expand Up @@ -41,7 +41,7 @@ PUT /migrate/v3/update/system/sharding_db_config

```json
{
"for_new_tenant": "slave1uuid",
"for_new_data": "slave1uuid",
"create_slave_db": [
{
"name": "slave2",
Expand Down
16 changes: 15 additions & 1 deletion src/common/tablenames.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ const (

// BKTableNameTenantTemplate is the tenant template(public data that needs to be initialized for all tenants) table
BKTableNameTenantTemplate = "TenantTemplate"

// BKTableNameWatchDBRelation is the db and watch db relation table
BKTableNameWatchDBRelation = "WatchDBRelation"
)

// AllTables is all table names, not include the sharding tables which is created dynamically,
Expand Down Expand Up @@ -278,9 +281,9 @@ var platformTableMap = map[string]struct{}{
BKTableNameTenantTemplate: {},
BKTableNamePlatformAuditLog: {},
BKTableNameWatchToken: {},
BKTableNameLastWatchEvent: {},
BKTableNameAPITask: {},
BKTableNameAPITaskSyncHistory: {},
BKTableNameWatchDBRelation: {},
}

// IsPlatformTable returns if the target table is a platform table
Expand Down Expand Up @@ -323,6 +326,17 @@ func SplitTenantTableName(tenantTableName string) (string, string, error) {
if !strings.Contains(tenantTableName, "_") {
return "", "", errors.New("tenant table name is invalid")
}

if strings.Contains(tenantTableName, "_"+BKObjectInstShardingTablePrefix) {
sepIdx := strings.LastIndex(tenantTableName, "_"+BKObjectInstShardingTablePrefix)
return tenantTableName[:sepIdx], tenantTableName[sepIdx+1:], nil
}

if strings.Contains(tenantTableName, "_"+BKObjectInstAsstShardingTablePrefix) {
sepIdx := strings.LastIndex(tenantTableName, "_"+BKObjectInstAsstShardingTablePrefix)
return tenantTableName[:sepIdx], tenantTableName[sepIdx+1:], nil
}

sepIdx := strings.LastIndex(tenantTableName, "_")
if sepIdx == -1 {
return "", "", errors.New("tenant table name is invalid")
Expand Down
33 changes: 1 addition & 32 deletions src/common/watch/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (

"configcenter/src/common"
"configcenter/src/common/blog"
kubetypes "configcenter/src/kube/types"
"configcenter/src/storage/stream/types"

"github.com/tidwall/gjson"
"go.mongodb.org/mongo-driver/bson/primitive"
)

Expand Down Expand Up @@ -73,11 +71,6 @@ var (
BizSet: 14,
BizSetRelation: 15,
Plat: 16,
KubeCluster: 17,
KubeNode: 18,
KubeNamespace: 19,
KubeWorkload: 20,
KubePod: 21,
Project: 22,
}

Expand Down Expand Up @@ -124,17 +117,6 @@ const (
Plat CursorType = "plat"
// Project project event cursor type
Project CursorType = "project"
// kube related cursor types
// KubeCluster cursor type
KubeCluster CursorType = "kube_cluster"
// KubeNode cursor type
KubeNode CursorType = "kube_node"
// KubeNamespace cursor type
KubeNamespace CursorType = "kube_namespace"
// KubeWorkload cursor type, including all workloads(e.g. deployment) with their type specified in sub-resource
KubeWorkload CursorType = "kube_workload"
// KubePod cursor type, its event detail is pod info with containers in it
KubePod CursorType = "kube_pod"
)

// ToInt TODO
Expand All @@ -161,8 +143,7 @@ func (ct *CursorType) ParseInt(typ int) {
// ListCursorTypes returns all support CursorTypes.
func ListCursorTypes() []CursorType {
return []CursorType{Host, ModuleHostRelation, Biz, Set, Module, ObjectBase, Process, ProcessInstanceRelation,
HostIdentifier, MainlineInstance, InstAsst, BizSet, BizSetRelation, Plat, KubeCluster, KubeNode, KubeNamespace,
KubeWorkload, KubePod, Project}
HostIdentifier, MainlineInstance, InstAsst, BizSet, BizSetRelation, Plat, Project}
}

// Cursor is a self-defined token which is corresponding to the mongodb's resume token.
Expand Down Expand Up @@ -339,11 +320,6 @@ var collEventCursorTypeMap = map[string]CursorType{
common.BKTableNameInstAsst: InstAsst,
common.BKTableNameBaseBizSet: BizSet,
common.BKTableNameBasePlat: Plat,
kubetypes.BKTableNameBaseCluster: KubeCluster,
kubetypes.BKTableNameBaseNode: KubeNode,
kubetypes.BKTableNameBaseNamespace: KubeNamespace,
kubetypes.BKTableNameBaseWorkload: KubeWorkload,
kubetypes.BKTableNameBasePod: KubePod,
common.BKTableNameBaseProject: Project,
}

Expand All @@ -370,13 +346,6 @@ func GetEventCursor(coll string, e *types.Event, instID int64) (string, error) {

// add unique key for common object instance.
hCursor.UniqKey = strconv.FormatInt(instID, 10)
case KubeWorkload:
if instID <= 0 {
return "", errors.New("invalid kube workload id")
}

// add unique key for kube workload, composed by workload type and id.
hCursor.UniqKey = fmt.Sprintf("%s:%d", gjson.GetBytes(e.DocBytes, kubetypes.KindField).String(), instID)
}

hCursorEncode, err := hCursor.Encode()
Expand Down
15 changes: 8 additions & 7 deletions src/common/watch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ type ChainNode struct {
InstanceID int64 `json:"inst_id,omitempty" bson:"inst_id,omitempty"`
// SubResource the sub resource of the watched resource, eg. the object ID of the instance resource
SubResource []string `json:"bk_sub_resource,omitempty" bson:"bk_sub_resource,omitempty"`
// TenantID the supplier account of the chain node's related event resource.
TenantID string `json:"tenant_id" bson:"tenant_id"`
}

// LastChainNodeData TODO
type LastChainNodeData struct {
Coll string `json:"_id" bson:"_id"`
ID uint64 `json:"id" bson:"id"`
Token string `json:"token" bson:"token"`
Cursor string `json:"cursor" bson:"cursor"`
StartAtTime types.TimeStamp `json:"start_at_time,omitempty" bson:"start_at_time,omitempty"`
Coll string `json:"_id" bson:"_id"`
ID uint64 `json:"id" bson:"id"`
Cursor string `json:"cursor" bson:"cursor"`
}

// GenDBWatchTokenID generate db watch token identifier by db uuid and collection name
func GenDBWatchTokenID(dbID, coll string) string {
return dbID + ":" + coll
}
2 changes: 1 addition & 1 deletion src/common/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *WatchEventOptions) Validate(isInner bool) error {

if len(w.Filter.SubResource) > 0 || len(w.Filter.SubResources) > 0 {
switch w.Resource {
case ObjectBase, MainlineInstance, InstAsst, KubeWorkload:
case ObjectBase, MainlineInstance, InstAsst:
default:
return fmt.Errorf("%s event cannot have sub resource", w.Resource)
}
Expand Down
7 changes: 2 additions & 5 deletions src/scene_server/admin_server/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package app
import (
"context"
"fmt"
"time"

iamcli "configcenter/src/ac/iam"
"configcenter/src/common/auth"
Expand All @@ -31,7 +30,6 @@ import (
"configcenter/src/scene_server/admin_server/iam"
"configcenter/src/scene_server/admin_server/logics"
svc "configcenter/src/scene_server/admin_server/service"
"configcenter/src/storage/dal/mongo/sharding"
"configcenter/src/storage/dal/redis"
"configcenter/src/storage/driver/mongodb"
"configcenter/src/thirdparty/monitor"
Expand Down Expand Up @@ -64,11 +62,10 @@ func Run(ctx context.Context, cancel context.CancelFunc, op *options.ServerOptio
db := mongodb.Dal()
process.Service.SetDB(db)

watchDB, err := sharding.NewDisableDBShardingMongo(process.Config.WatchDB.GetMongoConf(), time.Minute)
if err != nil {
if err = mongodb.SetWatchCli("", &process.Config.WatchDB, process.Config.Crypto); err != nil {
return fmt.Errorf("connect watch mongo server failed, err: %v", err)
}
process.Service.SetWatchDB(watchDB)
process.Service.SetWatchDB(mongodb.Dal())

cache, err := redis.NewFromConfig(process.Config.Redis)
if err != nil {
Expand Down
34 changes: 17 additions & 17 deletions src/scene_server/admin_server/service/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (s *Service) initShardingApi(api *restful.WebService) {

// ShardingDBConfig is the sharding db config for api
type ShardingDBConfig struct {
MasterDB string `json:"master_db"`
ForNewTenant string `json:"for_new_tenant"`
SlaveDB map[string]SlaveDBConfig `json:"slave_db"`
MasterDB string `json:"master_db"`
ForNewData string `json:"for_new_data"`
SlaveDB map[string]SlaveDBConfig `json:"slave_db"`
}

// SlaveDBConfig is the slave db config for api
Expand All @@ -69,9 +69,9 @@ func (s *Service) GetShardingDBConfig(req *restful.Request, resp *restful.Respon
}

result := &ShardingDBConfig{
MasterDB: conf.MasterDB,
ForNewTenant: conf.ForNewTenant,
SlaveDB: make(map[string]SlaveDBConfig),
MasterDB: conf.MasterDB,
ForNewData: conf.ForNewData,
SlaveDB: make(map[string]SlaveDBConfig),
}

for uuid, mongoConf := range conf.SlaveDB {
Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *Service) getShardingDBConf(kit *rest.Kit) (*sharding.ShardingDBConf, er

// UpdateShardingDBReq is the update sharding db config request
type UpdateShardingDBReq struct {
ForNewTenant string `json:"for_new_tenant,omitempty"`
ForNewData string `json:"for_new_data,omitempty"`
CreateSlaveDB []SlaveDBConfig `json:"create_slave_db,omitempty"`
UpdateSlaveDB map[string]UpdateSlaveDBInfo `json:"update_slave_db,omitempty"`
}
Expand Down Expand Up @@ -177,8 +177,8 @@ func (s *Service) UpdateShardingDBConfig(req *restful.Request, resp *restful.Res

cond := map[string]any{common.MongoMetaID: common.ShardingDBConfID}
updateData := map[string]any{
"for_new_tenant": updateConf.ForNewTenant,
"slave_db": updateConf.SlaveDB,
"for_new_data": updateConf.ForNewData,
"slave_db": updateConf.SlaveDB,
}
err = s.db.Shard(kit.SysShardOpts()).Table(common.BKTableNameSystem).Update(s.ctx, cond, updateData)
if err != nil {
Expand Down Expand Up @@ -247,23 +247,23 @@ func (s *Service) genUpdatedShardingDBConf(kit *rest.Kit, dbConf *sharding.Shard
}

// update new tenant db config, check if the new tenant db config exists
if conf.ForNewTenant != "" {
if conf.ForNewData != "" {
// use uuid to specify the new tenant db config for db that already exists
_, uuidExists := dbConf.SlaveDB[conf.ForNewTenant]
if conf.ForNewTenant == dbConf.MasterDB || uuidExists {
dbConf.ForNewTenant = conf.ForNewTenant
_, uuidExists := dbConf.SlaveDB[conf.ForNewData]
if conf.ForNewData == dbConf.MasterDB || uuidExists {
dbConf.ForNewData = conf.ForNewData
return dbConf, nil
}

// use name to specify the new tenant db config for new db that doesn't have uuid before creation
uuid, nameExists := nameUUIDMap[conf.ForNewTenant]
uuid, nameExists := nameUUIDMap[conf.ForNewData]
if nameExists {
dbConf.ForNewTenant = uuid
dbConf.ForNewData = uuid
return dbConf, nil
}

blog.Errorf("add new tenant db %s is invalid, rid: %s", conf.ForNewTenant, kit.Rid)
return nil, kit.CCError.CCErrorf(common.CCErrCommParamsInvalid, "for_new_tenant")
blog.Errorf("add new tenant db %s is invalid, rid: %s", conf.ForNewData, kit.Rid)
return nil, kit.CCError.CCErrorf(common.CCErrCommParamsInvalid, "for_new_data")
}
return dbConf, nil
}
Expand Down
16 changes: 13 additions & 3 deletions src/source_controller/cacheservice/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,19 @@ func initResource(cacheSvr *CacheServer) error {
return err
}

dbErr := mongodb.InitClient("", &cacheSvr.Config.Mongo)
if dbErr != nil {
blog.Errorf("failed to connect the db server, error info is %s", dbErr.Error())
cryptoConf, err := cc.Crypto("crypto")
if err != nil {
blog.Errorf("get crypto config failed, err: %v", err)
return err
}

if dbErr := mongodb.SetShardingCli("", &cacheSvr.Config.Mongo, cryptoConf); dbErr != nil {
blog.Errorf("failed to connect the db server, err: %v", dbErr)
return dbErr
}

if dbErr := mongodb.SetShardingCli("watch", &cacheSvr.Config.WatchMongo, cryptoConf); dbErr != nil {
blog.Errorf("new watch db sharding client failed, err: %v", dbErr)
return dbErr
}

Expand Down
7 changes: 3 additions & 4 deletions src/source_controller/cacheservice/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
"configcenter/src/storage/dal"
"configcenter/src/storage/driver/mongodb"
"configcenter/src/storage/driver/redis"
"configcenter/src/storage/reflector"
"configcenter/src/storage/stream"
)

// NewCache new cache service
func NewCache(reflector reflector.Interface, loopW stream.LoopInterface, isMaster discovery.ServiceManageInterface,
watchDB dal.DB) (*ClientSet, error) {
func NewCache(loopW stream.LoopInterface, isMaster discovery.ServiceManageInterface, watchDB dal.DB) (*ClientSet,
error) {

if err := mainline.NewMainlineCache(loopW); err != nil {
return nil, fmt.Errorf("new business cache failed, err: %v", err)
Expand All @@ -56,7 +55,7 @@ func NewCache(reflector reflector.Interface, loopW stream.LoopInterface, isMaste
return nil, fmt.Errorf("new common topo cache failed, err: %v", err)
}

watchCli := watch.NewClient(watchDB, mongodb.Client(), redis.Client())
watchCli := watch.NewClient(mongodb.Dal("watch"), mongodb.Dal(), redis.Client())

generalCache, err := general.New(isMaster, loopW, watchCli)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"configcenter/src/common/blog"
"configcenter/src/source_controller/cacheservice/event"
mixevent "configcenter/src/source_controller/cacheservice/event/mix-event"
"configcenter/src/storage/dal"
"configcenter/src/storage/dal/mongo/local"
"configcenter/src/storage/stream"
"configcenter/src/storage/stream/task"
)

const (
Expand All @@ -32,12 +30,10 @@ const (
)

// NewBizSetRelation init and run biz set relation event watch
func NewBizSetRelation(watch stream.LoopInterface, watchDB *local.Mongo, ccDB dal.DB) error {
func NewBizSetRelation(task *task.Task) error {
base := mixevent.MixEventFlowOptions{
MixKey: event.BizSetRelationKey,
Watch: watch,
WatchDB: watchDB,
CcDB: ccDB,
Task: task,
EventLockKey: bizSetRelationLockKey,
EventLockTTL: bizSetRelationLockTTL,
}
Expand Down
Loading