From 095a257c60e6e7265aec1a175d1d838c1af5b2e3 Mon Sep 17 00:00:00 2001 From: Vanja Pejovic Date: Mon, 28 Oct 2024 13:43:45 -0400 Subject: [PATCH] Include stdout, stderr, and auxiliary logs in file upload metrics (#7799) Now the `file_upload_count` and `file_upload_size_bytes` metrics will include these files and bytes. Also addresses the last comment from buildbuddy-io/buildbuddy/pull/7788 --- .../remote_execution/workspace/workspace.go | 6 ++ proto/remote_execution.proto | 2 +- server/cache/dirtools/dirtools.go | 36 +++++------ server/cache/dirtools/dirtools_test.go | 62 +++++++++---------- server/remote_cache/cachetools/cachetools.go | 15 +++++ 5 files changed, 70 insertions(+), 51 deletions(-) diff --git a/enterprise/server/remote_execution/workspace/workspace.go b/enterprise/server/remote_execution/workspace/workspace.go index d051e5f8421..a64b00898da 100644 --- a/enterprise/server/remote_execution/workspace/workspace.go +++ b/enterprise/server/remote_execution/workspace/workspace.go @@ -324,6 +324,12 @@ func (ws *Workspace) UploadOutputs(ctx context.Context, cmd *repb.Command, execu if err := eg.Wait(); err != nil { return nil, err } + txInfo.FileCount += 2 // for stdout and stderr + txInfo.BytesTransferred += int64(len(cmdResult.Stdout) + len(cmdResult.Stderr)) + txInfo.FileCount += int64(len(cmdResult.AuxiliaryLogs)) + for _, b := range cmdResult.AuxiliaryLogs { + txInfo.BytesTransferred += int64(len(b)) + } executeResponse.Result.StdoutDigest = stdoutDigest executeResponse.Result.StderrDigest = stderrDigest executeResponse.ServerLogs = serverLogs diff --git a/proto/remote_execution.proto b/proto/remote_execution.proto index 54c77e0ca62..59b12b215d5 100644 --- a/proto/remote_execution.proto +++ b/proto/remote_execution.proto @@ -2186,7 +2186,7 @@ message IOStats { // the end time of the last link operation. google.protobuf.Duration local_cache_link_duration = 8; - // The number of files uploaded in this tree. + // The number of files (and dirs and trees) uploaded in this tree. int64 file_upload_count = 4; // The total size of uploaded data. diff --git a/server/cache/dirtools/dirtools.go b/server/cache/dirtools/dirtools.go index 7d92f41e41c..6ca027114e5 100644 --- a/server/cache/dirtools/dirtools.go +++ b/server/cache/dirtools/dirtools.go @@ -273,13 +273,13 @@ func (f *fileToUpload) FileNode() *repb.FileNode { } } -func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUploader, env environment.Env, filesToUpload []*fileToUpload, instanceName string, digestFunction repb.DigestFunction_Value) (skippedFiles, skippedBytes int64, _ error) { +func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUploader, env environment.Env, filesToUpload []*fileToUpload, instanceName string, digestFunction repb.DigestFunction_Value) (alreadyPresentBytes int64, _ error) { ctx, cancel := context.WithCancel(ctx) defer cancel() type batchResult struct { - files []*fileToUpload - skippedFiles, skippedBytes int64 + files []*fileToUpload + presentBytes int64 } batches := make(chan batchResult, 1) var wg sync.WaitGroup @@ -296,7 +296,7 @@ func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUpload for _, f := range batch { req.BlobDigests = append(req.BlobDigests, f.digest) } - var skippedFiles, skippedBytes int64 + var presentBytes int64 resp, err := cas.FindMissingBlobs(ctx, req) if err != nil { log.CtxWarningf(ctx, "Failed to find missing output blobs: %s", err) @@ -311,8 +311,7 @@ func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUpload batch[missingLen] = uploadableFile missingLen++ } else { - skippedFiles++ - skippedBytes += uploadableFile.digest.GetSizeBytes() + presentBytes += uploadableFile.digest.GetSizeBytes() } } batch = batch[:missingLen] @@ -320,7 +319,7 @@ func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUpload select { case <-ctx.Done(): // If the reader errored and returned, don't block forever - case batches <- batchResult{files: batch, skippedFiles: skippedFiles, skippedBytes: skippedBytes}: + case batches <- batchResult{files: batch, presentBytes: presentBytes}: } }() } @@ -332,13 +331,12 @@ func uploadMissingFiles(ctx context.Context, uploader *cachetools.BatchCASUpload fc := env.GetFileCache() for batch := range batches { - skippedFiles += batch.skippedFiles - skippedBytes += batch.skippedBytes + alreadyPresentBytes += batch.presentBytes if err := uploadFiles(ctx, uploader, fc, batch.files); err != nil { - return 0, 0, err + return 0, err } } - return skippedFiles, skippedBytes, nil + return alreadyPresentBytes, nil } func uploadFiles(ctx context.Context, uploader *cachetools.BatchCASUploader, fc interfaces.FileCache, filesToUpload []*fileToUpload) error { @@ -503,7 +501,6 @@ func UploadTree(ctx context.Context, env environment.Env, dirHelper *DirHelper, if err != nil { return nil, err } - txInfo.FileCount += 1 txInfo.BytesTransferred += fileNode.GetDigest().GetSizeBytes() directory.Files = append(directory.Files, fileNode) @@ -529,13 +526,10 @@ func UploadTree(ctx context.Context, env environment.Env, dirHelper *DirHelper, // Upload output files to the remote cache and also add them to the local // cache since they are likely to be used as inputs to subsequent actions. - skippedFiles, skippedBytes, err := uploadMissingFiles(ctx, uploader, env, filesToUpload, instanceName, digestFunction) + alreadyPresentBytes, err := uploadMissingFiles(ctx, uploader, env, filesToUpload, instanceName, digestFunction) if err != nil { return nil, err } - metrics.SkippedOutputBytes.Add(float64(skippedBytes)) - txInfo.FileCount -= skippedFiles - txInfo.BytesTransferred -= skippedBytes // Upload Directory protos. // TODO: skip uploading Directory protos which are not part of any tree? @@ -579,9 +573,13 @@ func UploadTree(ctx context.Context, env environment.Env, dirHelper *DirHelper, if err := uploader.Wait(); err != nil { return nil, err } - endTime := time.Now() - txInfo.TransferDuration = endTime.Sub(startTime) - return txInfo, nil + uploadStats := uploader.Stats() + metrics.SkippedOutputBytes.Add(float64(alreadyPresentBytes + uploadStats.DuplicateBytes)) + return &TransferInfo{ + TransferDuration: time.Since(startTime), + FileCount: uploadStats.UploadedObjects, + BytesTransferred: uploadStats.UploadedBytes, + }, nil } type FilePointer struct { diff --git a/server/cache/dirtools/dirtools_test.go b/server/cache/dirtools/dirtools_test.go index c27d3486d29..50e103073d3 100644 --- a/server/cache/dirtools/dirtools_test.go +++ b/server/cache/dirtools/dirtools_test.go @@ -50,7 +50,7 @@ func TestUploadTree(t *testing.T) { symlinkPaths: map[string]string{}, expectedResult: &repb.ActionResult{}, expectedInfo: &dirtools.TransferInfo{ - FileCount: 0, + FileCount: 1, BytesTransferred: 0, }, }, @@ -76,8 +76,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 1, - BytesTransferred: 1, + FileCount: 2, + BytesTransferred: 84, }, }, { @@ -113,8 +113,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 84, + FileCount: 4, + BytesTransferred: 244, }, }, { @@ -150,8 +150,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 1, - BytesTransferred: 1, + FileCount: 2, + BytesTransferred: 108, }, }, { @@ -193,8 +193,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 1, - BytesTransferred: 1, + FileCount: 2, + BytesTransferred: 108, }, }, { @@ -234,8 +234,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 1, - BytesTransferred: 1, + FileCount: 2, + BytesTransferred: 108, }, }, { @@ -282,8 +282,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 84, + FileCount: 4, + BytesTransferred: 256, }, }, { @@ -336,8 +336,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 84, + FileCount: 4, + BytesTransferred: 256, }, }, { @@ -388,8 +388,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 84, + FileCount: 4, + BytesTransferred: 256, }, }, { @@ -426,8 +426,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 104, + FileCount: 4, + BytesTransferred: 284, }, }, { @@ -447,8 +447,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 0, - BytesTransferred: 0, + FileCount: 1, + BytesTransferred: 8, }, }, { @@ -468,8 +468,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 0, - BytesTransferred: 0, + FileCount: 1, + BytesTransferred: 8, }, }, { @@ -494,8 +494,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 1, - BytesTransferred: 1, + FileCount: 3, + BytesTransferred: 84, }, }, { @@ -534,8 +534,8 @@ func TestUploadTree(t *testing.T) { // Dir: a/b/e/g // File: a/b/c/fileA.txt // - FileCount: 6, - BytesTransferred: 381, + FileCount: 7, + BytesTransferred: 849, }, }, { @@ -571,8 +571,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 2, - BytesTransferred: 84, + FileCount: 5, + BytesTransferred: 244, }, }, { @@ -592,8 +592,8 @@ func TestUploadTree(t *testing.T) { }, }, expectedInfo: &dirtools.TransferInfo{ - FileCount: 0, - BytesTransferred: 0, + FileCount: 1, + BytesTransferred: 8, }, }, } { diff --git a/server/remote_cache/cachetools/cachetools.go b/server/remote_cache/cachetools/cachetools.go index 5883d052612..300bafcb4ab 100644 --- a/server/remote_cache/cachetools/cachetools.go +++ b/server/remote_cache/cachetools/cachetools.go @@ -565,6 +565,7 @@ type BatchCASUploader struct { instanceName string digestFunction repb.DigestFunction_Value unsentBatchSize int64 + stats UploadStats } // NewBatchCASUploader returns an uploader to be used only for the given request @@ -616,9 +617,12 @@ func (ul *BatchCASUploader) Upload(d *repb.Digest, rsc io.ReadSeekCloser) error // De-dupe uploads by digest. dk := digest.NewKey(d) if _, ok := ul.uploads[dk]; ok { + ul.stats.DuplicateBytes += d.GetSizeBytes() return rsc.Close() } ul.uploads[dk] = struct{}{} + ul.stats.UploadedObjects++ + ul.stats.UploadedBytes += d.GetSizeBytes() rsc.Seek(0, 0) r := io.ReadCloser(rsc) @@ -750,6 +754,17 @@ func (ul *BatchCASUploader) Wait() error { return ul.eg.Wait() } +// UploadStats contains the statistics for a batch of uploads. +type UploadStats struct { + UploadedObjects, UploadedBytes, DuplicateBytes int64 +} + +// Stats returns information about all the uploads in this BatchCASUploader. +// It's only correct to call it after Wait. +func (ul *BatchCASUploader) Stats() UploadStats { + return ul.stats +} + type bytesReadSeekCloser struct { io.ReadSeeker }