Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mysql): db重命名和清档中的触发器 #6662 #6679

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,26 @@ func (c *OnMySQLComponent) instanceRenameTables(port int) error {
fromDB = fmt.Sprintf("%s_%d", fromDB, c.Param.PortShardIdMap[port])
toDB = fmt.Sprintf("%s_%d", toDB, c.Param.PortShardIdMap[port])
}
err := pkg.TransDBTables(c.dbConn, fromDB, toDB, c.dbTablesMap[fromDB])
createTriggers, err := pkg.TransDBTables(c.dbConn, fromDB, toDB, c.dbTablesMap[fromDB])
if err != nil {
return err
}

// 源的触发器已经没了
// 这里要在目的把触发器恢复
for _, trigger := range createTriggers {
_, err = c.dbConn.ExecContext(context.Background(), fmt.Sprintf("USE `%s`", toDB))
if err != nil {
logger.Error("change db to %s failed: %v", toDB, err)
return err
}
_, err = c.dbConn.ExecContext(context.Background(), trigger)
if err != nil {
logger.Error("create trigger %s in %s failed: %v", trigger, toDB, err)
return err
}
logger.Info("create trigger %s in %s success", trigger, toDB)
}
}
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,35 @@ import (
"github.com/jmoiron/sqlx"
)

func TransDBTables(conn *sqlx.Conn, from, to string, tables []string) error {
type createTriggerRow struct {
Trigger string `db:"Trigger"`
SqlMode string `db:"sql_mode"`
Sql string `db:"SQL Original Statement"`
CharsetClient string `db:"character_set_client"`
CollationConnection string `db:"collation_connection"`
Collation string `db:"Database Collation"`
Created string `db:"Created"`
}

// TransDBTables
// 一个表如果有触发器, 必须要 drop 触发器才能 drop/rename
// 如果是 rename db 单据, 源触发器可以不用保留
// 如果是 truncate db, 当清档类型是 truncate table 时, 源触发器需要保留
// 实际是 rename table 前先把触发器删掉, 完事了再看需求重建
// 但是又不能在这个函数重建, 因为表还不在, 所以只能把需求重建的 trigger 返回
func TransDBTables(conn *sqlx.Conn, from, to string, tables []string) ([]string, error) {
var res []string
for _, table := range tables {
err := transDBTable(conn, from, to, table)
createTriggers, err := transDBTable(conn, from, to, table)
if err != nil {
return err
return nil, err
}
res = append(res, createTriggers...)
}
return nil
return res, nil
}

func transDBTable(conn *sqlx.Conn, from, to, tableName string) error {
func transDBTable(conn *sqlx.Conn, from, to, tableName string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand All @@ -29,7 +47,40 @@ func transDBTable(conn *sqlx.Conn, from, to, tableName string) error {
fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`;", to, tableName),
)
if err != nil {
return err
return nil, err
}

// 源触发器
var triggers []string
err = conn.SelectContext(
ctx,
&triggers,
`SELECT TRIGGER_NAME FROM INFORMATION_SCHEMA.TRIGGERS WHERE EVENT_OBJECT_SCHEMA = ? AND EVENT_OBJECT_TABLE = ?`,
from, tableName,
)
if err != nil {
logger.Error("query trigger for %s.%s failed: %s", from, tableName, err)
return nil, err
}
logger.Info("query trigger for %s.%s succeeded: %v", from, tableName, triggers)

var createTriggers []string
for _, trigger := range triggers {
createTrigger := createTriggerRow{}
err = conn.GetContext(ctx, &createTrigger, fmt.Sprintf("SHOW CREATE TRIGGER `%s`.`%s`", from, trigger))
if err != nil {
logger.Error("show create trigger for %s.%s failed: %s", from, trigger, err)
return nil, err
}
createTriggers = append(createTriggers, createTrigger.Sql)
logger.Info("stage create trigger for %s.%s: %s", from, trigger, createTrigger.Sql)

_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TRIGGER `%s`.`%s`", from, trigger))
if err != nil {
logger.Error("drop trigger for %s.%s failed: %s", from, trigger, err)
return nil, err
}
logger.Info("drop trigger for %s.%s succeeded", from, trigger)
}

_, err = conn.ExecContext(
Expand All @@ -40,9 +91,9 @@ func transDBTable(conn *sqlx.Conn, from, to, tableName string) error {
),
)
if err != nil {
logger.Info("rename %s from %s to %s failed: %s", tableName, from, to, err.Error())
return err
logger.Error("rename %s from %s to %s failed: %s", tableName, from, to, err.Error())
return nil, err
}
logger.Info("rename %s from %s to %s success", tableName, from, to)
return nil
return createTriggers, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *OnMySQLComponent) oneInstance(port int) error {
//}
//logger.Info("rename others to stage on instance %d finished", port)

err = c.instanceRenameTables(port)
dbTriggers, err := c.instanceRenameTables(port)
if err != nil {
logger.Error("rename tables to stage on instance %d failed: %s", port, err.Error())
return err
Expand All @@ -113,6 +113,24 @@ func (c *OnMySQLComponent) oneInstance(port int) error {
return err
}
logger.Info("truncate source table on instance %d finished", port)

for db, triggers := range dbTriggers {
for _, trigger := range triggers {
_, err = c.dbConn.ExecContext(context.Background(), fmt.Sprintf("USE `%s`", db))
if err != nil {
logger.Error("change db to %s failed: %s", db, err.Error())
return err
}
_, err = c.dbConn.ExecContext(context.Background(), trigger)
if err != nil {
logger.Error("create trigger %s in %s on instance %d failed: %s",
trigger, db, port, err.Error())
return err
}
logger.Info("create trigger %s in %s on instance %d success",
trigger, db, port)
}
}
} else if c.Param.TruncateDataType == "drop_database" {
err = c.instanceDropSourceDBs(port)
if err != nil {
Expand Down Expand Up @@ -194,17 +212,19 @@ func (c *OnMySQLComponent) instanceCreateStageDBs(port int) error {
return nil
}

func (c *OnMySQLComponent) instanceRenameTables(port int) error {
func (c *OnMySQLComponent) instanceRenameTables(port int) (map[string][]string, error) {
res := map[string][]string{}
for db := range c.dbTablesMap {
stageDBName := generateStageDBName(c.Param.StageDBHeader, c.Param.FlowTimeStr, db)
err := rpkg.TransDBTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
triggers, err := rpkg.TransDBTables(c.dbConn, db, stageDBName, c.dbTablesMap[db])
if err != nil {
logger.Error("rename tables to stage on instance %d failed: %s", port, err.Error())
return err
return nil, err
}
res[db] = triggers
}
logger.Info("rename table on instance %d success", port)
return nil
return res, nil
}

func (c *OnMySQLComponent) instanceRenameOthers(port int) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,37 @@ func safeDropSourceTable(conn *sqlx.Conn, dbName, stageDBName, tableName string)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 留着这里, drop table 不需要预处理触发器
//var triggers []string
//err = conn.SelectContext(
// ctx,
// &triggers,
// `SELECT TRIGGER_NAME FROM INFORMATION_SCHEMA.TRIGGERS WHERE EVENT_OBJECT_SCHEMA = ? AND EVENT_OBJECT_TABLE = ?`,
// dbName, tableName,
//)
//if err != nil {
// logger.Error("query trigger for %s.%s failed: %s", dbName, tableName, err)
// return err
//}
//logger.Info("query trigger for %s.%s succeeded: %v", dbName, tableName, triggers)
//
//for _, trigger := range triggers {
// _, err = conn.ExecContext(ctx, fmt.Sprintf("USE `%s`", dbName))
// if err != nil {
// logger.Error("change db to %s failed: %s", dbName, err)
// return err
// }
// logger.Info("change db to %s succeeded", dbName)
//
// // 中控要 change db 再 drop trigger
// _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TRIGGER `%s`", trigger))
// if err != nil {
// logger.Error("drop trigger for %s.%s failed: %s", dbName, trigger, err)
// return err
// }
// logger.Info("drop trigger for %s.%s succeeded", dbName, trigger)
//}

_, err = conn.ExecContext(
ctx,
fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ type Checker struct {
func (c *Checker) Run() (msg string, err error) {
var cnfFile string
if config.MonitorConfig.Port == 3306 {
cnfFile = "/etc/my.cnf"
cnfFile = "etc/my.cnf.3306"
if _, err := os.Stat(cnfFile); os.IsNotExist(err) {
cnfFile = "/etc/my.cnf"
}
} else {
cnfFile = fmt.Sprintf("/etc/my.cnf.%d", config.MonitorConfig.Port)
}
Expand Down
Loading