From 9f7c111e505b8062d7f05cd87480febac4826e10 Mon Sep 17 00:00:00 2001 From: yuanruji Date: Fri, 13 Dec 2024 10:14:04 +0800 Subject: [PATCH] =?UTF-8?q?refactor(dbm-services):=20dbactor=20=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E4=BC=98=E5=8C=96=20#8586?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../components/spiderctl/backend_switch.go | 14 ++++---- .../check_tdbctl_with_spider_schema.go | 4 +-- .../init_cluster_routing_relationship.go | 4 +-- .../spiderctl/table_schema_check.go | 36 +++++++++---------- .../spiderctl/table_schema_repair.go | 3 +- 5 files changed, 29 insertions(+), 32 deletions(-) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/backend_switch.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/backend_switch.go index cd04316b56..e14c25d7c4 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/backend_switch.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/backend_switch.go @@ -659,9 +659,9 @@ func (c *CutOverCtx) getSlaveSpiderServers() (slaveSpiderServers []native.Server } // CheckSpiderAppProcesslist TODO -func (ctx *CutOverCtx) CheckSpiderAppProcesslist() (err error) { - for addr, spider_conn := range ctx.spidersConn { - pls, err := spider_conn.ShowApplicationProcesslist(ctx.sysUsers) +func (c *CutOverCtx) CheckSpiderAppProcesslist() (err error) { + for addr, spider_conn := range c.spidersConn { + pls, err := spider_conn.ShowApplicationProcesslist(c.sysUsers) if err != nil { return err } @@ -672,8 +672,8 @@ func (ctx *CutOverCtx) CheckSpiderAppProcesslist() (err error) { return } -func (ctx *CutOverCtx) lockaAllSpidersWrite() (err error) { - for addr, lockConn := range ctx.spidersLockConn { +func (c *CutOverCtx) lockaAllSpidersWrite() (err error) { + for addr, lockConn := range c.spidersLockConn { _, err = lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;") if err != nil { return fmt.Errorf("set lock_wait_timeout at %s failed,err:%w", addr, err) @@ -693,8 +693,8 @@ func (ctx *CutOverCtx) lockaAllSpidersWrite() (err error) { } // Unlock TODO -func (ctx *CutOverCtx) Unlock() (err error) { - for addr, lockConn := range ctx.spidersLockConn { +func (c *CutOverCtx) Unlock() (err error) { + for addr, lockConn := range c.spidersLockConn { err = cmutil.Retry(cmutil.RetryConfig{ Times: 3, DelayTime: 1 * time.Second, diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/check_tdbctl_with_spider_schema.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/check_tdbctl_with_spider_schema.go index 8e7c6f7316..ee9a4d2869 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/check_tdbctl_with_spider_schema.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/check_tdbctl_with_spider_schema.go @@ -134,14 +134,14 @@ func (c *CheckTdbctlWithSpideSchemaComp) RunCheck() (err error) { for tb := range map1 { if _, exist := map2[tb]; !exist { msg := fmt.Sprintf("%s frm文件在 tdbctl 中不存在,请确认\n", tb) - globalErrs = append(globalErrs, fmt.Errorf(msg)) + globalErrs = append(globalErrs, fmt.Errorf("%s", msg)) logger.Error(msg) } } if dbdirSpiderCount != dbdirTdbctlCount { msg := fmt.Sprintf("【%s】库上的表数量不一致,【tdbctl】节点上表总数量:%d, 【spider】节点上表的总数量 count:%d\n", db, dbdirTdbctlCount, dbdirSpiderCount) - globalErrs = append(globalErrs, fmt.Errorf(msg)) + globalErrs = append(globalErrs, fmt.Errorf("%s", msg)) logger.Error(msg) } spiderDbSchemaCountMap[db] = dbdirSpiderCount diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/init_cluster_routing_relationship.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/init_cluster_routing_relationship.go index 0c33f62f00..ca64d79b96 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/init_cluster_routing_relationship.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/init_cluster_routing_relationship.go @@ -248,7 +248,7 @@ func (i *InitClusterRoutingComp) OnlyInitTdbctl() (err error) { execSQLs = append(execSQLs, "set tc_admin=1;") execSQLs = append(execSQLs, i.getTdbctlRouterSqls()...) execSQLs = append(execSQLs, "tdbctl enable primary;") - if _, err := i.tdbCtlConn.ExecMore(execSQLs); err != nil { + if _, err = i.tdbCtlConn.ExecMore(execSQLs); err != nil { logger.Error("tdbctl create node failed:[%s]", err.Error()) return err } @@ -286,7 +286,7 @@ func (i *InitClusterRoutingComp) AddAppendDeployInitRouter() (err error) { execSQLs = append(execSQLs, i.getRemoteRouterSqls()...) execSQLs = append(execSQLs, i.getSpiderRouterSqls()...) execSQLs = append(execSQLs, "tdbctl enable primary;") - if _, err := i.tdbCtlConn.ExecMore(execSQLs); err != nil { + if _, err = i.tdbCtlConn.ExecMore(execSQLs); err != nil { logger.Error("tdbctl create node failed:[%s]", err.Error()) return err } diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_check.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_check.go index 4d33fe2e9f..b09782cff4 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_check.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_check.go @@ -16,6 +16,8 @@ import ( "fmt" "time" + "github.com/jmoiron/sqlx" + "dbm-services/common/go-pubpkg/cmutil" "dbm-services/common/go-pubpkg/logger" "dbm-services/mysql/db-tools/dbactuator/pkg/components" @@ -47,6 +49,7 @@ type CheckObject struct { } type tableSchemaCheckCtx struct { tdbCtlConn *native.TdbctlDbWork + xconn *sqlx.Conn version string } @@ -92,16 +95,21 @@ func (r *TableSchemaCheckComp) Init() (err error) { return err } r.tdbCtlConn = &native.TdbctlDbWork{DbWorker: *conn} + r.xconn, err = r.tdbCtlConn.GetSqlxDb().Connx(context.Background()) + if err != nil { + logger.Error("get xconn error: %v") + return err + } // init checksum table schema if _, err = r.tdbCtlConn.ExecMore([]string{"set tc_admin = 0;", "use infodba_schema;", TsccSchemaChecksum}); err != nil { logger.Error("init tscc_schema_checksum error: %v", err) - return + return err } r.version, err = r.tdbCtlConn.SelectVersion() if err != nil { logger.Error("get version error: %v", err) - return + return err } return err } @@ -146,18 +154,13 @@ func (r *TableSchemaCheckComp) checkAll() (err error) { } func (r *TableSchemaCheckComp) atomUpdateDbTables(dbName string) (err error) { - xconn, err := r.tdbCtlConn.GetSqlxDb().Connx(context.Background()) - if err != nil { - return err - } - defer xconn.Close() - _, err = xconn.ExecContext(context.Background(), "set tc_admin = 1;") + _, err = r.xconn.ExecContext(context.Background(), "set tc_admin = 1;") if err != nil { logger.Error("set tc_admin = 1 failed error: %v", err) return err } var result native.SchemaCheckResults - if err = xconn.SelectContext(context.Background(), &result, fmt.Sprintf("TDBCTL CHECK DATABASE `%s`;", + if err = r.xconn.SelectContext(context.Background(), &result, fmt.Sprintf("TDBCTL CHECK DATABASE `%s`;", dbName)); err != nil { logger.Error("check table schema: %s", err.Error()) return err @@ -193,13 +196,7 @@ func (r *TableSchemaCheckComp) atomUpdateTables(dbName string, tables []string) if len(tables) == 0 { return nil } - xconn, err := r.tdbCtlConn.GetSqlxDb().Connx(context.Background()) - if err != nil { - logger.Error("get sqlx conn failed: %s", err.Error()) - return err - } - defer xconn.Close() - _, err = xconn.ExecContext(context.Background(), "set tc_admin = 1;") + _, err = r.xconn.ExecContext(context.Background(), "set tc_admin = 1;") if err != nil { logger.Error("set tc_admin = 1 failed error: %v", err) return err @@ -207,7 +204,7 @@ func (r *TableSchemaCheckComp) atomUpdateTables(dbName string, tables []string) for _, table := range tables { // check table schema var result native.SchemaCheckResults - if err = xconn.SelectContext(context.Background(), &result, fmt.Sprintf("tdbctl check `%s`.`%s`;", dbName, + if err = r.xconn.SelectContext(context.Background(), &result, fmt.Sprintf("tdbctl check `%s`.`%s`;", dbName, table)); err != nil { logger.Error("check table schema error: %s", err.Error()) return err @@ -221,7 +218,7 @@ func (r *TableSchemaCheckComp) atomUpdateTables(dbName string, tables []string) func (r *TableSchemaCheckComp) atomUpdateCheckResult(db, tbl string, inconsistentItems []native.SchemaCheckResult) ( err error) { - _, err = r.tdbCtlConn.Exec("set tc_admin=0;") + _, err = r.xconn.ExecContext(context.TODO(), "set tc_admin=0;") if err != nil { logger.Error("set tc_admin=0 failed error: %v", err) return err @@ -237,7 +234,8 @@ func (r *TableSchemaCheckComp) atomUpdateCheckResult(db, tbl string, inconsisten return } } - if _, err = r.tdbCtlConn.Exec("replace into infodba_schema.tscc_schema_checksum values(?,?,?,?,?)", db, + if _, err = r.xconn.ExecContext(context.TODO(), "replace into infodba_schema.tscc_schema_checksum values(?,?,?,?,?)", + db, tbl, status, checkResult, time.Now()); err != nil { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_repair.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_repair.go index 86564cb68c..a04dbd3756 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_repair.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl/table_schema_repair.go @@ -45,7 +45,6 @@ type TableSchemaRepairParam struct { } type tableSchemaRepairCtx struct { tdbCtlConn *native.TdbctlDbWork - taskdir string svrNameServersMap map[SVRNAME]native.Server primarySpts []native.Server spiderSpts []native.Server @@ -108,7 +107,7 @@ func (r *TableSchemaRepairComp) RunAutoFix() (err error) { logger.Error("get abnormal schema checksum failed, err: %v", err) return err } - if len(abnormalChecksums) <= 0 { + if len(abnormalChecksums) == 0 { logger.Info("no abnormal table structure check record was found,bye~") return nil }