Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support recommended backup strategies and row backups #2786

Merged
merged 14 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqle/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func StartApi(net *gracenet.Net, exitChan chan struct{}, config *config.SqleOpti
v1ProjectOpRouter.PATCH("/:project_name/workflows/:workflow_name/", DeprecatedBy(apiV2))
v1ProjectOpRouter.POST("/:project_name/workflows/:workflow_id/tasks/:task_id/order_file", v1.UpdateSqlFileOrderByWorkflowV1)
v1ProjectOpRouter.GET("/:project_name/workflows/:workflow_id/backup_sqls", v1.GetBackupSqlList)
v1ProjectOpRouter.POST("/:project_name/workflows/:workflow_id/rollback_workflows", v1.CreateRollbackWorkflow)
v1ProjectOpRouter.POST("/:project_name/workflows/:workflow_id/create_rollback_workflow", v1.CreateRollbackWorkflow)

// sql version
v1ProjectOpRouter.POST("/:project_name/sql_versions/:sql_version_id/batch_release_workflows", v1.BatchReleaseWorkflows)
Expand Down
34 changes: 0 additions & 34 deletions sqle/api/controller/v1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,37 +102,3 @@ type BackupSqlListRes struct {
func GetBackupSqlList(c echo.Context) error {
return getBackupSqlList(c)
}

type CreateRollbackWorkflowReq struct {
Subject string `json:"workflow_subject" form:"workflow_subject" valid:"required,name"`
Desc string `json:"desc" form:"desc"`
SqlVersionID *uint `json:"sql_version_id" form:"sql_version_id"`
TaskIds []uint `json:"task_ids" form:"task_ids" valid:"required"`
RollbackSqlIds []uint `json:"rollback_sql_ids" form:"rollback_sql_ids" valid:"required"`
}

type CreateRollbackWorkflowRes struct {
controller.BaseRes
Data *CreateRollbackWorkflowResData `json:"data"`
}

type CreateRollbackWorkflowResData struct {
WorkflowID string `json:"workflow_id"`
}

// CreateRollbackWorkflow
// @Summary 创建回滚工单
// @Description create rollback workflow
// @Accept json
// @Produce json
// @Tags workflow
// @Id CreateRollbackWorkflow
// @Security ApiKeyAuth
// @Param instance body v1.CreateRollbackWorkflowReq true "create rollback workflow request"
// @Param project_name path string true "project name"
// @Param workflow_id path string true "origin workflow id to rollback"
// @Success 200 {object} CreateRollbackWorkflowRes
// @router /v1/projects/{project_name}/workflows/{workflow_id}/rollback_workflows [post]
func CreateRollbackWorkflow(c echo.Context) error {
return createRollbackWorkflow(c)
}
2 changes: 1 addition & 1 deletion sqle/api/controller/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type AuditTaskResV1 struct {
FileOrderMethod string `json:"file_order_method,omitempty"`
ExecMode string `json:"exec_mode,omitempty"`
EnableBackup bool `json:"enable_backup"`
BackupConflictWithInstance bool `json:"backup_conflict_with_instance"`
BackupConflictWithInstance bool `json:"backup_conflict_with_instance"` // 当数据源备份开启,工单备份关闭,则需要提示审核人工单备份策略与数据源备份策略不一致
AuditFiles []AuditFileResp `json:"audit_files,omitempty"`
}

Expand Down
34 changes: 34 additions & 0 deletions sqle/api/controller/v1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,3 +1669,37 @@ func GetWorkflowTaskAuditFile(c echo.Context) error {
}
return c.NoContent(http.StatusOK)
}

type CreateRollbackWorkflowReq struct {
Subject string `json:"workflow_subject" form:"workflow_subject" valid:"required,name"`
Desc string `json:"desc" form:"desc"`
SqlVersionID *uint `json:"sql_version_id" form:"sql_version_id"`
TaskIds []uint `json:"task_ids" form:"task_ids" valid:"required"`
RollbackSqlIds []uint `json:"rollback_sql_ids" form:"rollback_sql_ids" valid:"required"`
}

type CreateRollbackWorkflowRes struct {
controller.BaseRes
Data *CreateRollbackWorkflowResData `json:"data"`
}

type CreateRollbackWorkflowResData struct {
WorkflowID string `json:"workflow_id"`
}

// CreateRollbackWorkflow
// @Summary 创建回滚工单
// @Description create rollback workflow
// @Accept json
// @Produce json
// @Tags workflow
// @Id CreateRollbackWorkflow
// @Security ApiKeyAuth
// @Param instance body v1.CreateRollbackWorkflowReq true "create rollback workflow request"
// @Param project_name path string true "project name"
// @Param workflow_id path string true "origin workflow id to rollback"
// @Success 200 {object} CreateRollbackWorkflowRes
// @router /v1/projects/{project_name}/workflows/{workflow_id}/create_rollback_workflow [post]
func CreateRollbackWorkflow(c echo.Context) error {
return createRollbackWorkflow(c)
}
3 changes: 2 additions & 1 deletion sqle/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7197,7 +7197,7 @@ var doc = `{
}
}
},
"/v1/projects/{project_name}/workflows/{workflow_id}/rollback_workflows": {
"/v1/projects/{project_name}/workflows/{workflow_id}/create_rollback_workflow": {
"post": {
"security": [
{
Expand Down Expand Up @@ -11639,6 +11639,7 @@ var doc = `{
]
},
"backup_conflict_with_instance": {
"description": "当数据源备份开启,工单备份关闭,则需要提示审核人工单备份策略与数据源备份策略不一致",
"type": "boolean"
},
"enable_backup": {
Expand Down
3 changes: 2 additions & 1 deletion sqle/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -7181,7 +7181,7 @@
}
}
},
"/v1/projects/{project_name}/workflows/{workflow_id}/rollback_workflows": {
"/v1/projects/{project_name}/workflows/{workflow_id}/create_rollback_workflow": {
"post": {
"security": [
{
Expand Down Expand Up @@ -11623,6 +11623,7 @@
]
},
"backup_conflict_with_instance": {
"description": "当数据源备份开启,工单备份关闭,则需要提示审核人工单备份策略与数据源备份策略不一致",
"type": "boolean"
},
"enable_backup": {
Expand Down
3 changes: 2 additions & 1 deletion sqle/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ definitions:
- ""
type: string
backup_conflict_with_instance:
description: 当数据源备份开启,工单备份关闭,则需要提示审核人工单备份策略与数据源备份策略不一致
type: boolean
enable_backup:
type: boolean
Expand Down Expand Up @@ -10524,7 +10525,7 @@ paths:
summary: 获取工单下所有回滚SQL的列表
tags:
- workflow
/v1/projects/{project_name}/workflows/{workflow_id}/rollback_workflows:
/v1/projects/{project_name}/workflows/{workflow_id}/create_rollback_workflow:
post:
consumes:
- application/json
Expand Down
21 changes: 21 additions & 0 deletions sqle/driver/mysql/backup_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//go:build !enterprise
// +build !enterprise

package mysql

import (
"context"
"fmt"

"github.com/actiontech/sqle/sqle/driver"
)

var ErrUnsupportedBackup error = fmt.Errorf("backup is unsupported for sqle community version")

func (i *MysqlDriverImpl) Backup(ctx context.Context, backupStrategy string, sql string) (BackupSql []string, ExecuteInfo string, err error) {
return nil, "", ErrUnsupportedBackup
}

func (i *MysqlDriverImpl) RecommendBackupStrategy(ctx context.Context, sql string) (*driver.RecommendBackupStrategyRes, error) {
return nil, ErrUnsupportedBackup
}
11 changes: 4 additions & 7 deletions sqle/driver/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,10 @@ func (inspect *MysqlDriverImpl) applyConfig(cfg *driverV2.Config) {
inspect.isOfflineAudit = cfg.DSN == nil

inspect.cnf = &Config{
DMLRollbackMaxRows: 1000,// TODO 暂时将备份影响行数上限设置为1000,后续需要将对应该配置项的规则移除。备份影响行数上限设置将会在备份任务中设置。
DDLOSCMinSize: -1,
DDLGhostMinSize: -1,
}
for _, rule := range cfg.Rules {
if rule.Name == rulepkg.ConfigDMLRollbackMaxRows {
max := rule.Params.GetParam(rulepkg.DefaultSingleParamKeyName).Int()
inspect.cnf.DMLRollbackMaxRows = int64(max)
}
if rule.Name == rulepkg.ConfigDDLOSCMinSize {
min := rule.Params.GetParam(rulepkg.DefaultSingleParamKeyName).Int()
inspect.cnf.DDLOSCMinSize = int64(min)
Expand Down Expand Up @@ -631,7 +626,7 @@ func (p *PluginProcessor) GetDriverMetas() (*driverV2.DriverMetas, error) {
for i := range rulepkg.RuleHandlers {
allRules[i] = &rulepkg.RuleHandlers[i].Rule
}
return &driverV2.DriverMetas{
metas := &driverV2.DriverMetas{
PluginName: driverV2.DriverTypeMySQL,
DatabaseDefaultPort: 3306,
Logo: logo,
Expand All @@ -648,7 +643,9 @@ func (p *PluginProcessor) GetDriverMetas() (*driverV2.DriverMetas, error) {
driverV2.OptionalExecBatch,
driverV2.OptionalModuleI18n,
},
}, nil
}
addOptionModules(metas)
return metas, nil
}

func (p *PluginProcessor) Open(l *logrus.Entry, cfg *driverV2.Config) (driver.Plugin, error) {
Expand Down
2 changes: 2 additions & 0 deletions sqle/driver/mysql/mysql_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ func (i *MysqlDriverImpl) GetDatabaseDiffModifySQL(ctx context.Context, calibrat
func (i *MysqlDriverImpl) GetDatabaseObjectDDL(ctx context.Context, objInfos []*driverV2.DatabasSchemaInfo) ([]*driverV2.DatabaseSchemaObjectResult, error) {
return nil, fmt.Errorf("only support Query in enterprise edition")
}

func addOptionModules(metas *driverV2.DriverMetas) {}
winfredLIN marked this conversation as resolved.
Show resolved Hide resolved
1 change: 0 additions & 1 deletion sqle/driver/mysql/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ const (

// inspector config code
const (
ConfigDMLRollbackMaxRows = "dml_rollback_max_rows"
ConfigDDLOSCMinSize = "ddl_osc_min_size"
ConfigDDLGhostMinSize = "ddl_ghost_min_size"
ConfigOptimizeIndexEnabled = "optimize_index_enabled"
Expand Down
19 changes: 0 additions & 19 deletions sqle/driver/mysql/rule/rule_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,6 @@ var sourceRuleHandlers = []*SourceHandler{
AllowOffline: false,
Func: notAllowInsertAutoincrement,
},
{
Rule: SourceRule{
Name: ConfigDMLRollbackMaxRows,
Desc: plocale.ConfigDMLRollbackMaxRowsDesc,
Annotation: plocale.ConfigDMLRollbackMaxRowsAnnotation,
//Value: "1000",
Level: driverV2.RuleLevelNotice,
Category: plocale.RuleTypeGlobalConfig,
Params: []*SourceParam{
{
Key: DefaultSingleParamKeyName,
Value: "1000",
Desc: plocale.ConfigDMLRollbackMaxRowsParams1,
Type: params.ParamTypeInt,
},
},
},
Func: nil,
},
{
Rule: SourceRule{
Name: ConfigDDLOSCMinSize,
Expand Down
8 changes: 8 additions & 0 deletions sqle/driver/plugin_adapter_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ type PluginImplV1 struct {
driverV1.DriverManager
}

func (d *PluginImplV1) Backup(ctx context.Context, backupStrategy string, sql string) (backupSql []string, executeInfo string, err error) {
return nil, "", nil
}

func (d *PluginImplV1) RecommendBackupStrategy(ctx context.Context, sql string) (*RecommendBackupStrategyRes, error) {
return nil, nil
}

func (p *PluginImplV1) Close(ctx context.Context) {
p.DriverManager.Close(ctx)
}
Expand Down
48 changes: 48 additions & 0 deletions sqle/driver/plugin_adapter_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,54 @@ type PluginImplV2 struct {
meta *driverV2.DriverMetas
}

func (s *PluginImplV2) Backup(ctx context.Context, backupStrategy string, sql string) (BackupSql []string, ExecuteInfo string, err error) {
api := "Backup"
s.preLog(api)
var strategy protoV2.BackupStrategy
switch backupStrategy {
case driverV2.BackupStrategyReverseSql:
strategy = protoV2.BackupStrategy_ReverseSql
case driverV2.BackupStrategyNone:
strategy = protoV2.BackupStrategy_None
case driverV2.BackupStrategyOriginalRow:
strategy = protoV2.BackupStrategy_OriginalRow
case driverV2.BackupStrategyManually:
strategy = protoV2.BackupStrategy_Manually
default:
return []string{}, "", fmt.Errorf("unsupported strategy %v", backupStrategy)
}
resp, err := s.client.Backup(ctx, &protoV2.BackupReq{
Session: s.Session,
BackupStrategy: strategy,
Sql: sql,
})
s.afterLog(api, err)
if err != nil {
return nil, "", err
}
return resp.GetBackupSql(), resp.GetExecuteInfo(), nil
}

func (p *PluginImplV2) RecommendBackupStrategy(ctx context.Context, sql string) (*RecommendBackupStrategyRes, error) {
api := "RecommendBackupStrategy"
p.preLog(api)

resp, err := p.client.RecommendBackupStrategy(ctx, &protoV2.RecommendBackupStrategyReq{
Session: p.Session,
Sql: sql,
})
p.afterLog(api, err)
if err != nil {
return nil, err
}
return &RecommendBackupStrategyRes{
BackupStrategy: resp.BackupStrategy.String(),
BackupStrategyTip: resp.BackupStrategyTip,
TablesRefer: resp.TablesRefer,
SchemasRefer: resp.SchemasRefer,
}, nil
}

func (s *PluginImplV2) preLog(ApiName string) {
s.l.Infof("starting call plugin interface [%s]", ApiName)
}
Expand Down
11 changes: 11 additions & 0 deletions sqle/driver/plugin_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ type Plugin interface {
GetDatabaseObjectDDL(ctx context.Context, objInfos []*driverV2.DatabasSchemaInfo) ([]*driverV2.DatabaseSchemaObjectResult, error)

GetDatabaseDiffModifySQL(ctx context.Context, calibratedDSN *driverV2.DSN, objInfos []*driverV2.DatabasCompareSchemaInfo) ([]*driverV2.DatabaseDiffModifySQLResult, error)

Backup(ctx context.Context, backupStrategy string, sql string) (BackupSql []string, ExecuteInfo string, err error)

RecommendBackupStrategy(ctx context.Context, sql string) (*RecommendBackupStrategyRes, error)
}

type RecommendBackupStrategyRes struct {
BackupStrategy string
BackupStrategyTip string
TablesRefer []string
SchemasRefer []string
}

type PluginProcessor interface {
Expand Down
51 changes: 51 additions & 0 deletions sqle/driver/v2/driver_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,57 @@ func (d *DriverGrpcServer) getDriverBySession(session *protoV2.Session) (Driver,
return nil, fmt.Errorf("session %s not found", session.Id)
}
return driver, nil

}

func (d *DriverGrpcServer) Backup(ctx context.Context, req *protoV2.BackupReq) (*protoV2.BackupRes, error) {
driver, err := d.getDriverBySession(req.Session)
if err != nil {
return &protoV2.BackupRes{}, err
}
res, err := driver.Backup(ctx, &BackupReq{
BackupStrategy: req.BackupStrategy.String(),
Sql: req.Sql,
})
if err != nil {
return nil, errors.Wrap(err, "backup")
}
return &protoV2.BackupRes{
BackupSql: res.BackupSql,
ExecuteInfo: res.ExecuteInfo,
}, nil
}

func (d *DriverGrpcServer) RecommendBackupStrategy(ctx context.Context, req *protoV2.RecommendBackupStrategyReq) (*protoV2.RecommendBackupStrategyRes, error) {
driver, err := d.getDriverBySession(req.Session)
if err != nil {
return &protoV2.RecommendBackupStrategyRes{}, err
}
res, err := driver.RecommendBackupStrategy(ctx, &RecommendBackupStrategyReq{
Sql: req.Sql,
})
if err != nil {
return &protoV2.RecommendBackupStrategyRes{}, errors.Wrap(err, "backup")
}
var backupStrategyProtoV2 protoV2.BackupStrategy
switch res.BackupStrategy {
case BackupStrategyReverseSql:
backupStrategyProtoV2 = protoV2.BackupStrategy_ReverseSql
case BackupStrategyManually:
backupStrategyProtoV2 = protoV2.BackupStrategy_Manually
case BackupStrategyNone:
backupStrategyProtoV2 = protoV2.BackupStrategy_None
case BackupStrategyOriginalRow:
backupStrategyProtoV2 = protoV2.BackupStrategy_OriginalRow
default:
return nil, fmt.Errorf("unsupported strategy %v", res.BackupStrategy)
}
return &protoV2.RecommendBackupStrategyRes{
BackupStrategy: backupStrategyProtoV2,
BackupStrategyTip: res.BackupStrategyTip,
TablesRefer: res.TablesRefer,
SchemasRefer: res.SchemasRefer,
}, nil
}

func (d *DriverGrpcServer) Metas(ctx context.Context, req *protoV2.Empty) (*protoV2.MetasResponse, error) {
Expand Down
Loading