Skip to content

Commit

Permalink
Merge pull request #2640 from actiontech/sql_version_manage
Browse files Browse the repository at this point in the history
Sql version manage
  • Loading branch information
ColdWaterLW authored Sep 27, 2024
2 parents 909a7f7 + 54eadec commit 02773af
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 44 deletions.
6 changes: 3 additions & 3 deletions sqle/api/controller/v2/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func ExecuteOneTaskOnWorkflowV2(c echo.Context) error {
return controller.JSONBaseErrorReq(c, fmt.Errorf("task has no need to be executed. taskId=%v workflowId=%v", taskId, workflow.WorkflowId))
}

err = server.ExecuteWorkflow(workflow, map[uint]string{uint(taskId): user.GetIDStr()})
_, err = server.ExecuteWorkflow(workflow, map[uint]string{uint(taskId): user.GetIDStr()})
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
Expand Down Expand Up @@ -742,7 +742,7 @@ func CreateWorkflowV2(c echo.Context) error {
}
}

err = s.CreateWorkflowV2(req.Subject, workflowId, req.Desc, user, tasks, stepTemplates, model.ProjectUID(projectUid), req.SqlVersionID, nil, func(tasks []*model.Task) (auditWorkflowUsers, canExecUser [][]*model.User) {
err = s.CreateWorkflowV2(req.Subject, workflowId, req.Desc, user, tasks, stepTemplates, model.ProjectUID(projectUid), req.SqlVersionID, nil, nil, func(tasks []*model.Task) (auditWorkflowUsers, canExecUser [][]*model.User) {
auditWorkflowUsers = make([][]*model.User, len(tasks))
executorWorkflowUsers := make([][]*model.User, len(tasks))
for i, task := range tasks {
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func ExecuteTasksOnWorkflowV2(c echo.Context) error {
return err
}

err = server.ExecuteTasksProcess(workflow.WorkflowId, projectUid, user)
_, err = server.ExecuteTasksProcess(workflow.WorkflowId, projectUid, user)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
Expand Down
22 changes: 9 additions & 13 deletions sqle/model/sql_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (

type SqlVersionStage struct {
Model
SqlVersionID uint `json:"sql_version_id" gorm:"not null"`
Name string `json:"name" gorm:"type:varchar(255) not null"`
StageSequence int `json:"stage_sequence" gorm:"type:int not null"`
SqlVersionID uint `json:"sql_version_id" gorm:"not null"`
Name string `json:"name" gorm:"type:varchar(255) not null"`
// stage_sequence标识版本阶段的顺序,是一段从1开始连续的int值
StageSequence int `json:"stage_sequence" gorm:"type:int not null"`

SqlVersionStagesDependency []*SqlVersionStagesDependency
WorkflowVersionStage []*WorkflowVersionStage
Expand All @@ -40,9 +41,11 @@ type SqlVersionStagesDependency struct {

type WorkflowVersionStage struct {
Model
WorkflowID string `json:"workflow_id" gorm:"not null"`
SqlVersionID uint `json:"sql_version_id"`
SqlVersionStageID uint `json:"sql_version_stage_id"`
WorkflowID string `json:"workflow_id" gorm:"not null"`
SqlVersionID uint `json:"sql_version_id"`
SqlVersionStageID uint `json:"sql_version_stage_id"`
// workflow_sequence标识工单所处阶段的顺序及占位,当工单不需要发布到下一阶段时(如工单关闭),下一阶段的workflow_sequence可能是不连续的
// 同一版本每个阶段之间的工单占位和顺序都相互对应,如:开发阶段发布到测试阶段,workflow_sequence为1的工单发布成功后workflow_sequence也为1
WorkflowSequence int `json:"workflow_sequence" gorm:"type:int"`
WorkflowReleaseStatus string `json:"workflow_release_status" gorm:"type:varchar(255) not null"`
WorkflowExecTime *time.Time `json:"workflow_exec_time" gorm:"type:datetime(3)"`
Expand All @@ -55,10 +58,3 @@ const (
WorkflowReleaseStatusHaveBeenReleased = "released"
WorkflowReleaseStatusNotNeedReleased = "not_need_release"
)

func (stage SqlVersionStage) InitialStatusOfWorkflow() string {
if len(stage.SqlVersionStagesDependency) > 0 && stage.SqlVersionStagesDependency[0].NextStageID == 0 {
return WorkflowReleaseStatusNotNeedReleased
}
return WorkflowReleaseStatusIsBingReleased
}
12 changes: 12 additions & 0 deletions sqle/model/sql_version_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//go:build !enterprise
// +build !enterprise

package model

func (s *Storage) UpdateStageWorkflowExecTimeIfNeed(workflowId string) error {
return nil
}

func (stage SqlVersionStage) InitialStatusOfWorkflow() string {
return ""
}
15 changes: 11 additions & 4 deletions sqle/model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"database/sql"
e "errors"
"fmt"
"github.com/actiontech/sqle/sqle/locale"
"github.com/nicksnyder/go-i18n/v2/i18n"
"strings"
"time"

"github.com/actiontech/sqle/sqle/locale"
"github.com/nicksnyder/go-i18n/v2/i18n"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
"github.com/actiontech/sqle/sqle/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -472,7 +473,7 @@ func (w *Workflow) GetNeedSendOATaskIds(entry *logrus.Entry) ([]uint, error) {
return taskIds, nil
}

func (s *Storage) CreateWorkflowV2(subject, workflowId, desc string, user *User, tasks []*Task, stepTemplates []*WorkflowStepTemplate, projectId ProjectUID, sqlVersionId, versionStageId *uint, getOpExecUser func([]*Task) (canAuditUsers [][]*User, canExecUsers [][]*User)) error {
func (s *Storage) CreateWorkflowV2(subject, workflowId, desc string, user *User, tasks []*Task, stepTemplates []*WorkflowStepTemplate, projectId ProjectUID, sqlVersionId, versionStageId *uint, workflowStageSequence *int, getOpExecUser func([]*Task) (canAuditUsers [][]*User, canExecUsers [][]*User)) error {
if len(tasks) <= 0 {
return errors.New(errors.DataConflict, fmt.Errorf("there is no task for creating workflow"))
}
Expand Down Expand Up @@ -619,10 +620,16 @@ func (s *Storage) CreateWorkflowV2(subject, workflowId, desc string, user *User,
WorkflowID: workflowId,
SqlVersionID: *sqlVersionId,
SqlVersionStageID: stage.ID,
WorkflowSequence: len(stage.WorkflowVersionStage) + 1,
WorkflowReleaseStatus: stage.InitialStatusOfWorkflow(),
}

if workflowStageSequence != nil {
// 当在版本中发布工单时,工单发布到下一阶段所在的占位由当前阶段决定
workflowVersionStageRelation.WorkflowSequence = *workflowStageSequence
} else {
// 当在版本中新建工单时,该工单的顺序为该阶段的最后一条工单
workflowVersionStageRelation.WorkflowSequence = len(stage.WorkflowVersionStage) + 1
}
err = tx.Create(workflowVersionStageRelation).Error
if err != nil {
tx.Rollback()
Expand Down
54 changes: 30 additions & 24 deletions sqle/server/workflow_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
if len(needExecuteTaskIds) == 0 {
entry.Warnf("workflow %s need to execute scheduled, but no task find", w.Subject)
}
err = ExecuteWorkflow(w, needExecuteTaskIds)
_, err = ExecuteWorkflow(w, needExecuteTaskIds)
if err != nil {
entry.Errorf("execute scheduled workflow %s error: %v", w.Subject, err)
} else {
Expand All @@ -64,41 +64,45 @@ func (j *WorkflowScheduleJob) WorkflowSchedule(entry *logrus.Entry) {
}
}

func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]string) error {
func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]string) (chan string, error) {
s := model.GetStorage()

l := log.NewEntry()
err := s.UpdateStageWorkflowExecTimeIfNeed(workflow.WorkflowId)
if err != nil {
l.Errorf("update workflow execute time for version stage error: %v", err)
}
// get task and check connection before to execute it.
for taskId := range needExecTaskIdToUserId {
taskId := fmt.Sprintf("%d", taskId)
task, exist, err := s.GetTaskDetailById(taskId)
if err != nil {
return err
return nil, err
}
if !exist {
return errors.New(errors.DataNotExist, fmt.Errorf("task is not exist. taskID=%v", taskId))
return nil, errors.New(errors.DataNotExist, fmt.Errorf("task is not exist. taskID=%v", taskId))
}
instance, exist, err := dms.GetInstancesById(context.Background(), fmt.Sprintf("%d", task.InstanceId))
if err != nil {
return err
return nil, err
}
if !exist {
return errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist. instanceId=%v", task.InstanceId))
return nil, errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist. instanceId=%v", task.InstanceId))
}
task.Instance = instance
if task.Instance == nil {
return errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist"))
return nil, errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist"))
}

// if instance is not connectable, exec sql must be failed;
// commit action unable to retry, so don't to exec it.
if err = common.CheckInstanceIsConnectable(task.Instance); err != nil {
return errors.New(errors.ConnectRemoteDatabaseError, err)
return nil, errors.New(errors.ConnectRemoteDatabaseError, err)
}
}

currentStep := workflow.CurrentStep()
if currentStep == nil {
return fmt.Errorf("workflow current step not found")
return nil, fmt.Errorf("workflow current step not found")
}

// update workflow
Expand Down Expand Up @@ -128,12 +132,11 @@ func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]s
operateStep = nil
}

err := s.UpdateWorkflowExecInstanceRecord(workflow, operateStep, needExecTaskRecords)
err = s.UpdateWorkflowExecInstanceRecord(workflow, operateStep, needExecTaskRecords)
if err != nil {
return err
return nil, err
}

l := log.NewEntry()
workflowStatusChan := make(chan string)
var lock sync.Mutex
for taskId := range needExecTaskIdToUserId {
id := taskId
Expand All @@ -143,7 +146,7 @@ func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]s

{ // NOTE: Update the workflow status before sending notifications to ensure that the notification content reflects the latest information.
lock.Lock()
updateStatus(s, workflow, l)
updateStatus(s, workflow, l, workflowStatusChan)
lock.Unlock()
}

Expand All @@ -156,10 +159,10 @@ func ExecuteWorkflow(workflow *model.Workflow, needExecTaskIdToUserId map[uint]s
}()
}

return nil
return workflowStatusChan, nil
}

func updateStatus(s *model.Storage, workflow *model.Workflow, l *logrus.Entry) {
func updateStatus(s *model.Storage, workflow *model.Workflow, l *logrus.Entry, workflowStatusChan chan string) {
tasks, err := s.GetTasksByWorkFlowRecordID(workflow.Record.ID)
if err != nil {
l.Errorf("get tasks by workflow record id error: %v", err)
Expand Down Expand Up @@ -207,6 +210,9 @@ func updateStatus(s *model.Storage, workflow *model.Workflow, l *logrus.Entry) {
if err != nil {
l.Errorf("update workflow record status failed: %v", err)
}
if workflowStatusChan != nil && workFlowStatus != model.WorkflowStatusExecuting {
workflowStatusChan <- workFlowStatus
}
}
}

Expand Down Expand Up @@ -258,28 +264,28 @@ func RejectWorkflowProcess(workflow *model.Workflow, reason string, user *model.
return nil
}

func ExecuteTasksProcess(workflowId string, projectUid string, user *model.User) error {
func ExecuteTasksProcess(workflowId string, projectUid string, user *model.User) (chan string, error) {
s := model.GetStorage()
workflow, err := dms.GetWorkflowDetailByWorkflowId(projectUid, workflowId, s.GetWorkflowDetailWithoutInstancesByWorkflowID)
if err != nil {
return err
return nil, err
}

if err = PrepareForWorkflowExecution(projectUid, workflow, user); err != nil {
return err
return nil, err
}

needExecTaskIds, err := GetNeedExecTaskIds(workflow, user)
if err != nil {
return err
return nil, err
}

err = ExecuteWorkflow(workflow, needExecTaskIds)
workflowExecResultChan, err := ExecuteWorkflow(workflow, needExecTaskIds)
if err != nil {
return err
return nil, err
}

return nil
return workflowExecResultChan, nil
}

func PrepareForWorkflowExecution(projectUid string, workflow *model.Workflow, user *model.User) error {
Expand Down

0 comments on commit 02773af

Please sign in to comment.