Skip to content

Commit

Permalink
feat: merge correlated events into one change record
Browse files Browse the repository at this point in the history
* aggregate timestamps of the events
  • Loading branch information
adityathebe committed Dec 5, 2023
1 parent 7b316a0 commit 9895d6f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
3 changes: 3 additions & 0 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type ChangeResult struct {
CreatedBy *string `json:"created_by"`
CreatedAt *time.Time `json:"created_at"`
Details map[string]interface{} `json:"details"`

// UpdateExisting indicates whether to update an existing change
UpdateExisting bool `json:"update_existing"`
}

func (r ChangeResult) AsMap() map[string]any {
Expand Down
20 changes: 13 additions & 7 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,19 @@ func updateCI(ctx api.ScrapeContext, ci models.ConfigItem) error {
}

func updateChange(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
for _, change := range result.Changes {
if change.Action == v1.Ignore {
for _, changeResult := range result.Changes {
if changeResult.Action == v1.Ignore {
continue
}

if change.Action == v1.Delete {
if err := deleteChangeHandler(ctx, change); err != nil {
if changeResult.Action == v1.Delete {
if err := deleteChangeHandler(ctx, changeResult); err != nil {
return err
}
continue
}

change := models.NewConfigChangeFromV1(*result, change)
change := models.NewConfigChangeFromV1(*result, changeResult)

if change.CreatedBy != nil {
person, err := FindPersonByEmail(ctx, ptr.ToString(change.CreatedBy))
Expand All @@ -174,8 +174,14 @@ func updateChange(ctx api.ScrapeContext, result *v1.ScrapeResult) error {

change.ConfigID = *id

if err := db.Create(change).Error; err != nil {
return err
if changeResult.UpdateExisting {
if err := db.Save(change).Error; err != nil {
return err
}
} else {
if err := db.Create(change).Error; err != nil {
return err
}
}
}

Expand Down
47 changes: 45 additions & 2 deletions scrapers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ var azureActivityExcludeOperations = []string{
"Microsoft.ContainerService/managedClusters/listClusterUserCredential/action",
}

// activityChangeRecord holds together the change result generated from an activity log
// along with the original activity log event.
type activityChangeRecord struct {
Result v1.ChangeResult
EventData *armmonitor.EventData
}

func (azure Scraper) fetchActivityLogs() v1.ScrapeResults {
logger.Debugf("fetching activity logs for subscription %s", azure.config.SubscriptionID)

Expand All @@ -197,6 +204,10 @@ func (azure Scraper) fetchActivityLogs() v1.ScrapeResults {
return append(results, v1.ScrapeResult{Error: fmt.Errorf("failed to initiate arm monitor client: %w", err)})
}

// corelatedActivities keeps together events that belong to the same "uber operation"
// https://learn.microsoft.com/en-us/rest/api/monitor/activity-logs/list?view=rest-monitor-2015-04-01&tabs=HTTP#eventdata
var corelatedActivities = map[string][]activityChangeRecord{}

filter := fmt.Sprintf("eventTimestamp ge '%s'", time.Now().UTC().Add(activityLogTimespan).Format(time.RFC3339))
pager := clientFactory.NewActivityLogsClient().NewListPager(filter, &armmonitor.ActivityLogsClientListOptions{Select: &activityLogFilter})
for pager.More() {
Expand All @@ -215,15 +226,47 @@ func (azure Scraper) fetchActivityLogs() v1.ScrapeResults {
ChangeType: utils.Deref(v.OperationName.Value),
CreatedAt: v.EventTimestamp,
Details: v1.NewJSON(*v),
ExternalChangeID: utils.Deref(v.EventDataID),
ExternalChangeID: utils.Deref(v.CorrelationID),
ExternalID: strings.ToLower(utils.Deref(v.ResourceID)),
ConfigType: getARMType(v.ResourceType.Value),
Severity: string(getSeverityFromReason(v)),
Source: ConfigTypePrefix + "ActivityLog",
Summary: utils.Deref(v.OperationName.LocalizedValue),
}
results = append(results, v1.ScrapeResult{Changes: []v1.ChangeResult{change}})
corelatedActivities[utils.Deref(v.CorrelationID)] = append(corelatedActivities[utils.Deref(v.CorrelationID)], activityChangeRecord{
Result: change,
EventData: v,
})
}

// Stop the pager because there's a bug in the SDK itself
// error: failed to read activity logs next page: invalid semicolon separator in query
// TODO:
break
}

// For the correlated activities, we merge some fields and aggregate the timestamps into a single change record
for _, changeRecords := range corelatedActivities {
change := changeRecords[0].Result // Use the latest event as the base
change.UpdateExisting = true // New events might have arrived, so we want to update the existing change in db

statusTimestamps := map[string]*time.Time{}
for i := range changeRecords {
change.CreatedAt = changeRecords[i].EventData.EventTimestamp // Use the oldest event's timestamp

status := utils.Deref(changeRecords[i].EventData.Status.Value)
if status == "" {
continue
}
statusTimestamps[strings.ToLower(status)] = changeRecords[i].EventData.EventTimestamp

if _, ok := change.Details["httpRequest"]; !ok {
change.Details["httpRequest"] = changeRecords[i].EventData
}
}

change.Details["timestamps"] = statusTimestamps
results = append(results, v1.ScrapeResult{Changes: []v1.ChangeResult{change}})
}

return results
Expand Down

0 comments on commit 9895d6f

Please sign in to comment.