Skip to content

Commit

Permalink
Move the link loop to a goroutine. (#7244)
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimberezniker authored and bduffany committed Aug 14, 2024
1 parent 47982ba commit 211d3a5
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions enterprise/server/remote_execution/dirtools/dirtools.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,38 +731,41 @@ func (ff *BatchFileFetcher) FetchFiles(filesToFetch FileMap, opts *DownloadTreeO
fetchQueue := make(chan digestToFetch, 100)

linkStart := time.Now()
// Note: filesToFetch is keyed by digest, so all files in `filePointers` have
// the digest represented by dk.
//
// Attempt to link digests from the file cache. Digests that are not
// present in the filecache will be added to the fetchQueue channel.
for dk, filePointers := range filesToFetch {
d := dk.ToDigest()
filePointers := filePointers

rn := digest.NewResourceName(dk.ToDigest(), ff.instanceName, rspb.CacheType_CAS, ff.digestFunction)
// Write empty files directly (skip checking cache and downloading).
if rn.IsEmpty() {
for _, fp := range filePointers {
if err := writeFile(fp, []byte("")); err != nil {
return err
linkEG.Go(func() error {
// Note: filesToFetch is keyed by digest, so all files in `filePointers` have
// the digest represented by dk.
//
// Attempt to link digests from the file cache. Digests that are not
// present in the filecache will be added to the fetchQueue channel.
for dk, filePointers := range filesToFetch {
d := dk.ToDigest()
filePointers := filePointers

rn := digest.NewResourceName(dk.ToDigest(), ff.instanceName, rspb.CacheType_CAS, ff.digestFunction)
// Write empty files directly (skip checking cache and downloading).
if rn.IsEmpty() {
for _, fp := range filePointers {
if err := writeFile(fp, []byte("")); err != nil {
return err
}
}
continue
}
continue
}

linkEG.Go(func() error {
// If we linked the digest from the file cache, there's nothing
// more to do.
if err := ff.linkFromFileCache(filePointers, opts); err == nil {
return nil
}
linkEG.Go(func() error {
// If we linked the digest from the file cache, there's nothing
// more to do.
if err := ff.linkFromFileCache(filePointers, opts); err == nil {
return nil
}

// Otherwise, queue the digest to be fetched.
fetchQueue <- digestToFetch{d: d, fps: filePointers}
return nil
})
}
// Otherwise, queue the digest to be fetched.
fetchQueue <- digestToFetch{d: d, fps: filePointers}
return nil
})
}
return nil
})

// Read work off the fetchQueue channel and generate batch read requests
// to download data.
Expand Down

0 comments on commit 211d3a5

Please sign in to comment.