diff --git a/dbm-services/common/go-pubpkg/cmutil/mysql.go b/dbm-services/common/go-pubpkg/cmutil/mysql.go index d923819deb..9f7989b6cb 100644 --- a/dbm-services/common/go-pubpkg/cmutil/mysql.go +++ b/dbm-services/common/go-pubpkg/cmutil/mysql.go @@ -39,6 +39,19 @@ func GetGcsSystemDatabasesIgnoreTest(version string) []string { return DBs } +// TmysqlVersionParse TODO +/* +input: select version() 获取到的string +output: 获取tmysql带的版本号 +example: +5.7.20-tmysql-3.1.5-log ==> 3*1000000 + 1*1000 + 5 ==> 3001005 +返回0,表示非tmysql +*/ +func TmysqlVersionParse(version string) uint64 { + re := regexp.MustCompile(`tmysql-([\d]+).?([\d]+)?.?([\d]+)?`) + return mysqlVersionParse(re, version) +} + // MySQLVersionParse (): // input: select version() 获取到的string // output: 获取tmysql中的mysql前缀版本 diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go new file mode 100644 index 0000000000..8bfb2be241 --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go @@ -0,0 +1,98 @@ +// Package mysqlcmd TODO +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package mysqlcmd + +import ( + "fmt" + + "github.com/spf13/cobra" + + "dbm-services/bigdata/db-tools/dbactuator/pkg/util" + "dbm-services/common/go-pubpkg/logger" + "dbm-services/mysql/db-tools/dbactuator/internal/subcmd" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql" +) + +// UpgradeMySQLAct TODO +type UpgradeMySQLAct struct { + *subcmd.BaseOptions + Service mysql.MysqlUpgradeComp +} + +// NewUpgradeMySQLCommand create new subcommand +func NewUpgradeMySQLCommand() *cobra.Command { + act := UpgradeMySQLAct{ + BaseOptions: subcmd.GBaseOptions, + } + cmd := &cobra.Command{ + Use: "upgrade", + Short: "MySQL版本本地升级", + Example: fmt.Sprintf( + `dbactuator mysql upgrade %s %s`, subcmd.CmdBaseExampleStr, + subcmd.ToPrettyJson(act.Service.Example()), + ), + Run: func(cmd *cobra.Command, args []string) { + util.CheckErr(act.Validate()) + if act.RollBack { + util.CheckErr(act.Rollback()) + return + } + util.CheckErr(act.Init()) + util.CheckErr(act.Run()) + }, + } + return cmd +} + +// Init prepare run env +func (d *UpgradeMySQLAct) Init() (err error) { + if err = d.Deserialize(&d.Service.Params); err != nil { + logger.Error("DeserializeAndValidate err %s", err.Error()) + return err + } + d.Service.GeneralParam = subcmd.GeneralRuntimeParam + return +} + +// Run Command Run +func (d *UpgradeMySQLAct) Run() (err error) { + steps := subcmd.Steps{ + { + FunName: "Init", + Func: d.Service.Init, + }, + { + FunName: "前置检查", + Func: d.Service.PreCheck, + }, + { + FunName: "升级检查", + Func: d.Service.MysqlUpgradeCheck, + }, + } + if d.Service.Params.Run { + steps = append(steps, subcmd.StepFunc{ + FunName: "升级MySQL", + Func: d.Service.Upgrade, + }) + } + + if err := steps.Run(); err != nil { + return err + } + logger.Info("upgrade mysql or mysql upgrade check successfully") + return nil +} + +// Rollback TODO +func (d *UpgradeMySQLAct) Rollback() (err error) { + return +} diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysqlcmd.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysqlcmd.go index b490d46791..d9a894d657 100644 --- a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysqlcmd.go +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysqlcmd.go @@ -35,6 +35,7 @@ func NewMysqlCommand() *cobra.Command { NewExecSQLFileCommand(), CloneClientGrantCommand(), NewBackupTruncateDatabaseCommand(), + NewUpgradeMySQLCommand(), // NewBackupDatabaseTableCommand(), MycnfChangeCommand(), FindLocalBackupCommand(), diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go index 86d02731f9..6c2dc1d8ac 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go @@ -84,7 +84,7 @@ func (p *StartMySQLParam) StartMysqlInstance() (pid int, err error) { myCnfName = p.MyCnfName startCmd = fmt.Sprintf( `ulimit -n 204800; - cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaStr, myCnfName, +cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaStr, myCnfName, ) ) if p.SkipSlaveFlag { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go new file mode 100644 index 0000000000..76ed871f14 --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go @@ -0,0 +1,786 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package mysql + +import ( + "bufio" + "errors" + "fmt" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + "time" + + "github.com/golang/glog" + + "dbm-services/common/go-pubpkg/cmutil" + "dbm-services/common/go-pubpkg/logger" + "dbm-services/mysql/db-tools/dbactuator/pkg/components" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/computil" + "dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/common" + "dbm-services/mysql/db-tools/dbactuator/pkg/core/cst" + "dbm-services/mysql/db-tools/dbactuator/pkg/native" + "dbm-services/mysql/db-tools/dbactuator/pkg/util" + "dbm-services/mysql/db-tools/dbactuator/pkg/util/mysqlutil" + "dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil" +) + +// MysqlUpgradeComp TODO +type MysqlUpgradeComp struct { + GeneralParam *components.GeneralParam `json:"general"` + Params MysqlUpgradeParam `json:"extend"` + upgradeRtx `json:"-"` +} + +// MysqlUpgradeParam TODO +type MysqlUpgradeParam struct { + Host string `json:"host" validate:"required,ip"` + Ports []int `json:"ports"` + // mysql-5.7.20-linux-x86_64-tdbctl-2.4.3 + NewVersion string `json:"newVersion"` + components.Medium + // 是否强制升级 + IsForce bool `json:"isForce"` + // 只做升级检查 + Run bool `json:"run"` +} + +// VersionInfo TODO +type VersionInfo struct { + Version string + MysqlVersion uint64 + TmysqlVersion uint64 + IsToku bool +} + +// 运行时上下文 +type upgradeRtx struct { + dbConns map[Port]*native.DbWorker + verMap map[Port]VersionInfo + sysUsers []string + newVersion VersionInfo + socketMaps map[Port]string + adminUser string + adminPwd string +} + +// Example subcommand example input +func (c *MysqlUpgradeComp) Example() interface{} { + comp := MysqlUpgradeComp{ + Params: MysqlUpgradeParam{ + Host: "127.0.0.1", + Ports: []int{3306, 3307}, + NewVersion: "mysql-5.7.20-linux-x86_64-tdbctl-2.4.3", + Run: false, + }, + GeneralParam: &components.GeneralParam{ + RuntimeAccountParam: components.RuntimeAccountParam{ + MySQLAccountParam: common.MySQLAdminReplExample, + }, + }, + } + return comp +} + +// Init prepare run env +func (m *MysqlUpgradeComp) Init() (err error) { + m.dbConns = make(map[Port]*native.DbWorker) + m.verMap = make(map[Port]VersionInfo) + m.socketMaps = make(map[Port]string) + m.sysUsers = m.GeneralParam.GetAllSysAccount() + m.adminUser = m.GeneralParam.RuntimeAccountParam.AdminUser + m.adminPwd = m.GeneralParam.RuntimeAccountParam.AdminPwd + m.newVersion = VersionInfo{ + Version: m.Params.NewVersion, + MysqlVersion: cmutil.MySQLVersionParse(m.Params.NewVersion), + TmysqlVersion: cmutil.TmysqlVersionParse(m.Params.NewVersion), + } + if m.newVersion.MysqlVersion <= 0 { + return fmt.Errorf("mysql version %s is invalid", m.Params.NewVersion) + } + for _, port := range m.Params.Ports { + dbConn, err := native.InsObject{ + Host: m.Params.Host, + Port: port, + User: m.adminUser, + Pwd: m.adminPwd, + }.Conn() + if err != nil { + logger.Error("Connect %d failed:%s", port, err.Error()) + return err + } + m.dbConns[port] = dbConn + ver, err := dbConn.SelectVersion() + if err != nil { + logger.Error("Get version failed:%s", err.Error()) + return err + } + isTokudb := false + isTokudb, err = dbConn.HasTokudb() + if err != nil { + logger.Error("query %d engine failed:%s", port, err.Error()) + return err + } + currentVer := VersionInfo{ + Version: ver, + MysqlVersion: cmutil.MySQLVersionParse(ver), + TmysqlVersion: cmutil.TmysqlVersionParse(ver), + IsToku: isTokudb, + } + m.verMap[port] = currentVer + if err = currentVer.canUpgrade(m.newVersion); err != nil { + logger.Error("upgrade version check failed %s", err.Error()) + return err + } + } + + logger.Info("mysql upgrade init ok,new version:%d", m.newVersion.MysqlVersion) + return nil +} + +// PreCheck pre run pre check +func (m *MysqlUpgradeComp) PreCheck() (err error) { + logger.Info("check delete user only at mysql.user table,but not delete user in mysql.user_priv ... tables") + for port, conn := range m.dbConns { + dirtyAccounts, err := conn.GetDeleteWithoutDropUser() + if err != nil { + logger.Warn("get dirty accounts on %d failed:%s", port, err.Error()) + } + if len(dirtyAccounts) > 0 { + logger.Warn("users have DELETE but no DROP:%v", dirtyAccounts) + } + pls, err := conn.ShowApplicationProcesslist(m.sysUsers) + if err != nil { + logger.Error("show application processlist failed:%s", err.Error()) + return err + } + if len(pls) > 0 { + return fmt.Errorf("Exist dirty processlist:%v in %d", pls, port) + } + } + if m.Params.Run { + if !cmutil.FileExists(m.Params.GetAbsolutePath()) { + return fmt.Errorf("%s file not exist", m.Params.Pkg) + } + } + return +} + +// canUpgrade TODO +func (current *VersionInfo) canUpgrade(newVersion VersionInfo) (err error) { + logger.Info("newvesion is %v", newVersion) + logger.Info("currentvesion MysqlVersion is %v", current.MysqlVersion) + logger.Info("currentvesion TmysqlVersion is %v", current.TmysqlVersion) + logger.Info("currentvesion IsToku is %v", current.IsToku) + switch { + case current.MysqlVersion < native.MYSQL_5P5P24: + return fmt.Errorf("don't support current version: %d lower than mysql-5.5.24 to upgrade", current.MysqlVersion) + case current.MysqlVersion > newVersion.MysqlVersion: + return fmt.Errorf("don't allow to decrease mysql versoin: current version: %s, new version: %s", current.Version, + newVersion.Version) + case (newVersion.MysqlVersion == current.MysqlVersion && newVersion.TmysqlVersion < current.TmysqlVersion): + return fmt.Errorf("don't allow to decrease tmysql versoin: current version: %s, new version: %s", current.Version, + newVersion.Version) + case newVersion.TmysqlVersion < native.TMYSQL_1P1: + return fmt.Errorf("don't allow to upgrade to NON-TMYSQL: current version: %s, new version: %s", current.Version, + newVersion.Version) + case int32(newVersion.TmysqlVersion/1000000)-int32(current.TmysqlVersion/100000) > 1: + return fmt.Errorf("don't allow to upgrade across big versin: current version: %s, new version: %s", + current.Version, newVersion.Version) + case newVersion.TmysqlVersion >= native.TMYSQL_1 && current.MysqlVersion < native.MYSQL_5P1P24: + return fmt.Errorf("don't allow to upgrade, current version: %s, new version: %s", current.Version, + newVersion.Version) + case newVersion.TmysqlVersion >= native.TMYSQL_2 && current.TmysqlVersion < native.TMYSQL_1: + return fmt.Errorf("don't allow to upgrade tmysql 2.x: current version: %s, new version: %s", current.Version, + newVersion.Version) + case newVersion.MysqlVersion >= native.MYSQL_8P0 && current.MysqlVersion < native.MYSQL_5P70: + return fmt.Errorf("upgrading to MySQL 8 from MySQL version <5.7 is not allowed: current version: %d, new version: %d", + current.MysqlVersion, newVersion.MysqlVersion) + } + if current.IsToku && (newVersion.TmysqlVersion >= native.TMYSQL_3 || newVersion.TmysqlVersion <= native.TMYSQL_2P1P1) { + return fmt.Errorf("current version: %s have enable tokudb, but newversion: %s don't support", current.Version, + newVersion.Version) + } + if newVersion.MysqlVersion > native.MYSQL_5P5P1 && current.MysqlVersion > native.MYSQL_5P0P48 { + return nil + } + return fmt.Errorf("don't allow to upgrade, current version: %s, new version: %s", current.Version, + newVersion.Version) +} + +// MysqlUpgradeCheck TODO +func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) { + for port, conn := range m.dbConns { + currentVer := m.verMap[port] + if currentVer.TmysqlVersion > native.TMYSQL_3 && currentVer.TmysqlVersion < native.TMySQL_3P15 { + if err = conn.CheckInstantAddColumn(); err != nil { + // 当前版本是tmysql 3, 且低于3.1.15。检查是否有非法在线加字段 + if !errors.Is(err, native.ErrorUsedInstantAddColumnButValid) { + return err + } + } + } + if m.newVersion.MysqlVersion >= native.MYSQL_8P0 { + if err = conn.CheckInstantAddColumn(); err != nil { + return fmt.Errorf( + "CheckInstantAddColumn failed, upgrade to %s cannot go on due to incompatibility of data dictionary: %s", + m.newVersion.Version, err.Error()) + } + } + // table check + if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil { + logger.Error("check table upgrade failed %s", err.Error()) + return err + } + } + return +} + +// Upgrade TODO +func (m *MysqlUpgradeComp) Upgrade() (err error) { + for port, conn := range m.dbConns { + logger.Info("do upgrade and replace my.cnf for %d", port) + if err = m.upgradeMycnf(port); err != nil { + return err + } + logger.Info("do upgrade %d mysql old password", port) + if err = m.upgradeOldPassword(conn, port); err != nil { + return err + } + socket, ok := m.socketMaps[port] + if !ok { + return fmt.Errorf("get socket from socket map failed") + } + // shutfown mysql + logger.Info("do shutdown mysql for %d", port) + if err = computil.ShutdownMySQLBySocket2(m.adminUser, m.adminPwd, socket); err != nil { + logger.Error("shutdown mysql %d failed %s", port, err.Error()) + return err + } + } + logger.Info("upgrade mysql install bin...") + // relink mysql + if err = m.relinkMysql(); err != nil { + logger.Info("failed to replace mysql media %s", err.Error()) + return err + } + for _, port := range m.Params.Ports { + start := computil.StartMySQLParam{ + Host: m.Params.Host, + Port: port, + Socket: m.socketMaps[port], + MySQLUser: m.adminUser, + MySQLPwd: m.adminPwd, + + MyCnfName: util.GetMyCnfFileName(port), + MediaDir: cst.MysqldInstallPath, + } + logger.Info("start mysql for %d", port) + pid, err := start.StartMysqlInstance() + if err != nil { + logger.Error("start mysql %d failed %s", err.Error()) + return err + } + logger.Info("start mysql success,pid is %d", pid) + logger.Error("reconnect mysql ") + dbConn, err := native.InsObject{ + Host: m.Params.Host, + Port: port, + User: m.adminUser, + Pwd: m.adminPwd, + }.Conn() + if err != nil { + logger.Error("Connect %d failed:%s", port, err.Error()) + return err + } + m.dbConns[port] = dbConn + // do mysql check + // MySQL 8.0.16后,mysql_upgrade被弃用,无须额外调用,升级操作被集成到mysqld中 + // 因此当升级版本在8.0.16以上时,mysqld成功拉起,即代表升级完成 + if m.newVersion.MysqlVersion >= native.MYSQL_8P0P16 { + logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.") + return nil + } + logger.Info("do mysqlcheck for %d", port) + if err = m.mysqlCheck(dbConn, port); err != nil { + logger.Error("do %d mysqlcheck failed %s", port, err.Error()) + return err + } + logger.Info("do mysql upgrade for %d", port) + if err = m.mysqlUpgrade(dbConn, port); err != nil { + logger.Error("do %d mysqlUpgrade failed %s", port, err.Error()) + return err + } + logger.Info("exec upgrade addtion actions for %d", port) + if err = m.additionalActions(dbConn, port); err != nil { + logger.Error("do %d additionalActions failed %s", port, err.Error()) + return err + } + } + return nil +} + +func (m *MysqlUpgradeComp) relinkMysql() (err error) { + // tar mysql new version packege + if stderr, err := osutil.StandardShellCommand(false, fmt.Sprintf("tar -axf %s -C %s", m.Params.GetAbsolutePath(), + cst.UsrLocal)); err != nil { + logger.Error("tar mysql new version packege failed %s,stderr%s", err.Error(), stderr) + return err + } + fi, err := os.Lstat(cst.MysqldInstallPath) + if err != nil { + logger.Error("read /usr/local/mysql dir info failed %s", err.Error()) + return err + } + sc := "" + newlink := m.Params.GePkgBaseName() + switch mode := fi.Mode(); { + case mode.IsDir(): + bakdir := fmt.Sprintf("mysql_%s", time.Now().Format(cst.TimeLayoutDir)) + sc = fmt.Sprintf("cd %s && mv mysql %s && ln -s %s mysql ", + cst.UsrLocal, bakdir, newlink) + logger.Info("move mysql dir to %s", bakdir) + case mode&os.ModeSymlink != 0: + oldlink, err := os.Readlink(cst.MysqldInstallPath) + if err != nil { + logger.Error("get old mysql link failed %s", err.Error()) + return err + } + logger.Info("mysql old link is %s", oldlink) + sc = fmt.Sprintf("cd %s && unlink mysql && ln -s %s mysql ", + cst.UsrLocal, newlink) + default: + return fmt.Errorf("file %s is not a dir or symlink", cst.MysqldInstallPath) + } + if stderr, err := osutil.StandardShellCommand(false, sc); err != nil { + logger.Error("tar mysql new version packege failed %s,stderr%s", err.Error(), stderr) + return err + } + return err +} + +func (m *MysqlUpgradeComp) upgradeMycnf(port int) (err error) { + cf := util.GetMyCnfFileName(port) + cff, err := util.LoadMyCnfForFile(cf) + if err != nil { + logger.Error("local %s file failed %s", cf, err.Error()) + return err + } + newfile := fmt.Sprintf("./my.cnf.%d.new", port) + if cmutil.FileExists(newfile) { + if err = os.Remove(newfile); err != nil { + logger.Error("remove exist tmp my.cnf failed %s", err.Error()) + return err + } + } + sck, err := cff.GetMySQLSocket() + if err != nil { + logger.Error("get mysql socket failed %s", err.Error()) + return err + } + m.socketMaps[port] = sck + cff.FileName = newfile + section := util.MysqldSec + // tmysql 版本的相关的配置替换 + if m.newVersion.TmysqlVersion > native.TMYSQL_1 { + switch { + case m.newVersion.TmysqlVersion < native.TMYSQL_3: + cff.ReplaceValue(section, "innodb_create_use_gcs_real_format", true, "") + fallthrough + case m.newVersion.TmysqlVersion >= native.TMYSQL_1P4: + cff.ReplaceValue(section, "userstat", false, "ON") + cff.ReplaceValue(section, "query_response_time_stats", false, "ON") + fallthrough + case m.newVersion.TmysqlVersion >= native.TMYSQL_2P1: + cff.ReplaceKeyName(section, "table_cache", "table_open_cache") + cff.ReplaceValue(section, "performance_schema", false, "OFF") + cff.Cfg.Section(section).DeleteKey("alter_query_log") + cff.ReplaceValue(section, "secure_auth", false, "OFF") + } + } + switch { + case m.newVersion.MysqlVersion > native.MYSQL_5P1P46: + cff.ReplaceValue(section, "skip-name-resolve", true, "") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_5P5P11: + cff.ReplaceValue(section, "slow_query_log", false, "1") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_5P5P5: + cff.ReplaceValue(section, "innodb_file_format", false, "Barracuda") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_5P5P1: + cff.ReplaceKeyName(section, "default-character-set", "character-set-server") + cff.ReplaceKeyName(section, "log_bin_trust_routine_creators", "log_bin_trust_function_creators") + cff.Cfg.Section(section).DeleteKey("skip-locking") + cff.Cfg.Section(section).DeleteKey("log-long-format") + cff.Cfg.Section(section).DeleteKey("log-update") + cff.Cfg.Section(section).DeleteKey("safe-show-database") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_5P1P29: + cff.ReplaceKeyName(section, "default-collation", "collation_server") + cff.ReplaceKeyName(section, "default-table-type", "default_storage_engine") + cff.ReplaceKeyName(section, "warnings", "log_warnings") + cff.Cfg.Section(section).DeleteKey("delay-key-write-for-all-tables") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_5P70: + cff.Cfg.Section(section).DeleteKey("secure_auth") + cff.Cfg.Section(section).DeleteKey("innodb_additional_mem_pool_size") + cff.Cfg.Section(section).DeleteKey("innodb_create_use_gcs_real_format") + cff.Cfg.Section(section).DeleteKey("thread_concurrency") + cff.Cfg.Section(section).DeleteKey("storage_engine") + cff.Cfg.Section(section).DeleteKey("old_passwords") + cff.Cfg.Section(section).DeleteKey("innodb_file_io_threads") + cff.ReplaceKeyName(section, "thread_cache", "thread_cache_size") + cff.ReplaceKeyName(section, "key_buffer", "key_buffer_size") + cff.ReplaceKeyName(section, "log_warnings", "log_error_verbosity") + cff.ReplaceValue(section, "log_error_verbosity", false, "1") + cff.ReplaceValue(section, "show_compatibility_56", false, "on") + cff.ReplaceValue(section, "secure_file_priv", false, "") + cff.ReplaceValue(section, "sync_binlog", false, "0") + fallthrough + case m.newVersion.MysqlVersion > native.MYSQL_8P0: + cff.Cfg.Section(section).DeleteKey("innodb_file_format") + cff.Cfg.Section(section).DeleteKey("query_cache_size") + cff.Cfg.Section(section).DeleteKey("query_cache_type") + cff.Cfg.Section(section).DeleteKey("show_compatibility_56") + cff.Cfg.Section(section).DeleteKey("userstat") + cff.Cfg.Section(section).DeleteKey("query_response_time_stats") + cff.ReplaceValue(section, "thread_handling", false, "2") + cff.ReplaceValue(section, "performance_schema", false, "ON") + cff.ReplaceValue(section, "explicit_defaults_for_timestamp", false, "OFF") + cff.ReplaceValue(section, "default_authentication_plugin", false, "mysql_native_password") + } + + if err = cff.SafeSaveFile(false); err != nil { + logger.Error("write %s failed %s", newfile, err.Error()) + return err + } + bakcnf := cf + "." + time.Now().Format(cst.TimeLayoutDir) + script := fmt.Sprintf("cp %s %s && cp %s %s", cf, bakcnf, newfile, cf) + stderr, err := osutil.StandardShellCommand(false, script) + if err != nil { + logger.Error("replace my.cnf failed,stderr:%s,err:%s", stderr, err.Error()) + return err + } + return nil +} + +func (m MysqlUpgradeComp) upgradeOldPassword(conn *native.DbWorker, port int) (err error) { + currentVersion, ok := m.verMap[port] + if !ok { + return fmt.Errorf("get %d version from runtime ctx failed", port) + } + if !(m.newVersion.TmysqlVersion > native.TMYSQL_2 && currentVersion.MysqlVersion > native.MYSQL_5P70) { + logger.Info("ignore upgradeOldPassword check") + return nil + } + upgradeUsers := []string{} + upgradeUsers = append(upgradeUsers, m.GeneralParam.RuntimeAccountParam.AdminUser) + upgradeUsers = append(upgradeUsers, m.GeneralParam.RuntimeAccountParam.YwUser) + upgradeUsers = append(upgradeUsers, m.GeneralParam.RuntimeAccountParam.DbBackupUser) + upgradeUsers = append(upgradeUsers, m.GeneralParam.RuntimeAccountParam.MonitorUser) + users, err := conn.GetIsOldPasswordUsers(upgradeUsers) + if err != nil { + logger.Error("query users have old password failed %s", err.Error()) + } + for _, user := range users { + pwd := "" + switch user.User { + case m.GeneralParam.RuntimeAccountParam.AdminUser: + pwd = m.GeneralParam.RuntimeAccountParam.AdminPwd + case m.GeneralParam.RuntimeAccountParam.YwUser: + pwd = m.GeneralParam.RuntimeAccountParam.YwPwd + case m.GeneralParam.RuntimeAccountParam.DbBackupUser: + pwd = m.GeneralParam.RuntimeAccountParam.DbBackupPwd + case m.GeneralParam.RuntimeAccountParam.MonitorUser: + pwd = m.GeneralParam.RuntimeAccountParam.MonitorPwd + } + _, err = conn.Exec( + "UPDATE mysql.user SET plugin = 'mysql_native_password',Password = PASSWORD('?') WHERE (User, Host) = ('?', '?')", + pwd, user.User, user.Host) + if err != nil { + logger.Error("update mysql.user password failed %s", err.Error()) + return err + } + } + if _, err = conn.Exec("FLUSH PRIVILEGES;"); err != nil { + logger.Error("flush privileges failed %s", err.Error()) + return err + } + return err +} + +func (m MysqlUpgradeComp) mysqlUpgrade(conn *native.DbWorker, port int) (err error) { + currentVersion, ok := m.verMap[port] + if !ok { + return fmt.Errorf("get %d version from runtime ctx failed", port) + } + // safe big version, ignore mysqlcheck + if int32(m.newVersion.TmysqlVersion/1000000)-int32(currentVersion.TmysqlVersion/100000) == 0 { + logger.Info("same big tmysql versoin, ignore mysqlupgrade") + return nil + } + // open general_log + // if err = m.openGeneralLog(conn); err != nil { + // logger.Error("set global general_log=on failed %s", err.Error()) + // return err + // } + upgradeScript := "" + if m.newVersion.TmysqlVersion > native.TMYSQL_1P2 && m.newVersion.TmysqlVersion < native.TMYSQL_2 { + upgradeScript = fmt.Sprintf( + "cd /usr/local/mysql && ./bin/mysql_upgrade -h%s --skip-write-binlog -i --grace-print -P%d -u%s -p%s", + m.Params.Host, port, m.adminUser, m.adminPwd) + } else if currentVersion.MysqlVersion < native.MYSQL_5P70 && m.newVersion.MysqlVersion > native.MYSQL_5P70 { + upgradeScript = fmt.Sprintf( + "cd /usr/local/mysql && ./bin/mysql_upgrade -h%s --skip-write-binlog --grace-print -P%d -u%s -p%s", + m.Params.Host, port, m.adminUser, m.adminPwd) + } else { + upgradeScript = fmt.Sprintf("cd /usr/local/mysql && ./bin/mysql_upgrade -h%s -P%d --skip-write-binlog -u%s -p%s", + m.Params.Host, port, m.adminUser, m.adminPwd) + } + upgradelog := fmt.Sprintf("upgrade-%d.log", port) + c := osutil.ComplexCommand{ + Command: upgradeScript, + WriteStderr: true, + WriteStdout: true, + StdoutFile: upgradelog, + StderrFile: upgradelog, + Logger: true, + } + alreadyUpgradeNum := 0 + if err = c.Run(); err != nil { + l, err := m.alreadyUpgradedLines(upgradelog) + if err != nil { + logger.Error("analysis upgradelog failed %s", err.Error()) + return err + } + alreadyUpgradeNum = len(l) + if alreadyUpgradeNum <= 0 { + return fmt.Errorf("failed to mysqlupgrade,please refer to the log for details %s,err is %s", upgradelog, err.Error()) + } + } + logger.Info("run mysql upgrade shell success") + // close general_log + if err = m.closeGeneralLog(conn); err != nil { + logger.Error("set global general_log=off failed %s", err.Error()) + return err + } + logger.Info("check upgrade log ...") + notOkScript := fmt.Sprintf( + "cat %s |grep -vwE 'OK|Warning|Looking|Running|mysql|performance_schema|information_schema|collate_upgrade|REPAIR TABLE|Repairing tables|Pre-4.1 Password hash found|Checking|Upgrading|Upgrade process|already'"+ + "|grep -v '^$' | wc -l", upgradelog) + out1, err := exec.Command("/bin/bash", "-c", notOkScript).CombinedOutput() + if err != nil { + glog.Infof("check upgrade log failed %s", err.Error()) + return err + } + if num, _ := strconv.Atoi(strings.TrimSpace(string(out1))); num != 0 && alreadyUpgradeNum == 0 { + err := fmt.Errorf("failed to mysqlupgrade, out1 is not empty, error info: %s", upgradelog) + logger.Error(err.Error()) + return err + } + logger.Info("mysqlupgrade for %s#%s ok", m.Params.Host, port) + return nil +} + +// additionalActions 升级后额外的操作 +func (m MysqlUpgradeComp) additionalActions(conn *native.DbWorker, port int) (err error) { + currentVersion, ok := m.verMap[port] + if !ok { + return fmt.Errorf("get %d version from runtime ctx failed", port) + } + actuator := mysqlutil.ExecuteSqlAtLocal{ + NeedShowWarnings: true, + Host: m.Params.Host, + Port: port, + WorkDir: "./", + User: m.adminUser, + Password: m.adminPwd, + } + // 如果版本小于5.6则需要该更row的模式 + if m.newVersion.MysqlVersion < native.MYSQL_5P70 { + changeRowFormatfile := fmt.Sprintf("convert_innodb_row_format_for_%d.sql", port) + if cmutil.FileExists(changeRowFormatfile) { + os.Remove(changeRowFormatfile) + } + fd, err := os.OpenFile(changeRowFormatfile, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + logger.Error("open convert_innodb_row_format_for_%d.sql failed %s", port, err.Error()) + return err + } + defer fd.Close() + if err = conn.ConvertInnodbRowFomart(currentVersion.Version, fd); err != nil { + logger.Error("create convert_innodb_row_format_for_%d.sql failed %s", port, err.Error()) + return err + } + if err = actuator.ExcuteSqlByMySQLClientOne(changeRowFormatfile, " "); err != nil { + logger.Error("excute sql by mysql client one %d.sql failed %s", port, err.Error()) + return err + } + } + if currentVersion.IsToku && currentVersion.TmysqlVersion <= native.TMYSQL_2P1 && + m.newVersion.TmysqlVersion >= native.TMYSQL_2P1P1 { + tokudbRenameTablesql := fmt.Sprintf("rename_tokudb_table.sql") + if cmutil.FileExists(tokudbRenameTablesql) { + os.Remove(tokudbRenameTablesql) + } + fd, err := os.OpenFile(tokudbRenameTablesql, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + logger.Error("open %d rename_tokudb_table.sql failed %s", port, err.Error()) + return err + } + defer fd.Close() + if err = conn.RenameTokudbTable(currentVersion.Version, fd); err != nil { + logger.Error("create convert_innodb_row_format_for_%d.sql failed %s", port, err.Error()) + return err + } + if err = actuator.ExcuteSqlByMySQLClientOne(tokudbRenameTablesql, " "); err != nil { + logger.Error("excute sql by mysql client one %d.sql failed %s", port, err.Error()) + return err + } + } + return nil +} + +func (m MysqlUpgradeComp) alreadyUpgradedLines(upgradelog string) (lines []string, err error) { + fd, err := os.Open(upgradelog) + if err != nil { + logger.Error("open mysqlcheck log failed %s", err.Error()) + return lines, err + } + already_upgraded := regexp.MustCompile("already upgraded") + defer fd.Close() + sc := bufio.NewScanner(fd) + for sc.Scan() { + line := sc.Text() + if already_upgraded.MatchString(line) { + lines = append(lines, line) + } + } + return lines, nil +} + +func (m MysqlUpgradeComp) openGeneralLog(conn *native.DbWorker) (err error) { + // open general_log + if _, err = conn.Exec("set global general_log=on;"); err != nil { + logger.Error("set global general_log=on failed %s", err.Error()) + return err + } + return nil +} + +func (m MysqlUpgradeComp) closeGeneralLog(conn *native.DbWorker) (err error) { + // close general_log + if _, err = conn.Exec("set global general_log=off;"); err != nil { + logger.Error("set global general_log=off failed %s", err.Error()) + return err + } + return nil +} + +func (m MysqlUpgradeComp) mysqlCheck(conn *native.DbWorker, port int) (err error) { + currentVersion, ok := m.verMap[port] + if !ok { + return fmt.Errorf("get %d version from runtime ctx failed", port) + } + // safe big version, ignore mysqlcheck + if int32(m.newVersion.TmysqlVersion/1000000)-int32(currentVersion.TmysqlVersion/100000) == 0 { + logger.Info("same big tmysql versoin, ignore mysqlcheck") + return nil + } + // open general_log + if err = m.openGeneralLog(conn); err != nil { + logger.Error("set global general_log=on failed %s", err.Error()) + return err + } + mysqlchecklog := fmt.Sprintf("mysqlcheck-%d.log", port) + mysqlcheckerrlog := fmt.Sprintf("mysqlcheck-%d.err", port) + if cmutil.FileExists(mysqlchecklog) { + if err = os.Remove(mysqlchecklog); err != nil { + logger.Error("it already exists and needs to be deleted ,remove %s failed %s", mysqlchecklog, err.Error()) + return err + } + } + checkScript := "" + if (m.newVersion.TmysqlVersion > native.TMYSQL_1P2 && m.newVersion.TmysqlVersion < native.TMYSQL_2) || + (currentVersion.MysqlVersion < native.MYSQL_5P70 && m.newVersion.MysqlVersion > native.MYSQL_5P70) { + checkScript = fmt.Sprintf( + "cd %s && ./bin/mysqlcheck -h%s -P%d --check-upgrade --grace-print --all-databases --skip-write-binlog -u%s -p%s", + cst.MysqldInstallPath, m.Params.Host, port, m.adminUser, m.adminPwd) + } else { + checkScript = fmt.Sprintf( + "cd %s && ./bin/mysqlcheck -h%s -P%d --all-databases --skip-write-binlog --check-upgrade -u%s -p%s", + cst.MysqldInstallPath, m.Params.Host, port, m.adminUser, m.adminPwd) + } + c := osutil.ComplexCommand{ + Command: checkScript, + Logger: true, + WriteStdout: true, + StdoutFile: mysqlchecklog, + WriteStderr: true, + StderrFile: mysqlcheckerrlog, + } + if err = c.Run(); err != nil { + logger.Error("run mysqlcheck failed %s", err.Error()) + return err + } + // close general_log + if err = m.closeGeneralLog(conn); err != nil { + logger.Error("set global general_log=off failed %s", err.Error()) + return err + } + var regs []*regexp.Regexp + performance_schema := regexp.MustCompile("^performance_schema") + information_schema := regexp.MustCompile("^information_schema") + regs = append(regs, regexp.MustCompile("OK$")) + regs = append(regs, performance_schema) + regs = append(regs, information_schema) + if m.newVersion.TmysqlVersion > native.TMYSQL_1P2 { + regs = append(regs, regexp.MustCompile("collate_upgrade")) + regs = append(regs, regexp.MustCompile("REPAIR TABLE")) + } + l, err := m.analysisMySQLCheckLog(mysqlchecklog, regs) + if err != nil { + return err + } + if len(l) > 0 { + return fmt.Errorf("failed to mysqlcheck for %d, error info: %v", port, l) + } + return nil +} + +// analysisMySQLCheckLog 分析mysqlcheck 的输出的结果 +func (m MysqlUpgradeComp) analysisMySQLCheckLog(mysqlchecklog string, regs []*regexp.Regexp) (lines []string, + err error) { + fd, err := os.Open(mysqlchecklog) + if err != nil { + logger.Error("open mysqlcheck log failed %s", err.Error()) + return lines, err + } + var abnormalLines []string + + defer fd.Close() + sc := bufio.NewScanner(fd) + for sc.Scan() { + line := sc.Text() + for _, reg := range regs { + if reg.MatchString(line) { + goto ctn + } + } + abnormalLines = append(abnormalLines, line) + ctn: + continue + } + return abnormalLines, nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/db.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/db.go index 591ea5c748..11a46df5a3 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/db.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/db.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - _ "github.com/go-sql-driver/mysql" // mysql TODO - "dbm-services/common/go-pubpkg/cmutil" "dbm-services/common/go-pubpkg/logger" "dbm-services/mysql/db-tools/dbactuator/pkg/core/cst" diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go index a8cbc9eb3b..bca37f8273 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/dbworker.go @@ -341,11 +341,14 @@ func (h *DbWorker) SelectVersion() (version string, err error) { return } -// HasTokudb TODO +// HasTokudb 判断是否安装tokudb引擎 func (h *DbWorker) HasTokudb() (has bool, err error) { var engine string err = h.Queryxs(&engine, "select engine from information_schema.engines where engine='tokudb';") - return cmutil.IsEmpty(engine), err + if err == sql.ErrNoRows { + return false, nil + } + return cmutil.IsNotEmpty(engine), err } // SelectNow 获取实例的当前时间。不是获取机器的,因为可能存在时区不一样 @@ -664,6 +667,27 @@ func (h *DbWorker) GetUserHosts() (users []UserHosts, err error) { return } +// GetIsOldPasswordUsers TODO +func (h *DbWorker) GetIsOldPasswordUsers(upgradeUsers []string) (users []UserHosts, err error) { + q, args, err := sqlx.In(`select User, + Host, + Password +from mysql.user +where ( + ( + plugin = '' + AND LENGTH(Password) = 16 + ) + or plugin = 'mysql_old_password' + ) + and User in (?)`, upgradeUsers) + if err != nil { + return nil, err + } + err = h.Queryx(&users, q, args...) + return users, err +} + // ShowPrivForUser 获取create user && grant user 语句 // // @receiver h diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/types.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/types.go index a7f0cb1dab..b0039e7592 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/types.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/types.go @@ -160,6 +160,13 @@ type ShowEnginesResp struct { // UserHosts TODO type UserHosts struct { - User string `db:"user"` - Host string `db:"host"` + Host string `db:"Host"` + User string `db:"User"` +} + +// Warning show warnings respone +type Warning struct { + Level string `db:"Level"` + Code int `db:"Code"` + Message string `db:"Message"` } diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go new file mode 100644 index 0000000000..fc85ec6249 --- /dev/null +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go @@ -0,0 +1,1009 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package native + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "strings" + + "dbm-services/common/go-pubpkg/cmutil" + "dbm-services/common/go-pubpkg/logger" + + "github.com/jmoiron/sqlx" +) + +const ( + // MYSQL_5P0P48 TODO + MYSQL_5P0P48 uint64 = 5000048 + // MYSQL_5P1P24 TODO + MYSQL_5P1P24 uint64 = 5001024 + // MYSQL_5P1P29 TODO + MYSQL_5P1P29 uint64 = 5001029 + // MYSQL_5P1P46 TODO + MYSQL_5P1P46 uint64 = 5001046 + // MYSQL_5P5P1 TODO + MYSQL_5P5P1 uint64 = 5005001 + // MYSQL_5P5P5 TODO + MYSQL_5P5P5 uint64 = 5005005 + // MYSQL_5P5P11 TODO + MYSQL_5P5P11 uint64 = 5005011 + // MYSQL_5P5P24 TODO + MYSQL_5P5P24 uint64 = 5005024 + // MYSQL_5P6P24 TODO + MYSQL_5P6P24 uint64 = 5006024 + // MYSQL_5P70 TODO + MYSQL_5P70 uint64 = 5007000 + // MYSQL_5P60 TODO + MYSQL_5P60 uint64 = 5006000 + // MYSQL_5P7P21 TODO + MYSQL_5P7P21 uint64 = 5007021 + // MYSQL_8P0 TODO + MYSQL_8P0 uint64 = 8000000 + // MYSQL_8P0P16 TODO + MYSQL_8P0P16 uint64 = 8000016 // mysql_upgrade deprecated + // MYSQL_8P0P18 TODO + MYSQL_8P0P18 uint64 = 8000018 + // TMYSQL_1 TODO + TMYSQL_1 uint64 = 1000000 + // TMYSQL_1P1 TODO + TMYSQL_1P1 uint64 = 1001000 + // TMYSQL_1P2 TODO + TMYSQL_1P2 uint64 = 1002000 + // TMYSQL_1P4 TODO + TMYSQL_1P4 uint64 = 1004000 + // TMYSQL_2 TODO + TMYSQL_2 uint64 = 2000000 + // TMYSQL_2P1 TODO + TMYSQL_2P1 uint64 = 2001000 + // TMYSQL_2P1P1 TODO + TMYSQL_2P1P1 uint64 = 2001001 + // TMYSQL_3 TODO + TMYSQL_3 uint64 = 3000000 + // TMySQL_3P15 TODO + TMySQL_3P15 uint64 = 3001005 +) + +// CheckColObject TODO +type CheckColObject struct { + TableSchema string `db:"TABLE_SCHEMA"` + TableName string `db:"TABLE_NAME"` + ColumnName string `db:"COLUMN_NAME"` +} + +// HasInvalidCode check if contain the invalid chars +func (h *DbWorker) HasInvalidCode() (invalidObjs []CheckColObject, err error) { + conn, err := h.GetSqlxDb().Connx(context.Background()) + if err != nil { + return nil, err + } + defer conn.Close() + // set names + _, err = conn.ExecContext(context.Background(), "set names utf8;") + if err != nil { + return nil, err + } + // set var + _, err = conn.ExecContext(context.Background(), "set @var:=concat('%', convert(0xC39F using utf8), '%');") + if err != nil { + return nil, err + } + checksql := `select distinct a.TABLE_SCHEMA, a.TABLE_NAME, a.COLUMN_NAME from information_schema.STATISTICS a, + information_schema.COLUMNS b + where a.TABLE_SCHEMA = b.TABLE_SCHEMA + and a.TABLE_NAME = b.TABLE_NAME + and a.COLUMN_NAME = b.COLUMN_NAME + and ( + b.COLLATION_NAME = 'utf8_general_ci' + or + b.COLLATION_NAME = 'utf8_general_mysql500_ci' + or + b.COLLATION_NAME = 'ucs2_general_ci' + or + b.COLLATION_NAME = 'ucs2_general_mysql500_ci' + ) + and a.TABLE_SCHEMA <> 'mysql';` + var checkObjects []CheckColObject + if err = conn.SelectContext(context.Background(), &checkObjects, checksql); err != nil { + return nil, err + } + for _, v := range checkObjects { + var count int + err = conn.GetContext(context.Background(), &count, "select count("+v.ColumnName+") from "+v.TableSchema+"."+ + v.TableName+ + " where "+v.ColumnName+" like @var collate utf8_bin ;") + if err != nil { + return nil, err + } + if count > 0 { + invalidObjs = append(invalidObjs, v) + } + } + return invalidObjs, nil +} + +// CheckInstantAddColumnObj TODO +type CheckInstantAddColumnObj struct { + Name string `db:"NAME"` + TableID int `db:"TABLE_ID"` + NCols int `db:"N_COLS"` + NCoreCols int `db:"N_CORE_COLS"` +} + +var ( + // ErrorUsedInstantAddColumnButValid TODO + ErrorUsedInstantAddColumnButValid = errors.New("found usage of instant add column, but it is valid") + // ErrorInvalidUsageOfInstantAddColumn TODO + ErrorInvalidUsageOfInstantAddColumn = errors.New("found invalid usage of instant add column") +) + +// CheckInstantAddColumn TODO +func (h *DbWorker) CheckInstantAddColumn() (err error) { + var data []CheckInstantAddColumnObj + err = h.Queryx(&data, + "select NAME, TABLE_ID, N_COLS,N_CORE_COLS from INFORMATION_SCHEMA.INNODB_SYS_TABLES where N_COLS <> N_CORE_COLS") + if err != nil { + return err + } + usedInstantAddColumn := 0 + containInvalidAddColumn := 0 + for _, v := range data { + usedInstantAddColumn = 1 + var flags []int + err = h.Queryx(&flags, + `select case (PRTYPE & 256) when 256 THEN 1 ELSE 0 END as DATA_NOT_NULL + from INFORMATION_SCHEMA.INNODB_SYS_COLUMNS where TABLE_ID=?`, v.TableID) + if err != nil { + return err + } + i := 0 + beforeNull := 0 + afterNull := 0 + for _, vv := range flags { + i++ + if vv == 0 { + afterNull++ + if i <= v.NCoreCols { + beforeNull++ + } + } + } + a := int(math.Ceil(float64(beforeNull) / 8)) + b := int(math.Ceil(float64(afterNull) / 8)) + if a != b { + containInvalidAddColumn = 1 + logger.Info("%s, column_num:%d, column_num_before_add:%d, null_num_before_add:%d, num_num_after_add:%d\n", v.Name, + v.NCols, v.NCoreCols, beforeNull, afterNull) + } + } + if containInvalidAddColumn > 0 { + return ErrorInvalidUsageOfInstantAddColumn + } + if usedInstantAddColumn > 0 { + return ErrorUsedInstantAddColumnButValid + } + return nil +} + +// GetDeleteWithoutDropUser TODO +func (h *DbWorker) GetDeleteWithoutDropUser() (accounts []string, err error) { + q := ` + select concat(user, '\@', host) as account +from mysql.db +where Delete_priv = 'Y' + and Drop_priv = 'N' +union +select concat(user, '\@', host) as account +from mysql.tables_priv +where Table_priv like '%Delete%' + and Table_priv not like '%Drop%' + ` + err = h.Queryx(&accounts, q) + return +} + +// TableInfo information_schema.table info +type TableInfo struct { + TableSchema string `db:"TABLE_SCHEMA"` + TableName string `db:"TABLE_NAME"` + TableType string `db:"TABLE_TYPE"` + Engine string `db:"ENGINE"` + RowFormat string `db:"ROW_FORMAT"` +} + +// CheckTableUpgrade TODO +func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) { + type checkFunc struct { + fn func(currentVersion, newVersion uint64) error + desc string + } + // 库表名关键字检查 + fns := []checkFunc{} + fns = append(fns, checkFunc{ + fn: h.tableNameKeyWordCheck, + desc: "检查表名是否包含关键字", + }) + fns = append(fns, checkFunc{ + fn: h.colNameNameKeyWordCheck, + desc: "检查字段是否包含关键字", + }) + fns = append(fns, checkFunc{ + fn: h.routineNameKeyWordCheck, + desc: "存储过程名关键字检查", + }) + fns = append(fns, checkFunc{ + fn: h.eventNameKeyWordCheck, + desc: "eventName关键字检查", + }) + fns = append(fns, checkFunc{ + fn: h.triggerNameKeyWordCheck, + desc: "triggerName关键字检查"}) + fns = append(fns, checkFunc{ + fn: h.viewNameKeyWordCheck, + desc: "视图名关键字检查", + }) + for _, f := range fns { + logger.Info("start check %s ...", f.desc) + if err = f.fn(currentVersion, newVersion); err != nil { + logger.Error("when check %s,failed %s", f.desc, err.Error()) + return err + } + } + type checkFuncNoparam struct { + fn func() error + desc string + } + // 非法字符检查 + fnns := []checkFuncNoparam{} + fnns = append(fnns, checkFuncNoparam{ + fn: h.tableNameAsciiCodeCheck, + desc: "检查库、表名中包含非法字符", + }) + fnns = append(fnns, checkFuncNoparam{ + fn: h.columnNameAsciiCodeCheck, + desc: "检查列名中包含非法字符", + }) + fnns = append(fnns, checkFuncNoparam{ + fn: h.routineNameAsciiCodeCheck, + desc: "存储过程名中包含非法字符", + }) + fnns = append(fnns, checkFuncNoparam{ + fn: h.triggerNameAsciiCodeCheck, + desc: "触发器名中包含非法字符", + }) + fnns = append(fnns, checkFuncNoparam{ + fn: h.viewNameAsciiCodeCheck, + desc: "视图名中包含非法字符", + }) + fnns = append(fnns, checkFuncNoparam{ + fn: h.udfCheck, + desc: "检查是否包含自定义函数", + }) + for _, f := range fnns { + logger.Info("start check %s ...", f.desc) + if err = f.fn(); err != nil { + logger.Error("when check %s,failed %s", f.desc, err.Error()) + return err + } + } + switch { + // 当准备升级到8.0版本 + case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0: + logger.Info("准备升级到8.0需要做这些额外的检查...") + fns80 := []checkFuncNoparam{} + fns80 = append(fns80, checkFuncNoparam{ + fn: h.checkNonNativeSupportParttion, + desc: "升级到MySQL8.0,必须将分区引起改成innodb或者ndb,不能有使用不支持本地分区的引擎创建分区表", + }) + fns80 = append(fns80, checkFuncNoparam{ + fn: h.checkParttionsInInnoSharedTableSpace, + desc: "没有分区表在共享表空间,Before upgrading to 8.0 they need to be moved to file-per-table tablespace", + }) + fns80 = append(fns80, checkFuncNoparam{ + fn: h.foreignKeyNameLengthCheck, + desc: "MySQL 8.0 限制外键名称不能超过 64 个字符,检查当前版本外键长度", + }) + fns80 = append(fns80, checkFuncNoparam{ + fn: h.enumSetTotalLengthTooLong, + desc: "表或者存储过程的 ENUM/SET 的所有元素总长度超过 255 字符,会导致升级失败", + }) + fns80 = append(fns80, checkFuncNoparam{ + fn: h.viewNameLengthTooLong, + desc: "视图列名不能超过 64 字符", + }) + fns80 = append(fns80, checkFuncNoparam{ + fn: h.datadicTablenameConflictsCheck, + desc: "检查MySQL 系统数据库中与 MySQL 8.0 数据字典中同名的表", + }) + for _, f := range fns80 { + logger.Info("start check %s ...", f.desc) + if err = f.fn(); err != nil { + logger.Error("when check %s,failed %s", f.desc, err.Error()) + return err + } + } + // 当准备升级到5.7版本 + case newVersion >= MYSQL_5P70 && currentVersion < MYSQL_5P70: + // per-4.1 password check + logger.Info("准备升级到MySQL5.7 需要做这些额外的检查...") + fns57 := []checkFuncNoparam{} + fns57 = append(fns57, checkFuncNoparam{ + fn: h.passwordCheck, + desc: "密码检查", + }) + fns57 = append(fns57, checkFuncNoparam{ + fn: h.partitionCheck, + desc: "5.7分表有比较大的改动,暂时不支持本地升级", + }) + fns57 = append(fns57, checkFuncNoparam{ + fn: h.columnTypeCheck, + desc: "MySQL5.7不支持year(2)字段类型,需要提前升级", + }) + fns57 = append(fns57, checkFuncNoparam{ + fn: h.tokudbEngineCheck, + desc: "tokudb引擎检查", + }) + for _, f := range fns57 { + logger.Info("start check %s ...", f.desc) + if err = f.fn(); err != nil { + logger.Error("when check %s,failed %s", f.desc, err.Error()) + return err + } + } + // 当准备升级到5.6版本 + case newVersion >= MYSQL_5P60 && currentVersion < MYSQL_5P60: + // per-4.1 password check + logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...") + if err = h.passwordCheck(); err != nil { + return err + } + + } + return nil +} + +func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string { + ReservedWords := []string{"ACCESSIBLE", "LINEAR", "MASTER_SSL_VERIFY_SERVER_CERT", "RANGE", "READ_ONLY", + "IGNORE_SERVER_IDS", "MASTER_HEARTBEAT_PERIOD", "MAXVALUE", "RESIGNAL", "SIGNAL", "SLOW"} + // new added reserver words for MySQL5.6 + ReservedWords56 := []string{"GET", "IO_AFTER_GTIDS", "IO_BEFORE_GTIDS", "MASTER_BIND", "PARTITION"} + // new added reserver words for MySQL5.7 + ReservedWords57 := []string{"GENERATED", "OPTIMIZER_COSTS", "STORED", "VIRTUAL", "PARTITION"} + // new added reserver words for MySQL8.0 + ReservedWords80 := []string{"CUME_DIST", "DENSE_RANK", "EMPTY", "EXCEPT", "FIRST_VALUE", "GROUPING", "GROUPS", + "JSON_TABLE", "LAG", "LAST_VALUE", "LATERAL", "LEAD", "NTH_VALUE", "NTILE", "OF", "OVER", "PERCENT_RANK", "RANK", + "RECURSIVE", "ROW_NUMBER", "SYSTEM", "WINDOW"} + switch { + case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0: + return append(ReservedWords, ReservedWords80...) + case newVersion >= MYSQL_5P70 && currentVersion < MYSQL_5P70: + return append(ReservedWords, ReservedWords57...) + case newVersion >= MYSQL_5P60 && currentVersion < MYSQL_5P60: + return append(ReservedWords, ReservedWords56...) + default: + return ReservedWords + } +} + +// tableNameKeyWordCheck TODO +func (h *DbWorker) tableNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []TableInfo + q, args, err := sqlx.In( + "select TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE from information_schema.tables where table_name in (?)", + h.getKeyWords(currentVersion, newVersion)) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid table name: %v", data) + } + return nil +} + +// ColumnInfo TODO +type ColumnInfo struct { + TableSchema string `db:"TABLE_SCHEMA"` + TableName string `db:"TABLE_NAME"` + ColumnName string `db:"COLUMN_NAME"` +} + +func (h *DbWorker) colNameNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []ColumnInfo + q, args, err := sqlx.In( + "select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME from information_schema.columns where column_name in (?)", + h.getKeyWords(currentVersion, newVersion)) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +// RoutineInfo TODO +type RoutineInfo struct { + RoutineSchema string `db:"ROUTINE_SCHEMA"` + RoutineName string `db:"ROUTINE_NAME"` + RoutineType string `db:"ROUTINE_TYPE"` +} + +func (h *DbWorker) routineNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []RoutineInfo + blacklist := []string{"ExtractValue", "FROM_BASE64", "GTID_SUBSET", "GTID_SUBTRACT", "INET6_ATON", "INET6_NTOA", + "IS_IPV4_COMPAT", "IS_IPV4_MAPPED", "IS_IPV4", "IS_IPV6", "SQL_THREAD_WAIT_AFTER_GTIDS", "TO_BASE64", "TO_SECONDS", + "UpdateXML", "UUID_SHORT", "VALIDATE_PASSWORD_STRENGTH", "WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS", "WEIGHT_STRING"} + keywords := h.getKeyWords(currentVersion, newVersion) + keywords = append(keywords, blacklist...) + q, args, err := sqlx.In( + "select ROUTINE_SCHEMA,ROUTINE_NAME,ROUTINE_TYPE from information_schema.routines where routine_name in (?)", + keywords) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +// TriggerInfo TODO +type TriggerInfo struct { + TriggerSchema string `db:"TRIGGER_SCHEMA"` + TriggerName string `db:"TRIGGER_NAME"` +} + +func (h *DbWorker) triggerNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []TriggerInfo + q, args, err := sqlx.In( + "select TRIGGER_SCHEMA,TRIGGER_NAME from information_schema.triggers where TRIGGER_NAME in (?)", + h.getKeyWords(currentVersion, newVersion)) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +// EventInfo TODO +type EventInfo struct { + EventSchema string `db:"EVENT_SCHEMA"` + EventName string `db:"EVENT_NAME"` +} + +func (h *DbWorker) eventNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []EventInfo + q, args, err := sqlx.In( + "select EVENT_SCHEMA,EVENT_NAME from information_schema.events where EVENT_NAME in (?)", + h.getKeyWords(currentVersion, newVersion)) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +// ViewInfo TODO +type ViewInfo struct { + TableSchema string `db:"TABLE_SCHEMA"` + TableName string `db:"TABLE_NAME"` + Definer string `db:"DEFINER"` +} + +func (h *DbWorker) viewNameKeyWordCheck(currentVersion, newVersion uint64) (err error) { + var data []ViewInfo + q, args, err := sqlx.In( + "select TABLE_SCHEMA,TABLE_NAME,DEFINER from information_schema.VIEWS where TABLE_NAME in (?)", + h.getKeyWords(currentVersion, newVersion)) + if err != nil { + return err + } + if err = h.Queryx(&data, q, args...); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +func (h *DbWorker) tableNameAsciiCodeCheck() (err error) { + var data []TableInfo + q := ` + select TABLE_SCHEMA, + TABLE_NAME, + TABLE_TYPE +from information_schema.tables +where TABLE_NAME <> convert(table_name using ASCII) + or TABLE_SCHEMA <> convert(TABLE_SCHEMA using ASCII); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +func (h *DbWorker) columnNameAsciiCodeCheck() (err error) { + var data []ColumnInfo + q := ` + select TABLE_SCHEMA, + TABLE_NAME, + COLUMN_NAME +from information_schema.columns +where column_name <> convert(column_name using ASCII) + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +func (h *DbWorker) columnTypeCheck() (err error) { + var data []ColumnInfo + err = h.Queryx(&data, ` + select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME + from information_schema.COLUMNS where COLUMN_TYPE='year(2)' group by 1,2,3; + `) + if err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("mysql5.7 no longer supports year(2) column type please change it to year(4) in upgrade,list: %v", + data) + } + return nil +} + +func (h *DbWorker) routineNameAsciiCodeCheck() (err error) { + var data []RoutineInfo + q := ` + select ROUTINE_SCHEMA, + ROUTINE_NAME, + ROUTINE_TYPE +from information_schema.routines +where routine_name <> convert(routine_name using ASCII); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +func (h *DbWorker) triggerNameAsciiCodeCheck() (err error) { + var data []TriggerInfo + q := ` + select TRIGGER_SCHEMA, + TRIGGER_NAME +from information_schema.triggers +where TRIGGER_NAME <> convert(TRIGGER_NAME using ASCII) + or ACTION_STATEMENT <> convert(ACTION_STATEMENT using ASCII); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +func (h *DbWorker) viewNameAsciiCodeCheck() (err error) { + var data []ViewInfo + q := ` + select TABLE_SCHEMA, + TABLE_NAME, + DEFINER +from information_schema.VIEWS +where TABLE_NAME <> convert(TABLE_NAME using ASCII); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("found invalid column name: %v", data) + } + return nil +} + +// ConvertInnodbRowFomart 匹配innodb行格式 并生成可以适配快速加字段的行格式 +// +// Tmysql 中小于5.7的版本需要支持快速加字段需要转变行格式成GCS +func (h *DbWorker) ConvertInnodbRowFomart(ver string, file *os.File) (err error) { + dbs, err := h.ShowDatabases() + if err != nil { + return err + } + _, err = file.WriteString("set sql_log_bin=off;\n") + if err != nil { + return fmt.Errorf("wirte sql_log_bin=off failed %w", err) + } + for _, db := range cmutil.FilterOutStringSlice(dbs, cmutil.GetGcsSystemDatabases(ver)) { + var tables []TableInfo + err = h.Queryx(&tables, + "select TABLE_SCHEMA, TABLE_NAME, ENGINE, ROW_FORMAT from information_schema.TABLES where TABLE_SCHEMA = ?", db) + if err != nil { + return fmt.Errorf("when get tables in %s,failed %w", db, err) + } + for _, tb := range tables { + var werr error = nil + if strings.ToLower(tb.Engine) == "innodb" { + switch strings.ToLower(tb.RowFormat) { + case "compact": + _, werr = file.WriteString(fmt.Sprintf("/*!99000 alter table `%s`.`%s` row_format=GCS */;\n", tb.TableSchema, + tb.TableName)) + case "dynamic": + _, werr = file.WriteString(fmt.Sprintf("/*!99000 alter table `%s`.`%s` row_format=GCS_Dynamic */;\n", + tb.TableSchema, + tb.TableName)) + } + if werr != nil { + return werr + } + } + } + } + return err +} + +// RenameTokudbTable TODO +func (h *DbWorker) RenameTokudbTable(ver string, file *os.File) (err error) { + dbs, err := h.ShowDatabases() + if err != nil { + return err + } + _, err = file.WriteString("set sql_log_bin=off;\n") + if err != nil { + return fmt.Errorf("wirte sql_log_bin=off failed %w", err) + } + for _, db := range cmutil.FilterOutStringSlice(dbs, cmutil.GetGcsSystemDatabases(ver)) { + var tables []TableInfo + err = h.Queryx(&tables, + "select TABLE_SCHEMA, TABLE_NAME, ENGINE, ROW_FORMAT from information_schema.TABLES where TABLE_SCHEMA = ?", db) + if err != nil { + return fmt.Errorf("when get tables in %s,failed %w", db, err) + } + for _, tb := range tables { + if strings.ToLower(tb.Engine) != "tokudb" { + continue + } + renamesSqls := []string{} + renamesSqls = append(renamesSqls, fmt.Sprintf("rename table `%s`.`%s` to `%s`.`%s`;\n", tb.TableSchema, tb.TableName, + tb.TableSchema, tb.TableName+"_tokudb_backup_tmp")) + renamesSqls = append(renamesSqls, fmt.Sprintf("rename table `%s`.`%s` to `%s`.`%s`;\n", tb.TableSchema, + tb.TableName+"_tokudb_backup_tmp", + tb.TableSchema, tb.TableName)) + for _, ql := range renamesSqls { + _, werr := file.WriteString(ql) + if werr != nil { + return werr + } + } + } + } + return err + +} + +// udfCheck 检查是否能存在自定义函数 +func (h *DbWorker) udfCheck() (err error) { + // sql_check_udf="select dl from mysql.func;" + var count int + err = h.Queryxs(&count, "select count(dl) from mysql.func") + if err != nil { + return err + } + if count > 0 { + return fmt.Errorf("found udf,but it is not allowed") + } + return nil +} + +// passwordCheck per-4.1 password check +func (h *DbWorker) passwordCheck() (err error) { + var accounts []string + err = h.Queryx(&accounts, "SELECT concat(user,'@',host) as account FROM mysql.user WHERE LENGTH(password) = 16") + if err != nil { + return err + } + if len(accounts) > 0 { + return fmt.Errorf("%v found password length 16,but it is not allowed", accounts) + } + return +} + +// partitionCheck https://dev.mysql.com/doc/refman/5.7/en/upgrading-from-previous-series.html +// ----MySQL 5.7 specific checks +// Beginning with MySQL 5.7.6, the InnoDB storage engine uses its own built-in (“native”) partitioning +// handler for any new partitioned tables created using InnoDB. +// Partitioned InnoDB tables created in previous versions of MySQL are not automatically upgraded. +// You can easily upgrade such tables to use InnoDB native partitioning +// in MySQL 5.7.9 or later using either of the following methods: +// 如果mysql5.7 升级分区表 +// 执行alter 分区表可能会花费很多时间,暂时不支持 +// To upgrade an individual table from the generic partitioning handler to InnoDB native partitioning, +// execute the statement ALTER TABLE table_name UPGRADE PARTITIONING. +// To upgrade all InnoDB tables that use the generic partitioning +// handler to use the native partitioning handler instead, run mysql_upgrade. +func (h *DbWorker) partitionCheck() (err error) { + var data []TableInfo + q := ` + select TABLE_SCHEMA, + TABLE_NAME, + count(*) +from INFORMATION_SCHEMA.PARTITIONS +where PARTITION_NAME is not NULL +group by 1, + 2; + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("%v found partition name,but it is not allowed", data) + } + return nil +} + +func (h *DbWorker) tokudbEngineCheck() (err error) { + var data []TableInfo + q := ` + select TABLE_SCHEMA, + TABLE_NAME, + ENGINE +from information_schema.TABLES +where ENGINE = 'TokuDB' + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("exists TokuDB table,but it is not allowed,%v", data) + } + return nil +} + +// ----MySQL 8.0 specific checks +// https://dev.mysql.com/doc/refman/8.0/en/upgrading-from-previous-series.html#upgrade-configuration-changes + +// datadicTablenameConflictsCheck TODO +// 3.1.2 MySQL 系统数据库中不能有与 MySQL 8.0 数据字典中同名的表 +func (h *DbWorker) datadicTablenameConflictsCheck() (err error) { + var data []TableInfo + q := ` + SELECT TABLE_SCHEMA, + TABLE_NAME +FROM INFORMATION_SCHEMA.TABLES +WHERE LOWER(TABLE_SCHEMA) = 'mysql' + and LOWER(TABLE_NAME) IN ( + 'catalogs', + 'character_sets', + 'check_constraints', + 'collations', + 'column_statistics', + 'column_type_elements', + 'columns', + 'dd_properties', + 'events', + 'foreign_key_column_usage', + 'foreign_keys', + 'index_column_usage', + 'index_partitions', + 'index_stats', + 'indexes', + 'parameter_type_elements', + 'parameters', + 'resource_groups', + 'routines', + 'schemata', + 'st_spatial_reference_systems', + 'table_partition_values', + 'table_partitions', + 'table_stats', + 'tables', + 'tablespace_files', + 'tablespaces', + 'triggers', + 'view_routine_usage', + 'view_table_usage' + ); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("系统数据库中存在与 MySQL 8.0 数据字典中同名的表,具体有%v", data) + } + return nil +} + +func (h *DbWorker) foreignKeyNameLengthCheck() (err error) { + var data []TableInfo + q := ` + SELECT TABLE_SCHEMA, TABLE_NAME + FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_NAME IN + (SELECT LEFT(SUBSTR(ID,INSTR(ID,'/')+1), + INSTR(SUBSTR(ID,INSTR(ID,'/')+1),'_ibfk_')-1) + FROM INFORMATION_SCHEMA.INNODB_SYS_FOREIGN + WHERE LENGTH(SUBSTR(ID,INSTR(ID,'/')+1))>64); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("%v found foreign key name length > 64,but it is not allowed", data) + } + return nil +} + +// checkNonNativeSupportParttion TODO +// A MySQL storage engine is now responsible for providing its own partitioning handler, +// and the MySQL server no longer provides generic partitioning support. +// InnoDB and NDB are the only storage engines that provide a native partitioning handler that is supported in MySQL 8.0 +// A partitioned table using any other storage engine must be altered—either to convert it to InnoDB or NDB, +// or to remove its partitioning—before upgrading the server, else it cannot be used afterwards. +func (h *DbWorker) checkNonNativeSupportParttion() (err error) { + var data []TableInfo + q := ` + SELECT TABLE_SCHEMA, + TABLE_NAME +FROM INFORMATION_SCHEMA.TABLES +WHERE ENGINE NOT IN ('innodb', 'ndbcluster') + AND CREATE_OPTIONS LIKE '%partitioned%'; + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("exists non-native support partition,but it is not allowed,%v", data) + } + return nil +} + +// TableSpaceInfo TableSpaceInfo +type TableSpaceInfo struct { + Name string `db:"NAME"` + Space string `db:"SPACE"` + SpaceType string `db:"SPACE_TYPE"` +} + +// checkParttionsInInnoSharedTableSpace TODO +// 3.1.6 没有分区表在共享表空间 +func (h *DbWorker) checkParttionsInInnoSharedTableSpace() (err error) { + var data []TableSpaceInfo + q := ` + SELECT DISTINCT NAME, + SPACE, + SPACE_TYPE +FROM INFORMATION_SCHEMA.INNODB_SYS_TABLES +WHERE NAME LIKE '%#P#%' + AND SPACE_TYPE NOT LIKE 'Single'; + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("exists partition in innodb shared tablespace,but it is not allowed,%v", data) + } + return nil +} + +// enumSetTotalLengthTooLong TODO +// 3.1.5 表或者存储过程的 ENUM/SET 的所有元素总长度超过 255 字符,会导致升级失败 +// EnumSetTotalLenghTooLong TODO +func (h *DbWorker) enumSetTotalLengthTooLong() (err error) { + var data []ColumnInfo + q := ` + select concat(TABLE_SCHEMA, ".", TABLE_NAME, ".", COLUMN_NAME) as schema_table_column + from information_schema.columns + where length(COLUMN_TYPE) > 255 + 8 + and TABLE_SCHEMA NOT IN ( + 'INFORMATION_SCHEMA', + 'SYS', + 'PERFORMANCE_SCHEMA', + 'MYSQL' + ); + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("exists column type length > 255,but it is not allowed,%v", data) + } + return nil +} + +// viewNameLengthTooLong 3.1.11 视图列名不能超过 64 字符 +func (h *DbWorker) viewNameLengthTooLong() (err error) { + var data []ColumnInfo + q := ` + SELECT c.TABLE_SCHEMA, + c.TABLE_NAME, + c.COLUMN_NAME +FROM INFORMATION_SCHEMA.COLUMNS c + JOIN INFORMATION_SCHEMA.TABLES t ON c.TABLE_NAME = t.TABLE_NAME + and c.TABLE_SCHEMA = t.TABLE_SCHEMA +where c.TABLE_SCHEMA NOT IN ( + 'INFORMATION_SCHEMA', + 'SYS', + 'PERFORMANCE_SCHEMA', + 'MYSQL' + ) + AND t.TABLE_COMMENT = 'VIEW' + AND length(c.COLUMN_NAME) >= 64; + ` + if err = h.Queryx(&data, q); err != nil { + return err + } + if len(data) > 0 { + return fmt.Errorf("exists view column name length > 64,but it is not allowed,%v", data) + } + return nil +} + +func (h *DbWorker) tableCommentIllegalChar() (err error) { + conn, err := h.GetSqlxDb().Connx(context.Background()) + if err != nil { + return err + } + defer conn.Close() + rows, err := conn.QueryContext(context.Background(), "SELECT DISTINCT 1 FROM INFORMATION_SCHEMA.COLUMNS;") + if err != nil { + return err + } + defer rows.Close() + var warnings []Warning + if err = conn.SelectContext(context.Background(), &warnings, "show warnings;"); err != nil { + return err + } + if len(warnings) <= 0 { + return nil + } + logger.Error("get warnings %v", warnings) + var errs []error + for _, warning := range warnings { + if warning.Code == 1366 { + errs = append(errs, fmt.Errorf("非法comment %s", warning.Message)) + } + } + return errors.Join(errs...) +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec.go index fb7c0854ac..c68467bf9d 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec.go @@ -1,11 +1,14 @@ package osutil import ( + "bufio" "bytes" "fmt" + "io" "os" "os/exec" "strings" + "sync" "dbm-services/common/go-pubpkg/logger" @@ -167,3 +170,99 @@ func StandardShellCommand(isSudo bool, param string) (stdoutStr string, err erro } return stdout.String(), nil } + +// ComplexCommand 捕获标准错误和标准输出io copy 到需要文件里面 +// 不影响正常的输出 +type ComplexCommand struct { + Command string + WriteStdout bool + WriteStderr bool + StdoutFile string + StderrFile string + Logger bool +} + +// Run Command Run +func (c *ComplexCommand) Run() (err error) { + var stderrBuf bytes.Buffer + var errStdout, errStderr error + var stderrWs, stdoutWs []io.Writer + cmd := exec.Command("/bin/bash", "-c", c.Command) + stdoutIn, _ := cmd.StdoutPipe() + stderrIn, _ := cmd.StderrPipe() + // 写入error 文件 + if c.WriteStderr { + ef, errO := os.OpenFile(c.StderrFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if errO != nil { + logger.Warn("打开日志时失败! %s", errO.Error()) + return errO + } + defer ef.Close() + defer ef.Sync() + stderrWs = append(stderrWs, ef) + } + if c.WriteStdout { + of, errO := os.OpenFile(c.StdoutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if errO != nil { + logger.Warn("打开日志时失败! %s", errO.Error()) + return errO + } + defer of.Close() + defer of.Sync() + stdoutWs = append(stdoutWs, of) + } + if c.Logger { + reader, writer := io.Pipe() + stderrWs = append(stderrWs, writer) + stdoutWs = append(stdoutWs, writer) + go func() { + buf := []byte{} + sc := bufio.NewScanner(reader) + sc.Buffer(buf, 2048*1024) + lineNumber := 1 + for sc.Scan() { + logger.Info(sc.Text()) + lineNumber++ + } + if err := sc.Err(); err != nil { + logger.Error("something bad happened in the line %v: %v", lineNumber, err) + return + } + }() + } + stdout := io.MultiWriter(stdoutWs...) + stderr := io.MultiWriter(stderrWs...) + + if err = cmd.Start(); err != nil { + logger.Error("start command failed:%s", err.Error()) + return err + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + _, errStdout = io.Copy(stdout, stdoutIn) + wg.Done() + }() + + _, errStderr = io.Copy(stderr, stderrIn) + wg.Wait() + + if errStdout != nil { + logger.Error("failed to capture stdout or stderr%v\n", errStdout) + return errStdout + } + if errStderr != nil { + logger.Error("failed to capture stderr or stderr,%v\n", errStderr) + return errStderr + } + + if err = cmd.Wait(); err != nil { + errStr := string(stderrBuf.Bytes()) + logger.Error("exec failed:%s,stderr: %s", err.Error(), errStr) + return err + } + + return nil +} diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec_test.go b/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec_test.go index 68ccf696ce..257335e6cd 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec_test.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/util/osutil/cmdexec_test.go @@ -1,3 +1,13 @@ +/* + * TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. + * Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at https://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + package osutil_test import ( @@ -14,3 +24,17 @@ func TestExecShellCommand(t *testing.T) { } t.Log(out) } + +func TestComplexCommand(t *testing.T) { + t.Log("start test complex command") + c := osutil.ComplexCommand{ + Command: "mysqlcheck -uxx -pxx --check-upgrade --grace-print --all-databases --skip-write-binlog ", + Logger: false, + WriteStdout: true, + StdoutFile: "./test.out", + } + if err := c.Run(); err != nil { + t.Fatal(err.Error()) + } + t.Log("end test complex command") +}