Skip to content

Commit

Permalink
refactor(dbm-services): 模拟执行记录导入表结构过程 TencentBlueKing#6995
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Sep 19, 2024
1 parent f9efdce commit d1edc18
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 25 deletions.
2 changes: 1 addition & 1 deletion dbm-services/mysql/db-simulation/app/service/kubernets.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (k *DbPodSets) executeInPod(cmd, container string, extMap map[string]string
for sc.Scan() {
if !noLogger {
// 此方案打印的日志会在前端展示
xlogger.Info(sc.Text())
xlogger.Info("%s", sc.Text())
} else {
logger.Info(sc.Text())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCreateClusterPod(t *testing.T) {
ps.TdbCtlImage = config.GAppConfig.Image.TdbCtlImg
ps.SpiderImage = config.GAppConfig.Image.SpiderImg
if err := ps.CreateClusterPod(); err != nil {
t.Fatalf(err.Error())
t.Fatalf("%s", err.Error())
return
}
t.Log("ending..")
Expand Down
47 changes: 39 additions & 8 deletions dbm-services/mysql/db-simulation/app/service/simulation_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,14 @@ func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Log
// 关闭协程
defer func() { ticker.Stop(); doneChan <- struct{}{} }()
model.UpdatePhase(t.TaskId, t.MySQLVersion, model.PhaseLoadSchema)
stdout, stderr, err := t.DbPodSets.executeInPod(t.getLoadSchemaSQLCmd(t.Path, t.SchemaSQLFile),
containerName,
t.getExtmap(t.SchemaSQLFile), true)
sstdout += stdout.String() + "\n"
sstderr += stderr.String() + "\n"
// Load schema SQL
sstdout, sstderr, err = t.loadSchemaSQL(containerName)
if err != nil {
logger.Error("load database schema sql failed %v", err)
return sstdout, sstderr, err
xlogger.Error("Failed to load schema SQL: %v", err)
return sstdout, sstderr, fmt.Errorf("failed to load schema SQL: %w", err)
}
xlogger.Info(stdout.String(), stderr.String())
xlogger.Info("Schema SQL loaded successfully")
xlogger.Info(sstdout, sstderr)
// load real databases
if err = t.getDbsExcludeSysDb(); err != nil {
logger.Error("getDbsExcludeSysDb faiked %v", err)
Expand All @@ -249,6 +247,39 @@ func (t *SimulationTask) SimulationRun(containerName string, xlogger *logger.Log
return sstdout, sstderr, nil
}

func (t *SimulationTask) loadSchemaSQL(containerName string) (sstdout, sstderr string,
err error) {
defer func() {
status := model.TaskSuccess
errMsg := ""
if err != nil {
status = model.TaskFailed
errMsg = err.Error()
}
errx := model.DB.Create(&model.TbSqlFileSimulationInfo{
TaskId: t.TaskId,
BillTaskId: t.Uid,
LineId: 99999,
FileNameHash: fmt.Sprintf("%x", sha256.Sum256([]byte(t.SchemaSQLFile))),
FileName: t.SchemaSQLFile,
MySQLVersion: t.MySQLVersion,
Status: status,
ErrMsg: errMsg,
CreateTime: time.Now(),
UpdateTime: time.Now(),
}).Error
if errx != nil {
logger.Warn("create exeute schema sqlfile simulation record failed %v", errx)
}
}()
stdout, stderr, err := t.DbPodSets.executeInPod(t.getLoadSchemaSQLCmd(t.Path, t.SchemaSQLFile),
containerName,
t.getExtmap(t.SchemaSQLFile), true)
sstdout += stdout.String() + "\n"
sstderr += stderr.String() + "\n"
return sstdout, sstderr, err
}

func (t *SimulationTask) executeOneObject(e ExcuteSQLFileObj, containerName string, xlogger *logger.Logger) (sstdout,
sstderr string, err error) {
defer func() {
Expand Down
22 changes: 9 additions & 13 deletions dbm-services/mysql/db-simulation/app/syntax/syntax.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ type RiskInfo struct {
const DdlMapFileSubffix = ".tbl.map"

// Do 运行语法检查 For SQL 文件
//
// @receiver tf
// @return result
// @return err
func (tf *TmysqlParseFile) Do(dbtype string, versions []string) (result map[string]*CheckInfo, err error) {
logger.Info("doing....")
tf.result = make(map[string]*CheckInfo)
Expand Down Expand Up @@ -313,7 +309,7 @@ func (t *TmysqlParse) getCommand(filename, version string) (cmd string) {
outputFileName := getSQLParseResultFile(filename, version)
out = path.Join(t.tmpWorkdir, outputFileName)

cmd = fmt.Sprintf(`%s --sql-file=%s --output-path=%s --print-query-mode=2 --output-format='JSON_LINE_PER_OBJECT'`,
cmd = fmt.Sprintf(`%s --sql-file=%s --output-path=%s --print-query-mode=2 --output-format='JSON_LINE_PER_OBJECT' --sql-mode='' `,
t.TmysqlParseBinPath, in, out)

if lo.IsNotEmpty(version) {
Expand Down Expand Up @@ -401,19 +397,19 @@ func (t *TmysqlParse) AnalyzeParseResult(alreadExecutedSqlfileCh chan string, my
return errors.Join(errs...)
}

func (c *CheckInfo) parseResult(rule *RuleItem, res ParseLineQueryBase, ver string) {
func (c *CheckInfo) parseResult(rule *RuleItem, res ParseLineQueryBase, ver string, lineNum int) {
matched, err := rule.CheckItem(res.Command)
if matched {
if rule.Ban {
c.BanWarnings = append(c.BanWarnings, RiskInfo{
Line: int64(res.QueryId),
Line: int64(lineNum),
Sqltext: res.QueryString,
CommandType: res.Command,
WarnInfo: fmt.Sprintf("[%s]: %s", ver, err.Error()),
})
} else {
c.RiskWarnings = append(c.RiskWarnings, RiskInfo{
Line: int64(res.QueryId),
Line: int64(lineNum),
Sqltext: res.QueryString,
CommandType: res.Command,
WarnInfo: fmt.Sprintf("[%s]: %s", ver, err.Error()),
Expand Down Expand Up @@ -555,7 +551,7 @@ func (t *TmysqlParse) AnalyzeOne(inputfileName, mysqlVersion, dbtype string) (er
if res.IsSysDb() {
t.mu.Lock()
checkResult.BanWarnings = append(checkResult.BanWarnings, RiskInfo{
Line: int64(res.QueryId),
Line: int64(idx),
Sqltext: res.QueryString,
WarnInfo: fmt.Sprintf("disable operating sys db: %s", res.DbName),
})
Expand All @@ -565,15 +561,15 @@ func (t *TmysqlParse) AnalyzeOne(inputfileName, mysqlVersion, dbtype string) (er
// tmysqlparse检查结果全部正确,开始判断语句是否符合定义的规则(即虽然语法正确,但语句可能是高危语句或禁用的命令)
switch dbtype {
case app.MySQL:
checkResult.parseResult(R.CommandRule.HighRiskCommandRule, res, mysqlVersion)
checkResult.parseResult(R.CommandRule.BanCommandRule, res, mysqlVersion)
checkResult.parseResult(R.CommandRule.HighRiskCommandRule, res, mysqlVersion, idx)
checkResult.parseResult(R.CommandRule.BanCommandRule, res, mysqlVersion, idx)
err = checkResult.runcheck(res, bs, mysqlVersion)
if err != nil {
goto END
}
case app.Spider:
checkResult.parseResult(SR.CommandRule.HighRiskCommandRule, res, mysqlVersion)
checkResult.parseResult(SR.CommandRule.BanCommandRule, res, mysqlVersion)
checkResult.parseResult(SR.CommandRule.HighRiskCommandRule, res, mysqlVersion, idx)
checkResult.parseResult(SR.CommandRule.BanCommandRule, res, mysqlVersion, idx)
err = checkResult.runSpidercheck(ddlTbls, res, bs, mysqlVersion)
if err != nil {
goto END
Expand Down
2 changes: 1 addition & 1 deletion dbm-services/mysql/db-simulation/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func QueryTask(c *gin.Context) {
switch task.Status {
case model.TaskFailed:
allSuccessful = false
SendResponse(c, fmt.Errorf(task.SysErrMsg), map[string]interface{}{
SendResponse(c, fmt.Errorf("%s", task.SysErrMsg), map[string]interface{}{
"simulation_version": task.MySQLVersion,
"stdout": task.Stdout,
"stderr": task.Stderr,
Expand Down
6 changes: 5 additions & 1 deletion dbm-services/mysql/db-simulation/handler/syntax_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func init() {
}
workdir = strings.TrimSpace(viper.GetString("workdir"))
if workdir == "" {
workdir = "/tmp"
if cmutil.FileExists("/tmp") {
workdir = "/tmp"
return
}
workdir = "/"
}
}

Expand Down

0 comments on commit d1edc18

Please sign in to comment.