From 78bc0dcfbd20bcd1d7519016cb9c49b29fa622a9 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 6 Sep 2024 16:54:35 +0800 Subject: [PATCH 1/5] modify: audit plan sql audit optimize --- sqle/model/instance_audit_plan.go | 27 ++++++--- sqle/model/instance_audit_plan_list.go | 2 +- sqle/server/auditplan/job_task_handler.go | 70 +++++++++++++++-------- 3 files changed, 65 insertions(+), 34 deletions(-) diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 51ab3c0e4..339288cb5 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -481,22 +481,31 @@ func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) { return sqls, err } -func (s *Storage) RemoveSQLFromQueue(sql *SQLManageQueue) error { - return s.db.Unscoped().Delete(sql).Error +func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error { + return txDB.Unscoped().Delete(sql).Error } -func (s *Storage) UpdateManagerSQL(sql *SQLManageRecord) error { - const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`, `audit_level`, `audit_results`,`priority`) " + - "VALUES (?,?,?,?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), `priority` = VALUES(priority), " + - "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info), `audit_level`= VALUES(audit_level), `audit_results`= VALUES(audit_results)" - return s.db.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info, sql.AuditLevel, sql.AuditResults, sql.Priority).Error +func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error { + const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " + + "VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " + + "`schema_name` = VALUES(`schema_name`), `sql_text` = VALUES(sql_text), `sql_fingerprint` = VALUES(sql_fingerprint), `info`= VALUES(info)" + return txDB.Exec(query, sql.SQLID, sql.Source, sql.SourceId, sql.ProjectId, sql.InstanceID, sql.SchemaName, sql.SqlFingerprint, sql.SqlText, sql.Info).Error } -func (s *Storage) UpdateManagerSQLStatus(sql *SQLManageRecord) error { +func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) error { const query = ` INSERT INTO sql_manage_record_processes (sql_manage_record_id) SELECT smr.id FROM sql_manage_records smr WHERE smr.sql_id = ? ON DUPLICATE KEY UPDATE sql_manage_record_id = VALUES(sql_manage_record_id);` - return s.db.Exec(query, sql.SQLID).Error + return txDB.Exec(query, sql.SQLID).Error +} + +func (s *Storage) UpdateManagerSQLBySqlId(sql *SQLManageRecord) error { + err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sql.SQLID). + Updates(map[string]interface{}{"audit_level": sql.AuditLevel, "audit_results": sql.AuditResults, "priority": sql.Priority}).Error + if err != nil { + return err + } + return nil } func (s *Storage) UpdateAuditPlanLastCollectionTime(auditPlanID uint, collectionTime time.Time) error { diff --git a/sqle/model/instance_audit_plan_list.go b/sqle/model/instance_audit_plan_list.go index 9165a2fca..6c70252b4 100644 --- a/sqle/model/instance_audit_plan_list.go +++ b/sqle/model/instance_audit_plan_list.go @@ -168,7 +168,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority {{- template "body" . -}} diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index 7aa9f8aba..b74009623 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -10,6 +10,7 @@ import ( "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/server" "github.com/sirupsen/logrus" + "gorm.io/gorm" ) type AuditPlanHandlerJob struct { @@ -61,38 +62,59 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { if len(sqlList) == 0 { return } - // 审核 - sqlList, err = BatchAuditSQLs(sqlList, true) - if err != nil { - entry.Warnf("batch audit origin manager sql failed, error: %v", err) + + // todo: 错误处理 + if err = s.Tx(func(txDB *gorm.DB) error { + for _, sql := range sqlList { + err := s.SaveManagerSQL(txDB, sql) + if err != nil { + entry.Warnf("update manager sql failed, error: %v", err) + return err + } + + // 更新状态表 + err = s.UpdateManagerSQLStatus(txDB, sql) + if err != nil { + entry.Warnf("update manager sql status failed, error: %v", err) + return err + } + } + + for _, sql := range queues { + err := s.RemoveSQLFromQueue(txDB, sql) + if err != nil { + entry.Warnf("remove manager sql queue failed, error: %v", err) + return err + } + } + + return nil + + }); err != nil { return } + + go handlerSQLAudit(sqlList, entry) + +} + +// todo: 错误处理 +func handlerSQLAudit(sqlList []*model.SQLManageRecord, entry *logrus.Entry) { + s := model.GetStorage() + sqlList, err := BatchAuditSQLs(sqlList, true) + if err != nil { + entry.Warnf("batch audit manager sql failed, error: %v", err) + } + // 设置高优先级 sqlList, err = SetSQLPriority(sqlList) if err != nil { - entry.Warnf("check sql priority sql failed, error: %v", err) - return + entry.Warnf("set sql priority sql failed, error: %v", err) } - // todo: 保证事务和错误处理 for _, sql := range sqlList { - err := s.UpdateManagerSQL(sql) + err = s.UpdateManagerSQLBySqlId(sql) if err != nil { entry.Warnf("update manager sql failed, error: %v", err) - return - } - - // 同时更新状态表 - err = s.UpdateManagerSQLStatus(sql) - if err != nil { - entry.Warnf("update manager sql status failed, error: %v", err) - return - } - - } - for _, sql := range queues { - err := s.RemoveSQLFromQueue(sql) - if err != nil { - entry.Warnf("remove manager sql queue failed, error: %v", err) - return + continue } } } From a4bf9938d90ebb7055eb360a1aa017701329a2ae Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 6 Sep 2024 16:54:35 +0800 Subject: [PATCH 2/5] =?UTF-8?q?modify=EF=BC=9A=20sql=20manage=20api=20add?= =?UTF-8?q?=20audit=5Fstatus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/api/controller/v2/sql_manage.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sqle/api/controller/v2/sql_manage.go b/sqle/api/controller/v2/sql_manage.go index 122a6ae54..d83007d7e 100644 --- a/sqle/api/controller/v2/sql_manage.go +++ b/sqle/api/controller/v2/sql_manage.go @@ -22,6 +22,7 @@ type SqlManage struct { InstanceName string `json:"instance_name"` SchemaName string `json:"schema_name"` AuditResult []*v1.AuditResult `json:"audit_result"` + AuditStatus string `json:"audit_status" enums:"being_audited"` FirstAppearTimeStamp string `json:"first_appear_timestamp"` LastReceiveTimeStamp string `json:"last_receive_timestamp"` FpCount uint64 `json:"fp_count"` From d3cc9bbbc778c46104cbd774759d277c28a4380e Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 6 Sep 2024 16:54:36 +0800 Subject: [PATCH 3/5] gen swagger: sql manage api add audit_status --- sqle/docs/docs.go | 6 ++++++ sqle/docs/swagger.json | 6 ++++++ sqle/docs/swagger.yaml | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/sqle/docs/docs.go b/sqle/docs/docs.go index e3707eb89..3c1cefa13 100644 --- a/sqle/docs/docs.go +++ b/sqle/docs/docs.go @@ -17015,6 +17015,12 @@ var doc = `{ "$ref": "#/definitions/v1.AuditResult" } }, + "audit_status": { + "type": "string", + "enum": [ + "being_audited" + ] + }, "endpoints": { "type": "string" }, diff --git a/sqle/docs/swagger.json b/sqle/docs/swagger.json index 5efe6701c..7aef077d8 100644 --- a/sqle/docs/swagger.json +++ b/sqle/docs/swagger.json @@ -16999,6 +16999,12 @@ "$ref": "#/definitions/v1.AuditResult" } }, + "audit_status": { + "type": "string", + "enum": [ + "being_audited" + ] + }, "endpoints": { "type": "string" }, diff --git a/sqle/docs/swagger.yaml b/sqle/docs/swagger.yaml index 03c32eaaa..865d8a186 100644 --- a/sqle/docs/swagger.yaml +++ b/sqle/docs/swagger.yaml @@ -4947,6 +4947,10 @@ definitions: items: $ref: '#/definitions/v1.AuditResult' type: array + audit_status: + enum: + - being_audited + type: string endpoints: type: string first_appear_timestamp: From 3a42db3bfcabe6ee772267c0e73eadfe82261277 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 6 Sep 2024 16:54:36 +0800 Subject: [PATCH 4/5] =?UTF-8?q?unit=20test=EF=BC=9Aaudit=20plan=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sqle/model/instance_audit_plan_list_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sqle/model/instance_audit_plan_list_test.go b/sqle/model/instance_audit_plan_list_test.go index 51fbe62d3..f7acf1139 100644 --- a/sqle/model/instance_audit_plan_list_test.go +++ b/sqle/model/instance_audit_plan_list_test.go @@ -32,7 +32,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -59,7 +59,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -87,7 +87,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -115,7 +115,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -144,7 +144,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls @@ -172,7 +172,7 @@ audit_plan_sqls.sql_fingerprint, audit_plan_sqls.sql_text, audit_plan_sqls.schema_name, audit_plan_sqls.info, -audit_plan_sqls.audit_results, +IF(audit_plan_sqls.audit_results IS NULL,'being_audited',audit_plan_sqls.audit_results) AS audit_results, audit_plan_sqls.priority FROM sql_manage_records AS audit_plan_sqls From 7133c9f91718779b17ef28aee9123de228cdc339 Mon Sep 17 00:00:00 2001 From: iwanghc Date: Fri, 6 Sep 2024 17:24:21 +0800 Subject: [PATCH 5/5] modify: modify handlerSQLAudit and modify UpdateManagerSQLBySqlId params --- sqle/model/instance_audit_plan.go | 6 +++--- sqle/server/auditplan/job_task_handler.go | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index 339288cb5..39aa139ec 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -499,9 +499,9 @@ func (s *Storage) UpdateManagerSQLStatus(txDB *gorm.DB, sql *SQLManageRecord) er return txDB.Exec(query, sql.SQLID).Error } -func (s *Storage) UpdateManagerSQLBySqlId(sql *SQLManageRecord) error { - err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sql.SQLID). - Updates(map[string]interface{}{"audit_level": sql.AuditLevel, "audit_results": sql.AuditResults, "priority": sql.Priority}).Error +func (s *Storage) UpdateManagerSQLBySqlId(sqlManageMap map[string]interface{}, sqlId string) error { + err := s.db.Model(&SQLManageRecord{}).Where("sql_id = ?", sqlId). + Updates(sqlManageMap).Error if err != nil { return err } diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index b74009623..6999e3da3 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -94,12 +94,12 @@ func (j *AuditPlanHandlerJob) HandlerSQL(entry *logrus.Entry) { return } - go handlerSQLAudit(sqlList, entry) + go handlerSQLAudit(entry, sqlList) } // todo: 错误处理 -func handlerSQLAudit(sqlList []*model.SQLManageRecord, entry *logrus.Entry) { +func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) { s := model.GetStorage() sqlList, err := BatchAuditSQLs(sqlList, true) if err != nil { @@ -111,7 +111,11 @@ func handlerSQLAudit(sqlList []*model.SQLManageRecord, entry *logrus.Entry) { entry.Warnf("set sql priority sql failed, error: %v", err) } for _, sql := range sqlList { - err = s.UpdateManagerSQLBySqlId(sql) + manageSqlParam := make(map[string]interface{}, 3) + manageSqlParam["audit_level"] = sql.AuditLevel + manageSqlParam["audit_results"] = sql.AuditResults + manageSqlParam["priority"] = sql.Priority + err = s.UpdateManagerSQLBySqlId(manageSqlParam, sql.SQLID) if err != nil { entry.Warnf("update manager sql failed, error: %v", err) continue