diff --git a/sqle/api/controller/v2/sql_manage.go b/sqle/api/controller/v2/sql_manage.go index 122a6ae54..d83007d7e 100644 --- a/sqle/api/controller/v2/sql_manage.go +++ b/sqle/api/controller/v2/sql_manage.go @@ -22,6 +22,7 @@ type SqlManage struct { InstanceName string `json:"instance_name"` SchemaName string `json:"schema_name"` AuditResult []*v1.AuditResult `json:"audit_result"` + AuditStatus string `json:"audit_status" enums:"being_audited"` FirstAppearTimeStamp string `json:"first_appear_timestamp"` LastReceiveTimeStamp string `json:"last_receive_timestamp"` FpCount uint64 `json:"fp_count"` diff --git a/sqle/docs/docs.go b/sqle/docs/docs.go index e3707eb89..3c1cefa13 100644 --- a/sqle/docs/docs.go +++ b/sqle/docs/docs.go @@ -17015,6 +17015,12 @@ var doc = `{ "$ref": "#/definitions/v1.AuditResult" } }, + "audit_status": { + "type": "string", + "enum": [ + "being_audited" + ] + }, "endpoints": { "type": "string" }, diff --git a/sqle/docs/swagger.json b/sqle/docs/swagger.json index 5efe6701c..7aef077d8 100644 --- a/sqle/docs/swagger.json +++ b/sqle/docs/swagger.json @@ -16999,6 +16999,12 @@ "$ref": "#/definitions/v1.AuditResult" } }, + "audit_status": { + "type": "string", + "enum": [ + "being_audited" + ] + }, "endpoints": { "type": "string" }, diff --git a/sqle/docs/swagger.yaml b/sqle/docs/swagger.yaml index 03c32eaaa..865d8a186 100644 --- a/sqle/docs/swagger.yaml +++ b/sqle/docs/swagger.yaml @@ -4947,6 +4947,10 @@ definitions: items: $ref: '#/definitions/v1.AuditResult' type: array + audit_status: + enum: + - being_audited + type: string endpoints: type: string first_appear_timestamp: diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 82b387a46..58fea1b0c 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -481,22 +481,31 @@ func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) { return sqls, err } -func (s *Storage) RemoveSQLFromQueue(sql *SQLManageQueue) error { - return s.db.Unscoped().Delete(sql).Error +func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error { + return txDB.Unscoped().Delete(sql).Error } -func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error { - const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " + - "VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " + - "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)" - return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error +func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error { + const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " + + "VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " + + "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)" + return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error } -func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error { +func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error { const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id) SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ? ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);` - return s.db.Exec(query, sql.SQLID).Error + return txDB.Exec(query, sql.SQLID).Error +} + +func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error { + err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId). + Updates(sqlManageMap).Error + if err != nil { + return err + } + return nil } func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error { diff --git a/sqle/model/instance_audit_plan_list.go b/sqle/model/instance_audit_plan_list.go index 2d7efdb47..834b3e980 100644 --- a/sqle/model/instance_audit_plan_list.go +++ b/sqle/model/instance_audit_plan_list.go @@ -169,7 +169,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority {{- template "body" . -}} diff --git a/sqle/model/instance_audit_plan_list_test.go b/sqle/model/instance_audit_plan_list_test.go index 51fbe62d3..f7acf1139 100644 --- a/sqle/model/instance_audit_plan_list_test.go +++ b/sqle/model/instance_audit_plan_list_test.go @@ -32,7 +32,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -59,7 +59,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -87,7 +87,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -115,7 +115,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -144,7 +144,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -172,7 +172,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index 7aa9f8aba..6999e3da3 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -10,6 +10,7 @@ import ( "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/server" "github.com/sirupsen/logrus" + "gorm.io/gorm" ) type AuditPlanHandlerJob struct { @@ -61,38 +62,63 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { if len(sqlList) == 0 { return } - // 审核 - sqlList, err = BatchAuditSQLs(sqlList, true) - if err != nil { - entry.Warnf("batch audit origin manager sql failed, error: %v", err) + + // todo: 错误处理 + if err = s.Tx(func(txDB *gorm.DB) error { + for _, sql := range sqlList { + err := s.SaveManagerSQL(txDB, sql) + if err != nil { + entry.Warnf("update manager sql failed, error: %v", err) + return err + } + + // 更新状态表 + err = s.UpdateManagerSQLStatus(txDB, sql) + if err != nil { + entry.Warnf("update manager sql status failed, error: %v", err) + return err + } + } + + for _, sql := range queues { + err := s.RemoveSQLFromQueue(txDB, sql) + if err != nil { + entry.Warnf("remove manager sql queue failed, error: %v", err) + return err + } + } + + return nil + + }); err != nil { return } + + go handlerSQLAudit(entry, sqlList) + +} + +// todo: 错误处理 +func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) { + s := model.GetStorage() + sqlList, err := BatchAuditSQLs(sqlList, true) + if err != nil { + entry.Warnf("batch audit manager sql failed, error: %v", err) + } + // 设置高优先级 sqlList, err = SetSQLPriority(sqlList) if err != nil { - entry.Warnf("check sql priority sql failed, error: %v", err) - return + entry.Warnf("set sql priority sql failed, error: %v", err) } - // todo: 保证事务和错误处理 for _, sql := range sqlList { - err := s.UpdateManagerSQL(sql) + manageSqlParam := make(map[string]interface{}, 3) + manageSqlParam["audit_level"] = sql.AuditLevel + manageSqlParam["audit_results"] = sql.AuditResults + manageSqlParam["priority"] = sql.Priority + err = s.UpdateManagerSQLBySqlId(manageSqlParam, sql.SQLID) if err != nil { entry.Warnf("update manager sql failed, error: %v", err) - return - } - - // 同时更新状态表 - err = s.UpdateManagerSQLStatus(sql) - if err != nil { - entry.Warnf("update manager sql status failed, error: %v", err) - return - } - - } - for _, sql := range queues { - err := s.RemoveSQLFromQueue(sql) - if err != nil { - entry.Warnf("remove manager sql queue failed, error: %v", err) - return + continue } } }