From 2c6182ada5d928b27d6ca724763838ed055d96e8 Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Mon, 2 Dec 2024 13:36:34 +0800 Subject: [PATCH 1/9] modify: check can task backup out side of loop --- sqle/server/sqled.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/sqle/server/sqled.go b/sqle/server/sqled.go index 0558c77bc..30f91772b 100644 --- a/sqle/server/sqled.go +++ b/sqle/server/sqled.go @@ -445,8 +445,8 @@ func (a *action) GetTaskStatus(st *model.Storage) string { } func (a *action) execTask() (err error) { - // TODO if enable backup and plugin support backup - if a.task.EnableBackup { + svc := BackupService{} + if svc.CheckCanTaskBackup(a.task) { err = a.backupAndExecSql() if err != nil { return err @@ -483,16 +483,13 @@ backupAndExecSql() 备份与执行SQL: 按照顺序,先根据一条SQL备份,再执行该SQL。备份过程中涉及连库查询和保存数据。 */ func (a *action) backupAndExecSql() error { - svc := BackupService{} for _, executeSQL := range a.task.ExecuteSQLs { - if svc.CheckCanTaskBackup(a.task) { - backupTask, err := toBackupTask(a.plugin, executeSQL) - if err != nil { - return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) - } - if err = backupTask.Backup(); err != nil { - return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) - } + backupTask, err := toBackupTask(a.plugin, executeSQL) + if err != nil { + return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) + } + if err = backupTask.Backup(); err != nil { + return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) } if err := a.execSQL(executeSQL); err != nil { return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup task: %v, task: %v", executeSQL, err, executeSQL.BackupTask.ID, a.task.ID) From bf311a1c630a62e8eeea81b1e07138ee51712afc Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Mon, 2 Dec 2024 16:02:28 +0800 Subject: [PATCH 2/9] modify: compatible with other plugins --- sqle/server/backup.go | 6 +++--- sqle/server/backup_ce.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sqle/server/backup.go b/sqle/server/backup.go index a888729a0..b9d29c436 100644 --- a/sqle/server/backup.go +++ b/sqle/server/backup.go @@ -3,7 +3,7 @@ package server import "github.com/actiontech/sqle/sqle/model" type BackupTask interface { - Backup() error + backup() error } type BackupStrategy string @@ -12,7 +12,7 @@ const ( BackupStrategyNone BackupStrategy = "none" // 不备份(不支持备份、无需备份、选择不备份) BackupStrategyReverseSql BackupStrategy = "reverse_sql" // 备份为反向SQL BackupStrategyOriginalRow BackupStrategy = "original_row" // 备份为原始行 - BackupStrategyManually BackupStrategy = "manual" // 标记为人工备份 + BackupStrategyManually BackupStrategy = "manual" // 标记为人工备份 BackupRowsAffectedLimit int = 1000 // SQL影响行数上限,超过该上限的SQL不进行备份 ) @@ -63,4 +63,4 @@ func (m backupTaskMap) AddBackupTask(backupTask *model.BackupTask) { if _, exist := m[backupTask.ExecuteSqlId]; !exist { m[backupTask.ExecuteSqlId] = backupTask } -} \ No newline at end of file +} diff --git a/sqle/server/backup_ce.go b/sqle/server/backup_ce.go index 4f7708a06..2a6d18199 100644 --- a/sqle/server/backup_ce.go +++ b/sqle/server/backup_ce.go @@ -28,7 +28,7 @@ func initModelBackupTask(p driver.Plugin, task *model.Task, sql *model.ExecuteSQ return &model.BackupTask{} } -func toBackupTask(a driver.Plugin, sql *model.ExecuteSQL) (BackupTask, error) { +func toBackupTask(p driver.Plugin, sql *model.ExecuteSQL) (*BaseBackupTask, error) { return &BaseBackupTask{}, nil } From f21db65e7b2d003e3135de77a66f0a3f09aac83d Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Mon, 2 Dec 2024 16:17:58 +0800 Subject: [PATCH 3/9] modify: compatible with other plugins --- sqle/server/backup_ce.go | 2 +- sqle/server/sqled.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sqle/server/backup_ce.go b/sqle/server/backup_ce.go index 2a6d18199..18893b69d 100644 --- a/sqle/server/backup_ce.go +++ b/sqle/server/backup_ce.go @@ -28,7 +28,7 @@ func initModelBackupTask(p driver.Plugin, task *model.Task, sql *model.ExecuteSQ return &model.BackupTask{} } -func toBackupTask(p driver.Plugin, sql *model.ExecuteSQL) (*BaseBackupTask, error) { +func toBackupTask(p driver.Plugin, sql *model.ExecuteSQL, dbType string) (*BaseBackupTask, error) { return &BaseBackupTask{}, nil } diff --git a/sqle/server/sqled.go b/sqle/server/sqled.go index 30f91772b..f067b4031 100644 --- a/sqle/server/sqled.go +++ b/sqle/server/sqled.go @@ -484,7 +484,7 @@ backupAndExecSql() 备份与执行SQL: */ func (a *action) backupAndExecSql() error { for _, executeSQL := range a.task.ExecuteSQLs { - backupTask, err := toBackupTask(a.plugin, executeSQL) + backupTask, err := toBackupTask(a.plugin, executeSQL, a.task.DBType) if err != nil { return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) } From 2f52a1baee2e5c568980a46433053185b7bab0d6 Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Tue, 3 Dec 2024 16:35:54 +0800 Subject: [PATCH 4/9] fix: panic while format subquery --- sqle/driver/mysql/rollback.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sqle/driver/mysql/rollback.go b/sqle/driver/mysql/rollback.go index 0d596db4a..707296be0 100644 --- a/sqle/driver/mysql/rollback.go +++ b/sqle/driver/mysql/rollback.go @@ -415,7 +415,7 @@ func (i *MysqlDriverImpl) generateInsertRollbackSql(stmt *ast.InsertStmt) (strin for n, name := range columnsName { _, isPk := pkColumnsName[name] if isPk { - where = append(where, fmt.Sprintf("%s = '%s'", name, util.ExprFormat(value[n]))) + where = append(where, fmt.Sprintf("%s = '%s'", name, restore(value[n]))) } } if len(where) != len(pkColumnsName) { @@ -437,7 +437,7 @@ func (i *MysqlDriverImpl) generateInsertRollbackSql(stmt *ast.InsertStmt) (strin name := setExpr.Column.Name.String() _, isPk := pkColumnsName[name] if isPk { - where = append(where, fmt.Sprintf("%s = '%s'", name, util.ExprFormat(setExpr.Expr))) + where = append(where, fmt.Sprintf("%s = '%s'", name, restore(setExpr.Expr))) } } if len(where) != len(pkColumnsName) { @@ -617,7 +617,7 @@ func (i *MysqlDriverImpl) generateUpdateRollbackSql(stmt *ast.UpdateStmt) (strin colChanged = true if isPk { isPkChanged = true - pkValue = util.ExprFormat(l.Expr) + pkValue = restore(l.Expr) } } } @@ -702,12 +702,12 @@ func (i *MysqlDriverImpl) generateGetRecordsSql(expr string, tableName *ast.Tabl recordSql = fmt.Sprintf("%s AS %s", recordSql, tableAlias) } if where != nil { - recordSql = fmt.Sprintf("%s WHERE %s", recordSql, util.ExprFormat(where)) + recordSql = fmt.Sprintf("%s WHERE %s", recordSql, restore(where)) } if order != nil { recordSql = fmt.Sprintf("%s ORDER BY", recordSql) for _, item := range order.Items { - recordSql = fmt.Sprintf("%s %s", recordSql, util.ExprFormat(item.Expr)) + recordSql = fmt.Sprintf("%s %s", recordSql, restore(item.Expr)) if item.Desc { recordSql = fmt.Sprintf("%s DESC", recordSql) } From bd8aec1323bd32def009241b22697ccac1ec81ae Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Tue, 3 Dec 2024 17:31:01 +0800 Subject: [PATCH 5/9] fix: panic when refer nil of executeSQL.BackupTask.ID --- sqle/server/sqled.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqle/server/sqled.go b/sqle/server/sqled.go index f067b4031..fc0db238c 100644 --- a/sqle/server/sqled.go +++ b/sqle/server/sqled.go @@ -486,13 +486,13 @@ func (a *action) backupAndExecSql() error { for _, executeSQL := range a.task.ExecuteSQLs { backupTask, err := toBackupTask(a.plugin, executeSQL, a.task.DBType) if err != nil { - return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) + return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, backupTask, a.task.ID) } if err = backupTask.Backup(); err != nil { - return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID) + return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, backupTask, a.task.ID) } if err := a.execSQL(executeSQL); err != nil { - return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup task: %v, task: %v", executeSQL, err, executeSQL.BackupTask.ID, a.task.ID) + return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup task: %v, task: %v", executeSQL, err, backupTask, a.task.ID) } } return nil From 06e1301e676dc704866fc5be0805f4a22aa9b13f Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Tue, 3 Dec 2024 17:32:06 +0800 Subject: [PATCH 6/9] modify: set connected true, so test case will not try to connect to server --- sqle/driver/mysql/audit_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sqle/driver/mysql/audit_test.go b/sqle/driver/mysql/audit_test.go index 84cdf8225..5f96772f0 100644 --- a/sqle/driver/mysql/audit_test.go +++ b/sqle/driver/mysql/audit_test.go @@ -114,7 +114,8 @@ func NewMockInspectWithIsExecutedSQL(e *executor.Executor) *MysqlDriverImpl { Password: "123456", DatabaseName: "mysql", }, - Ctx: session.NewMockContext(e), + isConnected: true, + Ctx: session.NewMockContext(e), cnf: &Config{ DDLOSCMinSize: 16, DDLGhostMinSize: 16, From 16acbd247a9b4ad313b5772455a97f9608c61309 Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Thu, 5 Dec 2024 14:01:17 +0800 Subject: [PATCH 7/9] refactor: backup task to backup manager --- sqle/server/backup.go | 4 ++-- sqle/server/backup_ce.go | 8 ++++---- sqle/server/sqled.go | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sqle/server/backup.go b/sqle/server/backup.go index b9d29c436..deed25dc8 100644 --- a/sqle/server/backup.go +++ b/sqle/server/backup.go @@ -2,8 +2,8 @@ package server import "github.com/actiontech/sqle/sqle/model" -type BackupTask interface { - backup() error +type backupHandler interface { + backup() (backupResult string, err error) } type BackupStrategy string diff --git a/sqle/server/backup_ce.go b/sqle/server/backup_ce.go index 18893b69d..dedb7af5d 100644 --- a/sqle/server/backup_ce.go +++ b/sqle/server/backup_ce.go @@ -18,9 +18,9 @@ func (BackupService) CheckIsDbTypeSupportEnableBackup(dbType string) error { return nil } -type BaseBackupTask struct{} +type BackupManager struct{} -func (t BaseBackupTask) Backup() error { +func (t BackupManager) Backup() error { return nil } @@ -28,8 +28,8 @@ func initModelBackupTask(p driver.Plugin, task *model.Task, sql *model.ExecuteSQ return &model.BackupTask{} } -func toBackupTask(p driver.Plugin, sql *model.ExecuteSQL, dbType string) (*BaseBackupTask, error) { - return &BaseBackupTask{}, nil +func getBackupManager(p driver.Plugin, sql *model.ExecuteSQL, dbType string) (*BackupManager, error) { + return &BackupManager{}, nil } func (BackupService) GetRollbackSqlsMap(taskId uint) (map[uint][]string, error) { diff --git a/sqle/server/sqled.go b/sqle/server/sqled.go index fc0db238c..43c0a0f13 100644 --- a/sqle/server/sqled.go +++ b/sqle/server/sqled.go @@ -484,15 +484,15 @@ backupAndExecSql() 备份与执行SQL: */ func (a *action) backupAndExecSql() error { for _, executeSQL := range a.task.ExecuteSQLs { - backupTask, err := toBackupTask(a.plugin, executeSQL, a.task.DBType) + backupMng, err := getBackupManager(a.plugin, executeSQL, a.task.DBType) if err != nil { - return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, backupTask, a.task.ID) + return fmt.Errorf("in backupAndExecSql when getBackupManager, err %w , task: %v", err, a.task.ID) } - if err = backupTask.Backup(); err != nil { - return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, backupTask, a.task.ID) + if err = backupMng.Backup(); err != nil { + return fmt.Errorf("in backupAndExecSql when backupMng Backup, err %w, backup manager: %v, task: %v", err, backupMng, a.task.ID) } if err := a.execSQL(executeSQL); err != nil { - return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup task: %v, task: %v", executeSQL, err, backupTask, a.task.ID) + return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup manager: %v, task: %v", executeSQL, err, backupMng, a.task.ID) } } return nil From 6f3fa394e4d6aa34d28c1bc6ffabb5b6ea237b09 Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Fri, 6 Dec 2024 10:12:37 +0800 Subject: [PATCH 8/9] rename: mng to mgr --- sqle/server/sqled.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqle/server/sqled.go b/sqle/server/sqled.go index 43c0a0f13..e004464e8 100644 --- a/sqle/server/sqled.go +++ b/sqle/server/sqled.go @@ -484,15 +484,15 @@ backupAndExecSql() 备份与执行SQL: */ func (a *action) backupAndExecSql() error { for _, executeSQL := range a.task.ExecuteSQLs { - backupMng, err := getBackupManager(a.plugin, executeSQL, a.task.DBType) + backupMgr, err := getBackupManager(a.plugin, executeSQL, a.task.DBType) if err != nil { return fmt.Errorf("in backupAndExecSql when getBackupManager, err %w , task: %v", err, a.task.ID) } - if err = backupMng.Backup(); err != nil { - return fmt.Errorf("in backupAndExecSql when backupMng Backup, err %w, backup manager: %v, task: %v", err, backupMng, a.task.ID) + if err = backupMgr.Backup(); err != nil { + return fmt.Errorf("in backupAndExecSql when backupMgr Backup, err %w, backup manager: %v, task: %v", err, backupMgr, a.task.ID) } if err := a.execSQL(executeSQL); err != nil { - return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup manager: %v, task: %v", executeSQL, err, backupMng, a.task.ID) + return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup manager: %v, task: %v", executeSQL, err, backupMgr, a.task.ID) } } return nil From a2e322e2e1da90555f152a57aef07674f5e46ae7 Mon Sep 17 00:00:00 2001 From: WinfredLIN Date: Fri, 6 Dec 2024 10:20:36 +0800 Subject: [PATCH 9/9] modify: move to ee --- sqle/server/backup.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sqle/server/backup.go b/sqle/server/backup.go index deed25dc8..7e96b79c3 100644 --- a/sqle/server/backup.go +++ b/sqle/server/backup.go @@ -2,10 +2,6 @@ package server import "github.com/actiontech/sqle/sqle/model" -type backupHandler interface { - backup() (backupResult string, err error) -} - type BackupStrategy string const (