From 3bd185d8006fce83b125722c03a0d6f05d04e699 Mon Sep 17 00:00:00 2001 From: yuanruji Date: Wed, 11 Dec 2024 16:21:27 +0800 Subject: [PATCH] =?UTF-8?q?refactor(dbm-services):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E5=8D=87=E7=BA=A7=E6=B5=81=E7=A8=8B=20#8543?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/subcmd/mysqlcmd/mysql_upgrade.go | 9 +-- .../pkg/components/mysql/mysql_upgrade.go | 36 ++++++++++- .../dbactuator/pkg/native/upgrade_tool.go | 61 +++++++++++++------ 3 files changed, 79 insertions(+), 27 deletions(-) diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go index 8bfb2be241..deb47da3e3 100644 --- a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go @@ -73,16 +73,17 @@ func (d *UpgradeMySQLAct) Run() (err error) { FunName: "前置检查", Func: d.Service.PreCheck, }, - { - FunName: "升级检查", - Func: d.Service.MysqlUpgradeCheck, - }, } if d.Service.Params.Run { steps = append(steps, subcmd.StepFunc{ FunName: "升级MySQL", Func: d.Service.Upgrade, }) + } else { + steps = append(steps, subcmd.StepFunc{ + FunName: "升级检查", + Func: d.Service.MysqlUpgradeCheck, + }) } if err := steps.Run(); err != nil { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go index c388c0bdcb..064e75d90a 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go @@ -235,9 +235,12 @@ func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) { } } // table check - if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil { - logger.Error("check table upgrade failed %s", err.Error()) - return err + errs := conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion) + if len(errs) > 0 { + for _, err := range errs { + logger.Error("port:[%d]: check table upgrade error: %s", port, err.Error()) + } + return fmt.Errorf("check table upgrade failed, port: %d, errors: %v", port, errs) } } return @@ -308,6 +311,26 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) { logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.") return nil } + // 处理分区表升级 + if m.newVersion.MysqlVersion >= native.MYSQL_5P70 && m.newVersion.MysqlVersion < native.MYSQL_8P0 { + // logger.Info("Upgrading to MySQL version>=5.7.0, remaining upgrade procedure is skipped.") + pdata, errx := dbConn.GetPartitionSchema() + if errx != nil { + logger.Error("get partition schema failed %s", errx.Error()) + return errx + } + if len(pdata) > 0 { + for _, p := range pdata { + usql := fmt.Sprintf("ALTER TABLE `%s`.`%s` UPGRADE PARTITIONING", p.TableSchema, p.TableName) + logger.Info("upgrade partition sql: %s", usql) + _, err = dbConn.Exec(usql) + if err != nil { + logger.Error("upgrade partition table %s.%s failed %s", p.TableSchema, p.TableName, err.Error()) + return err + } + } + } + } logger.Info("do mysqlcheck for %d", port) if err = m.mysqlCheck(dbConn, port); err != nil { logger.Error("do %d mysqlcheck failed %s", port, err.Error()) @@ -323,6 +346,13 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) { logger.Error("do %d additionalActions failed %s", port, err.Error()) return err } + logger.Info("do mysql restart for %d", port) + pid, err = start.RestartMysqlInstance() + if err != nil { + logger.Error("restart mysql %d failed %s", port, err.Error()) + return err + } + logger.Info("restart mysql %d success,pid is %d", port, pid) } return nil } diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go index fc85ec6249..3303a96232 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go @@ -222,12 +222,13 @@ type TableInfo struct { RowFormat string `db:"ROW_FORMAT"` } -// CheckTableUpgrade TODO -func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) { +// TableNameIsValid TODO +func (h *DbWorker) TableNameIsValid(currentVersion, newVersion uint64) (errs []error) { type checkFunc struct { fn func(currentVersion, newVersion uint64) error desc string } + var err error // 库表名关键字检查 fns := []checkFunc{} fns = append(fns, checkFunc{ @@ -257,14 +258,20 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(currentVersion, newVersion); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } - type checkFuncNoparam struct { - fn func() error - desc string - } - // 非法字符检查 + return errs +} + +type checkFuncNoparam struct { + fn func() error + desc string +} + +// IllegalCharacterCheck TODO +func (h *DbWorker) IllegalCharacterCheck() (errs []error) { + var err error fnns := []checkFuncNoparam{} fnns = append(fnns, checkFuncNoparam{ fn: h.tableNameAsciiCodeCheck, @@ -294,9 +301,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } + return errs +} + +// CheckTableUpgrade 检查表是否满足升级条件 +func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (errs []error) { + var err error + errs = append(errs, h.TableNameIsValid(currentVersion, newVersion)...) + // 非法字符检查 + errs = append(errs, h.IllegalCharacterCheck()...) + switch { // 当准备升级到8.0版本 case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0: @@ -330,7 +347,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } // 当准备升级到5.7版本 @@ -358,7 +375,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } // 当准备升级到5.6版本 @@ -366,11 +383,11 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err // per-4.1 password check logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...") if err = h.passwordCheck(); err != nil { - return err + errs = append(errs, fmt.Errorf("%v", err.Error())) } } - return nil + return errs } func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string { @@ -765,6 +782,15 @@ func (h *DbWorker) passwordCheck() (err error) { // handler to use the native partitioning handler instead, run mysql_upgrade. func (h *DbWorker) partitionCheck() (err error) { var data []TableInfo + data, err = h.GetPartitionSchema() + if len(data) > 0 { + return fmt.Errorf("%v found partition name,but it is not allowed", data) + } + return nil +} + +// GetPartitionSchema 获取分区表 +func (h *DbWorker) GetPartitionSchema() (data []TableInfo, err error) { q := ` select TABLE_SCHEMA, TABLE_NAME, @@ -774,13 +800,8 @@ where PARTITION_NAME is not NULL group by 1, 2; ` - if err = h.Queryx(&data, q); err != nil { - return err - } - if len(data) > 0 { - return fmt.Errorf("%v found partition name,but it is not allowed", data) - } - return nil + err = h.Queryx(&data, q) + return data, err } func (h *DbWorker) tokudbEngineCheck() (err error) {