Skip to content

Commit

Permalink
DEVP-3508 activity_log_events
Browse files Browse the repository at this point in the history
  • Loading branch information
MickStanciu committed Oct 15, 2024
1 parent 9e6c19a commit 6d60ff1
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 32 deletions.
3 changes: 3 additions & 0 deletions pkg/internal/feed/drain_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,7 @@ type GetFeedParams struct {
// Applicable only for course progress
Offset int `url:"offset,omitempty"`
CompletionStatus string `url:"completion_status,omitempty"`

// Applicable only for account history
CreatedAfter time.Time `url:"created_after,omitempty"`
}
4 changes: 2 additions & 2 deletions pkg/internal/feed/exporter_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (e *SQLExporter) LastModifiedAt(feed Feed, modifiedAfter time.Time, columnN
}

// LastRecord returns the latest stored record the feed
func (e *SQLExporter) LastRecord(feed Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time {
func (e *SQLExporter) LastRecord(feed Feed, fallbackTime time.Time, orgID string, sortColumn string) time.Time {
type Ts struct {
TimeValue time.Time
}
Expand All @@ -205,7 +205,7 @@ func (e *SQLExporter) LastRecord(feed Feed, modifiedAfter time.Time, orgID strin
return latestRow.TimeValue
}

return modifiedAfter
return fallbackTime
}

// FinaliseExport closes out an export
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Exporter interface {

FinaliseExport(feed Feed, rows interface{}) error
LastModifiedAt(feed Feed, modifiedAfter time.Time, columnName string, orgID string) (time.Time, error)
LastRecord(feed Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time
LastRecord(feed Feed, fallbackTime time.Time, orgID string, sortColumn string) time.Time
WriteMedia(auditID string, mediaID string, contentType string, body []byte) error
DeleteRowsIfExist(feed Feed, query string, args ...interface{}) error
GetDuration() time.Duration
Expand Down
23 changes: 13 additions & 10 deletions pkg/internal/feed/feed_account_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

const feedPath = "/accounts/history/v2/feed/activity_log_events"
const accountHistorySortingColumn = "event_at"

// AccountHistory represents a row from the account history feed
type AccountHistory struct {
Expand All @@ -30,10 +31,9 @@ type AccountHistory struct {

// AccountHistoryFeed is a representation of the account history feed
type AccountHistoryFeed struct {
ExportedAt time.Time
Incremental bool
Limit int
SortingColumn string
ExportedAt time.Time
Incremental bool
Limit int
}

// Name is the name of the feed
Expand Down Expand Up @@ -88,6 +88,11 @@ func (f *AccountHistoryFeed) CreateSchema(exporter Exporter) error {
// Export exports the feed to the supplied exporter
func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Client, exporter Exporter, orgID string) error {
l := logger.GetLogger().With("feed", f.Name(), "org_id", orgID)
s12OrgID := util.ConvertS12ToUUID(orgID)
if s12OrgID.IsNil() {
return fmt.Errorf("cannot convert organisation ID to UUID")
}

status := GetExporterStatus()

if err := exporter.InitFeed(f, &InitFeedOptions{
Expand All @@ -98,10 +103,8 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie
return events.WrapEventError(err, "init feed")
}

var err error
f.ExportedAt, err = exporter.LastModifiedAt(f, f.ExportedAt, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
if f.Incremental {
f.ExportedAt = exporter.LastRecord(f, f.ExportedAt, s12OrgID.String(), accountHistorySortingColumn)
}

drainFn := func(resp *GetFeedResponse) error {
Expand Down Expand Up @@ -140,8 +143,8 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie
req := &GetFeedRequest{
InitialURL: feedPath,
Params: GetFeedParams{
Limit: f.Limit,
ModifiedAfter: f.ExportedAt,
Limit: f.Limit,
CreatedAfter: f.ExportedAt,
},
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/feed/feed_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type ActionFeed struct {
ModifiedAfter time.Time
Incremental bool
Limit int
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -123,7 +122,7 @@ func (f *ActionFeed) Export(ctx context.Context, apiClient *httpapi.Client, expo
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/feed/feed_action_assignees.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type ActionAssignee struct {
type ActionAssigneeFeed struct {
ModifiedAfter time.Time
Incremental bool
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -119,7 +118,7 @@ func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *httpapi.Clie
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/internal/feed/feed_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
e.getInspectionFeed(),
&UserFeed{},
&TemplateFeed{
Incremental: e.configuration.ExportIncremental,
SortingColumn: DefaultSortingColumn,
Incremental: e.configuration.ExportIncremental,
},
&TemplatePermissionFeed{
Incremental: e.configuration.ExportIncremental,
Expand All @@ -273,12 +272,10 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportActionLimit,
SortingColumn: DefaultSortingColumn,
},
&ActionAssigneeFeed{
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Incremental: e.configuration.ExportIncremental,
SortingColumn: DefaultSortingColumn,
},
&ActionTimelineItemFeed{
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Expand All @@ -296,7 +293,6 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportInspectionLimit,
ExportMedia: e.configuration.ExportMedia,
SortingColumn: DefaultSortingColumn,
},
&IssueFeed{
Incremental: false, // this was disabled on request. Issues API doesn't support modified After filters
Expand All @@ -320,9 +316,8 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
Limit: e.configuration.ExportIssueLimit,
},
&AccountHistoryFeed{
Incremental: true,
Limit: 250,
SortingColumn: "event_at",
Incremental: true,
Limit: 250,
},
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/feed/feed_inspection_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type InspectionItemFeed struct {
Incremental bool
ExportMedia bool
Limit int
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -273,7 +272,7 @@ func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *httpapi.Clie
})

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/internal/feed/feed_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type Template struct {
type TemplateFeed struct {
ModifiedAfter time.Time
Incremental bool
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -133,7 +132,7 @@ func (f *TemplateFeed) Export(ctx context.Context, apiClient *httpapi.Client, ex
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/feed/mocks/exporter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/internal/util/uuid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package util

import (
"github.com/gofrs/uuid"
"strings"
)

func ConvertS12ToUUID(s string) uuid.UUID {
idx := strings.LastIndex(s, "_")
if idx == -1 {
return uuid.Nil
}
return uuid.FromStringOrNil(s[idx+1:])
}

0 comments on commit 6d60ff1

Please sign in to comment.