Skip to content

Commit

Permalink
Merge pull request #2821 from actiontech/optimize_audit_task_ce
Browse files Browse the repository at this point in the history
Optimize audit task ce
  • Loading branch information
ColdWaterLW authored Dec 13, 2024
2 parents 83848cb + cad8a11 commit a1ecf34
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 156 deletions.
18 changes: 15 additions & 3 deletions sqle/api/controller/v1/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,10 +1421,22 @@ func AuditPlanTriggerSqlAudit(c echo.Context) error {
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
auditedSqlList, err := auditplan.BatchAuditSQLs(auditPlanSqls, false)
auditedSqlList, err := auditplan.BatchAuditSQLs(log.NewEntry(), auditPlanSqls)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

return controller.JSONBaseErrorReq(c, s.BatchSave(auditedSqlList, 50))
recordIds := make([]uint, len(auditPlanSqls))
for i, sqlId := range auditedSqlList {
recordIds[i] = sqlId.ID
}
err = s.BatchSave(auditedSqlList, 50)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
// 更新最后审核时间
err = s.UpdateManageSQLProcessByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()})
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
return controller.JSONBaseErrorReq(c, nil)
}
41 changes: 34 additions & 7 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (s *Storage) GetLatestStartTimeAuditPlanSQLV2(sourceId uint) (string, error
return info.StartTime, err
}

// 此表对于来源是扫描任务的相关sql, 目前仅在采集和审核时会更新, 如有其他场景更新此表, 需要考虑更新后会触发审核影响
// 如有其他sql业务相关字段补充, 可新增至SQLManageRecordProcess中
type SQLManageRecord struct {
Model

Expand All @@ -185,7 +187,7 @@ type SQLManageRecord struct {
SqlText string `json:"sql_text" gorm:"type:mediumtext;not null"`
Info JSON `gorm:"type:json"` // 慢日志的 执行时间等特殊属性
AuditLevel string `json:"audit_level" gorm:"type:varchar(255)"`
AuditResults AuditResults `json:"audit_results" gorm:"type:json"`
AuditResults *AuditResults `json:"audit_results" gorm:"type:json"`
SQLID string `json:"sql_id" gorm:"type:varchar(255);unique;not null"`
Priority sql.NullString `json:"priority" gorm:"type:varchar(255)"`

Expand Down Expand Up @@ -276,7 +278,8 @@ func (s *Storage) GetManagerSqlRuleTipsByAuditPlan(auditPlanId uint) ([]*SqlMana
type SQLManageRecordProcess struct {
Model

SQLManageRecordID *uint `json:"sql_manage_record_id" gorm:"unique;not null"`
SQLManageRecordID *uint `json:"sql_manage_record_id" gorm:"unique;not null"`
LastAuditTime *time.Time `json:"last_audit_time" gorm:"type:datetime(3)"`
// 任务属性字段
Assignees string `json:"assignees" gorm:"type:varchar(2000)"`
Status ProcessStatus `json:"status" gorm:"default:\"unhandled\""`
Expand Down Expand Up @@ -523,10 +526,10 @@ func (s *Storage) DeleteSQLManageRecordBySourceId(sourceId string) error {
}

func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error {
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " +
"VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " +
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)"
return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " +
"`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)"
return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error
}

func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error {
Expand All @@ -536,7 +539,7 @@ func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) er
return txDB.Exec(query, sql.SQLID).Error
}

func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error {
func (s *Storage) UpdateManagerSQLBySqlId(sqlId string, sqlManageMap map[string]interface{}) error {
err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId).
Updates(sqlManageMap).Error
if err != nil {
Expand All @@ -558,3 +561,27 @@ func (s *Storage) GetAuditPlansByProjectId(projectID string) ([]*InstanceAuditPl
err := s.db.Model(InstanceAuditPlan{}).Where("project_id = ?", projectID).Find(&instanceAuditPlan).Error
return instanceAuditPlan, err
}

// 获取需要审核的sql,
// 当更新时间大于最后审核时间或最后审核时间为空时需要重新审核(采集或重新采集到的sql)
// 需要注意:当前仅在采集和审核时会更sql_manage_records中扫描任务相关的sql,所以使用了updated_at > last_audit_time条件。
func (s *Storage) GetSQLsToAuditFromManage() ([]*SQLManageRecord, error) {
manageRecords := []*SQLManageRecord{}
err := s.db.Limit(1000).Model(SQLManageRecord{}).
Joins("JOIN audit_plans_v2 apv ON sql_manage_records.source_id = apv.instance_audit_plan_id AND sql_manage_records.source = apv.type AND apv.deleted_at IS NULL").
Joins("JOIN sql_manage_record_processes smrp ON sql_manage_records.id =smrp.sql_manage_record_id").
Where("sql_manage_records.updated_at > smrp.last_audit_time OR smrp.last_audit_time IS NULL").
Find(&manageRecords).Error
if err == gorm.ErrRecordNotFound {
return manageRecords, nil
}
return manageRecords, err
}

func (s *Storage) UpdateManageSQLProcessByManageIDs(ids []uint, attrs map[string]interface{}) error {
if len(ids) == 0 {
return nil
}
err := s.db.Model(SQLManageRecordProcess{}).Where("sql_manage_record_id IN (?)", ids).Updates(attrs).Error
return errors.New(errors.ConnectStorageError, err)
}
95 changes: 95 additions & 0 deletions sqle/server/auditplan/job_task_aggregate_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package auditplan

import (
"time"

"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/server"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
)

type AuditPlanAggregateSQLJob struct {
server.BaseJob
}

func NewAuditPlanAggregateSQLJob(entry *logrus.Entry) server.ServerJob {
entry = entry.WithField("job", "audit_plan_aggregate_sql")
j := &AuditPlanAggregateSQLJob{}
j.BaseJob = *server.NewBaseJob(entry, 5*time.Second, j.AggregateSQL)
return j
}

func (j *AuditPlanAggregateSQLJob) AggregateSQL(entry *logrus.Entry) {
s := model.GetStorage()
queues, err := s.PullSQLFromManagerSQLQueue()
if err != nil {
entry.Warnf("pull sql from queue failed, error: %v", err)
return
}
if len(queues) == 0 {
return
}
cache := NewSQLV2CacheWithPersist(s)
for _, sql := range queues {
sqlV2 := ConvertMangerSQLQueueToSQLV2(sql)
meta, err := GetMeta(sqlV2.Source)
if err != nil {
entry.Warnf("get meta failed, error: %v", err)
// todo: 有错误咋处理
continue
}
if meta.Handler == nil {
entry.Warnf("do not support this type (%s), error: %v", sqlV2.Source, err)
// todo: 没有处理器咋办
continue
}
err = meta.Handler.AggregateSQL(cache, sqlV2)
if err != nil {
entry.Warnf("aggregate sql failed, error: %v", err)
// todo: 有错误咋处理
continue
}

}

sqlList := make([]*model.SQLManageRecord, 0, len(cache.GetSQLs()))
for _, sql := range cache.GetSQLs() {
sqlList = append(sqlList, ConvertSQLV2ToMangerSQL(sql))
}

if len(sqlList) == 0 {
return
}
// todo: 错误处理
if err = s.Tx(func(txDB *gorm.DB) error {
for _, sql := range sqlList {
err := s.SaveManagerSQL(txDB, sql)
if err != nil {
entry.Warnf("update manager sql failed, error: %v", err)
return err
}

// 更新状态表
err = s.UpdateManagerSQLStatus(txDB, sql)
if err != nil {
entry.Warnf("update manager sql status failed, error: %v", err)
return err
}
}

for _, sql := range queues {
err := s.RemoveSQLFromQueue(txDB, sql)
if err != nil {
entry.Warnf("remove manager sql queue failed, error: %v", err)
return err
}
}

return nil

}); err != nil {
entry.Warnf("audit plan aggregate sql job failed, error: %v", err)
return
}
}
23 changes: 0 additions & 23 deletions sqle/server/auditplan/job_task_alert.go

This file was deleted.

Loading

0 comments on commit a1ecf34

Please sign in to comment.