diff --git a/pkg/internal/feed/drain_feed.go b/pkg/internal/feed/drain_feed.go index b7bb151d..7a16d806 100644 --- a/pkg/internal/feed/drain_feed.go +++ b/pkg/internal/feed/drain_feed.go @@ -78,4 +78,7 @@ type GetFeedParams struct { // Applicable only for course progress Offset int `url:"offset,omitempty"` CompletionStatus string `url:"completion_status,omitempty"` + + // Applicable only for account history + CreatedAfter time.Time `url:"created_after,omitempty"` } diff --git a/pkg/internal/feed/exporter_sql.go b/pkg/internal/feed/exporter_sql.go index 972af93d..8fb7899f 100644 --- a/pkg/internal/feed/exporter_sql.go +++ b/pkg/internal/feed/exporter_sql.go @@ -191,7 +191,7 @@ func (e *SQLExporter) LastModifiedAt(feed Feed, modifiedAfter time.Time, columnN } // LastRecord returns the latest stored record the feed -func (e *SQLExporter) LastRecord(feed Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time { +func (e *SQLExporter) LastRecord(feed Feed, fallbackTime time.Time, orgID string, sortColumn string) time.Time { type Ts struct { TimeValue time.Time } @@ -205,7 +205,7 @@ func (e *SQLExporter) LastRecord(feed Feed, modifiedAfter time.Time, orgID strin return latestRow.TimeValue } - return modifiedAfter + return fallbackTime } // FinaliseExport closes out an export diff --git a/pkg/internal/feed/feed.go b/pkg/internal/feed/feed.go index 8ce9909e..378d6533 100644 --- a/pkg/internal/feed/feed.go +++ b/pkg/internal/feed/feed.go @@ -41,7 +41,7 @@ type Exporter interface { FinaliseExport(feed Feed, rows interface{}) error LastModifiedAt(feed Feed, modifiedAfter time.Time, columnName string, orgID string) (time.Time, error) - LastRecord(feed Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time + LastRecord(feed Feed, fallbackTime time.Time, orgID string, sortColumn string) time.Time WriteMedia(auditID string, mediaID string, contentType string, body []byte) error DeleteRowsIfExist(feed Feed, query string, args ...interface{}) error GetDuration() time.Duration diff --git a/pkg/internal/feed/feed_account_history.go b/pkg/internal/feed/feed_account_history.go index c1a676f5..85b647b3 100644 --- a/pkg/internal/feed/feed_account_history.go +++ b/pkg/internal/feed/feed_account_history.go @@ -14,6 +14,7 @@ import ( ) const feedPath = "/accounts/history/v2/feed/activity_log_events" +const accountHistorySortingColumn = "event_at" // AccountHistory represents a row from the account history feed type AccountHistory struct { @@ -30,10 +31,9 @@ type AccountHistory struct { // AccountHistoryFeed is a representation of the account history feed type AccountHistoryFeed struct { - ExportedAt time.Time - Incremental bool - Limit int - SortingColumn string + ExportedAt time.Time + Incremental bool + Limit int } // Name is the name of the feed @@ -88,6 +88,11 @@ func (f *AccountHistoryFeed) CreateSchema(exporter Exporter) error { // Export exports the feed to the supplied exporter func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Client, exporter Exporter, orgID string) error { l := logger.GetLogger().With("feed", f.Name(), "org_id", orgID) + s12OrgID := util.ConvertS12ToUUID(orgID) + if s12OrgID.IsNil() { + return fmt.Errorf("cannot convert organisation ID to UUID") + } + status := GetExporterStatus() if err := exporter.InitFeed(f, &InitFeedOptions{ @@ -98,10 +103,8 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie return events.WrapEventError(err, "init feed") } - var err error - f.ExportedAt, err = exporter.LastModifiedAt(f, f.ExportedAt, f.SortingColumn, orgID) - if err != nil { - return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after") + if f.Incremental { + f.ExportedAt = exporter.LastRecord(f, f.ExportedAt, s12OrgID.String(), accountHistorySortingColumn) } drainFn := func(resp *GetFeedResponse) error { @@ -140,8 +143,8 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie req := &GetFeedRequest{ InitialURL: feedPath, Params: GetFeedParams{ - Limit: f.Limit, - ModifiedAfter: f.ExportedAt, + Limit: f.Limit, + CreatedAfter: f.ExportedAt, }, } diff --git a/pkg/internal/feed/feed_action.go b/pkg/internal/feed/feed_action.go index 74c42a56..062e196f 100644 --- a/pkg/internal/feed/feed_action.go +++ b/pkg/internal/feed/feed_action.go @@ -46,7 +46,6 @@ type ActionFeed struct { ModifiedAfter time.Time Incremental bool Limit int - SortingColumn string } // Name is the name of the feed @@ -123,7 +122,7 @@ func (f *ActionFeed) Export(ctx context.Context, apiClient *httpapi.Client, expo } var err error - f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID) + f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID) if err != nil { return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after") } diff --git a/pkg/internal/feed/feed_action_assignees.go b/pkg/internal/feed/feed_action_assignees.go index cc4fe026..15956959 100644 --- a/pkg/internal/feed/feed_action_assignees.go +++ b/pkg/internal/feed/feed_action_assignees.go @@ -30,7 +30,6 @@ type ActionAssignee struct { type ActionAssigneeFeed struct { ModifiedAfter time.Time Incremental bool - SortingColumn string } // Name is the name of the feed @@ -119,7 +118,7 @@ func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *httpapi.Clie } var err error - f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID) + f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID) if err != nil { return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after") } diff --git a/pkg/internal/feed/feed_exporter.go b/pkg/internal/feed/feed_exporter.go index d2ab9791..eab5751e 100644 --- a/pkg/internal/feed/feed_exporter.go +++ b/pkg/internal/feed/feed_exporter.go @@ -245,8 +245,7 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { e.getInspectionFeed(), &UserFeed{}, &TemplateFeed{ - Incremental: e.configuration.ExportIncremental, - SortingColumn: DefaultSortingColumn, + Incremental: e.configuration.ExportIncremental, }, &TemplatePermissionFeed{ Incremental: e.configuration.ExportIncremental, @@ -273,12 +272,10 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { ModifiedAfter: e.configuration.ExportModifiedAfterTime, Incremental: e.configuration.ExportIncremental, Limit: e.configuration.ExportActionLimit, - SortingColumn: DefaultSortingColumn, }, &ActionAssigneeFeed{ ModifiedAfter: e.configuration.ExportModifiedAfterTime, Incremental: e.configuration.ExportIncremental, - SortingColumn: DefaultSortingColumn, }, &ActionTimelineItemFeed{ ModifiedAfter: e.configuration.ExportModifiedAfterTime, @@ -296,7 +293,6 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { Incremental: e.configuration.ExportIncremental, Limit: e.configuration.ExportInspectionLimit, ExportMedia: e.configuration.ExportMedia, - SortingColumn: DefaultSortingColumn, }, &IssueFeed{ Incremental: false, // this was disabled on request. Issues API doesn't support modified After filters @@ -320,9 +316,8 @@ func (e *ExporterFeedClient) GetFeeds() []Feed { Limit: e.configuration.ExportIssueLimit, }, &AccountHistoryFeed{ - Incremental: true, - Limit: 250, - SortingColumn: "event_at", + Incremental: true, + Limit: 250, }, } } diff --git a/pkg/internal/feed/feed_inspection_item.go b/pkg/internal/feed/feed_inspection_item.go index ae13632f..99400826 100644 --- a/pkg/internal/feed/feed_inspection_item.go +++ b/pkg/internal/feed/feed_inspection_item.go @@ -70,7 +70,6 @@ type InspectionItemFeed struct { Incremental bool ExportMedia bool Limit int - SortingColumn string } // Name is the name of the feed @@ -273,7 +272,7 @@ func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *httpapi.Clie }) var err error - f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID) + f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID) if err != nil { return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after") } diff --git a/pkg/internal/feed/feed_template.go b/pkg/internal/feed/feed_template.go index 21579a66..dc47a91a 100644 --- a/pkg/internal/feed/feed_template.go +++ b/pkg/internal/feed/feed_template.go @@ -33,7 +33,6 @@ type Template struct { type TemplateFeed struct { ModifiedAfter time.Time Incremental bool - SortingColumn string } // Name is the name of the feed @@ -133,7 +132,7 @@ func (f *TemplateFeed) Export(ctx context.Context, apiClient *httpapi.Client, ex } var err error - f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID) + f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, DefaultSortingColumn, orgID) if err != nil { return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after") } diff --git a/pkg/internal/feed/mocks/exporter_mock.go b/pkg/internal/feed/mocks/exporter_mock.go index 6062cc7c..6c0cf83c 100644 --- a/pkg/internal/feed/mocks/exporter_mock.go +++ b/pkg/internal/feed/mocks/exporter_mock.go @@ -136,8 +136,8 @@ func (_m *Exporter) LastModifiedAt(_a0 feed.Feed, modifiedAfter time.Time, colum } // LastRecord provides a mock function with given fields: _a0, modifiedAfter, orgID, sortColumn -func (_m *Exporter) LastRecord(_a0 feed.Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time { - ret := _m.Called(_a0, modifiedAfter, orgID, sortColumn) +func (_m *Exporter) LastRecord(_a0 feed.Feed, fallbackTime time.Time, orgID string, sortColumn string) time.Time { + ret := _m.Called(_a0, fallbackTime, orgID, sortColumn) if len(ret) == 0 { panic("no return value specified for LastRecord") @@ -145,7 +145,7 @@ func (_m *Exporter) LastRecord(_a0 feed.Feed, modifiedAfter time.Time, orgID str var r0 time.Time if rf, ok := ret.Get(0).(func(feed.Feed, time.Time, string, string) time.Time); ok { - r0 = rf(_a0, modifiedAfter, orgID, sortColumn) + r0 = rf(_a0, fallbackTime, orgID, sortColumn) } else { r0 = ret.Get(0).(time.Time) } diff --git a/pkg/internal/util/uuid.go b/pkg/internal/util/uuid.go new file mode 100644 index 00000000..3e1e27bd --- /dev/null +++ b/pkg/internal/util/uuid.go @@ -0,0 +1,14 @@ +package util + +import ( + "github.com/gofrs/uuid" + "strings" +) + +func ConvertS12ToUUID(s string) uuid.UUID { + idx := strings.LastIndex(s, "_") + if idx == -1 { + return uuid.Nil + } + return uuid.FromStringOrNil(s[idx+1:]) +}