Skip to content

Commit

Permalink
Merge pull request #2113 from actiontech/issue-1229-4
Browse files Browse the repository at this point in the history
Revert "Revert "chore:move unused field""
  • Loading branch information
ColdWaterLW authored Dec 7, 2023
2 parents be67d04 + 06d346f commit 05e0469
Show file tree
Hide file tree
Showing 11 changed files with 698 additions and 43 deletions.
2 changes: 2 additions & 0 deletions sqle/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ func StartApi(net *gracenet.Net, exitChan chan struct{}, config config.SqleConfi

v1Router.GET("/projects/:project_name/audit_plans/:audit_plan_name/sqls", v1.GetAuditPlanSQLs)
v1Router.POST("/projects/:project_name/audit_plans/:audit_plan_name/sqls/full", v1.FullSyncAuditPlanSQLs, sqleMiddleware.ScannerVerifier())
v2Router.POST("/projects/:project_name/audit_plans/:audit_plan_name/sqls/full", v2.FullSyncAuditPlanSQLs, sqleMiddleware.ScannerVerifier())
v1Router.POST("/projects/:project_name/audit_plans/:audit_plan_name/sqls/partial", v1.PartialSyncAuditPlanSQLs, sqleMiddleware.ScannerVerifier())
v2Router.POST("/projects/:project_name/audit_plans/:audit_plan_name/sqls/partial", v2.PartialSyncAuditPlanSQLs, sqleMiddleware.ScannerVerifier())
v1Router.POST("/projects/:project_name/audit_plans/:audit_plan_name/trigger", v1.TriggerAuditPlan)
v1Router.PATCH("/projects/:project_name/audit_plans/:audit_plan_name/notify_config", v1.UpdateAuditPlanNotifyConfig)
v1Router.GET("/projects/:project_name/audit_plans/:audit_plan_name/notify_config", v1.GetAuditPlanNotifyConfig)
Expand Down
51 changes: 26 additions & 25 deletions sqle/api/controller/v1/audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ type AuditPlanSQLReqV1 struct {
Endpoint string `json:"endpoint" from:"endpoint" example:"10.186.1.2"`
}

// todo: 后续该接口会废弃
// @Deprecated
// @Summary 全量同步SQL到扫描任务
// @Description full sync audit plan SQLs
// @Id fullSyncAuditPlanSQLsV1
Expand All @@ -845,21 +847,10 @@ func FullSyncAuditPlanSQLs(c echo.Context) error {
apName := c.Param("audit_plan_name")

s := model.GetStorage()
archived, err := s.IsProjectArchived(projectName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if archived {
return controller.JSONBaseErrorReq(c, ErrProjectArchived)
}

ap, exist, err := s.GetAuditPlanFromProjectByName(projectName, apName)
ap, err := CheckProjectAndAuditPlan(s, projectName, apName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, errAuditPlanNotExist)
}

l := log.NewEntry()
reqSQLs := req.SQLs
Expand All @@ -882,6 +873,8 @@ type PartialSyncAuditPlanSQLsReqV1 struct {
SQLs []*AuditPlanSQLReqV1 `json:"audit_plan_sql_list" form:"audit_plan_sql_list" valid:"dive"`
}

// todo: 后续该接口会废弃
// @Deprecated
// @Summary 增量同步SQL到扫描任务
// @Description partial sync audit plan SQLs
// @Id partialSyncAuditPlanSQLsV1
Expand All @@ -901,21 +894,10 @@ func PartialSyncAuditPlanSQLs(c echo.Context) error {
apName := c.Param("audit_plan_name")

s := model.GetStorage()
archived, err := s.IsProjectArchived(projectName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if archived {
return controller.JSONBaseErrorReq(c, ErrProjectArchived)
}

ap, exist, err := s.GetAuditPlanFromProjectByName(projectName, apName)
ap, err := CheckProjectAndAuditPlan(s, projectName, apName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, errAuditPlanNotExist)
}

l := log.NewEntry()
reqSQLs := req.SQLs
Expand All @@ -933,6 +915,26 @@ func PartialSyncAuditPlanSQLs(c echo.Context) error {
return controller.JSONBaseErrorReq(c, auditplan.UploadSQLs(l, ap, sqls, true))
}

func CheckProjectAndAuditPlan(s *model.Storage, projectName string, apName string) (*model.AuditPlan, error) {
archived, err := s.IsProjectArchived(projectName)
if err != nil {
return nil, err
}
if archived {
return nil, ErrProjectArchived
}

ap, exist, err := s.GetAuditPlanFromProjectByName(projectName, apName)
if err != nil {
return nil, err
}
if !exist {
return nil, errors.NewAuditPlanNotExistErr()
}

return ap, nil
}

func convertToModelAuditPlanSQL(c echo.Context, auditPlan *model.AuditPlan, reqSQLs []*AuditPlanSQLReqV1) ([]*auditplan.SQL, error) {
var p driver.Plugin
var err error
Expand Down Expand Up @@ -1006,7 +1008,6 @@ func convertToModelAuditPlanSQL(c echo.Context, auditPlan *model.AuditPlan, reqS
SQLContent: reqSQL.LastReceiveText,
Info: info,
Schema: reqSQL.Schema,
Endpoint: reqSQL.Endpoint,
})
}
return sqls, nil
Expand Down
208 changes: 208 additions & 0 deletions sqle/api/controller/v2/audit_plan.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package v2

import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/actiontech/sqle/sqle/common"
"github.com/actiontech/sqle/sqle/driver"
"github.com/actiontech/sqle/sqle/log"
"github.com/actiontech/sqle/sqle/server"
"github.com/labstack/echo/v4"

"github.com/actiontech/sqle/sqle/api/controller"
Expand Down Expand Up @@ -257,3 +264,204 @@ type GetAuditPlanAnalysisDataResV2 struct {
func GetAuditPlanAnalysisData(c echo.Context) error {
return getAuditPlanAnalysisData(c)
}

type AuditPlanSQLReqV2 struct {
Fingerprint string `json:"audit_plan_sql_fingerprint" form:"audit_plan_sql_fingerprint" example:"select * from t1 where id = ?"`
Counter string `json:"audit_plan_sql_counter" form:"audit_plan_sql_counter" example:"6" valid:"required"`
LastReceiveText string `json:"audit_plan_sql_last_receive_text" form:"audit_plan_sql_last_receive_text" example:"select * from t1 where id = 1"`
LastReceiveTimestamp string `json:"audit_plan_sql_last_receive_timestamp" form:"audit_plan_sql_last_receive_timestamp" example:"RFC3339"`
Schema string `json:"audit_plan_sql_schema" from:"audit_plan_sql_schema" example:"db1"`
QueryTimeAvg *float64 `json:"query_time_avg" from:"query_time_avg" example:"3.22"`
QueryTimeMax *float64 `json:"query_time_max" from:"query_time_max" example:"5.22"`
FirstQueryAt time.Time `json:"first_query_at" from:"first_query_at" example:"2023-09-12T02:48:01.317880Z"`
DBUser string `json:"db_user" from:"db_user" example:"database_user001"`
Endpoints []string `json:"endpoints" from:"endpoints"`
}

func filterSQLsByBlackList(sqls []*AuditPlanSQLReqV2, blackList []*model.BlackListAuditPlanSQL) []*AuditPlanSQLReqV2 {
filteredSQLs := []*AuditPlanSQLReqV2{}
for _, sql := range sqls {
var match bool
for _, blackSQL := range blackList {
// todo: ee issue1119, 临时使用strings.Contains判断子字符串
match = strings.Contains(strings.ToUpper(sql.LastReceiveText), strings.ToUpper(blackSQL.FilterSQL))
if match {
break
}
}
if !match {
filteredSQLs = append(filteredSQLs, sql)
}
}
return filteredSQLs
}

func convertToModelAuditPlanSQL(c echo.Context, auditPlan *model.AuditPlan, reqSQLs []*AuditPlanSQLReqV2) ([]*auditplan.SQL, error) {
var p driver.Plugin
var err error

// lazy load driver
initDriver := func() error {
if p == nil {
p, err = common.NewDriverManagerWithoutCfg(log.NewEntry(), auditPlan.DBType)
if err != nil {
return err
}
}
return nil
}
defer func() {
if p != nil {
p.Close(context.TODO())
}
}()

sqls := make([]*auditplan.SQL, 0, len(reqSQLs))
for _, reqSQL := range reqSQLs {
if reqSQL.LastReceiveText == "" {
continue
}
fp := reqSQL.Fingerprint
// the caller may be written in a different language, such as (Java, Bash, Python), so the fingerprint is
// generated in different ways. In order to maintain th same fingerprint generation logic, we provide a way to
// generate it by sqle, if the request fingerprint is empty.
if fp == "" {
err := initDriver()
if err != nil {
return nil, err
}
nodes, err := p.Parse(context.TODO(), reqSQL.LastReceiveText)
if err != nil {
return nil, err
}
if len(nodes) > 0 {
fp = nodes[0].Fingerprint
} else {
fp = reqSQL.LastReceiveText
}
}
counter, err := strconv.ParseUint(reqSQL.Counter, 10, 64)
if err != nil {
return nil, err
}
info := map[string]interface{}{
"counter": counter,
"last_receive_timestamp": reqSQL.LastReceiveTimestamp,
server.AuditSchema: reqSQL.Schema,
"endpoints": reqSQL.Endpoints,
}
// 兼容老版本的Scannerd
// 老版本Scannerd不传输这两个字段,不记录到数据库中
// 并且这里避免记录0值到数据库中,导致后续计算出的平均时间出错
if reqSQL.QueryTimeAvg != nil {
info["query_time_avg"] = utils.Round(*reqSQL.QueryTimeAvg, 4)
}
if reqSQL.QueryTimeMax != nil {
info["query_time_max"] = utils.Round(*reqSQL.QueryTimeMax, 4)
}
if !reqSQL.FirstQueryAt.IsZero() {
info["first_query_at"] = reqSQL.FirstQueryAt
}
if reqSQL.DBUser != "" {
info["db_user"] = reqSQL.DBUser
}
sqls = append(sqls, &auditplan.SQL{
Fingerprint: fp,
SQLContent: reqSQL.LastReceiveText,
Info: info,
Schema: reqSQL.Schema,
})
}

return sqls, nil
}

type PartialSyncAuditPlanSQLsReqV2 struct {
SQLs []*AuditPlanSQLReqV2 `json:"audit_plan_sql_list" form:"audit_plan_sql_list" valid:"dive"`
}

// PartialSyncAuditPlanSQLs
// @Summary 增量同步SQL到扫描任务
// @Description partial sync audit plan SQLs
// @Id partialSyncAuditPlanSQLsV2
// @Tags audit_plan
// @Security ApiKeyAuth
// @Param project_name path string true "project name"
// @Param audit_plan_name path string true "audit plan name"
// @Param sqls body v2.PartialSyncAuditPlanSQLsReqV2 true "partial sync audit plan SQLs request"
// @Success 200 {object} controller.BaseRes
// @router /v2/projects/{project_name}/audit_plans/{audit_plan_name}/sqls/partial [post]
func PartialSyncAuditPlanSQLs(c echo.Context) error {
req := new(PartialSyncAuditPlanSQLsReqV2)
if err := controller.BindAndValidateReq(c, req); err != nil {
return err
}
projectName := c.Param("project_name")
apName := c.Param("audit_plan_name")

s := model.GetStorage()
ap, err := v1.CheckProjectAndAuditPlan(s, projectName, apName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

l := log.NewEntry()
reqSQLs := req.SQLs
blackList, err := s.GetBlackListAuditPlanSQLs()
if err == nil {
reqSQLs = filterSQLsByBlackList(reqSQLs, blackList)
} else {
l.Warnf("blacklist is not used, err:%v", err)
}

sqls, err := convertToModelAuditPlanSQL(c, ap, reqSQLs)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
return controller.JSONBaseErrorReq(c, auditplan.UploadSQLs(l, ap, sqls, true))
}

type FullSyncAuditPlanSQLsReqV2 struct {
SQLs []*AuditPlanSQLReqV2 `json:"audit_plan_sql_list" form:"audit_plan_sql_list" valid:"dive"`
}

// @Summary 全量同步SQL到扫描任务
// @Description full sync audit plan SQLs
// @Id fullSyncAuditPlanSQLsV2
// @Tags audit_plan
// @Security ApiKeyAuth
// @Param project_name path string true "project name"
// @Param audit_plan_name path string true "audit plan name"
// @Param sqls body v2.FullSyncAuditPlanSQLsReqV2 true "full sync audit plan SQLs request"
// @Success 200 {object} controller.BaseRes
// @router /v2/projects/{project_name}/audit_plans/{audit_plan_name}/sqls/full [post]
func FullSyncAuditPlanSQLs(c echo.Context) error {
req := new(FullSyncAuditPlanSQLsReqV2)
if err := controller.BindAndValidateReq(c, req); err != nil {
return err
}
projectName := c.Param("project_name")
apName := c.Param("audit_plan_name")

s := model.GetStorage()
ap, err := v1.CheckProjectAndAuditPlan(s, projectName, apName)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

l := log.NewEntry()
reqSQLs := req.SQLs
blackList, err := s.GetBlackListAuditPlanSQLs()
if err == nil {
reqSQLs = filterSQLsByBlackList(reqSQLs, blackList)
} else {
l.Warnf("blacklist is not used, err:%v", err)
}

sqls, err := convertToModelAuditPlanSQL(c, ap, reqSQLs)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

return controller.JSONBaseErrorReq(c, auditplan.UploadSQLs(l, ap, sqls, false))
}
Loading

0 comments on commit 05e0469

Please sign in to comment.