From 9895d6f80cfead9810be45ff9da0704ce96c5c79 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 5 Dec 2023 18:41:27 +0545 Subject: [PATCH] feat: merge correlated events into one change record * aggregate timestamps of the events --- api/v1/interface.go | 3 +++ db/update.go | 20 ++++++++++++------ scrapers/azure/azure.go | 47 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/api/v1/interface.go b/api/v1/interface.go index 8df3f1d1..7262da96 100644 --- a/api/v1/interface.go +++ b/api/v1/interface.go @@ -78,6 +78,9 @@ type ChangeResult struct { CreatedBy *string `json:"created_by"` CreatedAt *time.Time `json:"created_at"` Details map[string]interface{} `json:"details"` + + // UpdateExisting indicates whether to update an existing change + UpdateExisting bool `json:"update_existing"` } func (r ChangeResult) AsMap() map[string]any { diff --git a/db/update.go b/db/update.go index 9c3dc17b..a205816f 100644 --- a/db/update.go +++ b/db/update.go @@ -138,19 +138,19 @@ func updateCI(ctx api.ScrapeContext, ci models.ConfigItem) error { } func updateChange(ctx api.ScrapeContext, result *v1.ScrapeResult) error { - for _, change := range result.Changes { - if change.Action == v1.Ignore { + for _, changeResult := range result.Changes { + if changeResult.Action == v1.Ignore { continue } - if change.Action == v1.Delete { - if err := deleteChangeHandler(ctx, change); err != nil { + if changeResult.Action == v1.Delete { + if err := deleteChangeHandler(ctx, changeResult); err != nil { return err } continue } - change := models.NewConfigChangeFromV1(*result, change) + change := models.NewConfigChangeFromV1(*result, changeResult) if change.CreatedBy != nil { person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy)) @@ -174,8 +174,14 @@ func updateChange(ctx api.ScrapeContext, result *v1.ScrapeResult) error { change.ConfigID = *id - if err := db.Create(change).Error; err != nil { - return err + if changeResult.UpdateExisting { + if err := db.Save(change).Error; err != nil { + return err + } + } else { + if err := db.Create(change).Error; err != nil { + return err + } } } diff --git a/scrapers/azure/azure.go b/scrapers/azure/azure.go index af97117b..9bda8894 100644 --- a/scrapers/azure/azure.go +++ b/scrapers/azure/azure.go @@ -187,6 +187,13 @@ var azureActivityExcludeOperations = []string{ "Microsoft.ContainerService/managedClusters/listClusterUserCredential/action", } +// activityChangeRecord holds together the change result generated from an activity log +// along with the original activity log event. +type activityChangeRecord struct { + Result v1.ChangeResult + EventData *armmonitor.EventData +} + func (azure Scraper) fetchActivityLogs() v1.ScrapeResults { logger.Debugf("fetching activity logs for subscription %s", azure.config.SubscriptionID) @@ -197,6 +204,10 @@ func (azure Scraper) fetchActivityLogs() v1.ScrapeResults { return append(results, v1.ScrapeResult{Error: fmt.Errorf("failed to initiate arm monitor client: %w", err)}) } + // corelatedActivities keeps together events that belong to the same "uber operation" + // https://learn.microsoft.com/en-us/rest/api/monitor/activity-logs/list?view=rest-monitor-2015-04-01&tabs=HTTP#eventdata + var corelatedActivities = map[string][]activityChangeRecord{} + filter := fmt.Sprintf("eventTimestamp ge '%s'", time.Now().UTC().Add(activityLogTimespan).Format(time.RFC3339)) pager := clientFactory.NewActivityLogsClient().NewListPager(filter, &armmonitor.ActivityLogsClientListOptions{Select: &activityLogFilter}) for pager.More() { @@ -215,15 +226,47 @@ func (azure Scraper) fetchActivityLogs() v1.ScrapeResults { ChangeType: utils.Deref(v.OperationName.Value), CreatedAt: v.EventTimestamp, Details: v1.NewJSON(*v), - ExternalChangeID: utils.Deref(v.EventDataID), + ExternalChangeID: utils.Deref(v.CorrelationID), ExternalID: strings.ToLower(utils.Deref(v.ResourceID)), ConfigType: getARMType(v.ResourceType.Value), Severity: string(getSeverityFromReason(v)), Source: ConfigTypePrefix + "ActivityLog", Summary: utils.Deref(v.OperationName.LocalizedValue), } - results = append(results, v1.ScrapeResult{Changes: []v1.ChangeResult{change}}) + corelatedActivities[utils.Deref(v.CorrelationID)] = append(corelatedActivities[utils.Deref(v.CorrelationID)], activityChangeRecord{ + Result: change, + EventData: v, + }) } + + // Stop the pager because there's a bug in the SDK itself + // error: failed to read activity logs next page: invalid semicolon separator in query + // TODO: + break + } + + // For the correlated activities, we merge some fields and aggregate the timestamps into a single change record + for _, changeRecords := range corelatedActivities { + change := changeRecords[0].Result // Use the latest event as the base + change.UpdateExisting = true // New events might have arrived, so we want to update the existing change in db + + statusTimestamps := map[string]*time.Time{} + for i := range changeRecords { + change.CreatedAt = changeRecords[i].EventData.EventTimestamp // Use the oldest event's timestamp + + status := utils.Deref(changeRecords[i].EventData.Status.Value) + if status == "" { + continue + } + statusTimestamps[strings.ToLower(status)] = changeRecords[i].EventData.EventTimestamp + + if _, ok := change.Details["httpRequest"]; !ok { + change.Details["httpRequest"] = changeRecords[i].EventData + } + } + + change.Details["timestamps"] = statusTimestamps + results = append(results, v1.ScrapeResult{Changes: []v1.ChangeResult{change}}) } return results