diff --git a/dbm-services/mysql/db-partition/service/check_partition.go b/dbm-services/mysql/db-partition/service/check_partition.go index eeaf399d59..5bf6c96069 100644 --- a/dbm-services/mysql/db-partition/service/check_partition.go +++ b/dbm-services/mysql/db-partition/service/check_partition.go @@ -127,18 +127,17 @@ func CheckPartitionConfigs(configs []*PartitionConfig, dbtype string, splitCnt i tokenBucket := make(chan int, 10) // 最大并行度 for _, config := range configs { + err := limiter.Wait(context.Background()) + if err != nil { + checkFailSet.Mu.Lock() + checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()}) + checkFailSet.Mu.Unlock() + continue + } wg.Add(1) tokenBucket <- 0 - ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) go func(config *PartitionConfig) { - err := limiter.Wait(context.Background()) - if err != nil { - checkFailSet.Mu.Lock() - checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()}) - checkFailSet.Mu.Unlock() - return - } CheckOnePartitionConfig(ctx, cancel, *config, &wg, &sqlSet, ¬hingToDoSet, &checkFailSet, dbtype, splitCnt, fromCron, host, &tokenBucket) }(config) diff --git a/dbm-services/mysql/db-partition/service/cron_basic_func.go b/dbm-services/mysql/db-partition/service/cron_basic_func.go index 3e473d9e77..efb9cee607 100644 --- a/dbm-services/mysql/db-partition/service/cron_basic_func.go +++ b/dbm-services/mysql/db-partition/service/cron_basic_func.go @@ -440,20 +440,20 @@ func DownLoadFilesCreateTicketByMachine(cloudMachineList map[int][]string, machi for cloud, machines := range cloudMachineList { tmp := util.SplitArray(machines, 20) for _, ips := range tmp { + errLimiter := limiter.Wait(context.Background()) + if errLimiter != nil { + msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error" + SendMonitor(msg, errLimiter) + slog.Error("msg", msg, errLimiter) + continue + } wg.Add(1) go func(cloud int, ips []string) { defer func() { wg.Done() }() - err := limiter.Wait(context.Background()) - if err != nil { - msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error" - SendMonitor(msg, err) - slog.Error("msg", msg, err) - return - } // 按照机器下载好dbactor - err = DownloadDbactor(cloud, ips) + err := DownloadDbactor(cloud, ips) // dbactor下载失败,可以继续执行分区的单据,机器上可能已经存在dbactor if err != nil { dimension := monitor.NewDeveloperEventDimension(Scheduler, monitor.PartitionCron) @@ -502,19 +502,19 @@ func DownLoadFilesCreateTicketByCluster(clusterIps map[string][]string, machineF vcluster := strings.Split(cluster, "|") domain := vcluster[0] cloud, _ := strconv.Atoi(vcluster[2]) + err := limiter.Wait(context.Background()) + if err != nil { + msg := "get token error" + SendMonitor(msg, err) + slog.Error("msg", msg, err) + continue + } wg.Add(1) var clusterFiles []Info go func(domain string, cloud int, machines []string) { defer func() { wg.Done() }() - err := limiter.Wait(context.Background()) - if err != nil { - msg := "get token error" - SendMonitor(msg, err) - slog.Error("msg", msg, err) - return - } tmp := util.SplitArray(machines, 20) for _, ips := range tmp { // 按照机器下载好dbactor diff --git a/dbm-services/mysql/db-priv/handler/account_rule.go b/dbm-services/mysql/db-priv/handler/account_rule.go index 14fdbd679b..ea0e254017 100644 --- a/dbm-services/mysql/db-priv/handler/account_rule.go +++ b/dbm-services/mysql/db-priv/handler/account_rule.go @@ -41,6 +41,7 @@ func (m *PrivService) GetAccountRuleList(c *gin.Context) { func (m *PrivService) AddAccountRule(c *gin.Context) { slog.Info("do AddAccountRule!") var input service.AccountRulePara + var rules []service.TbAccountRules ticket := strings.TrimPrefix(c.FullPath(), "/priv/") body, err := ioutil.ReadAll(c.Request.Body) @@ -55,13 +56,19 @@ func (m *PrivService) AddAccountRule(c *gin.Context) { SendResponse(c, errno.ErrBind, err) return } - if *input.ClusterType == "mongodb" { - err = input.MongoDBAddAccountRule(string(body), ticket) + rules, err = input.MongoDBAddAccountRule(string(body), ticket) } else { - err = input.AddAccountRule(string(body), ticket) + rules, err = input.AddAccountRule(string(body), ticket) } - SendResponse(c, err, nil) + if err != nil { + SendResponse(c, err, nil) + return + } + SendResponse(c, err, ListResponse{ + Count: len(rules), + Items: rules, + }) return } diff --git a/dbm-services/mysql/db-priv/main.go b/dbm-services/mysql/db-priv/main.go index cd1ac978cc..1e77b23003 100644 --- a/dbm-services/mysql/db-priv/main.go +++ b/dbm-services/mysql/db-priv/main.go @@ -1,6 +1,7 @@ package main import ( + "dbm-services/mysql/priv-service/util" "io" "log/slog" "net/http" @@ -41,6 +42,9 @@ func main() { } } + util.DbmetaClient = util.NewClientByHosts(viper.GetString("dbmeta")) + util.DrsClient = util.NewClientByHosts(viper.GetString("dbRemoteService")) + // 注册服务 gin.SetMode(gin.ReleaseMode) engine := gin.New() diff --git a/dbm-services/mysql/db-priv/service/account_rule_mongodb.go b/dbm-services/mysql/db-priv/service/account_rule_mongodb.go index d043a3b98e..48595f016b 100644 --- a/dbm-services/mysql/db-priv/service/account_rule_mongodb.go +++ b/dbm-services/mysql/db-priv/service/account_rule_mongodb.go @@ -8,7 +8,7 @@ import ( ) // MongoDBAddAccountRule 新增账号规则 -func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) error { +func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) { var ( accountRule TbAccountRules dbs []string @@ -16,6 +16,7 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) userPriv string managerPriv string err error + rules []TbAccountRules ) // mongo_user: read readWrite readAnyDatabase readWriteAnyDatabase // mongo_manager: dbAdmin backup restore userAdmin clusterAdmin @@ -24,17 +25,17 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) err = m.ParaPreCheck() if err != nil { - return err + return nil, err } dbs, err = util.String2Slice(m.Dbname) if err != nil { - return err + return nil, err } _, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false) if err != nil { - return err + return nil, err } for _, _type := range ConstPrivType { @@ -62,15 +63,16 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error if err != nil { tx.Rollback() - return err + return nil, err } + rules = append(rules, accountRule) } err = tx.Commit().Error if err != nil { - return err + return nil, err } log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime} AddPrivLog(log) - return nil + return rules, nil } diff --git a/dbm-services/mysql/db-priv/service/accout_rule.go b/dbm-services/mysql/db-priv/service/accout_rule.go index 4845547a00..03a752690e 100644 --- a/dbm-services/mysql/db-priv/service/accout_rule.go +++ b/dbm-services/mysql/db-priv/service/accout_rule.go @@ -142,7 +142,7 @@ func (m *QueryRulePara) QueryAccountRule() ([]*AccountRuleSplitUser, int, error) } // AddAccountRule 新增账号规则 -func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error { +func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) { var ( accountRule TbAccountRules dbs []string @@ -150,6 +150,7 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error { dmlDdlPriv string globalPriv string err error + rules []TbAccountRules ) // dml: select,insert,update,delete // ddl: create,alter,drop,index,execute,create view @@ -168,17 +169,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error { err = m.ParaPreCheck() if err != nil { - return err + return nil, err } dbs, err = util.String2Slice(m.Dbname) if err != nil { - return err + return nil, err } _, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false) if err != nil { - return err + return nil, err } for _, _type := range ConstPrivType { @@ -205,17 +206,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error { err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error if err != nil { tx.Rollback() - return err + return nil, err } + rules = append(rules, accountRule) } err = tx.Commit().Error if err != nil { - return err + return nil, err } log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime} AddPrivLog(log) - - return nil + return rules, nil } // AddAccountRuleDryRun 新增账号规则检查 diff --git a/dbm-services/mysql/db-priv/service/add_priv.go b/dbm-services/mysql/db-priv/service/add_priv.go index eafc6b6e2f..fc2e2f6d1e 100644 --- a/dbm-services/mysql/db-priv/service/add_priv.go +++ b/dbm-services/mysql/db-priv/service/add_priv.go @@ -12,9 +12,6 @@ import ( "golang.org/x/time/rate" "dbm-services/common/go-pubpkg/errno" - "dbm-services/mysql/priv-service/util" - - "github.com/spf13/viper" ) // AddPrivDryRun 使用账号规则,新增权限预检查 @@ -80,8 +77,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error { return errno.ClusterTypeIsEmpty } AddPrivLog(PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()}) - client := util.NewClientByHosts(viper.GetString("dbmeta")) - limit := rate.Every(time.Millisecond * 200) // QPS:5 + limit := rate.Every(time.Millisecond * 100) // QPS:10 burst := 10 // 桶容量 10 limiter := rate.NewLimiter(limit, burst) for _, rule := range m.AccoutRules { // 添加权限,for acccountRuleList;for instanceList; do create a routine @@ -91,6 +87,12 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error { continue } for _, dns := range m.TargetInstances { + errLimiter := limiter.Wait(context.Background()) + if errLimiter != nil { + slog.Error("limiter.Wait", "error", errLimiter, "dns", dns) + AddErrorOnly(&errMsg, errors.New(errLimiter.Error())) + continue + } wg.Add(1) go func(dns string) { defer func() { @@ -111,13 +113,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error { successInfo = fmt.Sprintf(`%s,授权成功。`, baseInfo) failInfo = fmt.Sprintf(`%s,授权失败:`, baseInfo) - err = limiter.Wait(context.Background()) - if err != nil { - AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error())) - return - } - - instance, err = GetCluster(client, m.ClusterType, Domain{EntryName: dns}) + instance, err = GetCluster(m.ClusterType, Domain{EntryName: dns}) if err != nil { AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error())) return diff --git a/dbm-services/mysql/db-priv/service/add_priv_base_func.go b/dbm-services/mysql/db-priv/service/add_priv_base_func.go index cd4a612596..2c380aee6b 100644 --- a/dbm-services/mysql/db-priv/service/add_priv_base_func.go +++ b/dbm-services/mysql/db-priv/service/add_priv_base_func.go @@ -15,7 +15,6 @@ import ( "github.com/asaskevich/govalidator" "github.com/jinzhu/gorm" - "github.com/spf13/viper" ) // GetAccountRuleInfo 根据账号名获取账号信息,根据账号 id 以及授权数据库获取账号规则 @@ -489,8 +488,6 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri UniqMap = make(map[string]struct{}) err error ) - - client := util.NewClientByHosts(viper.GetString("dbmeta")) for _, instance := range instances { instance = strings.Trim(strings.TrimSpace(instance), ".") if !govalidator.IsDNSName(instance) { @@ -499,7 +496,7 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri continue } dns = Domain{EntryName: instance} - _, err = GetCluster(client, clusterType, dns) + _, err = GetCluster(clusterType, dns) if err != nil { errMsg = append(errMsg, err.Error()) continue diff --git a/dbm-services/mysql/db-priv/service/add_priv_for_sqlserver.go b/dbm-services/mysql/db-priv/service/add_priv_for_sqlserver.go index ecf7f882ec..b949c0a891 100644 --- a/dbm-services/mysql/db-priv/service/add_priv_for_sqlserver.go +++ b/dbm-services/mysql/db-priv/service/add_priv_for_sqlserver.go @@ -5,10 +5,6 @@ import ( "log/slog" "strings" "time" - - "dbm-services/mysql/priv-service/util" - - "github.com/spf13/viper" ) // AddPriv 使用账号规则,新增权限 @@ -37,11 +33,10 @@ func (m *PrivTaskPara) AddPrivForSqlserver(jsonPara string) error { rules = append(rules, accountRule) } - client := util.NewClientByHosts(viper.GetString("dbmeta")) for _, dns := range m.TargetInstances { // 获取集群相关信息 dns = strings.Trim(strings.TrimSpace(dns), ".") - cluster, err := GetCluster(client, m.ClusterType, Domain{EntryName: dns}) + cluster, err := GetCluster(m.ClusterType, Domain{EntryName: dns}) if err != nil { return err } diff --git a/dbm-services/mysql/db-priv/service/admin_password.go b/dbm-services/mysql/db-priv/service/admin_password.go index a8b3a760a6..e68f001a84 100644 --- a/dbm-services/mysql/db-priv/service/admin_password.go +++ b/dbm-services/mysql/db-priv/service/admin_password.go @@ -287,7 +287,7 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error) var passwordInput string var errCheck error - limit := rate.Every(time.Millisecond * 200) // QPS:5 + limit := rate.Every(time.Millisecond * 100) // QPS:10 burst := 10 // 桶容量 10 limiter := rate.NewLimiter(limit, burst) @@ -377,16 +377,16 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error) slog.Error("SM4Encrypt", "error", errOuter) return batch, errOuter } + err := limiter.Wait(context.Background()) + if err != nil { + AddError(&errMsg, "get parallel resource", err) + continue + } wg.Add(1) go func(psw, encrypt string, cluster OneCluster) { defer func() { wg.Done() }() - err := limiter.Wait(context.Background()) - if err != nil { - AddError(&errMsg, "get parallel resource", err) - return - } // 如果是sqlserver授权,走sqlserver授权通道 if m.Component == "sqlserver" { m.ModifyAdminPasswordForSqlserver( diff --git a/dbm-services/mysql/db-priv/service/clone_client_priv.go b/dbm-services/mysql/db-priv/service/clone_client_priv.go index ff2000dedf..8d244471ec 100644 --- a/dbm-services/mysql/db-priv/service/clone_client_priv.go +++ b/dbm-services/mysql/db-priv/service/clone_client_priv.go @@ -12,9 +12,6 @@ import ( "golang.org/x/time/rate" "dbm-services/common/go-pubpkg/errno" - "dbm-services/mysql/priv-service/util" - - "github.com/spf13/viper" ) // CloneClientPrivDryRun 克隆客户端权限预检查 @@ -54,9 +51,10 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ var errMsg Err var sqls ClusterGrants wg := sync.WaitGroup{} - limit := rate.Every(time.Millisecond * 200) // QPS:5 + limit := rate.Every(time.Millisecond * 100) // QPS:10 burst := 10 // 桶容量 10 limiter := rate.NewLimiter(limit, burst) + tokenBucket := make(chan int, 10) // 最大并行度 if m.BkBizId == 0 { return nil, errno.BkBizIdIsEmpty @@ -70,9 +68,7 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ } AddPrivLog(PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()}) - - client := util.NewClientByHosts(viper.GetString("dbmeta")) - resp, errOuter := GetAllClustersInfo(client, BkBizIdPara{m.BkBizId}) + resp, errOuter := GetAllClustersInfo(BkBizIdPara{m.BkBizId}) if errOuter != nil { return nil, errOuter } @@ -122,17 +118,22 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ // 一个协程失败,其报错信息添加到errMsg.errs。主协程wg.Wait(),等待所有协程执行完成才会返回。 // 每个集群一个协程 + slog.Info("msg", "clusters", clusters) for _, item := range clusters { + errOuter = limiter.Wait(context.Background()) + if errOuter != nil { + slog.Error("limiter.Wait", "error", errOuter) + AddError(&errMsg, item.ImmuteDomain, errOuter) + continue + } wg.Add(1) + tokenBucket <- 0 + slog.Info("msg", "item.ImmuteDomain", item.ImmuteDomain) go func(item Cluster) { defer func() { + <-tokenBucket wg.Done() }() - errOuter = limiter.Wait(context.Background()) - if errOuter != nil { - AddError(&errMsg, item.ImmuteDomain, errOuter) - return - } clusterGrant := ClusterGrantSql{ImmuteDomain: item.ImmuteDomain} if item.ClusterType == tendbha || item.ClusterType == tendbsingle { for _, storage := range item.Storages { @@ -143,6 +144,11 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ AddError(&errMsg, address, err) continue } + if len(matchHosts) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.SourceIp) + continue + } slog.Info("msg", "matchHosts", matchHosts) userGrants, err := GetRemotePrivilege(address, matchHosts, item.BkCloudId, machineTypeBackend, m.User, true) @@ -151,6 +157,8 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ continue } if len(userGrants) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.SourceIp, "user", m.User) continue } userGrants = ReplaceHostInMysqlGrants(userGrants, m.TargetIp) @@ -160,7 +168,7 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ } clusterGrant.Sqls = append(clusterGrant.Sqls, InstanceGrantSql{address, mysqlcomm.ClearIdentifyByInSQLs(grants)}) - err = ImportMysqlPrivileges(grants, address, item.BkCloudId) + err = ImportMysqlPrivileges(userGrants, address, item.BkCloudId) if err != nil { AddError(&errMsg, address, err) } @@ -173,6 +181,11 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ AddError(&errMsg, address, err) continue } + if len(matchHosts) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.SourceIp) + continue + } userGrants, err := GetRemotePrivilege(address, matchHosts, item.BkCloudId, machineTypeSpider, m.User, true) if err != nil { @@ -180,6 +193,8 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ continue } if len(userGrants) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.SourceIp, "user", m.User) continue } userGrants = ReplaceHostInMysqlGrants(userGrants, m.TargetIp) @@ -189,7 +204,7 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ } clusterGrant.Sqls = append(clusterGrant.Sqls, InstanceGrantSql{address, mysqlcomm.ClearIdentifyByInSQLs(grants)}) - err = ImportMysqlPrivileges(grants, address, item.BkCloudId) + err = ImportMysqlPrivileges(userGrants, address, item.BkCloudId) if err != nil { AddError(&errMsg, address, err) } @@ -204,12 +219,18 @@ func (m *CloneClientPrivPara) CloneClientPriv(jsonPara string, ticket string) ([ AddError(&errMsg, address, err) } slog.Info("msg", "matchHosts", matchHosts) + if len(matchHosts) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.SourceIp) + continue + } proxyGrants, err := GetProxyPrivilege(address, matchHosts, item.BkCloudId, m.User) if err != nil { slog.Error("msg", "GetProxyPrivilege", err) AddError(&errMsg, address, err) } if len(proxyGrants) == 0 { + slog.Info("no match user@host", "instance", address, "user", m.User) continue } proxyGrants = ReplaceHostInProxyGrants(proxyGrants, m.TargetIp) diff --git a/dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go b/dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go index 9ffe59ad68..bd5607ed7c 100644 --- a/dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go +++ b/dbm-services/mysql/db-priv/service/clone_client_priv_base_func.go @@ -3,7 +3,6 @@ package service import ( "context" "dbm-services/common/go-pubpkg/errno" - "dbm-services/mysql/priv-service/util" "fmt" "log/slog" "regexp" @@ -11,9 +10,8 @@ import ( "sync" "time" - "golang.org/x/time/rate" - "github.com/asaskevich/govalidator" + "golang.org/x/time/rate" ) // ReplaceHostInMysqlGrants 替换mysql授权语句中的host @@ -111,27 +109,33 @@ func GetProxyPrivilege(address string, hosts []string, bkCloudId int64, specifie // ImportProxyPrivileges 导入proxy白名单 func ImportProxyPrivileges(grants []string, address string, bkCloudId int64) error { - var errs []string - limit := rate.Every(time.Millisecond * 200) // QPS:5 - burst := 5 // 桶容量 5 + var errMsg Err + wg := sync.WaitGroup{} + limit := rate.Every(time.Millisecond * 20) // QPS:50 + burst := 50 // 桶容量 50 limiter := rate.NewLimiter(limit, burst) - // nginx默认上传文件的大小限制是1M,为避免413 Request Entity Too Large报错,切分 - tmp := util.SplitArray(grants, 100) - for _, vsqls := range tmp { - err := limiter.Wait(context.Background()) - if err != nil { - slog.Error("msg", "limiter.Wait", err) - errs = append(errs, err.Error()) - continue - } - queryRequest := QueryRequest{[]string{address}, vsqls, true, 30, bkCloudId} - _, err = OneAddressExecuteProxySql(queryRequest) - if err != nil { - errs = append(errs, err.Error()) + for _, item := range grants { + errLimiter := limiter.Wait(context.Background()) + if errLimiter != nil { + slog.Error("msg", "limiter.Wait", errLimiter) + return errLimiter } + wg.Add(1) + go func(item string) { + defer func() { + wg.Done() + }() + queryRequest := QueryRequest{[]string{address}, []string{item}, true, 30, bkCloudId} + _, err := OneAddressExecuteProxySql(queryRequest) + if err != nil { + AddError(&errMsg, address, err) + return + } + }(item) } - if len(errs) > 0 { - return errno.ClonePrivilegesFail.Add(strings.Join(errs, "\n")) + wg.Wait() + if len(errMsg.errs) > 0 { + return errno.ClonePrivilegesFail.Add(strings.Join(errMsg.errs, "\n")) } return nil } diff --git a/dbm-services/mysql/db-priv/service/clone_instance_priv.go b/dbm-services/mysql/db-priv/service/clone_instance_priv.go index 2bf98a3440..a19aabbea1 100644 --- a/dbm-services/mysql/db-priv/service/clone_instance_priv.go +++ b/dbm-services/mysql/db-priv/service/clone_instance_priv.go @@ -79,11 +79,7 @@ func (m *CloneInstancePrivPara) CloneInstancePriv(jsonPara string, ticket string return err } } - var grants []string - for _, sql := range userGrants { - grants = append(grants, sql.Grants...) - } - err = ImportMysqlPrivileges(grants, m.Target.Address, *m.BkCloudId) + err = ImportMysqlPrivileges(userGrants, m.Target.Address, *m.BkCloudId) if err != nil { return err } diff --git a/dbm-services/mysql/db-priv/service/clone_instance_priv_base_func.go b/dbm-services/mysql/db-priv/service/clone_instance_priv_base_func.go index 9e66a8b595..23a557f2a7 100644 --- a/dbm-services/mysql/db-priv/service/clone_instance_priv_base_func.go +++ b/dbm-services/mysql/db-priv/service/clone_instance_priv_base_func.go @@ -51,8 +51,9 @@ func GetRemotePrivilege(address string, host string, bkCloudId int64, instanceTy wg := sync.WaitGroup{} finishChan := make(chan bool, 1) errorChan := make(chan error, 1) - limit := rate.Every(time.Millisecond * 200) // QPS:5 - burst := 5 // 桶容量 5 + //tokenBucket := make(chan int, 50) + limit := rate.Every(time.Millisecond * 20) // QPS:50 + burst := 50 // 桶容量 50 limiter := rate.NewLimiter(limit, burst) version, errOuter = GetMySQLVersion(address, bkCloudId) if errOuter != nil { @@ -88,18 +89,20 @@ func GetRemotePrivilege(address string, host string, bkCloudId int64, instanceTy } userHost := fmt.Sprintf(`'%s'@'%s'`, row["user"].(string), row["host"].(string)) wg.Add(1) + errLimiter := limiter.Wait(context.Background()) + if errLimiter != nil { + slog.Error("msg", "limiter.Wait", errLimiter) + return nil, errLimiter + } + // tokenBucket <- 0 // 在这里操作 token 可以防止过多的协程启动但处于等待 token 的阻塞状态 go func(userHost string, needShowCreateUser bool) { - slog.Info("msg", "userHost", userHost) defer func() { wg.Done() + // <-tokenBucket }() var Grants []string var err error - err = limiter.Wait(context.Background()) - if err != nil { - errorChan <- err - return - } + slog.Info("msg", "userHost", userHost) err = GetUserGantSql(needShowCreateUser, userHost, address, &Grants, bkCloudId) if err != nil { errorChan <- err @@ -113,6 +116,7 @@ func GetRemotePrivilege(address string, host string, bkCloudId int64, instanceTy go func() { wg.Wait() close(finishChan) + // close(tokenBucket) }() select { @@ -576,31 +580,49 @@ func CheckGrantInMySqlVersion(userGrants []UserGrant, address string, bkCloudId } // ImportMysqlPrivileges 执行mysql权限 -func ImportMysqlPrivileges(grants []string, address string, bkCloudId int64) error { - var errs []string - grants = append([]string{flushPriv, setBinlogOff}, grants...) - grants = append(grants, flushPriv) - limit := rate.Every(time.Millisecond * 200) // QPS:5 - burst := 5 // 桶容量 5 +func ImportMysqlPrivileges(userGrants []UserGrant, address string, bkCloudId int64) error { + // Err 错误信息列表 + type Err struct { + mu sync.RWMutex + errs []string + } + var errMsg Err + wg := sync.WaitGroup{} + limit := rate.Every(time.Millisecond * 20) // QPS:50 + burst := 50 // 桶容量 50 limiter := rate.NewLimiter(limit, burst) - // nginx默认上传文件的大小限制是1M,为避免413 Request Entity Too Large报错,切分 - tmp := util.SplitArray(grants, 100) - for _, vsqls := range tmp { - slog.Info("msg", "sqls", vsqls) - err := limiter.Wait(context.Background()) - if err != nil { - slog.Error("msg", "limiter.Wait", err) - errs = append(errs, err.Error()) - continue - } - queryRequest := QueryRequest{[]string{address}, vsqls, true, 60, bkCloudId} - _, err = OneAddressExecuteSql(queryRequest) - if err != nil { - errs = append(errs, err.Error()) + for _, row := range userGrants { + errLimiter := limiter.Wait(context.Background()) + if errLimiter != nil { + slog.Error("msg", "limiter.Wait", errLimiter) + return errLimiter } + wg.Add(1) + go func(row UserGrant) { + defer func() { + wg.Done() + }() + slog.Info("msg", "user@host", row.UserHost) + queryRequest := QueryRequest{[]string{address}, row.Grants, true, 60, bkCloudId} + _, err := OneAddressExecuteSql(queryRequest) + if err != nil { + errMsg.mu.Lock() + errMsg.errs = append(errMsg.errs, err.Error()) + errMsg.mu.Unlock() + return + } + }(row) + } + wg.Wait() + queryRequest := QueryRequest{[]string{address}, []string{flushPriv}, true, 60, bkCloudId} + _, err := OneAddressExecuteSql(queryRequest) + if err != nil { + errMsg.mu.Lock() + errMsg.errs = append(errMsg.errs, err.Error()) + errMsg.mu.Unlock() } - if len(errs) > 0 { - return fmt.Errorf(strings.Join(errs, "\n")) + if len(errMsg.errs) > 0 { + return fmt.Errorf(strings.Join(errMsg.errs, "\n")) } return nil } diff --git a/dbm-services/mysql/db-priv/service/db_meta_service.go b/dbm-services/mysql/db-priv/service/db_meta_service.go index b7d957c056..596dc4fb2e 100644 --- a/dbm-services/mysql/db-priv/service/db_meta_service.go +++ b/dbm-services/mysql/db-priv/service/db_meta_service.go @@ -78,10 +78,10 @@ GetAllClustersInfo 获取业务下所有集群信息 "immute_domain": "gamedb.2.hayley.db" }] */ -func GetAllClustersInfo(c *util.Client, id BkBizIdPara) ([]Cluster, error) { +func GetAllClustersInfo(id BkBizIdPara) ([]Cluster, error) { var resp []Cluster url := "/apis/proxypass/dbmeta/priv_manager/biz_clusters/" - result, err := c.Do(http.MethodPost, url, id) + result, err := util.DbmetaClient.Do(http.MethodPost, url, id) if err != nil { slog.Error("msg", url, err) return resp, err @@ -94,7 +94,7 @@ func GetAllClustersInfo(c *util.Client, id BkBizIdPara) ([]Cluster, error) { } // GetCluster 根据域名获取集群信息 -func GetCluster(c *util.Client, ClusterType string, dns Domain) (Instance, error) { +func GetCluster(ClusterType string, dns Domain) (Instance, error) { var resp Instance var url string if ClusterType == sqlserverHA || ClusterType == sqlserverSingle || ClusterType == sqlserver { @@ -104,7 +104,7 @@ func GetCluster(c *util.Client, ClusterType string, dns Domain) (Instance, error url = fmt.Sprintf("/apis/proxypass/dbmeta/priv_manager/mysql/%s/cluster_instances/", ClusterType) } - result, err := c.Do(http.MethodPost, url, dns) + result, err := util.DbmetaClient.Do(http.MethodPost, url, dns) if err != nil { slog.Error("msg", url, err) return resp, errno.DomainNotExists.Add(fmt.Sprintf(" %s: %s", dns.EntryName, err.Error())) diff --git a/dbm-services/mysql/db-priv/service/db_remote_service.go b/dbm-services/mysql/db-priv/service/db_remote_service.go index 084fe21236..9085e14f33 100644 --- a/dbm-services/mysql/db-priv/service/db_remote_service.go +++ b/dbm-services/mysql/db-priv/service/db_remote_service.go @@ -19,7 +19,6 @@ func OneAddressExecuteSqlBasic(vtype string, queryRequest QueryRequest) (oneAddr var result oneAddressResult var temp []oneAddressResult host := viper.GetString("dbRemoteService") - c := util.NewClientByHosts(host) var url string switch vtype { case "mysql": @@ -32,7 +31,7 @@ func OneAddressExecuteSqlBasic(vtype string, queryRequest QueryRequest) (oneAddr return result, fmt.Errorf("vtype not suppurt [%s]", vtype) } - apiResp, err := c.Do(http.MethodPost, url, queryRequest) + apiResp, err := util.DrsClient.Do(http.MethodPost, url, queryRequest) if err != nil { slog.Error("msg", "host", host, "url", url, "drs err", err) return result, fmt.Errorf("%s%s drs error: %s", host, url, err.Error()) diff --git a/dbm-services/mysql/db-priv/service/migrate_account_rule_base_func.go b/dbm-services/mysql/db-priv/service/migrate_account_rule_base_func.go index c462d91241..010107d510 100644 --- a/dbm-services/mysql/db-priv/service/migrate_account_rule_base_func.go +++ b/dbm-services/mysql/db-priv/service/migrate_account_rule_base_func.go @@ -157,7 +157,7 @@ func DoAddAccountRule(rule *PrivModule, apps map[string]int64, clusterType strin } log, _ := json.Marshal(rulePara) // 添加帐号规则 - err = rulePara.AddAccountRule(string(log), "add_account_rule") + _, err = rulePara.AddAccountRule(string(log), "add_account_rule") if err != nil { return fmt.Errorf("add rule failed: %s", err.Error()) } diff --git a/dbm-services/mysql/db-priv/service/query_priv.go b/dbm-services/mysql/db-priv/service/query_priv.go index d4b5f5f818..72b885892e 100644 --- a/dbm-services/mysql/db-priv/service/query_priv.go +++ b/dbm-services/mysql/db-priv/service/query_priv.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "github.com/spf13/viper" "golang.org/x/time/rate" ) @@ -24,8 +23,6 @@ func (m *GetPrivPara) GetUserList() ([]string, int, error) { if errCheck != nil { return userList.l, count, errCheck } - client := util.NewClientByHosts(viper.GetString("dbmeta")) - wg := sync.WaitGroup{} limit := rate.Every(time.Millisecond * 50) // QPS:20 burst := 20 // 桶容量 20 @@ -42,7 +39,7 @@ func (m *GetPrivPara) GetUserList() ([]string, int, error) { AddError(&errMsg, item, err) return } - instance, err := GetCluster(client, *m.ClusterType, Domain{EntryName: item}) + instance, err := GetCluster(*m.ClusterType, Domain{EntryName: item}) if err != nil { AddError(&errMsg, item, err) return @@ -120,8 +117,6 @@ func (m *GetPrivPara) GetPriv() ([]RelatedIp, []RelatedDomain2, int, []GrantInfo return nil, nil, count, nil, nil, nil, errno.ErrUserIsEmpty } users := strings.Join(m.Users, "','") - client := util.NewClientByHosts(viper.GetString("dbmeta")) - wg := sync.WaitGroup{} limit := rate.Every(time.Millisecond * 50) // QPS:20 burst := 20 // 桶容量 20 @@ -132,7 +127,7 @@ func (m *GetPrivPara) GetPriv() ([]RelatedIp, []RelatedDomain2, int, []GrantInfo defer func() { wg.Done() }() - instance, err := GetCluster(client, *m.ClusterType, Domain{EntryName: item}) + instance, err := GetCluster(*m.ClusterType, Domain{EntryName: item}) if err != nil { AddError(&errMsg, item, err) return @@ -211,6 +206,11 @@ func (m *GetPrivPara) GetPriv() ([]RelatedIp, []RelatedDomain2, int, []GrantInfo AddError(&errMsg, address, err) return } + if len(matchHosts) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", m.Ips, "users", m.Users) + return + } // 获取user@host的权限信息 userGrants, err = GetRemotePrivilege(address, matchHosts, instance.BkCloudId, machineType, users, true) @@ -218,6 +218,11 @@ func (m *GetPrivPara) GetPriv() ([]RelatedIp, []RelatedDomain2, int, []GrantInfo AddError(&errMsg, address, err) return } + if len(userGrants) == 0 { + slog.Info("no match user@host", "instance", address, + "source ip", matchHosts, "users", users) + return + } // 对权限语句做正则匹配,模糊匹配,过滤出匹配输入db的权限信息 dbpriv = SplitGrantSql(userGrants, m.Dbs, tendbhaMasterDomain) // mysql中的账号与权限相结合 diff --git a/dbm-services/mysql/db-priv/util/client.go b/dbm-services/mysql/db-priv/util/client.go index c3bb17ed8d..0623658833 100644 --- a/dbm-services/mysql/db-priv/util/client.go +++ b/dbm-services/mysql/db-priv/util/client.go @@ -22,6 +22,8 @@ const ( statusSuccess int = 0 ) +var DbmetaClient, DrsClient *Client + // APIServerResponse TODO type APIServerResponse struct { Code int `json:"code"` @@ -69,10 +71,7 @@ func (c *Client) DoNew(method, url string, params interface{}, headers map[strin var err error for retryIdx := 0; retryIdx < 5; retryIdx++ { response, err = c.doNewInner(method, url, params, headers) - if err == nil { - break - } - if strings.Contains(err.Error(), "cse.flowcontrol.Consumer.qps.limit") { + if err != nil { slog.Error(fmt.Sprintf("DoNew failed, retryIdx:%d", retryIdx), err) wait := retryIdx*retryIdx*1000 + rand.Intn(1000) time.Sleep(time.Duration(wait) * time.Millisecond) @@ -143,6 +142,11 @@ func (c *Client) doNewInner(method, url string, params interface{}, headers map[ break } + // 关闭前一个响应体,防止内存泄漏 + if resp.Body != nil { + resp.Body.Close() + } + wait := i*i*1000 + rand.Intn(1000) time.Sleep(time.Duration(wait) * time.Millisecond) slog.Warn(fmt.Sprintf("client.Do result with %s, wait %d milliSeconds and retry, url: %s", resp.Status, wait,