diff --git a/cmd/proxy/actions/app_proxy.go b/cmd/proxy/actions/app_proxy.go index 777ad4ca6..0117bc9db 100644 --- a/cmd/proxy/actions/app_proxy.go +++ b/cmd/proxy/actions/app_proxy.go @@ -101,7 +101,7 @@ func addProxyRoutes( lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs) checker := storage.WithChecker(s) - withSingleFlight, err := getSingleFlight(l, c, checker) + withSingleFlight, err := getSingleFlight(l, c, s, checker) if err != nil { return err } @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a l.logger.WithContext(ctx).Printf(format, v...) } -func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) { +func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) { switch c.SingleFlightType { case "", "memory": return stash.WithSingleflight, nil @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) ( if c.StorageType != "gcp" { return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType) } - return stash.WithGCSLock, nil + return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s) case "azureblob": if c.StorageType != "azureblob" { return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType) diff --git a/config.dev.toml b/config.dev.toml index d04df2fea..aab468dfc 100755 --- a/config.dev.toml +++ b/config.dev.toml @@ -377,6 +377,10 @@ ShutdownTimeout = 60 # Max retries while acquiring the lock. Defaults to 10. # Env override: ATHENS_REDIS_LOCK_MAX_RETRIES MaxRetries = 10 + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 [Storage] # Only storage backends that are specified in Proxy.StorageType are required here [Storage.CDN] diff --git a/docs/content/configuration/storage.md b/docs/content/configuration/storage.md index ab63ea653..81ef957d2 100644 --- a/docs/content/configuration/storage.md +++ b/docs/content/configuration/storage.md @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red SentinelPassword = "sekret" Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis. + + +### Using GCP as a singleflight mechanism + +The GCP singleflight mechanism does not required configuration, and works out of the box. It has a +single option with which it can be customized: + + [SingleFlight.GCP] + # Threshold for how long to wait in seconds for an in-progress GCP upload to + # be considered to have failed to unlock. + StaleThreshold = 120 diff --git a/pkg/config/config.go b/pkg/config/config.go index 9caed7fee..b591b949a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -181,6 +181,7 @@ func defaultConfig() *Config { SentinelPassword: "sekret", LockConfig: DefaultRedisLockConfig(), }, + GCP: DefaultGCPConfig(), }, Index: &Index{ MySQL: &MySQL{ diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 6c741a04a..b57445e7d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) { LockConfig: DefaultRedisLockConfig(), }, Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"}, + GCP: DefaultGCPConfig(), } expConf := &Config{ @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string { } else if singleFlight.Etcd != nil { envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd" envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints + } else if singleFlight.GCP != nil { + envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold) } } return envVars diff --git a/pkg/config/singleflight.go b/pkg/config/singleflight.go index 69049b10a..b3b6fe048 100644 --- a/pkg/config/singleflight.go +++ b/pkg/config/singleflight.go @@ -7,6 +7,7 @@ type SingleFlight struct { Etcd *Etcd Redis *Redis RedisSentinel *RedisSentinel + GCP *GCP } // Etcd holds client side configuration @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig { MaxRetries: 10, } } + +// GCP is the configuration for GCP locking. +type GCP struct { + StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"` +} + +// DefaultGCPConfig returns the default GCP locking configuration. +func DefaultGCPConfig() *GCP { + return &GCP{ + StaleThreshold: 120, + } +} diff --git a/pkg/stash/with_gcs.go b/pkg/stash/with_gcs.go index 3e2386e5c..788251f1c 100644 --- a/pkg/stash/with_gcs.go +++ b/pkg/stash/with_gcs.go @@ -2,15 +2,32 @@ package stash import ( "context" + "fmt" + "time" "github.com/gomods/athens/pkg/errors" "github.com/gomods/athens/pkg/observ" + "github.com/gomods/athens/pkg/storage" + "github.com/gomods/athens/pkg/storage/gcp" ) // WithGCSLock returns a distributed singleflight // using a GCS backend. See the config.toml documentation for details. -func WithGCSLock(s Stasher) Stasher { - return &gcsLock{s} +func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) { + if staleThreshold <= 0 { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold")) + } + // Since we *must* be using a GCP stoagfe backend, we can abuse this + // fact to mutate it, so that we can get our threshold into Save(). + // Your instincts are correct, this is kind of gross. + gs, ok := s.(*gcp.Storage) + if !ok { + return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage")) + } + gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second) + return func(s Stasher) Stasher { + return &gcsLock{s} + }, nil } type gcsLock struct { diff --git a/pkg/stash/with_gcs_test.go b/pkg/stash/with_gcs_test.go index 3a309cac0..d738a26d0 100644 --- a/pkg/stash/with_gcs_test.go +++ b/pkg/stash/with_gcs_test.go @@ -3,6 +3,7 @@ package stash import ( "bytes" "context" + "fmt" "io" "os" "strings" @@ -17,6 +18,12 @@ import ( "golang.org/x/sync/errgroup" ) +type failReader int + +func (f *failReader) Read([]byte) (int, error) { + return 0, fmt.Errorf("failure") +} + // TestWithGCS requires a real GCP backend implementation // and it will ensure that saving to modules at the same time // is done synchronously so that only the first module gets saved. @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) { for i := 0; i < 5; i++ { content := uuid.New().String() ms := &mockGCPStasher{strg, content} - s := WithGCSLock(ms) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) eg.Go(func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) { } } +// TestWithGCSPartialFailure equires a real GCP backend implementation +// and ensures that if one of the non-singleflight-lock files fails to +// upload, that the cache does not remain poisoned. +func TestWithGCSPartialFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) + defer cancel() + const ( + mod = "stashmod" + ver = "v1.0.0" + ) + strg := getStorage(t) + strg.Delete(ctx, mod, ver) + defer strg.Delete(ctx, mod, ver) + + // sanity check + _, err := strg.GoMod(ctx, mod, ver) + if !errors.Is(err, errors.KindNotFound) { + t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err) + } + + content := uuid.New().String() + ms := &mockGCPStasher{strg, content} + fr := new(failReader) + gs, err := WithGCSLock(120, strg) + if err != nil { + t.Fatal(err) + } + s := gs(ms) + // We simulate a failure by manually passing an io.Reader that will fail. + err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content)) + if err == nil { + // We *want* to fail. + t.Fatal(err) + } + + // Now try a Stash. This should upload the missing files. + _, err = s.Stash(ctx, "stashmod", "v1.0.0") + if err != nil { + t.Fatal(err) + } + + info, err := strg.Info(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + modContent, err := strg.GoMod(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + zip, err := strg.Zip(ctx, mod, ver) + if err != nil { + t.Fatal(err) + } + defer zip.Close() + zipContent, err := io.ReadAll(zip) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(info, modContent) { + t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent)) + } + if !bytes.Equal(info, zipContent) { + t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent)) + } +} + // mockGCPStasher is like mockStasher // but leverages in memory storage // so that redis can determine diff --git a/pkg/storage/gcp/gcp.go b/pkg/storage/gcp/gcp.go index a15d25d1d..b502fe588 100644 --- a/pkg/storage/gcp/gcp.go +++ b/pkg/storage/gcp/gcp.go @@ -15,8 +15,9 @@ import ( // Storage implements the (./pkg/storage).Backend interface. type Storage struct { - bucket *storage.BucketHandle - timeout time.Duration + bucket *storage.BucketHandle + timeout time.Duration + staleThreshold time.Duration } // New returns a new Storage instance backed by a Google Cloud Storage bucket. diff --git a/pkg/storage/gcp/saver.go b/pkg/storage/gcp/saver.go index 4298aaa17..b430d64d7 100644 --- a/pkg/storage/gcp/saver.go +++ b/pkg/storage/gcp/saver.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "time" "cloud.google.com/go/storage" "github.com/gomods/athens/pkg/config" @@ -12,6 +13,10 @@ import ( googleapi "google.golang.org/api/googleapi" ) +// Fallback for how long we consider an "in_progress" metadata key stale, +// due to failure to remove it. +const fallbackInProgressStaleThreshold = 2 * time.Minute + // Save uploads the module's .mod, .zip and .info files for a given version // It expects a context, which can be provided using context.Background // from the standard library until context has been threaded down the stack. @@ -20,40 +25,146 @@ import ( // Uploaded files are publicly accessible in the storage bucket as per // an ACL rule. func (s *Storage) Save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { - const op errors.Op = "gcp.Save" + const op errors.Op = "gcp.save" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() gomodPath := config.PackageVersionedName(module, version, "mod") - err := s.upload(ctx, gomodPath, bytes.NewReader(mod)) - if err != nil { + innerErr := s.save(ctx, module, version, mod, zip, info) + if errors.Is(innerErr, errors.KindAlreadyExists) { + // Cache hit. + return errors.E(op, innerErr) + } + // No cache hit. Remove the metadata lock if it is there. + inProgress, outerErr := s.checkUploadInProgress(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + if inProgress { + outerErr = s.removeInProgressMetadata(ctx, gomodPath) + if outerErr != nil { + return errors.E(op, outerErr) + } + } + return innerErr +} + +// SetStaleThreshold sets the threshold of how long we consider +// a lock metadata stale after. +func (s *Storage) SetStaleThreshold(threshold time.Duration) { + s.staleThreshold = threshold +} + +func (s *Storage) save(ctx context.Context, module, version string, mod []byte, zip io.Reader, info []byte) error { + const op errors.Op = "gcp.save" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + gomodPath := config.PackageVersionedName(module, version, "mod") + seenAlreadyExists := 0 + err := s.upload(ctx, gomodPath, bytes.NewReader(mod), true) + // If it already exists, check the object metadata to see if the + // other two are still uploading in progress somewhere else. If they + // are, return a cache hit. If not, continue on to the other two, + // and only return a cache hit if all three exist. + if errors.Is(err, errors.KindAlreadyExists) { + inProgress, progressErr := s.checkUploadInProgress(ctx, gomodPath) + if progressErr != nil { + return errors.E(op, progressErr) + } + if inProgress { + // err is known to be errors.KindAlreadyExists at this point, so + // this is a cache hit return. + return errors.E(op, err) + } + seenAlreadyExists++ + } else if err != nil { + // Other errors return errors.E(op, err) } zipPath := config.PackageVersionedName(module, version, "zip") - err = s.upload(ctx, zipPath, zip) - if err != nil { + err = s.upload(ctx, zipPath, zip, false) + if errors.Is(err, errors.KindAlreadyExists) { + seenAlreadyExists++ + } else if err != nil { return errors.E(op, err) } infoPath := config.PackageVersionedName(module, version, "info") - err = s.upload(ctx, infoPath, bytes.NewReader(info)) + err = s.upload(ctx, infoPath, bytes.NewReader(info), false) + // Have all three returned errors.KindAlreadyExists? + if errors.Is(err, errors.KindAlreadyExists) { + if seenAlreadyExists == 2 { + return errors.E(op, err) + } + } else if err != nil { + return errors.E(op, err) + } + return nil +} + +func (s *Storage) removeInProgressMetadata(ctx context.Context, gomodPath string) error { + const op errors.Op = "gcp.removeInProgressMetadata" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + _, err := s.bucket.Object(gomodPath).Update(ctx, storage.ObjectAttrsToUpdate{ + Metadata: map[string]string{}, + }) if err != nil { return errors.E(op, err) } return nil } -func (s *Storage) upload(ctx context.Context, path string, stream io.Reader) error { +func (s *Storage) checkUploadInProgress(ctx context.Context, gomodPath string) (bool, error) { + const op errors.Op = "gcp.checkUploadInProgress" + ctx, span := observ.StartSpan(ctx, op.String()) + defer span.End() + attrs, err := s.bucket.Object(gomodPath).Attrs(ctx) + if err != nil { + return false, errors.E(op, err) + } + // If we have a config-set lock threshold, i.e. we are using the GCP + // slightflight backend, use it. Otherwise, use the fallback, which + // is arguably irrelevant when not using GCP for singleflighting. + threshold := fallbackInProgressStaleThreshold + if s.staleThreshold > 0 { + threshold = s.staleThreshold + } + if attrs.Metadata != nil { + _, ok := attrs.Metadata["in_progress"] + if ok { + // In case the final call to remove the metadata fails for some reason, + // we have a threshold after which we consider this to be stale. + if time.Since(attrs.Created) > threshold { + return false, nil + } + return true, nil + } + } + return false, nil +} + +func (s *Storage) upload(ctx context.Context, path string, stream io.Reader, first bool) error { const op errors.Op = "gcp.upload" ctx, span := observ.StartSpan(ctx, op.String()) defer span.End() + cancelCtx, cancel := context.WithCancel(ctx) + defer cancel() + wc := s.bucket.Object(path).If(storage.Conditions{ DoesNotExist: true, - }).NewWriter(ctx) + }).NewWriter(cancelCtx) + + // We set this metadata only for the first of the three files uploaded, + // for use as a singleflight lock. + if first { + wc.ObjectAttrs.Metadata = make(map[string]string) + wc.ObjectAttrs.Metadata["in_progress"] = "true" + } // NOTE: content type is auto detected on GCP side and ACL defaults to public // Once we support private storage buckets this may need refactoring // unless there is a way to set the default perms in the project. if _, err := io.Copy(wc, stream); err != nil { - _ = wc.Close() + // Purposely do not close it to avoid creating a partial file. return err }