From 5cc68e27dc00f99c9d7cf46f3affb88d3c0d6cea Mon Sep 17 00:00:00 2001 From: Keepers Date: Mon, 23 Oct 2023 17:29:56 -0600 Subject: [PATCH] use count bus in kopia backups (#4482) uses the count bus in the kopia backup package. This currently duplicates counts that we're getting from the kopia stats. A later pr will remove the old stats entirely in favor of the counter. --- #### Does this PR need a docs update or release note? - [x] :no_entry: No #### Type of change - [x] :broom: Tech Debt/Cleanup #### Test Plan - [x] :zap: Unit test - [x] :green_heart: E2E --- src/internal/kopia/data_collection_test.go | 10 +- src/internal/kopia/inject/inject.go | 2 + src/internal/kopia/merge_collection_test.go | 9 +- src/internal/kopia/upload.go | 4 + src/internal/kopia/upload_test.go | 23 ++-- src/internal/kopia/wrapper.go | 16 ++- src/internal/kopia/wrapper_scale_test.go | 11 +- src/internal/kopia/wrapper_test.go | 134 ++++++++++++++------ src/internal/operations/backup.go | 18 ++- src/internal/operations/backup_test.go | 17 ++- src/internal/stats/stats.go | 7 + src/internal/streamstore/streamstore.go | 4 +- src/pkg/count/count.go | 8 ++ src/pkg/count/keys.go | 26 +++- 14 files changed, 220 insertions(+), 69 deletions(-) diff --git a/src/internal/kopia/data_collection_test.go b/src/internal/kopia/data_collection_test.go index 4b1b4a4b27..ab49c994db 100644 --- a/src/internal/kopia/data_collection_test.go +++ b/src/internal/kopia/data_collection_test.go @@ -16,7 +16,9 @@ import ( "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" dataMock "github.com/alcionai/corso/src/internal/data/mock" + istats "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -395,12 +397,16 @@ func (suite *KopiaDataCollectionUnitSuite) TestFetchItemByName() { defer flush() root := getLayout(t, test.inputSerializationVersion) - c := &i64counter{} + + counter := count.New() + c := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } col := &kopiaDataCollection{ path: pth, dir: root, - counter: c, + counter: &c, expectedVersion: readers.DefaultSerializationVersion, } diff --git a/src/internal/kopia/inject/inject.go b/src/internal/kopia/inject/inject.go index 3011a79e72..09425bc9bb 100644 --- a/src/internal/kopia/inject/inject.go +++ b/src/internal/kopia/inject/inject.go @@ -8,6 +8,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/identity" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -23,6 +24,7 @@ type ( tags map[string]string, buildTreeWithBase bool, errs *fault.Bus, + counter *count.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) } diff --git a/src/internal/kopia/merge_collection_test.go b/src/internal/kopia/merge_collection_test.go index fefbfbb15b..f18cae256a 100644 --- a/src/internal/kopia/merge_collection_test.go +++ b/src/internal/kopia/merge_collection_test.go @@ -15,7 +15,9 @@ import ( "github.com/alcionai/corso/src/internal/common/readers" "github.com/alcionai/corso/src/internal/data" "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" + istats "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/tester" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -271,7 +273,10 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { ctx, flush := tester.NewContext(t) defer flush() - c := &i64counter{} + counter := count.New() + c := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } dc := mergeCollection{fullPath: pth} @@ -279,7 +284,7 @@ func (suite *MergeCollectionUnitSuite) TestFetchItemByName() { col := &kopiaDataCollection{ path: pth, dir: layout(t), - counter: c, + counter: &c, expectedVersion: readers.DefaultSerializationVersion, } diff --git a/src/internal/kopia/upload.go b/src/internal/kopia/upload.go index d99fe9cd39..52a06e7328 100644 --- a/src/internal/kopia/upload.go +++ b/src/internal/kopia/upload.go @@ -59,6 +59,7 @@ type corsoProgress struct { mu sync.RWMutex totalBytes int64 errs *fault.Bus + counter *count.Bus // expectedIgnoredErrors is a count of error cases caught in the Error wrapper // which are well known and actually ignorable. At the end of a run, if the // manifest ignored error count is equal to this count, then everything is good. @@ -186,6 +187,7 @@ func (cp *corsoProgress) FinishedHashingFile(fname string, bs int64) { "finished hashing file", "path", clues.Hide(path.Elements(sl[2:]))) + cp.counter.Add(count.PersistedHashedBytes, bs) atomic.AddInt64(&cp.totalBytes, bs) } @@ -214,7 +216,9 @@ func (cp *corsoProgress) Error(relpath string, err error, isIgnored bool) { // delta query and a fetch. This is our next point of error // handling, where we can identify and skip over the case. if clues.HasLabel(err, graph.LabelsSkippable) { + cp.counter.Inc(count.PersistenceExpectedErrors) cp.incExpectedErrs() + return } diff --git a/src/internal/kopia/upload_test.go b/src/internal/kopia/upload_test.go index c0f713bc9a..6512b74cf0 100644 --- a/src/internal/kopia/upload_test.go +++ b/src/internal/kopia/upload_test.go @@ -24,6 +24,7 @@ import ( "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/identity" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -569,6 +570,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { deets: bd, pending: map[string]*itemDetails{}, errs: fault.New(true), + counter: count.New(), } ci := test.cachedItems(suite.targetFileName, suite.targetFilePath) @@ -579,6 +581,7 @@ func (suite *CorsoProgressUnitSuite) TestFinishedHashingFile() { assert.Empty(t, cp.pending) assert.Equal(t, test.expectedBytes, cp.totalBytes) + assert.Equal(t, test.expectedBytes, cp.counter.Get(count.PersistedHashedBytes)) }) } } @@ -2669,7 +2672,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP getBaseSnapshot := func() (fs.Entry, map[string]*int) { counters := map[string]*int{} - folder, count := newMockStaticDirectory( + folder, dirCount := newMockStaticDirectory( encodeElements(folderID3)[0], []fs.Entry{ virtualfs.StreamingFileWithModTimeFromReader( @@ -2681,9 +2684,9 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP time.Time{}, io.NopCloser(bytes.NewReader(fileData6))), }) - counters[folderID3] = count + counters[folderID3] = dirCount - folder, count = newMockStaticDirectory( + folder, dirCount = newMockStaticDirectory( encodeElements(folderID2)[0], []fs.Entry{ virtualfs.StreamingFileWithModTimeFromReader( @@ -2696,14 +2699,14 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP io.NopCloser(bytes.NewReader(fileData4))), folder, }) - counters[folderID2] = count + counters[folderID2] = dirCount - folder4, count := newMockStaticDirectory( + folder4, dirCount := newMockStaticDirectory( encodeElements(folderID4)[0], []fs.Entry{}) - counters[folderID4] = count + counters[folderID4] = dirCount - folder, count = newMockStaticDirectory( + folder, dirCount = newMockStaticDirectory( encodeElements(folderID1)[0], []fs.Entry{ virtualfs.StreamingFileWithModTimeFromReader( @@ -2717,9 +2720,9 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP folder, folder4, }) - counters[folderID1] = count + counters[folderID1] = dirCount - folder5, count := newMockStaticDirectory( + folder5, dirCount := newMockStaticDirectory( encodeElements(folderID5)[0], []fs.Entry{ virtualfs.StreamingFileWithModTimeFromReader( @@ -2731,7 +2734,7 @@ func (suite *HierarchyBuilderUnitSuite) TestBuildDirectoryTree_SelectiveSubtreeP time.Time{}, io.NopCloser(bytes.NewReader(fileData8))), }) - counters[folderID5] = count + counters[folderID5] = dirCount return baseWithChildren( prefixFolders, diff --git a/src/internal/kopia/wrapper.go b/src/internal/kopia/wrapper.go index 10523de6c6..21b5f63dca 100644 --- a/src/internal/kopia/wrapper.go +++ b/src/internal/kopia/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -76,6 +77,13 @@ func manifestToStats( progress *corsoProgress, uploadCount *stats.ByteCounter, ) BackupStats { + progress.counter.Add(count.PersistedFiles, int64(man.Stats.TotalFileCount)) + progress.counter.Add(count.PersistedCachedFiles, int64(man.Stats.CachedFiles)) + progress.counter.Add(count.PersistedNonCachedFiles, int64(man.Stats.NonCachedFiles)) + progress.counter.Add(count.PersistedDirectories, int64(man.Stats.TotalDirectoryCount)) + progress.counter.Add(count.PersistenceErrors, int64(man.Stats.ErrorCount)) + progress.counter.Add(count.PersistenceIgnoredErrors, int64(man.Stats.IgnoredErrorCount)) + return BackupStats{ SnapshotID: string(man.ID), @@ -145,6 +153,7 @@ func (w Wrapper) ConsumeBackupCollections( additionalTags map[string]string, buildTreeWithBase bool, errs *fault.Bus, + counter *count.Bus, ) (*BackupStats, *details.Builder, DetailsMergeInfoer, error) { if w.c == nil { return nil, nil, nil, clues.Stack(errNotConnected).WithClues(ctx) @@ -163,6 +172,7 @@ func (w Wrapper) ConsumeBackupCollections( deets: &details.Builder{}, toMerge: newMergeDetails(), errs: errs, + counter: counter, } // When running an incremental backup, we need to pass the prior @@ -227,7 +237,11 @@ func (w Wrapper) makeSnapshotWithRoot( ) (*BackupStats, error) { var ( man *snapshot.Manifest - bc = &stats.ByteCounter{} + bc = &stats.ByteCounter{ + // duplicate the count in the progress count.Bus. Later we can + // replace the ByteCounter with the progress counter entirely. + Counter: progress.counter.AdderFor(count.PersistedUploadedBytes), + } ) snapIDs := make([]manifest.ID, 0, len(prevSnapEntries)) // just for logging diff --git a/src/internal/kopia/wrapper_scale_test.go b/src/internal/kopia/wrapper_scale_test.go index 16dbd3c6d0..d6ab99bae5 100644 --- a/src/internal/kopia/wrapper_scale_test.go +++ b/src/internal/kopia/wrapper_scale_test.go @@ -15,6 +15,7 @@ import ( exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/identity" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -99,6 +100,7 @@ func BenchmarkHierarchyMerge(b *testing.B) { base ManifestEntry, ) ManifestEntry { bbs := test.baseBackups(base) + counter := count.New() stats, _, _, err := w.ConsumeBackupCollections( ctx, @@ -108,11 +110,14 @@ func BenchmarkHierarchyMerge(b *testing.B) { nil, nil, true, - fault.New(true)) + fault.New(true), + counter) require.NoError(t, err, clues.ToCore(err)) - assert.Equal(t, 0, stats.IgnoredErrorCount) - assert.Equal(t, 0, stats.ErrorCount) + assert.Zero(t, stats.IgnoredErrorCount) + assert.Zero(t, stats.ErrorCount) + assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors)) + assert.Zero(t, counter.Get(count.PersistenceErrors)) assert.False(t, stats.Incomplete) snap, err := snapshot.LoadSnapshot( diff --git a/src/internal/kopia/wrapper_test.go b/src/internal/kopia/wrapper_test.go index b1578d7461..9ea0936263 100644 --- a/src/internal/kopia/wrapper_test.go +++ b/src/internal/kopia/wrapper_test.go @@ -27,10 +27,12 @@ import ( dataMock "github.com/alcionai/corso/src/internal/data/mock" "github.com/alcionai/corso/src/internal/m365/collection/drive/metadata" exchMock "github.com/alcionai/corso/src/internal/m365/service/exchange/mock" + istats "github.com/alcionai/corso/src/internal/stats" "github.com/alcionai/corso/src/internal/tester" "github.com/alcionai/corso/src/pkg/backup/details" "github.com/alcionai/corso/src/pkg/backup/identity" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/logger" "github.com/alcionai/corso/src/pkg/path" @@ -73,11 +75,11 @@ func testForFiles( ) { t.Helper() - count := 0 + fCount := 0 for _, c := range collections { for s := range c.Items(ctx, fault.New(true)) { - count++ + fCount++ fullPath, err := c.FullPath().AppendItem(s.ID()) require.NoError(t, err, clues.ToCore(err)) @@ -96,7 +98,7 @@ func testForFiles( } } - assert.Equal(t, len(expected), count) + assert.Equal(t, len(expected), fCount) } func checkSnapshotTags( @@ -883,6 +885,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { defer flush() bbs := test.baseBackups(base) + counter := count.New() stats, deets, deetsMerger, err := suite.w.ConsumeBackupCollections( ctx, @@ -892,27 +895,45 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections() { nil, tags, true, - fault.New(true)) + fault.New(true), + counter) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files") + assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedFiles), "total files") assert.Equal(t, test.expectedUploadedFiles, stats.UncachedFileCount, "uncached files") + assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedNonCachedFiles), "uncached files") assert.Equal(t, test.expectedCachedFiles, stats.CachedFileCount, "cached files") + assert.Equal(t, int64(test.expectedCachedFiles), counter.Get(count.PersistedCachedFiles), "cached files") assert.Equal(t, 4+len(test.collections), stats.TotalDirectoryCount, "directory count") - assert.Equal(t, 0, stats.IgnoredErrorCount) - assert.Equal(t, 0, stats.ErrorCount) + assert.Equal(t, int64(4+len(test.collections)), counter.Get(count.PersistedDirectories), "directory count") + assert.Zero(t, stats.IgnoredErrorCount, "ignored errors") + assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors") + assert.Zero(t, stats.ErrorCount, "errors") + assert.Zero(t, counter.Get(count.PersistenceErrors), "errors") assert.False(t, stats.Incomplete) test.hashedBytesCheck(t, stats.TotalHashedBytes, "hashed bytes") + test.hashedBytesCheck(t, counter.Get(count.PersistedHashedBytes), "hashed bytes") assert.LessOrEqual( t, test.uploadedBytes[0], stats.TotalUploadedBytes, "low end of uploaded bytes") + assert.LessOrEqual( + t, + test.uploadedBytes[0], + counter.Get(count.PersistedUploadedBytes), + "low end of uploaded bytes") assert.GreaterOrEqual( t, test.uploadedBytes[1], stats.TotalUploadedBytes, "high end of uploaded bytes") + assert.GreaterOrEqual( + t, + test.uploadedBytes[1], + counter.Get(count.PersistedUploadedBytes), + "high end of uploaded bytes") if test.expectMerge { assert.Empty(t, deets.Details().Entries, "details entries") @@ -1183,6 +1204,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() { suite.Run(test.name, func() { t := suite.T() collections := test.cols() + counter := count.New() stats, deets, prevShortRefs, err := suite.w.ConsumeBackupCollections( suite.ctx, @@ -1192,15 +1214,22 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_NoDetailsForMeta() { nil, tags, true, - fault.New(true)) + fault.New(true), + counter) assert.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedUploadedFiles, stats.TotalFileCount, "total files") + assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedFiles), "total files") assert.Equal(t, test.expectedUploadedFiles, stats.UncachedFileCount, "uncached files") + assert.Equal(t, int64(test.expectedUploadedFiles), counter.Get(count.PersistedNonCachedFiles), "uncached files") assert.Equal(t, test.expectedCachedFiles, stats.CachedFileCount, "cached files") - assert.Equal(t, 5, stats.TotalDirectoryCount) - assert.Equal(t, 0, stats.IgnoredErrorCount) - assert.Equal(t, 0, stats.ErrorCount) + assert.Equal(t, int64(test.expectedCachedFiles), counter.Get(count.PersistedCachedFiles), "cached files") + assert.Equal(t, 5, stats.TotalDirectoryCount, "uploaded directories") + assert.Equal(t, int64(5), counter.Get(count.PersistedDirectories), "uploaded directories") + assert.Zero(t, stats.IgnoredErrorCount, "ignored errors") + assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors") + assert.Zero(t, stats.ErrorCount, "errors") + assert.Zero(t, counter.Get(count.PersistenceErrors), "errors") assert.False(t, stats.Incomplete) // 47 file and 1 folder entries. @@ -1280,7 +1309,8 @@ func (suite *KopiaIntegrationSuite) TestRestoreAfterCompressionChange() { nil, nil, true, - fault.New(true)) + fault.New(true), + count.New()) require.NoError(t, err, clues.ToCore(err)) err = k.Compression(ctx, "gzip") @@ -1365,6 +1395,7 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { } errs := fault.New(true) + counter := count.New() stats, deets, _, err := suite.w.ConsumeBackupCollections( suite.ctx, @@ -1374,12 +1405,17 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { nil, nil, true, - errs) + errs, + counter) require.Error(t, err, clues.ToCore(err)) - assert.Equal(t, 0, stats.ErrorCount, "error count") + assert.Zero(t, stats.ErrorCount, "error count") + assert.Zero(t, counter.Get(count.PersistenceErrors), "error count") assert.Equal(t, 5, stats.TotalFileCount, "total files") + assert.Equal(t, int64(5), counter.Get(count.PersistedFiles), "total files") assert.Equal(t, 6, stats.TotalDirectoryCount, "total directories") - assert.Equal(t, 0, stats.IgnoredErrorCount, "ignored errors") + assert.Equal(t, int64(6), counter.Get(count.PersistedDirectories), "total directories") + assert.Zero(t, stats.IgnoredErrorCount, "ignored errors") + assert.Zero(t, counter.Get(count.PersistenceIgnoredErrors), "ignored errors") assert.Equal(t, 1, len(errs.Errors().Recovered), "recovered errors") assert.False(t, stats.Incomplete, "incomplete") // 5 file and 2 folder entries. @@ -1388,7 +1424,9 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { failedPath, err := suite.storePath2.AppendItem(testFileName4) require.NoError(t, err, clues.ToCore(err)) - ic := i64counter{} + ic := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } dcs, err := suite.w.ProduceRestoreCollections( suite.ctx, @@ -1411,7 +1449,12 @@ func (suite *KopiaIntegrationSuite) TestBackupCollections_ReaderError() { // Files that had an error shouldn't make a dir entry in kopia. If they do we // may run into kopia-assisted incrementals issues because only mod time and // not file size is checked for StreamingFiles. - assert.ErrorIs(t, errs.Failure(), data.ErrNotFound, "errored file is restorable", clues.ToCore(err)) + assert.ErrorIs( + t, + errs.Failure(), + data.ErrNotFound, + "errored file is restorable", + clues.ToCore(err)) } type backedupFile struct { @@ -1450,7 +1493,8 @@ func (suite *KopiaIntegrationSuite) TestBackupCollectionsHandlesNoCollections() nil, nil, true, - fault.New(true)) + fault.New(true), + count.New()) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, BackupStats{}, *s) @@ -1600,6 +1644,7 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { } r := identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory) + counter := count.New() // Other tests check basic things about deets so not doing that again here. stats, _, _, err := suite.w.ConsumeBackupCollections( @@ -1610,12 +1655,17 @@ func (suite *KopiaSimpleRepoIntegrationSuite) SetupTest() { nil, nil, false, - fault.New(true)) + fault.New(true), + counter) require.NoError(t, err, clues.ToCore(err)) - require.Equal(t, stats.ErrorCount, 0) - require.Equal(t, stats.TotalFileCount, expectedFiles) - require.Equal(t, stats.TotalDirectoryCount, expectedDirs) - require.Equal(t, stats.IgnoredErrorCount, 0) + require.Zero(t, stats.ErrorCount) + require.Zero(t, counter.Get(count.PersistenceErrors)) + require.Zero(t, stats.IgnoredErrorCount) + require.Zero(t, counter.Get(count.PersistenceIgnoredErrors)) + require.Equal(t, expectedFiles, stats.TotalFileCount) + require.Equal(t, int64(expectedFiles), counter.Get(count.PersistedFiles)) + require.Equal(t, expectedDirs, stats.TotalDirectoryCount) + require.Equal(t, int64(expectedFiles), counter.Get(count.PersistedDirectories)) require.False(t, stats.Incomplete) suite.snapshotID = manifest.ID(stats.SnapshotID) @@ -1627,14 +1677,6 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TearDownTest() { logger.Flush(suite.ctx) } -type i64counter struct { - i int64 -} - -func (c *i64counter) Count(i int64) { - c.i += i -} - func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { r := identity.NewReason(testTenant, testUser, path.ExchangeService, path.EmailCategory) @@ -1729,6 +1771,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { }) } + counter := count.New() + stats, _, _, err := suite.w.ConsumeBackupCollections( suite.ctx, []identity.Reasoner{r}, @@ -1741,10 +1785,13 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { excluded, nil, true, - fault.New(true)) + fault.New(true), + counter) require.NoError(t, err, clues.ToCore(err)) assert.Equal(t, test.expectedCachedItems, stats.CachedFileCount) assert.Equal(t, test.expectedUncachedItems, stats.UncachedFileCount) + assert.Equal(t, int64(test.expectedCachedItems), counter.Get(count.PersistedCachedFiles)) + assert.Equal(t, int64(test.expectedUncachedItems), counter.Get(count.PersistedNonCachedFiles)) test.backupIDCheck(t, stats.SnapshotID) @@ -1752,7 +1799,9 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestBackupExcludeItem() { return } - ic := i64counter{} + ic := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } dcs, err := suite.w.ProduceRestoreCollections( suite.ctx, @@ -1871,7 +1920,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() { expected[pth.String()] = item.data } - ic := i64counter{} + counter := count.New() + ic := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } result, err := suite.w.ProduceRestoreCollections( suite.ctx, @@ -1902,7 +1954,8 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections() { } assert.Len(t, result, test.expectedCollections) - assert.Less(t, int64(0), ic.i) + assert.Less(t, int64(0), ic.NumBytes) + assert.Less(t, int64(0), counter.Get(count.PersistedUploadedBytes)) testForFiles(t, ctx, expected, result) }) } @@ -2010,7 +2063,10 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Path expected[itemPath.String()] = item.data } - ic := i64counter{} + counter := count.New() + ic := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } result, err := suite.w.ProduceRestoreCollections( suite.ctx, @@ -2055,9 +2111,11 @@ func (suite *KopiaSimpleRepoIntegrationSuite) TestProduceRestoreCollections_Fetc }, } - // Really only interested in getting the collection so we can call fetch on - // it. - ic := i64counter{} + // Really only interested in getting the collection so we can call fetch on it. + counter := count.New() + ic := istats.ByteCounter{ + Counter: counter.AdderFor(count.PersistedUploadedBytes), + } result, err := suite.w.ProduceRestoreCollections( suite.ctx, diff --git a/src/internal/operations/backup.go b/src/internal/operations/backup.go index 60e3e64d34..5635bc6f40 100644 --- a/src/internal/operations/backup.go +++ b/src/internal/operations/backup.go @@ -68,6 +68,8 @@ type BackupResults struct { stats.ReadWrites stats.StartAndEndTime BackupID model.StableID `json:"backupID"` + // keys are found in /pkg/count/keys.go + Counts map[string]int64 `json:"counts"` } // NewBackupOperation constructs and validates a backup operation. @@ -202,6 +204,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { ctx, flushMetrics := events.NewMetrics(ctx, logger.Writer{Ctx: ctx}) defer flushMetrics() + // for cases where we can't pass the counter down as part of a func call. ctx = count.Embed(ctx, op.Counter) // Check if the protected resource has the service enabled in order for us @@ -294,7 +297,7 @@ func (op *BackupOperation) Run(ctx context.Context) (err error) { // Persistence // ----- - err = op.persistResults(startTime, &opStats) + err = op.persistResults(startTime, &opStats, op.Counter) if err != nil { op.Errors.Fail(clues.Wrap(err, "persisting backup results")) return op.Errors.Failure() @@ -347,6 +350,7 @@ func (op *BackupOperation) do( Info("backing up selection") // should always be 1, since backups are 1:1 with resourceOwners. + // TODO: this is outdated and needs to be removed. opStats.resourceCount = 1 kbf, err := op.kopia.NewBaseFinder(op.store) @@ -409,7 +413,8 @@ func (op *BackupOperation) do( ssmb, backupID, op.incremental && canUseMetadata && canUsePreviousBackup, - op.Errors) + op.Errors, + op.Counter) if err != nil { return nil, clues.Wrap(err, "persisting collection backups") } @@ -499,6 +504,7 @@ func consumeBackupCollections( backupID model.StableID, isIncremental bool, errs *fault.Bus, + counter *count.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) { ctx = clues.Add( ctx, @@ -521,7 +527,8 @@ func consumeBackupCollections( pmr, tags, isIncremental, - errs) + errs, + counter) if err != nil { if kopiaStats == nil { return nil, nil, nil, clues.Stack(err) @@ -799,6 +806,7 @@ func mergeDetails( func (op *BackupOperation) persistResults( started time.Time, opStats *backupStats, + counter *count.Bus, ) error { op.Results.StartedAt = started op.Results.CompletedAt = time.Now() @@ -816,6 +824,10 @@ func (op *BackupOperation) persistResults( return clues.New("backup persistence never completed") } + // the summary of all counts collected during backup + op.Results.Counts = counter.TotalValues() + + // legacy counting system op.Results.BytesRead = opStats.k.TotalHashedBytes op.Results.BytesUploaded = opStats.k.TotalUploadedBytes op.Results.ItemsWritten = opStats.k.TotalFileCount diff --git a/src/internal/operations/backup_test.go b/src/internal/operations/backup_test.go index afb81aa293..fda5c5fb2f 100644 --- a/src/internal/operations/backup_test.go +++ b/src/internal/operations/backup_test.go @@ -39,6 +39,7 @@ import ( "github.com/alcionai/corso/src/pkg/backup/metadata" "github.com/alcionai/corso/src/pkg/control" "github.com/alcionai/corso/src/pkg/control/repository" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/extensions" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" @@ -136,6 +137,7 @@ func (mbu mockBackupConsumer) ConsumeBackupCollections( tags map[string]string, buildTreeWithBase bool, errs *fault.Bus, + counter *count.Bus, ) (*kopia.BackupStats, *details.Builder, kopia.DetailsMergeInfoer, error) { if mbu.checkFunc != nil { mbu.checkFunc(backupReasons, bases, cs, tags, buildTreeWithBase) @@ -432,14 +434,14 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_PersistResults() { op.Errors.Fail(test.fail) - test.expectErr(t, op.persistResults(now, &test.stats)) + // op.Counter is not incremented in this test. + test.expectErr(t, op.persistResults(now, &test.stats, count.New())) assert.Equal(t, test.expectStatus.String(), op.Status.String(), "status") - assert.Equal(t, test.stats.ctrl.Successes, op.Results.ItemsRead, "items read") - assert.Equal(t, test.stats.k.TotalFileCount, op.Results.ItemsWritten, "items written") - assert.Equal(t, test.stats.k.TotalHashedBytes, op.Results.BytesRead, "bytes read") - assert.Equal(t, test.stats.k.TotalUploadedBytes, op.Results.BytesUploaded, "bytes written") - assert.Equal(t, test.stats.resourceCount, op.Results.ResourceOwners, "resource owners") + assert.Equal(t, op.Results.ItemsRead, test.stats.ctrl.Successes, "items read") + assert.Equal(t, op.Results.ItemsWritten, test.stats.k.TotalFileCount, "items written") + assert.Equal(t, op.Results.BytesRead, test.stats.k.TotalHashedBytes, "bytes read") + assert.Equal(t, op.Results.BytesUploaded, test.stats.k.TotalUploadedBytes, "bytes written") assert.Equal(t, now, op.Results.StartedAt, "started at") assert.Less(t, now, op.Results.CompletedAt, "completed at") }) @@ -525,7 +527,8 @@ func (suite *BackupOpUnitSuite) TestBackupOperation_ConsumeBackupDataCollections nil, backupID, true, - fault.New(true)) + fault.New(true), + count.New()) } func (suite *BackupOpUnitSuite) TestBackupOperation_MergeBackupDetails_AddsItems() { diff --git a/src/internal/stats/stats.go b/src/internal/stats/stats.go index 69f71d65a0..4ecc5a811f 100644 --- a/src/internal/stats/stats.go +++ b/src/internal/stats/stats.go @@ -23,12 +23,19 @@ type StartAndEndTime struct { CompletedAt time.Time `json:"completedAt"` } +type Counter func(numBytes int64) + type ByteCounter struct { NumBytes int64 + Counter Counter } func (bc *ByteCounter) Count(i int64) { atomic.AddInt64(&bc.NumBytes, i) + + if bc.Counter != nil { + bc.Counter(i) + } } type SkippedCounts struct { diff --git a/src/internal/streamstore/streamstore.go b/src/internal/streamstore/streamstore.go index 5e704f9fff..44a5eb986c 100644 --- a/src/internal/streamstore/streamstore.go +++ b/src/internal/streamstore/streamstore.go @@ -15,6 +15,7 @@ import ( "github.com/alcionai/corso/src/internal/kopia" "github.com/alcionai/corso/src/internal/kopia/inject" "github.com/alcionai/corso/src/internal/stats" + "github.com/alcionai/corso/src/pkg/count" "github.com/alcionai/corso/src/pkg/fault" "github.com/alcionai/corso/src/pkg/path" ) @@ -215,7 +216,8 @@ func write( prefixmatcher.NopReader[map[string]struct{}](), nil, false, - errs) + errs, + count.New()) if err != nil { return "", clues.Wrap(err, "storing marshalled bytes in repository") } diff --git a/src/pkg/count/count.go b/src/pkg/count/count.go index cfaa170305..194529ac4b 100644 --- a/src/pkg/count/count.go +++ b/src/pkg/count/count.go @@ -56,6 +56,14 @@ func (b *Bus) Add(k key, n int64) { } } +// AdderFor returns a func that adds any value of i +// to the bus using the given key. +func (b *Bus) AdderFor(k key) func(i int64) { + return func(i int64) { + b.Add(k, i) + } +} + // Get returns the local count. func (b *Bus) Get(k key) int64 { if b == nil { diff --git a/src/pkg/count/keys.go b/src/pkg/count/keys.go index 8ebf2e20cb..bdfd03ca75 100644 --- a/src/pkg/count/keys.go +++ b/src/pkg/count/keys.go @@ -5,6 +5,30 @@ type key string const ( // count of bucket-tokens consumed by api calls. APICallTokensConsumed key = "api-call-tokens-consumed" + // count of api calls that resulted in failure due to throttling. + ThrottledAPICalls key = "throttled-api-calls" +) + +// Tracked during backup +const ( + // amounts reported by kopia + PersistedCachedFiles key = "persisted-cached-files" + PersistedDirectories key = "persisted-directories" + PersistedFiles key = "persisted-files" + PersistedHashedBytes key = "persisted-hashed-bytes" + PersistedNonCachedFiles key = "persisted-non-cached-files" + PersistedNonMetaFiles key = "persisted-non-meta-files" + PersistedNonMetaUploadedBytes key = "persisted-non-meta-uploaded-bytes" + PersistedUploadedBytes key = "persisted-uploaded-bytes" + PersistenceErrors key = "persistence-errors" + PersistenceExpectedErrors key = "persistence-expected-errors" + PersistenceIgnoredErrors key = "persistence-ignored-errors" + // amounts reported by data providers + ProviderItemsRead key = "provider-items-read" +) + +// Tracked during restore +const ( // count of times that items had collisions during restore, // and that collision was solved by replacing the item. CollisionReplace key = "collision-replace" @@ -15,6 +39,4 @@ const ( // non-meta item creation counting. IE: use it specifically // for counting new items (no collision) or copied items. NewItemCreated key = "new-item-created" - // count of api calls that resulted in failure due to throttling. - ThrottledAPICalls key = "throttled-api-calls" )