From 5fb49380349c44d4d9baa49feb58c9afb9510fab Mon Sep 17 00:00:00 2001 From: iwanghc Date: Thu, 12 Dec 2024 12:27:51 +0800 Subject: [PATCH 1/7] feat: optimize the audit sqls function of the audit plan to avoid sql stacking problems --- sqle/api/controller/v1/instance_audit_plan.go | 18 +- sqle/model/instance_audit_plan.go | 36 +++- .../auditplan/job_task_aggregate_sql.go | 95 ++++++++++ sqle/server/auditplan/job_task_alert.go | 23 --- sqle/server/auditplan/job_task_handler.go | 174 ++++++------------ sqle/server/auditplan/manager.go | 2 +- sqle/server/auditplan/task_v2.go | 2 +- 7 files changed, 197 insertions(+), 153 deletions(-) create mode 100644 sqle/server/auditplan/job_task_aggregate_sql.go delete mode 100644 sqle/server/auditplan/job_task_alert.go diff --git a/sqle/api/controller/v1/instance_audit_plan.go b/sqle/api/controller/v1/instance_audit_plan.go index 0f9559e7b..b5809f40e 100644 --- a/sqle/api/controller/v1/instance_audit_plan.go +++ b/sqle/api/controller/v1/instance_audit_plan.go @@ -1421,10 +1421,22 @@ func AuditPlanTriggerSqlAudit(c echo.Context) error { if err != nil { return controller.JSONBaseErrorReq(c, err) } - auditedSqlList, err := auditplan.BatchAuditSQLs(auditPlanSqls, false) + auditedSqlList, err := auditplan.BatchAuditSQLs(auditPlanSqls) if err != nil { return controller.JSONBaseErrorReq(c, err) } - - return controller.JSONBaseErrorReq(c, s.BatchSave(auditedSqlList, 50)) + recordIds := make([]uint, len(auditPlanSqls)) + for i, sqlId := range auditedSqlList { + recordIds[i] = sqlId.ID + } + err = s.BatchSave(auditedSqlList, 50) + if err != nil { + return controller.JSONBaseErrorReq(c, err) + } + // 更新最后审核时间 + err = s.UpdateManageSQLStatusByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) + if err != nil { + return controller.JSONBaseErrorReq(c, err) + } + return controller.JSONBaseErrorReq(c, nil) } diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 259609b1f..0ad34e4b7 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -185,7 +185,7 @@ type SQLManageRecord struct { SqlText string `json:"sql_text" gorm:"type:mediumtext;not null"` Info JSON `gorm:"type:json"` // 慢日志的 执行时间等特殊属性 AuditLevel string `json:"audit_level" gorm:"type:varchar(255)"` - AuditResults AuditResults `json:"audit_results" gorm:"type:json"` + AuditResults *AuditResults `json:"audit_results" gorm:"type:json"` SQLID string `json:"sql_id" gorm:"type:varchar(255);unique;not null"` Priority sql.NullString `json:"priority" gorm:"type:varchar(255)"` @@ -276,7 +276,8 @@ func (s *Storage) GetManagerSqlRuleTipsByAuditPlan(auditPlanId uint) ([]*SqlMana type SQLManageRecordProcess struct { Model - SQLManageRecordID *uint `json:"sql_manage_record_id" gorm:"unique;not null"` + SQLManageRecordID *uint `json:"sql_manage_record_id" gorm:"unique;not null"` + LastAuditTime *time.Time `json:"last_audit_time" gorm:"type:datetime(3)"` // 任务属性字段 Assignees string `json:"assignees" gorm:"type:varchar(2000)"` Status ProcessStatus `json:"status" gorm:"default:\"unhandled\""` @@ -523,10 +524,10 @@ func (s *Storage) DeleteSQLManageRecordBySourceId(sourceId string) error { } func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error { - const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " + - "VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " + - "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)" - return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error + const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " + + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " + + "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)" + return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error } func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error { @@ -536,7 +537,7 @@ func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) er return txDB.Exec(query, sql.SQLID).Error } -func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error { +func (s *Storage) UpdateManagerSQLBySqlId(sqlId string, sqlManageMap map[string]interface{}) error { err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId). Updates(sqlManageMap).Error if err != nil { @@ -558,3 +559,24 @@ func (s *Storage) GetAuditPlansByProjectId(projectID string) ([]*InstanceAuditPl err := s.db.Model(InstanceAuditPlan{}).Where("project_id = ?", projectID).Find(&instanceAuditPlan).Error return instanceAuditPlan, err } + +func (s *Storage) GetSQLsToAuditFromManage() ([]*SQLManageRecord, error) { + manageRecords := []*SQLManageRecord{} + err := s.db.Limit(1000).Model(SQLManageRecord{}). + Joins("JOIN audit_plans_v2 apv ON sql_manage_records.source_id = apv.instance_audit_plan_id AND sql_manage_records.source = apv.type AND apv.deleted_at IS NULL"). + Joins("JOIN sql_manage_record_processes smrp ON sql_manage_records.id =smrp.sql_manage_record_id"). + Where("sql_manage_records.updated_at > smrp.last_audit_time OR smrp.last_audit_time IS NULL"). + Find(&manageRecords).Error + if err == gorm.ErrRecordNotFound { + return manageRecords, nil + } + return manageRecords, err +} + +func (s *Storage) UpdateManageSQLStatusByManageIDs(ids []uint, attrs map[string]interface{}) error { + if len(ids) == 0 { + return nil + } + err := s.db.Model(SQLManageRecordProcess{}).Where("sql_manage_record_id IN (?)", ids).Updates(attrs).Error + return errors.New(errors.ConnectStorageError, err) +} diff --git a/sqle/server/auditplan/job_task_aggregate_sql.go b/sqle/server/auditplan/job_task_aggregate_sql.go new file mode 100644 index 000000000..bf9bcffe7 --- /dev/null +++ b/sqle/server/auditplan/job_task_aggregate_sql.go @@ -0,0 +1,95 @@ +package auditplan + +import ( + "time" + + "github.com/actiontech/sqle/sqle/model" + "github.com/actiontech/sqle/sqle/server" + "github.com/sirupsen/logrus" + "gorm.io/gorm" +) + +type AuditPlanAggregateSQLJob struct { + server.BaseJob +} + +func NewAuditPlanAggregateSQLJob(entry *logrus.Entry) server.ServerJob { + entry = entry.WithField("job", "audit_plan_aggregate_sql") + j := &AuditPlanAggregateSQLJob{} + j.BaseJob = *server.NewBaseJob(entry, 5*time.Second, j.AggregateSQL) + return j +} + +func (j *AuditPlanAggregateSQLJob) AggregateSQL(entry *logrus.Entry) { + s := model.GetStorage() + queues, err := s.PullSQLFromManagerSQLQueue() + if err != nil { + entry.Warnf("pull sql from queue failed, error: %v", err) + return + } + if len(queues) == 0 { + return + } + cache := NewSQLV2CacheWithPersist(s) + for _, sql := range queues { + sqlV2 := ConvertMangerSQLQueueToSQLV2(sql) + meta, err := GetMeta(sqlV2.Source) + if err != nil { + entry.Warnf("get meta failed, error: %v", err) + // todo: 有错误咋处理 + continue + } + if meta.Handler == nil { + entry.Warnf("do not support this type (%s), error: %v", sqlV2.Source, err) + // todo: 没有处理器咋办 + continue + } + err = meta.Handler.AggregateSQL(cache, sqlV2) + if err != nil { + entry.Warnf("aggregate sql failed, error: %v", err) + // todo: 有错误咋处理 + continue + } + + } + + sqlList := make([]*model.SQLManageRecord, 0, len(cache.GetSQLs())) + for _, sql := range cache.GetSQLs() { + sqlList = append(sqlList, ConvertSQLV2ToMangerSQL(sql)) + } + + if len(sqlList) == 0 { + return + } + // todo: 错误处理 + if err = s.Tx(func(txDB *gorm.DB) error { + for _, sql := range sqlList { + err := s.SaveManagerSQL(txDB, sql) + if err != nil { + entry.Warnf("update manager sql failed, error: %v", err) + return err + } + + // 更新状态表 + err = s.UpdateManagerSQLStatus(txDB, sql) + if err != nil { + entry.Warnf("update manager sql status failed, error: %v", err) + return err + } + } + + for _, sql := range queues { + err := s.RemoveSQLFromQueue(txDB, sql) + if err != nil { + entry.Warnf("remove manager sql queue failed, error: %v", err) + return err + } + } + + return nil + + }); err != nil { + entry.Warnf("audit plan aggregate sql job failed, error: %v", err) + return + } +} diff --git a/sqle/server/auditplan/job_task_alert.go b/sqle/server/auditplan/job_task_alert.go deleted file mode 100644 index 207ff5f86..000000000 --- a/sqle/server/auditplan/job_task_alert.go +++ /dev/null @@ -1,23 +0,0 @@ -package auditplan - -import ( - "time" - - "github.com/actiontech/sqle/sqle/server" - "github.com/sirupsen/logrus" -) - -type AuditPlanAlertJob struct { - server.BaseJob -} - -func NewAuditPlanAlertJob(entry *logrus.Entry) server.ServerJob { - entry = entry.WithField("job", "audit_plan_alert") - j := &AuditPlanAlertJob{} - j.BaseJob = *server.NewBaseJob(entry, 5*time.Second, j.Alert) - return j -} - -func (j *AuditPlanAlertJob) Alert(entry *logrus.Entry) { - return -} diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index 745d0e5af..cb1ccc187 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -4,7 +4,7 @@ import ( "database/sql" "errors" "fmt" - "strings" + "sync" "time" driverV2 "github.com/actiontech/sqle/sqle/driver/v2" @@ -12,7 +12,6 @@ import ( "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/server" "github.com/sirupsen/logrus" - "gorm.io/gorm" ) type AuditPlanHandlerJob struct { @@ -28,82 +27,15 @@ func NewAuditPlanHandlerJob(entry *logrus.Entry) server.ServerJob { func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { s := model.GetStorage() - queues, err := s.PullSQLFromManagerSQLQueue() + sqlList, err := s.GetSQLsToAuditFromManage() if err != nil { - entry.Warnf("pull sql from queue failed, error: %v", err) + entry.Warnf("get sqls to audit failed, error: %v", err) return } - cache := NewSQLV2CacheWithPersist(s) - for _, sql := range queues { - sqlV2 := ConvertMangerSQLQueueToSQLV2(sql) - meta, err := GetMeta(sqlV2.Source) - if err != nil { - entry.Warnf("get meta failed, error: %v", err) - // todo: 有错误咋处理 - continue - } - if meta.Handler == nil { - entry.Warnf("do not support this type (%s), error: %v", sqlV2.Source, err) - // todo: 没有处理器咋办 - continue - } - err = meta.Handler.AggregateSQL(cache, sqlV2) - if err != nil { - entry.Warnf("aggregate sql failed, error: %v", err) - // todo: 有错误咋处理 - continue - } - - } - - sqlList := make([]*model.SQLManageRecord, 0, len(cache.GetSQLs())) - for _, sql := range cache.GetSQLs() { - sqlList = append(sqlList, ConvertSQLV2ToMangerSQL(sql)) - } - if len(sqlList) == 0 { return } - - // todo: 错误处理 - if err = s.Tx(func(txDB *gorm.DB) error { - for _, sql := range sqlList { - err := s.SaveManagerSQL(txDB, sql) - if err != nil { - entry.Warnf("update manager sql failed, error: %v", err) - return err - } - - // 更新状态表 - err = s.UpdateManagerSQLStatus(txDB, sql) - if err != nil { - entry.Warnf("update manager sql status failed, error: %v", err) - return err - } - } - - for _, sql := range queues { - err := s.RemoveSQLFromQueue(txDB, sql) - if err != nil { - entry.Warnf("remove manager sql queue failed, error: %v", err) - return err - } - } - - return nil - - }); err != nil { - return - } - - go handlerSQLAudit(entry, sqlList) - -} - -// todo: 错误处理 -func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) { - s := model.GetStorage() - sqlList, err := BatchAuditSQLs(sqlList, true) + sqlList, err = BatchAuditSQLs(sqlList) if err != nil { entry.Warnf("batch audit manager sql failed, error: %v", err) } @@ -112,73 +44,79 @@ func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) { if err != nil { entry.Warnf("set sql priority sql failed, error: %v", err) } - for _, sql := range sqlList { - manageSqlParam := make(map[string]interface{}, 3) - manageSqlParam["audit_level"] = sql.AuditLevel - manageSqlParam["audit_results"] = sql.AuditResults - manageSqlParam["priority"] = sql.Priority - err = s.UpdateManagerSQLBySqlId(manageSqlParam, sql.SQLID) + // 更新审核结果和优先级 + recordIds := make([]uint, len(sqlList)) + for i, sql := range sqlList { + recordIds[i] = sql.ID + err = s.UpdateManagerSQLBySqlId(sql.SQLID, map[string]interface{}{"audit_level": sql.AuditLevel, "audit_results": sql.AuditResults, "priority": sql.Priority}) if err != nil { entry.Warnf("update manager sql failed, error: %v", err) continue } } + // 更新最后审核时间 + err = s.UpdateManageSQLStatusByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) + if err != nil { + entry.Warnf("update manage record process failed, error: %v", err) + } } -func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]*model.SQLManageRecord, error) { +func BatchAuditSQLs(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, error) { s := model.GetStorage() - // SQL聚合 sqlMap := make(map[string][]*model.SQLManageRecord) - for _, sql := range sqlList { - if isSkipAuditedSql && sql.AuditLevel != "" { - continue - } - // 根据source id和schema name 聚合sqls,避免task内需要切换schema上下文审核 - key := fmt.Sprintf("%s:%s", sql.SourceId, sql.SchemaName) - _, ok := sqlMap[key] - if !ok { - sqlMap[key] = make([]*model.SQLManageRecord, 0) - } + for _, sql := range sqlList { + // 根据扫描任务和 schema name 聚合 sqls,避免 task 内需要切换 schema 上下文审核 + key := fmt.Sprintf("%s:%s:%s", sql.SourceId, sql.Source, sql.SchemaName) sqlMap[key] = append(sqlMap[key], sql) - } - auditSQLs := make([]*model.SQLManageRecord, 0) - // 聚合的SQL批量审核 + var ( + auditSQLs []*model.SQLManageRecord + mu sync.Mutex + wg sync.WaitGroup + ) + for _, sqls := range sqlMap { - // get audit plan by source id - auditPlanType := sqls[0].Source + wg.Add(1) + go func(sqls []*model.SQLManageRecord) { + defer wg.Done() - meta, err := GetMeta(auditPlanType) - if err != nil { - return nil, err - } - resp, err := meta.Handler.Audit(sqls) - // 当管控队列表中sql出栈审核时扫描任务被删除,则清空已经save到管控表的sql。 - if err != nil && errors.Is(err, model.ErrAuditPlanNotFound) { - log.NewEntry().Warnf("audit sqls in task fail %v,cant find audit plan by id %s", err, sqls[0].SourceId) - err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId) + auditPlanType := sqls[0].Source + meta, err := GetMeta(auditPlanType) if err != nil { - log.NewEntry().Errorf("delete sql manage record fail %v", err) + mu.Lock() + defer mu.Unlock() + auditSQLs = append(auditSQLs, sqls...) + log.NewEntry().Errorf("get meta to audit sql fail %v", err) + return } - for k := range sqlMap { - if strings.HasPrefix(k, sqls[0].SourceId+":") { - delete(sqlMap, k) + + resp, err := meta.Handler.Audit(sqls) + if err != nil { + if errors.Is(err, model.ErrAuditPlanNotFound) { + log.NewEntry().Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) + // TODO 调整值clear中清理未关联扫描任务的sql + if err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId); err != nil { + log.NewEntry().Errorf("delete sql manage record fail %v", err) + } + return } + log.NewEntry().Errorf("audit sqls in task fail %v, ignore audit result", err) + mu.Lock() + auditSQLs = append(auditSQLs, sqls...) + mu.Unlock() + return } - continue - } - if err != nil { - log.NewEntry().Errorf("audit sqls in task fail %v,ignore audit result", err) - auditSQLs = append(auditSQLs, sqls...) - continue - } - - // 更新原值 - auditSQLs = append(auditSQLs, resp.AuditedSqls...) + mu.Lock() + auditSQLs = append(auditSQLs, resp.AuditedSqls...) + mu.Unlock() + }(sqls) } + + wg.Wait() + return auditSQLs, nil } diff --git a/sqle/server/auditplan/manager.go b/sqle/server/auditplan/manager.go index a7c70213f..3d9c8c0c8 100644 --- a/sqle/server/auditplan/manager.go +++ b/sqle/server/auditplan/manager.go @@ -150,7 +150,7 @@ func GetSQLData(ctx context.Context, ap *AuditPlan, persist *model.Storage, filt } func init() { - server.OnlyRunOnLeaderJobs = append(server.OnlyRunOnLeaderJobs, NewManager, NewAuditPlanHandlerJob, NewAuditPlanAlertJob) + server.OnlyRunOnLeaderJobs = append(server.OnlyRunOnLeaderJobs, NewManager, NewAuditPlanHandlerJob, NewAuditPlanAggregateSQLJob) } func NewManager(entry *logrus.Entry) server.ServerJob { diff --git a/sqle/server/auditplan/task_v2.go b/sqle/server/auditplan/task_v2.go index 588076221..1c47ac9dc 100644 --- a/sqle/server/auditplan/task_v2.go +++ b/sqle/server/auditplan/task_v2.go @@ -140,7 +140,7 @@ func auditSQLs(sqls []*model.SQLManageRecord) (*AuditResultResp, error) { // update sql audit result for i, sql := range task.ExecuteSQLs { - sqls[i].AuditResults = sql.AuditResults + sqls[i].AuditResults = &sql.AuditResults sqls[i].AuditLevel = sql.AuditLevel } From 8f17951b13258ef0bbc1edb8976f15c8e3bb3f9c Mon Sep 17 00:00:00 2001 From: iwanghc Date: Thu, 12 Dec 2024 13:55:38 +0800 Subject: [PATCH 2/7] add: add sql comment --- sqle/model/instance_audit_plan.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 0ad34e4b7..c8b7e267d 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -560,6 +560,8 @@ func (s *Storage) GetAuditPlansByProjectId(projectID string) ([]*InstanceAuditPl return instanceAuditPlan, err } +// 获取需要审核的sql, +// 当更新时间大于最后审核时间或最后审核时间为空时需要重新审核(采集或重新采集到的sql) func (s *Storage) GetSQLsToAuditFromManage() ([]*SQLManageRecord, error) { manageRecords := []*SQLManageRecord{} err := s.db.Limit(1000).Model(SQLManageRecord{}). From bc2a31cea183db584276bad44b52bcc5a49afee5 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Thu, 12 Dec 2024 18:41:49 +0800 Subject: [PATCH 3/7] fix: use database connection in parameters --- sqle/api/controller/v1/instance_audit_plan.go | 2 +- sqle/model/instance_audit_plan.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sqle/api/controller/v1/instance_audit_plan.go b/sqle/api/controller/v1/instance_audit_plan.go index b5809f40e..fcadee966 100644 --- a/sqle/api/controller/v1/instance_audit_plan.go +++ b/sqle/api/controller/v1/instance_audit_plan.go @@ -1434,7 +1434,7 @@ func AuditPlanTriggerSqlAudit(c echo.Context) error { return controller.JSONBaseErrorReq(c, err) } // 更新最后审核时间 - err = s.UpdateManageSQLStatusByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) + err = s.UpdateManageSQLProcessByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) if err != nil { return controller.JSONBaseErrorReq(c, err) } diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index c8b7e267d..9e474663d 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -527,7 +527,7 @@ func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error { const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " + "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)" - return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error + return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error } func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error { @@ -575,7 +575,7 @@ func (s *Storage) GetSQLsToAuditFromManage() ([]*SQLManageRecord, error) { return manageRecords, err } -func (s *Storage) UpdateManageSQLStatusByManageIDs(ids []uint, attrs map[string]interface{}) error { +func (s *Storage) UpdateManageSQLProcessByManageIDs(ids []uint, attrs map[string]interface{}) error { if len(ids) == 0 { return nil } From 3725a4ef991fe7fdaf86423eb00b730e8e3c9107 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Thu, 12 Dec 2024 18:43:11 +0800 Subject: [PATCH 4/7] =?UTF-8?q?optimization=EF=BC=9Aaudit=20plan=20sql=20a?= =?UTF-8?q?udit=20code=20simplification?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/job_task_handler.go | 36 ++++++++++------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index cb1ccc187..54db3bec1 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -55,7 +55,7 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { } } // 更新最后审核时间 - err = s.UpdateManageSQLStatusByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) + err = s.UpdateManageSQLProcessByManageIDs(recordIds, map[string]interface{}{"last_audit_time": time.Now()}) if err != nil { entry.Warnf("update manage record process failed, error: %v", err) } @@ -82,33 +82,27 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, go func(sqls []*model.SQLManageRecord) { defer wg.Done() + var resp *AuditResultResp auditPlanType := sqls[0].Source meta, err := GetMeta(auditPlanType) + // 当无法获取meta时,不执行审核,直接返回原始sql if err != nil { - mu.Lock() - defer mu.Unlock() - auditSQLs = append(auditSQLs, sqls...) log.NewEntry().Errorf("get meta to audit sql fail %v", err) - return - } - - resp, err := meta.Handler.Audit(sqls) - if err != nil { - if errors.Is(err, model.ErrAuditPlanNotFound) { - log.NewEntry().Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) - // TODO 调整值clear中清理未关联扫描任务的sql - if err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId); err != nil { - log.NewEntry().Errorf("delete sql manage record fail %v", err) + } else { + resp, err = meta.Handler.Audit(sqls) + if err != nil { + if errors.Is(err, model.ErrAuditPlanNotFound) { + log.NewEntry().Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) + // TODO 调整至clean中清理未关联扫描任务的sql + // 扫描任务已被删除的sql不需要save到管控中 + if err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId); err != nil { + log.NewEntry().Errorf("delete sql manage record fail %v", err) + } + return } - return + log.NewEntry().Errorf("audit sqls in task fail %v, ignore audit result", err) } - log.NewEntry().Errorf("audit sqls in task fail %v, ignore audit result", err) - mu.Lock() - auditSQLs = append(auditSQLs, sqls...) - mu.Unlock() - return } - mu.Lock() auditSQLs = append(auditSQLs, resp.AuditedSqls...) mu.Unlock() From ff272b9efa625d4de7776e7090d9311451548c52 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 13 Dec 2024 14:17:31 +0800 Subject: [PATCH 5/7] add: add sql and model comment for audit plan --- sqle/model/instance_audit_plan.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 9e474663d..85f01cffd 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -173,6 +173,8 @@ func (s *Storage) GetLatestStartTimeAuditPlanSQLV2(sourceId uint) (string, error return info.StartTime, err } +// 此表对于来源是扫描任务的相关sql, 目前仅在采集和审核时会更新, 如有其他场景更新此表, 需要考虑更新后会触发审核影响 +// 如有其他sql业务相关字段补充, 可新增至SQLManageRecordProcess中 type SQLManageRecord struct { Model @@ -562,6 +564,7 @@ func (s *Storage) GetAuditPlansByProjectId(projectID string) ([]*InstanceAuditPl // 获取需要审核的sql, // 当更新时间大于最后审核时间或最后审核时间为空时需要重新审核(采集或重新采集到的sql) +// 需要注意:当前仅在采集和审核时会更sql_manage_records中扫描任务相关的sql,所以使用了updated_at > last_audit_time条件。 func (s *Storage) GetSQLsToAuditFromManage() ([]*SQLManageRecord, error) { manageRecords := []*SQLManageRecord{} err := s.db.Limit(1000).Model(SQLManageRecord{}). From 5657f2770faca01607365732c3beb566b19beea5 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 13 Dec 2024 14:18:04 +0800 Subject: [PATCH 6/7] modify: use context log --- sqle/api/controller/v1/instance_audit_plan.go | 2 +- sqle/server/auditplan/job_task_handler.go | 23 +++++++++---------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/sqle/api/controller/v1/instance_audit_plan.go b/sqle/api/controller/v1/instance_audit_plan.go index fcadee966..efb5bf4f5 100644 --- a/sqle/api/controller/v1/instance_audit_plan.go +++ b/sqle/api/controller/v1/instance_audit_plan.go @@ -1421,7 +1421,7 @@ func AuditPlanTriggerSqlAudit(c echo.Context) error { if err != nil { return controller.JSONBaseErrorReq(c, err) } - auditedSqlList, err := auditplan.BatchAuditSQLs(auditPlanSqls) + auditedSqlList, err := auditplan.BatchAuditSQLs(log.NewEntry(), auditPlanSqls) if err != nil { return controller.JSONBaseErrorReq(c, err) } diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index 54db3bec1..65a7b255b 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -8,7 +8,6 @@ import ( "time" driverV2 "github.com/actiontech/sqle/sqle/driver/v2" - "github.com/actiontech/sqle/sqle/log" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/server" "github.com/sirupsen/logrus" @@ -35,7 +34,7 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { if len(sqlList) == 0 { return } - sqlList, err = BatchAuditSQLs(sqlList) + sqlList, err = BatchAuditSQLs(entry, sqlList) if err != nil { entry.Warnf("batch audit manager sql failed, error: %v", err) } @@ -61,7 +60,7 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { } } -func BatchAuditSQLs(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, error) { +func BatchAuditSQLs(l *logrus.Entry, sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, error) { s := model.GetStorage() sqlMap := make(map[string][]*model.SQLManageRecord) @@ -72,9 +71,9 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, } var ( - auditSQLs []*model.SQLManageRecord - mu sync.Mutex - wg sync.WaitGroup + auditedSQLs []*model.SQLManageRecord + mu sync.Mutex + wg sync.WaitGroup ) for _, sqls := range sqlMap { @@ -87,31 +86,31 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, meta, err := GetMeta(auditPlanType) // 当无法获取meta时,不执行审核,直接返回原始sql if err != nil { - log.NewEntry().Errorf("get meta to audit sql fail %v", err) + l.Errorf("get meta to audit sql fail %v", err) } else { resp, err = meta.Handler.Audit(sqls) if err != nil { if errors.Is(err, model.ErrAuditPlanNotFound) { - log.NewEntry().Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) + l.Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) // TODO 调整至clean中清理未关联扫描任务的sql // 扫描任务已被删除的sql不需要save到管控中 if err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId); err != nil { - log.NewEntry().Errorf("delete sql manage record fail %v", err) + l.Errorf("delete sql manage record fail %v", err) } return } - log.NewEntry().Errorf("audit sqls in task fail %v, ignore audit result", err) + l.Errorf("audit sqls in task fail %v, ignore audit result", err) } } mu.Lock() - auditSQLs = append(auditSQLs, resp.AuditedSqls...) + auditedSQLs = append(auditedSQLs, resp.AuditedSqls...) mu.Unlock() }(sqls) } wg.Wait() - return auditSQLs, nil + return auditedSQLs, nil } func SetSQLPriority(sqlList []*model.SQLManageRecord) ([]*model.SQLManageRecord, error) { From cad8a11accdbfc44fda8370e162cdd099724df0a Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 13 Dec 2024 15:01:23 +0800 Subject: [PATCH 7/7] =?UTF-8?q?chore=EF=BC=9Aoptimize=20audit=20plan=20log?= =?UTF-8?q?=20content?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/server/auditplan/job_task_handler.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index 65a7b255b..e112a6122 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -81,9 +81,12 @@ func BatchAuditSQLs(l *logrus.Entry, sqlList []*model.SQLManageRecord) ([]*model go func(sqls []*model.SQLManageRecord) { defer wg.Done() + sourceType := sqls[0].Source + sourceId := sqls[0].SourceId + l.Infof("processing audit for source id %s, type %s", sourceId, sourceType) + var resp *AuditResultResp - auditPlanType := sqls[0].Source - meta, err := GetMeta(auditPlanType) + meta, err := GetMeta(sourceType) // 当无法获取meta时,不执行审核,直接返回原始sql if err != nil { l.Errorf("get meta to audit sql fail %v", err) @@ -91,10 +94,10 @@ func BatchAuditSQLs(l *logrus.Entry, sqlList []*model.SQLManageRecord) ([]*model resp, err = meta.Handler.Audit(sqls) if err != nil { if errors.Is(err, model.ErrAuditPlanNotFound) { - l.Warnf("audit sqls in task fail %v, can't find audit plan by id %s", err, sqls[0].SourceId) + l.Warnf("audit sqls in task fail %v, can't find source", err) // TODO 调整至clean中清理未关联扫描任务的sql // 扫描任务已被删除的sql不需要save到管控中 - if err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId); err != nil { + if err := s.DeleteSQLManageRecordBySourceId(sourceId); err != nil { l.Errorf("delete sql manage record fail %v", err) } return