Skip to content

Commit

Permalink
refactor(backend): 同步v1.3.0代码 #3883
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzhw8 authored Apr 9, 2024
2 parents a97f22d + 3444f07 commit fa0595f
Show file tree
Hide file tree
Showing 310 changed files with 5,160 additions and 3,272 deletions.
2 changes: 1 addition & 1 deletion .gtmproject.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ github:
repo_name: "blueking-dbm"

# 指定里程碑ID,
milestone_id: "8"
milestone_id: "9"

project:
# 主分支
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -38,11 +39,18 @@ type Allocation struct {

// Conn TODO
func (o EsInsObject) Conn() (*elasticsearch.Client, error) {
username := os.Getenv("ES_USERNAME")
password := os.Getenv("ES_PASSWORD")
if username == "" || password == "" {
err := errors.New("环境变量ES_USERNAME、ES_PASSWORD为空,或不存在")
return nil, err

}
return elasticsearch.NewClient(
elasticsearch.Config{
Addresses: []string{fmt.Sprintf("http://%s:%d", o.Host, o.HTTPPort)},
Username: o.UserName,
Password: o.Password,
Username: username,
Password: password,
})
}

Expand Down
13 changes: 13 additions & 0 deletions dbm-services/common/dbha/ha-module/client/redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,19 @@ func (r *RedisClient) InfoV2(section string) (infoRet map[string]string, err err
return
}

// Get Redis `GET key` command. It returns redis.Nil error when key does not exist.
func (r *RedisClient) Get(key string) (ret string, err error) {
if r.mode != RedisInstance {
ret, err = r.crdb.Get(context.TODO(), key).Result()
} else {
ret, err = r.rdb.Get(context.TODO(), key).Result()
}
if err != nil && err != redis.Nil {
return
}
return ret, nil
}

// SlaveOf TODO
func (r *RedisClient) SlaveOf(host, port string) (ret string, err error) {
if r.mode == RedisInstance {
Expand Down
36 changes: 33 additions & 3 deletions dbm-services/common/dbha/ha-module/dbmodule/redis/redis_switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -523,7 +524,8 @@ func (ins *RedisSwitch) GetTwemproxyBackends(ip string, adminPort int) (segs map
// CheckSlaveSyncStatus // 5. 检查同步状态
func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, slaveIp string, slavePort int) error {
slaveAddr, slaveConn := fmt.Sprintf("%s:%d", slaveIp, slavePort), &client.RedisClient{}
slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 0)
masterAddr := fmt.Sprintf("%s:%d", masterIp, masterPort)
slaveConn.Init(slaveAddr, ins.Pass, ins.Timeout, 1)
defer slaveConn.Close()

replic, err := slaveConn.InfoV2("replication")
Expand All @@ -547,8 +549,10 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl
return err
}

if replic["master_link_status"] != "up" {
err := fmt.Errorf("unexpected status master_link_status:%s", replic["master_link_status"])
// master_last_io_seconds_ago:-1 master_link_status:down master_link_down_since_seconds:160 master_port:30001
// master_last_io_seconds_ago:114 master_link_status:up master_port:30000 master_repl_offset:2113331 master |
if err := ins.checkReplicationSync(slaveConn, masterAddr, slaveAddr); err != nil {
err := fmt.Errorf("unexpected status master_sync_status:%s", err)
ins.ReportLogs(constvar.FailResult, fmt.Sprintf("redis switch precheck: (%s) : %s", slaveAddr, err.Error()))
return err
}
Expand All @@ -563,6 +567,32 @@ func (ins *RedisSwitch) CheckSlaveSyncStatus(masterIp string, masterPort int, sl
return nil
}

// checkReplicationSync # here we just check the master heartbeat:
func (ins *RedisSwitch) checkReplicationSync(newMasterConn *client.RedisClient,
masterAddr, slaveAddr string) (err error) {
var masterTime, slaveTime int64

rst, err := newMasterConn.Get(fmt.Sprintf("%s:time", masterAddr))
if err != nil {
return fmt.Errorf("[%s]new master node, exec cmd err:%+v", masterAddr, err)
}
if masterTime, err = strconv.ParseInt(rst, 10, 64); err != nil {
return fmt.Errorf("[%s]new master node, time2Int64 err:%+v", masterAddr, err)
}

slaveTime = time.Now().Unix() // here gcs.perl use redis-cli time

slaveMasterDiffTime := math.Abs(float64(slaveTime) - float64(masterTime))
if slaveMasterDiffTime > MaxLastIOSecondsAgo {
return fmt.Errorf("err master slave sync too long %s => %s diff: %.0f(%d)",
masterAddr, slaveAddr, slaveMasterDiffTime, MaxLastIOSecondsAgo)
}

log.Logger.Infof("[%s]new master node, master on slave time:%d, diff:%.0f slave time:%d",
slaveAddr, masterTime, slaveMasterDiffTime, slaveTime)
return nil
}

// IsSlave check instance is slave or not
func (ins *RedisSwitch) IsSlave() bool {
return strings.Contains(ins.Role, "slave")
Expand Down
3 changes: 1 addition & 2 deletions dbm-services/mysql/db-partition/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (

"dbm-services/common/go-pubpkg/apm/metric"
"dbm-services/common/go-pubpkg/apm/trace"
"dbm-services/mysql/db-partition/monitor"

"dbm-services/mysql/db-partition/assests"
"dbm-services/mysql/db-partition/cron"
"dbm-services/mysql/db-partition/model"
"dbm-services/mysql/db-partition/monitor"
"dbm-services/mysql/db-partition/router"
)

Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mysql/db-partition/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestSendEvent(dataId int, token string, serviceHost string) error {
"content": "test partition monitor",
},
commonData: commonData{
Target: "127.0.0.1",
Target: "0.0.0.0",
Timestamp: time.Now().In(l).UnixMilli(),
Dimension: dimension,
Metrics: nil,
Expand Down
3 changes: 3 additions & 0 deletions dbm-services/mysql/db-partition/service/manage_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (m *QueryParititionsInput) GetPartitionsConfig() ([]*PartitionConfigWithLog
slog.Error(vsql, "execute error", err)
return nil, 0, err
}
if m.Limit == -1 {
m.Limit = cnt.Count
}

limitCondition := fmt.Sprintf("limit %d offset %d", m.Limit, m.Offset)
if m.OrderBy == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type QueryParititionsInput struct {
ImmuteDomains []string `json:"immute_domains"`
DbLikes []string `json:"dblikes"`
TbLikes []string `json:"tblikes"`
Limit int `json:"limit"`
Limit int64 `json:"limit"`
Offset int `json:"offset"`
OrderBy string `json:"order_by"`
AscDesc string `json:"asc_desc"`
Expand Down
21 changes: 12 additions & 9 deletions dbm-services/mysql/db-priv/handler/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,33 +92,36 @@ func (m *PrivService) ModifyAccount(c *gin.Context) {
return
}

// GetAccount 获取账号
func (m *PrivService) GetAccount(c *gin.Context) {
slog.Info("do GetAccount!")
var input service.AccountPara
// GetAccountList 获取账号列表
func (m *PrivService) GetAccountList(c *gin.Context) {
slog.Info("do GetAccountList!")
var input service.GetAccountListPara

body, err := ioutil.ReadAll(c.Request.Body)
if err != nil {
slog.Error("msg", err)
SendResponse(c, errno.ErrBind, err)
return
}

if err = json.Unmarshal(body, &input); err != nil {
slog.Error("msg", err)
SendResponse(c, errno.ErrBind, err)
return
}

accounts, count, err := input.GetAccount()
accounts, count, err := input.GetAccountList()
type ListResponse struct {
Count int64 `json:"count"`
Results interface{} `json:"results"`
}
SendResponse(c, err, ListResponse{
Count: count,
Items: accounts,
Count: count,
Results: accounts,
})
return
}

// GetAccount 获取账号
// GetAccountIncludePsw 获取账号密码
func (m *PrivService) GetAccountIncludePsw(c *gin.Context) {
slog.Info("do GetAccount!")
var input service.GetAccountIncludePswPara
Expand Down
8 changes: 4 additions & 4 deletions dbm-services/mysql/db-priv/handler/register_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ func (m *PrivService) Routes() []*gin.RouteInfo {
return []*gin.RouteInfo{
// 账号
{Method: http.MethodPost, Path: "add_account", HandlerFunc: m.AddAccount},
{Method: http.MethodPost, Path: "get_account", HandlerFunc: m.GetAccount},
{Method: http.MethodPost, Path: "modify_account", HandlerFunc: m.ModifyAccount},
{Method: http.MethodPost, Path: "delete_account", HandlerFunc: m.DeleteAccount},
// 查询帐号清单
{Method: http.MethodPost, Path: "get_account", HandlerFunc: m.GetAccountList},
// 查询帐号,并且包含密码,为mongodb等非mysql数据库类型使用
{Method: http.MethodPost, Path: "get_account_include_psw", HandlerFunc: m.GetAccountIncludePsw},

// 账号规则
{Method: http.MethodPost, Path: "get_account_rule_list", HandlerFunc: m.GetAccountRuleList},
Expand Down Expand Up @@ -73,9 +76,6 @@ func (m *PrivService) Routes() []*gin.RouteInfo {

// 检查和迁移账号规则
{Method: http.MethodPost, Path: "migrate_account_rule", HandlerFunc: m.MigrateAccountRule},

// 查询帐号,并且包含密码,为mongodb等非mysql数据库类型使用
{Method: http.MethodPost, Path: "get_account_include_psw", HandlerFunc: m.GetAccountIncludePsw},
}
}

Expand Down
65 changes: 54 additions & 11 deletions dbm-services/mysql/db-priv/service/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/hex"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -169,22 +170,61 @@ func (m *AccountPara) DeleteAccount(jsonPara string) error {
return nil
}

// GetAccount 获取账号
func (m *AccountPara) GetAccount() ([]*TbAccounts, int64, error) {
// GetAccountList 获取账号列表
func (m *GetAccountListPara) GetAccountList() ([]*TbAccounts, int64, error) {
// Cnt 用于返回匹配到的行数
type Cnt struct {
Count int64 `gorm:"column:cnt"`
}
var (
accounts []*TbAccounts
result *gorm.DB
)
if m.BkBizId == 0 {
return nil, 0, errno.BkBizIdIsEmpty
where := " 1=1 "
if m.BkBizId > 0 {
where = fmt.Sprintf("%s and bk_biz_id=%d", where, m.BkBizId)
}
result = DB.Self.Model(&TbAccounts{}).Where(&TbAccounts{
BkBizId: m.BkBizId, ClusterType: *m.ClusterType, User: m.User}).Select(
"id,bk_biz_id,user,cluster_type,creator,create_time,update_time").Scan(&accounts)
if result.Error != nil {
return nil, 0, result.Error
if m.ClusterType != nil {
where = fmt.Sprintf("%s and cluster_type='%s'", where, *m.ClusterType)
}
return accounts, int64(len(accounts)), nil
if m.UserLike != "" {
where = fmt.Sprintf("%s and user like '%%%s%%'", where, m.UserLike)
}
if m.User != "" {
where = fmt.Sprintf("%s and user = '%s'", where, m.User)
}
if m.Id != nil {
m.Ids = append(m.Ids, *m.Id)
}
if len(m.Ids) != 0 {
var temp = make([]string, len(m.Ids))
for k, id := range m.Ids {
temp[k] = strconv.FormatInt(id, 10)
}
ids := " and id in (" + strings.Join(temp, ",") + ") "
where = where + ids
}
cnt := Cnt{}
vsql := fmt.Sprintf("select count(*) as cnt from tb_accounts where %s", where)
err := DB.Self.Raw(vsql).Scan(&cnt).Error
if err != nil {
slog.Error(vsql, "execute error", err)
return nil, 0, err
}
if cnt.Count == 0 {
return nil, 0, nil
}
if m.Limit == nil {
vsql = fmt.Sprintf("select id, user, creator, bk_biz_id from tb_accounts where %s", where)
} else {
limitCondition := fmt.Sprintf("limit %d offset %d", *m.Limit, *m.Offset)
vsql = fmt.Sprintf("select id, user, creator, bk_biz_id from tb_accounts where %s %s", where, limitCondition)
}
err = DB.Self.Raw(vsql).Scan(&accounts).Error
if err != nil {
slog.Error(vsql, "execute error", err)
return nil, 0, err
}
return accounts, cnt.Count, nil
}

// GetAccountIncludePsw 获取帐号以及密码
Expand All @@ -199,6 +239,9 @@ func (m *GetAccountIncludePswPara) GetAccountIncludePsw() ([]*TbAccounts, int64,
if len(m.Users) == 0 {
return nil, 0, errno.ErrUserIsEmpty
}
if m.ClusterType == nil {
return nil, 0, errno.ClusterTypeIsEmpty
}
// mongodb 需要查询psw
users := "'" + strings.Join(m.Users, "','") + "'"
where := fmt.Sprintf("bk_biz_id=%d and cluster_type='%s' and user in (%s)", m.BkBizId, *m.ClusterType, users)
Expand Down
13 changes: 13 additions & 0 deletions dbm-services/mysql/db-priv/service/account_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ type GetAccountIncludePswPara struct {
ClusterType *string `json:"cluster_type"`
}

type GetAccountListPara struct {
BkBizId int64 `json:"bk_biz_id"`
Ids []int64 `json:"ids"`
Id *int64 `json:"id"`
ClusterType *string `json:"cluster_type"`
// user模糊查询
UserLike string `json:"user_like"`
// user精确查询
User string `json:"user"`
Limit *int64 `json:"limit"`
Offset *int64 `json:"offset"`
}

// PrivLog 记录权限相关接口的调用日志
type PrivLog struct {
Id int64 `gorm:"column:id;primary_key;auto_increment" json:"id"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func DoAddAccounts(apps map[string]int64, users []PrivModule, clusterType string

// DoAddAccountRule 创建帐号规则
func DoAddAccountRule(rule *PrivModule, apps map[string]int64, clusterType string, priv map[string]string) error {
account := AccountPara{BkBizId: apps[rule.App], User: rule.User, ClusterType: &clusterType}
items, cnt, err := account.GetAccount()
account := GetAccountListPara{BkBizId: apps[rule.App], User: rule.User, ClusterType: &clusterType}
items, cnt, err := account.GetAccountList()
if err != nil {
return fmt.Errorf("add rule failed when get account: %s", err.Error())
}
Expand Down
4 changes: 4 additions & 0 deletions dbm-services/mysql/db-tools/dbactuator/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ Buildstamp:%s`, version, githash, strings.ToUpper(external), buildstamp,
&subcmd.GBaseOptions.PayloadFormat, "payload-format", "m",
subcmd.GBaseOptions.PayloadFormat, "command payload format, default base64, value_allowed: base64|raw",
)
cmds.PersistentFlags().StringVarP(
&subcmd.GBaseOptions.NotSensitivePayload, "non-sensitive-paylod", "c",
subcmd.GBaseOptions.NotSensitivePayload, "commandn not sensitive payload <base64>",
)
cmds.PersistentFlags().StringVarP(&subcmd.GBaseOptions.Uid, "uid", "U", subcmd.GBaseOptions.Uid, "bill id")
cmds.PersistentFlags().StringVarP(&subcmd.GBaseOptions.RootId, "root_id", "R", subcmd.GBaseOptions.NodeId,
"process id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (d *DeployMySQLAct) Init() (err error) {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
if err = d.DeserializeNonSensitivePayload(&d.Service.MySQLConfigParams); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
d.Service.GeneralParam = subcmd.GeneralRuntimeParam
return d.Service.InitDefaultParam()
}
Expand Down
Loading

0 comments on commit fa0595f

Please sign in to comment.