From e1dc26b62c4c927bf07745a766346ce59f521019 Mon Sep 17 00:00:00 2001 From: Derek Buitenhuis Date: Tue, 30 Apr 2024 18:45:34 +0100 Subject: [PATCH 1/4] gcp/saver: Only return errors.KindAlreadyExists if all three exist In #1124, a GCP lock type was added as a singleflight backend. As part of this work, the GCP backend's Save() was made serial, likely because moduploader.Upload requires a call to Exists() before it, rendering the GCP lock less useful, by doubling the calls to GCS. However, by doing this, the existence check was now only checking the existence of the mod file, and not the info or zip. This meant that if during a Save, the zip or info uploads failed, on subsequent rquests, that when using the GCP singleflight backend, Athens would assume everything had been stashed and saved properly, and then fail to serve up the info or zip that had failed upload, meaning the cache was in an unhealable broklen state, requiring a manual intervention. To fix this, without breaking the singleflight behavior, introduce a metadata key that is set on the mod file during its initial upload, indicating that a Stash is still in progress on subsequent files, which gets removed once all three files are uploaded successfully, which can be checked if it it is determined that the mod file already exists. That way we can return a errors.KindAlreadyExists if a Stash is in progress, but also properly return it when a Stash is *not* currently in progress if and only if all three files exist on GCS, which prevents the cache from becoming permanently poisoned. One note is that it is possible the GCS call to remove the metadata key fails, which would mean it is left on the mod object forever. To avoid this, consider it stale after 2 minutes. Signed-off-by: Derek Buitenhuis --- pkg/storage/gcp/saver.go | 109 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 102 insertions(+), 7 deletions(-) diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 4298aaa17..ec24fc4b1 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "time" "cloud.google.com/go/storage" "github.com/gomods/athens/pkg/config" @@ -12,6 +13,10 @@ import ( googleapi "google.golang.org/api/googleapi" ) +// After how long we consider an "in_progress" metadata key stale, +// due to failure to remove it. +const inProgressStaleThreshold = 2 * time.Minute + // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background // from the standard library until context has been threaded down the stack. @@ -20,28 +25,111 @@ import ( // Uploaded files are publicly accessible in the storage bucket as per // an ACL rule. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { - const op errors.Op = "gcp.Save" + const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() gomodPath := config.PackageVersionedName(module, version, "mod") - err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) - if err != nil { + innerErr := s.save(ctx, module, version, mod, zip, info) + if errors.Is(innerErr, errors.KindAlreadyExists) { + // Cache hit. + return errors.E(op, innerErr) + } + // No cache hit. Remove the metadata lock if it is there. + inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + if inProgress { + outerErr = s.removeInProgressMetadata(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + } + return innerErr +} + +func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + const op errors.Op = "gcp.save" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + gomodPath := config.PackageVersionedName(module, version, "mod") + seenAlreadyExists := 0 + err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true) + // If it already exists, check the object metadata to see if the + // other two are still uploading in progress somewhere else. If they + // are, return a cache hit. If not, continue on to the other two, + // and only return a cache hit if all three exist. + if errors.Is(err, errors.KindAlreadyExists) { + inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath) + if progressErr != nil { + return errors.E(op, progressErr) + } + if inProgress { + // err is known to be errors.KindAlreadyExists at this point, so + // this is a cache hit return. + return errors.E(op, err) + } + seenAlreadyExists++ + } else if err != nil { + // Other errors return errors.E(op, err) } zipPath := config.PackageVersionedName(module, version, "zip") - err = s.upload(ctx, zipPath, zip) - if err != nil { + err = s.upload(ctx, zipPath, zip, false) + if errors.Is(err, errors.KindAlreadyExists) { + seenAlreadyExists++ + } else if err != nil { return errors.E(op, err) } infoPath := config.PackageVersionedName(module, version, "info") - err = s.upload(ctx, infoPath, bytes.NewReader(info)) + err = s.upload(ctx, infoPath, bytes.NewReader(info), false) + // Have all three returned errors.KindAlreadyExists? + if errors.Is(err, errors.KindAlreadyExists) { + if seenAlreadyExists == 2 { + return errors.E(op, err) + } + } else if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error { + const op errors.Op = "gcp.removeInProgressMetadata" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{ + Metadata: map[string]string{}, + }) if err != nil { return errors.E(op, err) } return nil } -func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { +func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) { + const op errors.Op = "gcp.checkUploadInProgress" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + attrs, err := s.bucket.Object(gomodPath).Attrs(ctx) + if err != nil { + return false, errors.E(op, err) + } + if attrs.Metadata != nil { + _, ok := attrs.Metadata["in_progress"] + if ok { + // In case the final call to remove the metadata fails for some reason, + // we have a threshold after which we consider this to be stale. + if time.Since(attrs.Created) > inProgressStaleThreshold { + return false, nil + } + return true, nil + } + } + return false, nil +} + +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() @@ -49,6 +137,13 @@ func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) err DoesNotExist: true, }).NewWriter(ctx) + // We set this metadata only for the first of the three files uploaded, + // for use as a singleflight lock. + if first { + wc.ObjectAttrs.Metadata = make(map[string]string) + wc.ObjectAttrs.Metadata["in_progress"] = "true" + } + // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. From 58548a51d218cb9c791fb83f678b91dc1ffd3034 Mon Sep 17 00:00:00 2001 From: Derek Buitenhuis Date: Thu, 2 May 2024 20:43:10 +0100 Subject: [PATCH 2/4] gcp/saver: Cancel the context on io.Copy failure during upload This is the proper way to make sure we do not write a partial file. Signed-off-by: Derek Buitenhuis --- pkg/storage/gcp/saver.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index ec24fc4b1..9936c294d 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -133,9 +133,12 @@ func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, fir const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + wc := s.bucket.Object(path).If(storage.Conditions{ DoesNotExist: true, - }).NewWriter(ctx) + }).NewWriter(cancelCtx) // We set this metadata only for the first of the three files uploaded, // for use as a singleflight lock. @@ -148,7 +151,7 @@ func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, fir // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. if _, err := io.Copy(wc, stream); err != nil { - _ = wc.Close() + // Purposely do not close it to avoid creating a partial file. return err } From aa9281351fd204a89e6057b964145b4c4a2224e5 Mon Sep 17 00:00:00 2001 From: Derek Buitenhuis Date: Thu, 2 May 2024 20:44:48 +0100 Subject: [PATCH 3/4] Add test for partial upload failure using GCP singleflight Signed-off-by: Derek Buitenhuis --- pkg/stash/with_gcs_test.go | 69 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go index 3a309cac0..4bae5b440 100644 --- a/pkg/stash/with_gcs_test.go +++ b/pkg/stash/with_gcs_test.go @@ -3,6 +3,7 @@ package stash import ( "bytes" "context" + "fmt" "io" "os" "strings" @@ -17,6 +18,12 @@ import ( "golang.org/x/sync/errgroup" ) +type failReader int + +func (f *failReader) Read([]byte) (int, error) { + return 0, fmt.Errorf("failure") +} + // TestWithGCS requires a real GCP backend implementation // and it will ensure that saving to modules at the same time // is done synchronously so that only the first module gets saved. @@ -79,6 +86,68 @@ func TestWithGCS(t *testing.T) { } } +// TestWithGCSPartialFailure equires a real GCP backend implementation +// and ensures that if one of the non-singleflight-lock files fails to +// upload, that the cache does not remain poisoned. +func TestWithGCSPartialFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + const ( + mod = "stashmod" + ver = "v1.0.0" + ) + strg := getStorage(t) + strg.Delete(ctx, mod, ver) + defer strg.Delete(ctx, mod, ver) + + // sanity check + _, err := strg.GoMod(ctx, mod, ver) + if !errors.Is(err, errors.KindNotFound) { + t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err) + } + + content := uuid.New().String() + ms := &mockGCPStasher{strg, content} + fr := new(failReader) + // We simulate a failure by manually passing an io.Reader that will fail. + err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content)) + if err == nil { + // We *want* to fail. + t.Fatal(err) + } + + // Now try a Stash. This should upload the missing files. + s := WithGCSLock(ms) + _, err = s.Stash(ctx, "stashmod", "v1.0.0") + if err != nil { + t.Fatal(err) + } + + info, err := strg.Info(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + modContent, err := strg.GoMod(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + zip, err := strg.Zip(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + defer zip.Close() + zipContent, err := io.ReadAll(zip) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(info, modContent) { + t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent)) + } + if !bytes.Equal(info, zipContent) { + t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent)) + } +} + // mockGCPStasher is like mockStasher // but leverages in memory storage // so that redis can determine From 1814b4101e9545960e08524b80e130bd6a1c564b Mon Sep 17 00:00:00 2001 From: Derek Buitenhuis Date: Wed, 8 May 2024 17:50:43 +0100 Subject: [PATCH 4/4] singleflight/gcs: Add an option to set the stale threshold This is useful if, for example, the user is on a slow network. Signed-off-by: Derek Buitenhuis --- cmd/proxy/actions/app_proxy.go | 6 +++--- config.dev.toml | 4 ++++ docs/content/configuration/storage.md | 11 +++++++++++ pkg/config/config.go | 1 + pkg/config/config_test.go | 3 +++ pkg/config/singleflight.go | 13 +++++++++++++ pkg/stash/with_gcs.go | 21 +++++++++++++++++++-- pkg/stash/with_gcs_test.go | 12 ++++++++++-- pkg/storage/gcp/gcp.go | 5 +++-- pkg/storage/gcp/saver.go | 19 ++++++++++++++++--- 10 files changed, 83 insertions(+), 12 deletions(-) diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 777ad4ca6..0117bc9db 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -101,7 +101,7 @@ func addProxyRoutes( lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs) checker := storage.WithChecker(s) - withSingleFlight, err := getSingleFlight(l, c, checker) + withSingleFlight, err := getSingleFlight(l, c, s, checker) if err != nil { return err } @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a l.logger.WithContext(ctx).Printf(format, v...) } -func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) { +func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) { switch c.SingleFlightType { case "", "memory": return stash.WithSingleflight, nil @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) ( if c.StorageType != "gcp" { return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) } - return stash.WithGCSLock, nil + return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s) case "azureblob": if c.StorageType != "azureblob" { return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) diff --git a/config.dev.toml b/config.dev.toml index d04df2fea..aab468dfc 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -377,6 +377,10 @@ ShutdownTimeout = 60 # Max retries while acquiring the lock. Defaults to 10. # Env override: ATHENS_REDIS_LOCK_MAX_RETRIES MaxRetries = 10 + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 [Storage] # Only storage backends that are specified in Proxy.StorageType are required here [Storage.CDN] diff --git a/docs/content/configuration/storage.md b/docs/content/configuration/storage.md index ab63ea653..81ef957d2 100644 --- a/docs/content/configuration/storage.md +++ b/docs/content/configuration/storage.md @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red SentinelPassword = "sekret" Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis. + + +### Using GCP as a singleflight mechanism + +The GCP singleflight mechanism does not required configuration, and works out of the box. It has a +single option with which it can be customized: + + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 diff --git a/pkg/config/config.go b/pkg/config/config.go index 9caed7fee..b591b949a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -181,6 +181,7 @@ func defaultConfig() *Config { SentinelPassword: "sekret", LockConfig: DefaultRedisLockConfig(), }, + GCP: DefaultGCPConfig(), }, Index: &Index{ MySQL: &MySQL{ diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6c741a04a..b57445e7d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) { LockConfig: DefaultRedisLockConfig(), }, Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"}, + GCP: DefaultGCPConfig(), } expConf := &Config{ @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string { } else if singleFlight.Etcd != nil { envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd" envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints + } else if singleFlight.GCP != nil { + envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold) } } return envVars diff --git a/pkg/config/singleflight.go b/pkg/config/singleflight.go index 69049b10a..b3b6fe048 100644 --- a/pkg/config/singleflight.go +++ b/pkg/config/singleflight.go @@ -7,6 +7,7 @@ type SingleFlight struct { Etcd *Etcd Redis *Redis RedisSentinel *RedisSentinel + GCP *GCP } // Etcd holds client side configuration @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig { MaxRetries: 10, } } + +// GCP is the configuration for GCP locking. +type GCP struct { + StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"` +} + +// DefaultGCPConfig returns the default GCP locking configuration. +func DefaultGCPConfig() *GCP { + return &GCP{ + StaleThreshold: 120, + } +} diff --git a/pkg/stash/with_gcs.go b/pkg/stash/with_gcs.go index 3e2386e5c..788251f1c 100644 --- a/pkg/stash/with_gcs.go +++ b/pkg/stash/with_gcs.go @@ -2,15 +2,32 @@ package stash import ( "context" + "fmt" + "time" "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/gcp" ) // WithGCSLock returns a distributed singleflight // using a GCS backend. See the config.toml documentation for details. -func WithGCSLock(s Stasher) Stasher { - return &gcsLock{s} +func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) { + if staleThreshold <= 0 { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold")) + } + // Since we *must* be using a GCP stoagfe backend, we can abuse this + // fact to mutate it, so that we can get our threshold into Save(). + // Your instincts are correct, this is kind of gross. + gs, ok := s.(*gcp.Storage) + if !ok { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage")) + } + gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second) + return func(s Stasher) Stasher { + return &gcsLock{s} + }, nil } type gcsLock struct { diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go index 4bae5b440..d738a26d0 100644 --- a/pkg/stash/with_gcs_test.go +++ b/pkg/stash/with_gcs_test.go @@ -48,7 +48,11 @@ func TestWithGCS(t *testing.T) { for i := 0; i < 5; i++ { content := uuid.New().String() ms := &mockGCPStasher{strg, content} - s := WithGCSLock(ms) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -109,6 +113,11 @@ func TestWithGCSPartialFailure(t *testing.T) { content := uuid.New().String() ms := &mockGCPStasher{strg, content} fr := new(failReader) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) // We simulate a failure by manually passing an io.Reader that will fail. err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content)) if err == nil { @@ -117,7 +126,6 @@ func TestWithGCSPartialFailure(t *testing.T) { } // Now try a Stash. This should upload the missing files. - s := WithGCSLock(ms) _, err = s.Stash(ctx, "stashmod", "v1.0.0") if err != nil { t.Fatal(err) diff --git a/pkg/storage/gcp/gcp.go b/pkg/storage/gcp/gcp.go index a15d25d1d..b502fe588 100644 --- a/pkg/storage/gcp/gcp.go +++ b/pkg/storage/gcp/gcp.go @@ -15,8 +15,9 @@ import ( // Storage implements the (./pkg/storage).Backend interface. type Storage struct { - bucket *storage.BucketHandle - timeout time.Duration + bucket *storage.BucketHandle + timeout time.Duration + staleThreshold time.Duration } // New returns a new Storage instance backed by a Google Cloud Storage bucket. diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 9936c294d..b430d64d7 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -13,9 +13,9 @@ import ( googleapi "google.golang.org/api/googleapi" ) -// After how long we consider an "in_progress" metadata key stale, +// Fallback for how long we consider an "in_progress" metadata key stale, // due to failure to remove it. -const inProgressStaleThreshold = 2 * time.Minute +const fallbackInProgressStaleThreshold = 2 * time.Minute // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background @@ -48,6 +48,12 @@ func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, return innerErr } +// SetStaleThreshold sets the threshold of how long we consider +// a lock metadata stale after. +func (s *Storage) SetStaleThreshold(threshold time.Duration) { + s.staleThreshold = threshold +} + func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) @@ -115,12 +121,19 @@ func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) ( if err != nil { return false, errors.E(op, err) } + // If we have a config-set lock threshold, i.e. we are using the GCP + // slightflight backend, use it. Otherwise, use the fallback, which + // is arguably irrelevant when not using GCP for singleflighting. + threshold := fallbackInProgressStaleThreshold + if s.staleThreshold > 0 { + threshold = s.staleThreshold + } if attrs.Metadata != nil { _, ok := attrs.Metadata["in_progress"] if ok { // In case the final call to remove the metadata fails for some reason, // we have a threshold after which we consider this to be stale. - if time.Since(attrs.Created) > inProgressStaleThreshold { + if time.Since(attrs.Created) > threshold { return false, nil } return true, nil