Skip to content

Commit

Permalink
DEVP-3508 WIP activity_log_events
Browse files Browse the repository at this point in the history
  • Loading branch information
MickStanciu committed Oct 14, 2024
1 parent 7fa7204 commit 959a3f2
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 35 deletions.
16 changes: 8 additions & 8 deletions pkg/api/exporter_csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ func TestCSVExporterLastModifiedAt_should_return_latest_modified_at(t *testing.T
assert.NoError(t, err)

// Check the timestamp for the audits that doesn't have organisation_id
lastModifiedAt, err := exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), "role_123")
lastModifiedAt, err := exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), feed.DefaultSortingColumn, "role_123")
assert.NoError(t, err)
// Times are slightly lossy, convert to ISO string
assert.Equal(t, now.Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))

lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), "role_1234")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), feed.DefaultSortingColumn, "role_1234")
assert.NoError(t, err)
// Times are slightly lossy, convert to ISO string
assert.Equal(t, now.Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))
Expand Down Expand Up @@ -260,12 +260,12 @@ func TestCSVExporterLastModifiedAt_should_return_latest_modified_at(t *testing.T
assert.NoError(t, err)

// Check the timestamp for the audits that contains organisation_id
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), "role_123")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), feed.DefaultSortingColumn, "role_123")
assert.NoError(t, err)
// Times are slightly lossy, convert to ISO string
assert.Equal(t, now.Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))

lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), "role_1234")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, time.Now().Add(time.Hour*-30000), feed.DefaultSortingColumn, "role_1234")
assert.NoError(t, err)
// Times are slightly lossy, convert to ISO string
assert.Equal(t, now.Add(time.Hour*-2).Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))
Expand Down Expand Up @@ -306,12 +306,12 @@ func TestCSVExporterLastModifiedAt_should_return_modified_after_if_latest(t *tes
assert.NoError(t, err)

// Check the timestamp for the audits that doesn't have organisation_id
lastModifiedAt, err := exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), "role_123")
lastModifiedAt, err := exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), feed.DefaultSortingColumn, "role_123")
assert.NoError(t, err)
// Times are slightly lossy, converting to ISO string
assert.Equal(t, now.Add(time.Hour).Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))

lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), "role_124")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), feed.DefaultSortingColumn, "role_124")
assert.NoError(t, err)
// Times are slightly lossy, converting to ISO string
assert.Equal(t, now.Add(time.Hour).Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))
Expand Down Expand Up @@ -343,12 +343,12 @@ func TestCSVExporterLastModifiedAt_should_return_modified_after_if_latest(t *tes
assert.NoError(t, err)

// Check the timestamp for the audits that contains organisation_id
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), "role_123")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), feed.DefaultSortingColumn, "role_123")
assert.NoError(t, err)
// Times are slightly lossy, converting to ISO string
assert.Equal(t, now.Add(time.Hour).Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))

lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), "role_124")
lastModifiedAt, err = exporter.LastModifiedAt(inspectionFeed, now.Add(time.Hour), feed.DefaultSortingColumn, "role_124")
assert.NoError(t, err)
// Times are slightly lossy, converting to ISO string
assert.Equal(t, now.Add(time.Hour).Format(time.RFC3339), lastModifiedAt.Format(time.RFC3339))
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/feed/exporter_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ type modifiedAtRow struct {
}

// LastModifiedAt returns the latest stored modified at date for the feed
func (e *SQLExporter) LastModifiedAt(feed Feed, modifiedAfter time.Time, orgID string) (time.Time, error) {
func (e *SQLExporter) LastModifiedAt(feed Feed, modifiedAfter time.Time, columnName string, orgID string) (time.Time, error) {
latestRow := modifiedAtRow{}

var result *gorm.DB
result = e.DB.Table(feed.Name()).
Where("organisation_id = ?", orgID).
Order("modified_at DESC").
Order(fmt.Sprintf("%s DESC", columnName)).
Limit(1).
First(&latestRow)
if result.RowsAffected == 0 {
Expand All @@ -179,7 +179,7 @@ func (e *SQLExporter) LastModifiedAt(feed Feed, modifiedAfter time.Time, orgID s
// where there is no org_id defined.
result = e.DB.Table(feed.Name()).
Where("organisation_id IS NULL OR organisation_id = ''").
Order("modified_at DESC").
Order(fmt.Sprintf("%s DESC", columnName)).
Limit(1).
First(&latestRow)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Exporter interface {
UpdateRows(feed Feed, primaryKeys []string, element map[string]interface{}) (int64, error)

FinaliseExport(feed Feed, rows interface{}) error
LastModifiedAt(feed Feed, modifiedAfter time.Time, orgID string) (time.Time, error)
LastModifiedAt(feed Feed, modifiedAfter time.Time, columnName string, orgID string) (time.Time, error)
LastRecord(feed Feed, modifiedAfter time.Time, orgID string, sortColumn string) time.Time
WriteMedia(auditID string, mediaID string, contentType string, body []byte) error
DeleteRowsIfExist(feed Feed, query string, args ...interface{}) error
Expand Down
22 changes: 17 additions & 5 deletions pkg/internal/feed/feed_account_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/SafetyCulture/safetyculture-exporter/pkg/internal/events"
)

// const feedPath = "/accounts/history/v2/feed/activity_log_events"
//const feedPath = "/accounts/history/v2/feed/activity_log_events"

//const feedPath = "/accounts/history/v1/feed/activity_log_events_v2"

const feedPath = "/accounts/history/v1/feed/activity_log_events"

// AccountHistory represents a row from the account history feed
Expand All @@ -31,13 +34,15 @@ type AccountHistory struct {

// AccountHistoryFeed is a representation of the account history feed
type AccountHistoryFeed struct {
Incremental bool
Limit int
ExportedAt time.Time
Incremental bool
Limit int
SortingColumn string
}

// Name is the name of the feed
func (f *AccountHistoryFeed) Name() string {
return "activity_log_events"
return "account_histories"
}

// HasRemainingInformation returns true if the feed returns remaining items information
Expand Down Expand Up @@ -97,6 +102,12 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie
return events.WrapEventError(err, "init feed")
}

var err error
f.ExportedAt, err = exporter.LastModifiedAt(f, f.ExportedAt, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}

drainFn := func(resp *GetFeedResponse) error {
fmt.Printf(" > %s \n", resp.Metadata.NextPage)
var rows []*AccountHistory
Expand Down Expand Up @@ -134,7 +145,8 @@ func (f *AccountHistoryFeed) Export(ctx context.Context, apiClient *httpapi.Clie
req := &GetFeedRequest{
InitialURL: feedPath,
Params: GetFeedParams{
Limit: f.Limit,
Limit: f.Limit,
ModifiedAfter: f.ExportedAt,
},
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/feed/feed_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ActionFeed struct {
ModifiedAfter time.Time
Incremental bool
Limit int
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -122,7 +123,7 @@ func (f *ActionFeed) Export(ctx context.Context, apiClient *httpapi.Client, expo
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/feed/feed_action_assignees.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ActionAssignee struct {
type ActionAssigneeFeed struct {
ModifiedAfter time.Time
Incremental bool
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -118,7 +119,7 @@ func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *httpapi.Clie
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/internal/feed/feed_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ NOTE: these functions were migrated from various feed methods and adapted not to
They are called directly by the CMD from export cmd.package
*/
const maxConcurrentGoRoutines = 10
const DefaultSortingColumn = "modified_at"

// SafetyCultureFeedExporter defines the basic action in regard to the exporter
type SafetyCultureFeedExporter interface {
Expand Down Expand Up @@ -244,7 +245,8 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
e.getInspectionFeed(),
&UserFeed{},
&TemplateFeed{
Incremental: e.configuration.ExportIncremental,
Incremental: e.configuration.ExportIncremental,
SortingColumn: DefaultSortingColumn,
},
&TemplatePermissionFeed{
Incremental: e.configuration.ExportIncremental,
Expand All @@ -271,10 +273,12 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportActionLimit,
SortingColumn: DefaultSortingColumn,
},
&ActionAssigneeFeed{
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Incremental: e.configuration.ExportIncremental,
SortingColumn: DefaultSortingColumn,
},
&ActionTimelineItemFeed{
ModifiedAfter: e.configuration.ExportModifiedAfterTime,
Expand All @@ -292,6 +296,7 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportInspectionLimit,
ExportMedia: e.configuration.ExportMedia,
SortingColumn: DefaultSortingColumn,
},
&IssueFeed{
Incremental: false, // this was disabled on request. Issues API doesn't support modified After filters
Expand All @@ -315,8 +320,9 @@ func (e *ExporterFeedClient) GetFeeds() []Feed {
Limit: e.configuration.ExportIssueLimit,
},
&AccountHistoryFeed{
Incremental: true,
Limit: 250,
Incremental: true,
Limit: 250,
SortingColumn: "event_at",
},
}
}
Expand All @@ -331,6 +337,7 @@ func (e *ExporterFeedClient) getInspectionFeed() *InspectionFeed {
Incremental: e.configuration.ExportIncremental,
Limit: e.configuration.ExportInspectionLimit,
WebReportLink: e.configuration.ExportInspectionWebReportLink,
SortingColumn: DefaultSortingColumn,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/feed/feed_inspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type InspectionFeed struct {
Incremental bool
Limit int
WebReportLink string
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -175,7 +176,7 @@ func (f *InspectionFeed) Export(ctx context.Context, apiClient *httpapi.Client,
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/feed/feed_inspection_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type InspectionItemFeed struct {
Incremental bool
ExportMedia bool
Limit int
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -272,7 +273,7 @@ func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *httpapi.Clie
})

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/internal/feed/feed_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Template struct {
type TemplateFeed struct {
ModifiedAfter time.Time
Incremental bool
SortingColumn string
}

// Name is the name of the feed
Expand Down Expand Up @@ -132,7 +133,7 @@ func (f *TemplateFeed) Export(ctx context.Context, apiClient *httpapi.Client, ex
}

var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, f.SortingColumn, orgID)
if err != nil {
return events.NewEventErrorWithMessage(err, events.ErrorSeverityError, events.ErrorSubSystemDB, false, "unable to load modified after")
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/internal/feed/mocks/exporter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 959a3f2

Please sign in to comment.