Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql): 客户端克隆匹配ip问题修复 #6486

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions dbm-services/mysql/db-partition/service/check_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,17 @@ func CheckPartitionConfigs(configs []*PartitionConfig, dbtype string, splitCnt i
tokenBucket := make(chan int, 10) // 最大并行度

for _, config := range configs {
err := limiter.Wait(context.Background())
if err != nil {
checkFailSet.Mu.Lock()
checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()})
checkFailSet.Mu.Unlock()
continue
}
wg.Add(1)
tokenBucket <- 0

ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
go func(config *PartitionConfig) {
err := limiter.Wait(context.Background())
if err != nil {
checkFailSet.Mu.Lock()
checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()})
checkFailSet.Mu.Unlock()
return
}
CheckOnePartitionConfig(ctx, cancel, *config, &wg, &sqlSet, &nothingToDoSet, &checkFailSet, dbtype, splitCnt,
fromCron, host, &tokenBucket)
}(config)
Expand Down
30 changes: 15 additions & 15 deletions dbm-services/mysql/db-partition/service/cron_basic_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,20 +440,20 @@ func DownLoadFilesCreateTicketByMachine(cloudMachineList map[int][]string, machi
for cloud, machines := range cloudMachineList {
tmp := util.SplitArray(machines, 20)
for _, ips := range tmp {
errLimiter := limiter.Wait(context.Background())
if errLimiter != nil {
msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error"
SendMonitor(msg, errLimiter)
slog.Error("msg", msg, errLimiter)
continue
}
wg.Add(1)
go func(cloud int, ips []string) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
return
}
// 按照机器下载好dbactor
err = DownloadDbactor(cloud, ips)
err := DownloadDbactor(cloud, ips)
// dbactor下载失败,可以继续执行分区的单据,机器上可能已经存在dbactor
if err != nil {
dimension := monitor.NewDeveloperEventDimension(Scheduler, monitor.PartitionCron)
Expand Down Expand Up @@ -502,19 +502,19 @@ func DownLoadFilesCreateTicketByCluster(clusterIps map[string][]string, machineF
vcluster := strings.Split(cluster, "|")
domain := vcluster[0]
cloud, _ := strconv.Atoi(vcluster[2])
err := limiter.Wait(context.Background())
if err != nil {
msg := "get token error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
continue
}
wg.Add(1)
var clusterFiles []Info
go func(domain string, cloud int, machines []string) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
msg := "get token error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
return
}
tmp := util.SplitArray(machines, 20)
for _, ips := range tmp {
// 按照机器下载好dbactor
Expand Down
15 changes: 11 additions & 4 deletions dbm-services/mysql/db-priv/handler/account_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (m *PrivService) GetAccountRuleList(c *gin.Context) {
func (m *PrivService) AddAccountRule(c *gin.Context) {
slog.Info("do AddAccountRule!")
var input service.AccountRulePara
var rules []service.TbAccountRules
ticket := strings.TrimPrefix(c.FullPath(), "/priv/")

body, err := ioutil.ReadAll(c.Request.Body)
Expand All @@ -55,13 +56,19 @@ func (m *PrivService) AddAccountRule(c *gin.Context) {
SendResponse(c, errno.ErrBind, err)
return
}

if *input.ClusterType == "mongodb" {
err = input.MongoDBAddAccountRule(string(body), ticket)
rules, err = input.MongoDBAddAccountRule(string(body), ticket)
} else {
err = input.AddAccountRule(string(body), ticket)
rules, err = input.AddAccountRule(string(body), ticket)
}
SendResponse(c, err, nil)
if err != nil {
SendResponse(c, err, nil)
return
}
SendResponse(c, err, ListResponse{
Count: len(rules),
Items: rules,
})
return
}

Expand Down
4 changes: 4 additions & 0 deletions dbm-services/mysql/db-priv/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"dbm-services/mysql/priv-service/util"
"io"
"log/slog"
"net/http"
Expand Down Expand Up @@ -41,6 +42,9 @@ func main() {
}
}

util.DbmetaClient = util.NewClientByHosts(viper.GetString("dbmeta"))
util.DrsClient = util.NewClientByHosts(viper.GetString("dbRemoteService"))

// 注册服务
gin.SetMode(gin.ReleaseMode)
engine := gin.New()
Expand Down
16 changes: 9 additions & 7 deletions dbm-services/mysql/db-priv/service/account_rule_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
)

// MongoDBAddAccountRule 新增账号规则
func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) error {
func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) {
var (
accountRule TbAccountRules
dbs []string
allTypePriv string
userPriv string
managerPriv string
err error
rules []TbAccountRules
)
// mongo_user: read readWrite readAnyDatabase readWriteAnyDatabase
// mongo_manager: dbAdmin backup restore userAdmin clusterAdmin
Expand All @@ -24,17 +25,17 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string)

err = m.ParaPreCheck()
if err != nil {
return err
return nil, err
}

dbs, err = util.String2Slice(m.Dbname)
if err != nil {
return err
return nil, err
}

_, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
if err != nil {
return err
return nil, err
}

for _, _type := range ConstPrivType {
Expand Down Expand Up @@ -62,15 +63,16 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string)
err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error
if err != nil {
tx.Rollback()
return err
return nil, err
}
rules = append(rules, accountRule)
}
err = tx.Commit().Error
if err != nil {
return err
return nil, err
}
log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime}
AddPrivLog(log)

return nil
return rules, nil
}
17 changes: 9 additions & 8 deletions dbm-services/mysql/db-priv/service/accout_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,15 @@ func (m *QueryRulePara) QueryAccountRule() ([]*AccountRuleSplitUser, int, error)
}

// AddAccountRule 新增账号规则
func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {
func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) {
var (
accountRule TbAccountRules
dbs []string
allTypePriv string
dmlDdlPriv string
globalPriv string
err error
rules []TbAccountRules
)
// dml: select,insert,update,delete
// ddl: create,alter,drop,index,execute,create view
Expand All @@ -168,17 +169,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {

err = m.ParaPreCheck()
if err != nil {
return err
return nil, err
}

dbs, err = util.String2Slice(m.Dbname)
if err != nil {
return err
return nil, err
}

_, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
if err != nil {
return err
return nil, err
}

for _, _type := range ConstPrivType {
Expand All @@ -205,17 +206,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {
err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error
if err != nil {
tx.Rollback()
return err
return nil, err
}
rules = append(rules, accountRule)
}
err = tx.Commit().Error
if err != nil {
return err
return nil, err
}
log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime}
AddPrivLog(log)

return nil
return rules, nil
}

// AddAccountRuleDryRun 新增账号规则检查
Expand Down
20 changes: 8 additions & 12 deletions dbm-services/mysql/db-priv/service/add_priv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import (
"golang.org/x/time/rate"

"dbm-services/common/go-pubpkg/errno"
"dbm-services/mysql/priv-service/util"

"github.com/spf13/viper"
)

// AddPrivDryRun 使用账号规则,新增权限预检查
Expand Down Expand Up @@ -80,8 +77,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
return errno.ClusterTypeIsEmpty
}
AddPrivLog(PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()})
client := util.NewClientByHosts(viper.GetString("dbmeta"))
limit := rate.Every(time.Millisecond * 200) // QPS:5
limit := rate.Every(time.Millisecond * 100) // QPS:10
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)
for _, rule := range m.AccoutRules { // 添加权限,for acccountRuleList;for instanceList; do create a routine
Expand All @@ -91,6 +87,12 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
continue
}
for _, dns := range m.TargetInstances {
errLimiter := limiter.Wait(context.Background())
if errLimiter != nil {
slog.Error("limiter.Wait", "error", errLimiter, "dns", dns)
AddErrorOnly(&errMsg, errors.New(errLimiter.Error()))
continue
}
wg.Add(1)
go func(dns string) {
defer func() {
Expand All @@ -111,13 +113,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
successInfo = fmt.Sprintf(`%s,授权成功。`, baseInfo)
failInfo = fmt.Sprintf(`%s,授权失败:`, baseInfo)

err = limiter.Wait(context.Background())
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
return
}

instance, err = GetCluster(client, m.ClusterType, Domain{EntryName: dns})
instance, err = GetCluster(m.ClusterType, Domain{EntryName: dns})
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
return
Expand Down
5 changes: 1 addition & 4 deletions dbm-services/mysql/db-priv/service/add_priv_base_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/asaskevich/govalidator"
"github.com/jinzhu/gorm"
"github.com/spf13/viper"
)

// GetAccountRuleInfo 根据账号名获取账号信息,根据账号 id 以及授权数据库获取账号规则
Expand Down Expand Up @@ -489,8 +488,6 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri
UniqMap = make(map[string]struct{})
err error
)

client := util.NewClientByHosts(viper.GetString("dbmeta"))
for _, instance := range instances {
instance = strings.Trim(strings.TrimSpace(instance), ".")
if !govalidator.IsDNSName(instance) {
Expand All @@ -499,7 +496,7 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri
continue
}
dns = Domain{EntryName: instance}
_, err = GetCluster(client, clusterType, dns)
_, err = GetCluster(clusterType, dns)
if err != nil {
errMsg = append(errMsg, err.Error())
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"log/slog"
"strings"
"time"

"dbm-services/mysql/priv-service/util"

"github.com/spf13/viper"
)

// AddPriv 使用账号规则,新增权限
Expand Down Expand Up @@ -37,11 +33,10 @@ func (m *PrivTaskPara) AddPrivForSqlserver(jsonPara string) error {
rules = append(rules, accountRule)
}

client := util.NewClientByHosts(viper.GetString("dbmeta"))
for _, dns := range m.TargetInstances {
// 获取集群相关信息
dns = strings.Trim(strings.TrimSpace(dns), ".")
cluster, err := GetCluster(client, m.ClusterType, Domain{EntryName: dns})
cluster, err := GetCluster(m.ClusterType, Domain{EntryName: dns})
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions dbm-services/mysql/db-priv/service/admin_password.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error)
var passwordInput string
var errCheck error

limit := rate.Every(time.Millisecond * 200) // QPS:5
limit := rate.Every(time.Millisecond * 100) // QPS:10
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)

Expand Down Expand Up @@ -377,16 +377,16 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error)
slog.Error("SM4Encrypt", "error", errOuter)
return batch, errOuter
}
err := limiter.Wait(context.Background())
if err != nil {
AddError(&errMsg, "get parallel resource", err)
continue
}
wg.Add(1)
go func(psw, encrypt string, cluster OneCluster) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
AddError(&errMsg, "get parallel resource", err)
return
}
// 如果是sqlserver授权,走sqlserver授权通道
if m.Component == "sqlserver" {
m.ModifyAdminPasswordForSqlserver(
Expand Down
Loading
Loading