Skip to content

Commit

Permalink
Merge pull request #196 from actiontech/issue_190_achieve
Browse files Browse the repository at this point in the history
Issue 190 achieve
  • Loading branch information
sjjian authored Jan 7, 2022
2 parents d830c23 + 19435d5 commit 9f70a95
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 28 deletions.
1 change: 1 addition & 0 deletions sqle/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func StartApi(net *gracenet.Net, exitChan chan struct{}, config config.SqleConfi
v1Router.GET("/instances/:instance_name/schemas", v1.GetInstanceSchemas)
v1Router.GET("/instance_tips", v1.GetInstanceTips)
v1Router.GET("/instances/:instance_name/rules", v1.GetInstanceRules)
v1Router.GET("/instances/:instance_name/workflow_template", v1.GetInstanceWorkflowTemplate)

// rule template
v1Router.GET("/rule_templates", v1.GetRuleTemplates)
Expand Down
31 changes: 29 additions & 2 deletions sqle/api/controller/v1/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,33 @@ type GetInstanceWorkflowTemplateResV1 struct {
// @Param instance_name path string true "instance name"
// @Success 200 {object} v1.GetInstanceWorkflowTemplateResV1
// @router /v1/instances/{instance_name}/workflow_template [get]
func GetInstanceWorkflowTemplate(c *echo.Context) error {
return nil
func GetInstanceWorkflowTemplate(c echo.Context) error {
s := model.GetStorage()
instanceName := c.Param("instance_name")
instance, exist, err := s.GetInstanceByName(instanceName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataNotExist, fmt.Errorf("instance is not exist")))
}

template, exist, err := s.GetWorkflowTemplateById(instance.WorkflowTemplateId)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataNotExist,
fmt.Errorf("the instance is not bound workflow template")))
}

res, err := getWorkflowTemplateDetailByTemplate(template)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

return c.JSON(http.StatusOK, &GetInstanceWorkflowTemplateResV1{
BaseRes: controller.NewBaseReq(nil),
Data: res,
})
}
2 changes: 2 additions & 0 deletions sqle/api/controller/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func convertTaskToRes(task *model.Task) *AuditTaskResV1 {
Id: task.ID,
InstanceName: task.InstanceName(),
InstanceSchema: task.Schema,
AuditLevel: task.AuditLevel,
PassRate: task.PassRate,
Status: task.Status,
SQLSource: task.SQLSource,
Expand Down Expand Up @@ -315,6 +316,7 @@ func GetTaskSQLs(c echo.Context) error {
"task_id": taskId,
"filter_exec_status": req.FilterExecStatus,
"filter_audit_status": req.FilterAuditStatus,
"filter_audit_level": req.FilterAuditLevel,
"no_duplicate": req.NoDuplicate,
"limit": req.PageSize,
"offset": offset,
Expand Down
88 changes: 70 additions & 18 deletions sqle/api/controller/v1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"fmt"
"github.com/actiontech/sqle/sqle/driver"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -29,7 +30,7 @@ type GetWorkflowTemplateResV1 struct {
type WorkflowTemplateDetailResV1 struct {
Name string `json:"workflow_template_name"`
Desc string `json:"desc,omitempty"`
AllowSubmitWhenLessAuditLevel *string `json:"allow_submit_when_less_audit_level" enums:"normal,notice,warn,error"`
AllowSubmitWhenLessAuditLevel string `json:"allow_submit_when_less_audit_level" enums:"normal,notice,warn,error"`
Steps []*WorkFlowStepTemplateResV1 `json:"workflow_step_template_list"`
Instances []string `json:"instance_name_list,omitempty"`
}
Expand Down Expand Up @@ -60,14 +61,27 @@ func GetWorkflowTemplate(c echo.Context) error {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataNotExist,
fmt.Errorf("workflow template is not exist")))
}
steps, err := s.GetWorkflowStepsDetailByTemplateId(template.ID)
res, err := getWorkflowTemplateDetailByTemplate(template)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
return c.JSON(http.StatusOK, &GetWorkflowTemplateResV1{
BaseRes: controller.NewBaseReq(nil),
Data: res,
})
}

func getWorkflowTemplateDetailByTemplate(template *model.WorkflowTemplate) (*WorkflowTemplateDetailResV1, error) {
s := model.GetStorage()
steps, err := s.GetWorkflowStepsDetailByTemplateId(template.ID)
if err != nil {
return nil, err
}
template.Steps = steps
res := &WorkflowTemplateDetailResV1{
Name: template.Name,
Desc: template.Desc,
Name: template.Name,
Desc: template.Desc,
AllowSubmitWhenLessAuditLevel: template.AllowSubmitWhenLessAuditLevel,
}
stepsRes := make([]*WorkFlowStepTemplateResV1, 0, len(steps))
for _, step := range steps {
Expand All @@ -89,20 +103,16 @@ func GetWorkflowTemplate(c echo.Context) error {

instanceNames, err := s.GetInstanceNamesByWorkflowTemplateId(template.ID)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
return nil, err
}
res.Instances = instanceNames

return c.JSON(http.StatusOK, &GetWorkflowTemplateResV1{
BaseRes: controller.NewBaseReq(nil),
Data: res,
})
return res, nil
}

type CreateWorkflowTemplateReqV1 struct {
Name string `json:"workflow_template_name" form:"workflow_template_name" valid:"required,name"`
Desc string `json:"desc" form:"desc"`
AllowSubmitWhenLessAuditLevel *string `json:"allow_submit_when_less_audit_level" enums:"normal,notice,warn,error"`
AllowSubmitWhenLessAuditLevel string `json:"allow_submit_when_less_audit_level" enums:"normal,notice,warn,error"`
Steps []*WorkFlowStepTemplateReqV1 `json:"workflow_step_template_list" form:"workflow_step_template_list" valid:"required,dive,required"`
Instances []string `json:"instance_name_list" form:"instance_name_list"`
}
Expand Down Expand Up @@ -185,10 +195,14 @@ func CreateWorkflowTemplate(c echo.Context) error {
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

allowSubmitWhenLessAuditLevel := string(driver.RuleLevelWarn)
if req.AllowSubmitWhenLessAuditLevel != "" {
allowSubmitWhenLessAuditLevel = req.AllowSubmitWhenLessAuditLevel
}
workflowTemplate := &model.WorkflowTemplate{
Name: req.Name,
Desc: req.Desc,
Name: req.Name,
Desc: req.Desc,
AllowSubmitWhenLessAuditLevel: allowSubmitWhenLessAuditLevel,
}
steps := make([]*model.WorkflowStepTemplate, 0, len(req.Steps))
for i, step := range req.Steps {
Expand Down Expand Up @@ -297,12 +311,18 @@ func UpdateWorkflowTemplate(c echo.Context) error {
return controller.JSONBaseErrorReq(c, err)
}
}

if req.Desc != nil {
workflowTemplate.Desc = *req.Desc
err = s.Save(workflowTemplate)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
}

if req.AllowSubmitWhenLessAuditLevel != nil {
workflowTemplate.AllowSubmitWhenLessAuditLevel = *req.AllowSubmitWhenLessAuditLevel
}

err = s.Save(workflowTemplate)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

if req.Instances != nil {
Expand Down Expand Up @@ -512,6 +532,12 @@ func CreateWorkflow(c echo.Context) error {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataNotExist,
fmt.Errorf("the task instance is not bound workflow template")))
}

err = checkWorkflowCanCommit(template, task)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

stepTemplates, err := s.GetWorkflowStepsByTemplateId(template.ID)
if err != nil {
return err
Expand All @@ -535,6 +561,18 @@ func CreateWorkflow(c echo.Context) error {
return c.JSON(http.StatusOK, controller.NewBaseReq(nil))
}

func checkWorkflowCanCommit(template *model.WorkflowTemplate, task *model.Task) error {
allowLevel := driver.RuleLevelError
if template.AllowSubmitWhenLessAuditLevel != "" {
allowLevel = driver.RuleLevel(template.AllowSubmitWhenLessAuditLevel)
}
if driver.RuleLevel(task.AuditLevel).More(allowLevel) {
return errors.New(errors.DataInvalid,
fmt.Errorf("there is an audit result with an error level higher than the allowable submission level(%v), please modify it before submitting", allowLevel))
}
return nil
}

type GetWorkflowResV1 struct {
controller.BaseRes
Data *WorkflowResV1 `json:"data"`
Expand Down Expand Up @@ -1250,6 +1288,20 @@ func UpdateWorkflow(c echo.Context) error {
fmt.Errorf("you are not allow to operate the workflow")))
}

template, exist, err := s.GetWorkflowTemplateById(task.Instance.WorkflowTemplateId)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataConflict,
fmt.Errorf("failed to find the corresponding workflow template based on the task id")))
}

err = checkWorkflowCanCommit(template, task)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

err = s.UpdateWorkflowRecord(workflow, task)
if err != nil {
return c.JSON(http.StatusOK, controller.NewBaseReq(err))
Expand Down
8 changes: 8 additions & 0 deletions sqle/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ var ruleLevelMap = map[RuleLevel]int{
RuleLevelError: 3,
}

func (r RuleLevel) LessOrEqual(l RuleLevel) bool {
return ruleLevelMap[r] <= ruleLevelMap[l]
}

func (r RuleLevel) More(l RuleLevel) bool {
return ruleLevelMap[r] > ruleLevelMap[l]
}

type RuleParamType string

const (
Expand Down
5 changes: 5 additions & 0 deletions sqle/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Task struct {
InstanceId uint `json:"instance_id"`
Schema string `json:"instance_schema" gorm:"column:instance_schema" example:"db1"`
PassRate float64 `json:"pass_rate"`
AuditLevel string `json:"audit_level"`
SQLSource string `json:"sql_source" gorm:"column:sql_source"`
DBType string `json:"db_type" gorm:"default:'mysql'" example:"mysql"`
Status string `json:"status" gorm:"default:\"initialized\""`
Expand Down Expand Up @@ -373,6 +374,10 @@ AND e_sql.exec_status = :filter_exec_status
AND e_sql.audit_status = :filter_audit_status
{{- end }}
{{- if .filter_audit_level}}
AND e_sql.audit_level = :filter_audit_level
{{- end}}
{{- if .no_duplicate }}
AND e_sql.id IN (
SELECT SQL_BIG_RESULT MIN(id) AS id FROM execute_sql_detail WHERE task_id = :task_id
Expand Down
9 changes: 5 additions & 4 deletions sqle/model/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

type WorkflowTemplate struct {
Model
Name string
Desc string
Name string
Desc string
AllowSubmitWhenLessAuditLevel string

Steps []*WorkflowStepTemplate `json:"-" gorm:"foreignkey:workflowTemplateId"`
Instances []*Instance `gorm:"foreignkey:WorkflowTemplateId"`
Expand Down Expand Up @@ -78,8 +79,8 @@ func (s *Storage) GetWorkflowStepsDetailByTemplateId(id uint) ([]*WorkflowStepTe

func (s *Storage) SaveWorkflowTemplate(template *WorkflowTemplate) error {
return s.TxExec(func(tx *sql.Tx) error {
result, err := tx.Exec("INSERT INTO workflow_templates (name, `desc`) values (?, ?)",
template.Name, template.Desc)
result, err := tx.Exec("INSERT INTO workflow_templates (name, `desc`, `allow_submit_when_less_audit_level`) values (?, ?, ?)",
template.Name, template.Desc, template.AllowSubmitWhenLessAuditLevel)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion sqle/model/workflow_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type WorkflowListDetail struct {
CurrentStepType sql.NullString `json:"current_step_type" enums:"sql_review,sql_execute"`
CurrentStepAssigneeUser RowList `json:"current_step_assignee_user_name_list"`
Status string `json:"status"`
ScheduleTime *time.Time `json:"schedule_time"`
ScheduleTime *time.Time `json:"schedule_time"`
}

var workflowsQueryTpl = `SELECT w.id AS workflow_id, w.subject, w.desc, wr.status,
Expand Down
10 changes: 8 additions & 2 deletions sqle/server/sqled.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,23 @@ func (a *action) audit() (err error) {
}

var normalCount float64
maxAuditLevel := driver.RuleLevelNormal
for _, executeSQL := range task.ExecuteSQLs {
if executeSQL.AuditLevel == string(driver.RuleLevelNormal) {
normalCount += 1
}
if driver.RuleLevel(executeSQL.AuditLevel).More(maxAuditLevel) {
maxAuditLevel = driver.RuleLevel(executeSQL.AuditLevel)
}
}
task.PassRate = utils.Round(normalCount/float64(len(task.ExecuteSQLs)), 4)
task.AuditLevel = string(maxAuditLevel)

task.Status = model.TaskStatusAudited
if err = st.UpdateTask(task, map[string]interface{}{
"pass_rate": task.PassRate,
"status": task.Status,
"pass_rate": task.PassRate,
"audit_level": task.AuditLevel,
"status": task.Status,
}); err != nil {
a.entry.Errorf("update task error:%v", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion sqle/server/sqled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Test_action_audit_UpdateTask(t *testing.T) {

mock.ExpectBegin()
mock.ExpectExec(regexp.QuoteMeta("UPDATE `tasks`")).
WithArgs(float64(1), model.TaskStatusAudited, act.task.ID).
WithArgs(driver.RuleLevelNormal, float64(1), model.TaskStatusAudited, act.task.ID).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand Down

0 comments on commit 9f70a95

Please sign in to comment.