From 767f81d4f547dfc8a168e01823cecc61204a5177 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> Date: Fri, 31 May 2024 21:55:34 +0530 Subject: [PATCH] feat(sync): support for periodic repo sync in scale-out cluster (#2424) This commit includes support for periodic repo sync in a scale-out cluster. Before this commit, all cluster members would sync all the repos as the config is shared. With this change, in periodic sync, the cluster member checks whether it manages the repo. If it does not manage the repo, it will skip the sync. This commit also includes a unit test to test on-demand sync too, but there are no logic changes for it as it is implicitly handled by the proxying logic. Signed-off-by: Vishwas Rajashekar --- pkg/api/proxy.go | 20 +- pkg/cluster/cluster.go | 17 + pkg/cluster/cluster_test.go | 25 ++ pkg/extensions/extension_sync.go | 3 +- pkg/extensions/sync/service.go | 26 ++ pkg/extensions/sync/sync_internal_test.go | 4 +- pkg/extensions/sync/sync_test.go | 447 ++++++++++++++++++++++ 7 files changed, 521 insertions(+), 21 deletions(-) create mode 100644 pkg/cluster/cluster.go create mode 100644 pkg/cluster/cluster_test.go diff --git a/pkg/api/proxy.go b/pkg/api/proxy.go index d3c6224da..96d35bfb9 100644 --- a/pkg/api/proxy.go +++ b/pkg/api/proxy.go @@ -8,11 +8,10 @@ import ( "net" "net/http" - "github.com/dchest/siphash" "github.com/gorilla/mux" - "zotregistry.dev/zot/pkg/api/config" "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/cluster" "zotregistry.dev/zot/pkg/common" ) @@ -46,8 +45,7 @@ func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { // the target member is the only one which should do read/write for the dist-spec APIs // for the given repository. - targetMemberIndex, targetMember := computeTargetMember(config, name) - + targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name) logger.Debug().Str(constants.RepositoryLogKey, name). Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex)) @@ -86,20 +84,6 @@ func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { } } -// computes the target member using siphash and returns the index and the member -// siphash was chosen to prevent against hash attacks where an attacker -// can target all requests to one given instance instead of balancing across the cluster -// resulting in a Denial-of-Service (DOS). -// ref: https://en.wikipedia.org/wiki/SipHash -func computeTargetMember(config *config.Config, name string) (uint64, string) { - h := siphash.New([]byte(config.Cluster.HashKey)) - h.Write([]byte(name)) - sum64 := h.Sum64() - targetIdx := sum64 % uint64(len(config.Cluster.Members)) - - return targetIdx, config.Cluster.Members[targetIdx] -} - // gets all the server sockets of a target member - IP:Port. // for IPv6, the socket is [IPv6]:Port. // if the input is an IP address, returns the same targetMember in an array. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 000000000..c97395d41 --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,17 @@ +package cluster + +import "github.com/dchest/siphash" + +// computes the target member using siphash and returns the index and the member +// siphash was chosen to prevent against hash attacks where an attacker +// can target all requests to one given instance instead of balancing across the cluster +// resulting in a Denial-of-Service (DOS). +// ref: https://en.wikipedia.org/wiki/SipHash +func ComputeTargetMember(hashKey string, members []string, repoName string) (uint64, string) { + h := siphash.New([]byte(hashKey)) + h.Write([]byte(repoName)) + sum64 := h.Sum64() + targetIdx := sum64 % uint64(len(members)) + + return targetIdx, members[targetIdx] +} diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go new file mode 100644 index 000000000..9252688e3 --- /dev/null +++ b/pkg/cluster/cluster_test.go @@ -0,0 +1,25 @@ +package cluster_test + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/cluster" +) + +func TestComputeTargetMember(t *testing.T) { + Convey("Should panic when the hashKey is not long enough", t, func() { + So(func() { cluster.ComputeTargetMember("lorem", []string{"member1", "member2"}, "zot-test") }, ShouldPanic) + }) + + Convey("Should panic when there are no members", t, func() { + So(func() { cluster.ComputeTargetMember("loremipsumdolors", []string{}, "zot-test") }, ShouldPanic) + }) + + Convey("Should return a valid result when input is valid", t, func() { + index, member := cluster.ComputeTargetMember("loremipsumdolors", []string{"member1", "member2"}, "zot-test") + So(index, ShouldEqual, 1) + So(member, ShouldEqual, "member2") + }) +} diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index 9bf86214e..3e96902d3 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -47,8 +47,9 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, tmpDir := config.Extensions.Sync.DownloadDir credsPath := config.Extensions.Sync.CredentialsFile + clusterCfg := config.Cluster - service, err := sync.New(registryConfig, credsPath, tmpDir, storeController, metaDB, log) + service, err := sync.New(registryConfig, credsPath, clusterCfg, tmpDir, storeController, metaDB, log) if err != nil { log.Error().Err(err).Msg("failed to initialize sync extension") diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index a8cc10e22..1e3f1b87c 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -13,6 +13,9 @@ import ( "github.com/opencontainers/go-digest" zerr "zotregistry.dev/zot/errors" + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/cluster" "zotregistry.dev/zot/pkg/common" syncconf "zotregistry.dev/zot/pkg/extensions/config/sync" client "zotregistry.dev/zot/pkg/extensions/sync/httpclient" @@ -25,6 +28,7 @@ import ( type BaseService struct { config syncconf.RegistryConfig credentials syncconf.CredentialsFile + clusterConfig *config.ClusterConfig remote Remote destination Destination retryOptions *retry.RetryOptions @@ -40,6 +44,7 @@ type BaseService struct { func New( opts syncconf.RegistryConfig, credentialsFilepath string, + clusterConfig *config.ClusterConfig, tmpDir string, storeController storage.StoreController, metadb mTypes.MetaDB, @@ -64,6 +69,10 @@ func New( service.credentials = credentialsFile + // load the cluster config into the object + // can be nil if the user did not configure cluster config + service.clusterConfig = clusterConfig + service.contentManager = NewContentManager(opts.Content, log) if len(tmpDir) == 0 { @@ -229,6 +238,23 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) { break } + if service.clusterConfig != nil { + targetIdx, targetMember := cluster.ComputeTargetMember( + service.clusterConfig.HashKey, service.clusterConfig.Members, lastRepo) + + // if the target index does not match with the local socket index, + // then the local instance is not responsible for syncing the repo and should skip the sync + if targetIdx != service.clusterConfig.Proxy.LocalMemberClusterSocketIndex { + service.log.Debug(). + Str(constants.RepositoryLogKey, lastRepo). + Str("targetMemberIndex", fmt.Sprintf("%d", targetIdx)). + Str("targetMember", targetMember). + Msg("skipping sync of repo not managed by local instance") + + continue + } + } + matches = service.contentManager.MatchesContent(lastRepo) } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 42ccdf9dd..bb5368dc9 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -162,7 +162,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) + service, err := New(conf, "", nil, os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) So(err, ShouldBeNil) err = service.SyncRepo(context.Background(), "repo") @@ -176,7 +176,7 @@ func TestSyncRepo(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) + service, err := New(conf, "", nil, os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) So(err, ShouldBeNil) service.remote = mocks.SyncRemote{ diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index d11265f0f..7bf98f2b3 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -227,6 +227,41 @@ func makeDownstreamServer( return dctlr, destBaseURL, destDir, client } +func makeInsecureDownstreamServerFixedPort( + t *testing.T, port string, syncConfig *syncconf.Config, clusterConfig *config.ClusterConfig, +) (*api.Controller, string, string, *resty.Client) { + t.Helper() + + destPort := port + destConfig := config.New() + client := resty.New() + + destBaseURL := test.GetBaseURL(destPort) + + destConfig.HTTP.Port = destPort + + destDir := t.TempDir() + + destConfig.Storage.RootDirectory = destDir + destConfig.Storage.Dedupe = false + destConfig.Storage.GC = false + + destConfig.Extensions = &extconf.ExtensionConfig{} + defVal := true + destConfig.Extensions.Search = &extconf.SearchConfig{ + BaseConfig: extconf.BaseConfig{Enable: &defVal}, + } + destConfig.Extensions.Sync = syncConfig + destConfig.Log.Output = path.Join(destDir, "sync.log") + destConfig.Log.Level = "debug" + + destConfig.Cluster = clusterConfig + + dctlr := api.NewController(destConfig) + + return dctlr, destBaseURL, destDir, client +} + func TestOnDemand(t *testing.T) { Convey("Verify sync on demand feature", t, func() { sctlr, srcBaseURL, _, _, srcClient := makeUpstreamServer(t, false, false) @@ -729,6 +764,305 @@ func TestOnDemand(t *testing.T) { }) } +func TestOnDemandWithScaleOutCluster(t *testing.T) { + Convey("Given 2 downstream zots and one upstream, test that the cluster can sync images", t, func() { + sctlr, srcBaseURL, _, _, srcClient := makeUpstreamServer(t, false, false) + scm := test.NewControllerManager(sctlr) + scm.StartAndWait(sctlr.Config.HTTP.Port) + defer scm.StopServer() + + // sync config for both downstreams. + tlsVerify := false + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Prefix: testImage, + }, + { + Prefix: testCveImage, + }, + }, + URLs: []string{srcBaseURL}, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + // cluster config for member 1. + clusterCfgDownstream1 := config.ClusterConfig{ + Members: []string{ + "127.0.0.1:43222", + "127.0.0.1:43223", + }, + HashKey: "loremipsumdolors", + } + + // cluster config copied for member 2. + clusterCfgDownstream2 := clusterCfgDownstream1 + + dctrl1, dctrl1BaseURL, destDir1, dstClient1 := makeInsecureDownstreamServerFixedPort( + t, "43222", syncConfig, &clusterCfgDownstream1) + dctrl1Scm := test.NewControllerManager(dctrl1) + + dctrl2, dctrl2BaseURL, destDir2, dstClient2 := makeInsecureDownstreamServerFixedPort( + t, "43223", syncConfig, &clusterCfgDownstream2) + dctrl2Scm := test.NewControllerManager(dctrl2) + + dctrl1Scm.StartAndWait(dctrl1.Config.HTTP.Port) + defer dctrl1Scm.StopServer() + + dctrl2Scm.StartAndWait(dctrl2.Config.HTTP.Port) + defer dctrl2Scm.StopServer() + + // verify that all servers are up. + clients := []*resty.Client{srcClient, dstClient1, dstClient2} + baseURLs := []string{srcBaseURL, dctrl1BaseURL, dctrl2BaseURL} + + for clientIdx, client := range clients { + resp, err := client.R().Get(fmt.Sprintf("%s/v2/", baseURLs[clientIdx])) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + + // storage for each downstream should not have image data at the start. + destDirs := []string{destDir1, destDir2} + images := []string{testImage, testCveImage} + for _, image := range images { + for _, destDir := range destDirs { + _, err := os.Stat(path.Join(destDir, image)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + } + } + + repos := []string{testImage, testCveImage} + + // tags list for both images should return 404 at the start. + // only hit one instance as the request will get proxied anyway. + for _, repo := range repos { + resp, err := dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/tags/list", dctrl1BaseURL, repo), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + } + + // should successfully sync zot-test image when trying to load manifest. + // only hit one instance as the request will get proxied anyway. + resp, err := dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/manifests/%s", dctrl1BaseURL, testImage, testImageTag), + ) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + // tags list for test image should return data after the sync. + // only hit one instance as the request will get proxied anyway. + // get manifest is hit with a GET request. + resp, err = dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/tags/list", dctrl1BaseURL, testImage), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + var initialTags TagsList + err = json.Unmarshal(resp.Body(), &initialTags) + So(err, ShouldBeNil) + So(initialTags, ShouldEqual, TagsList{ + Name: testImage, + Tags: []string{testImageTag}, + }) + + // should successfully sync test vulnerable image when trying to check manifest. + // check manifest is hit with a HEAD or OPTIONS request. + resp, err = dstClient1.R().Head( + fmt.Sprintf("%s/v2/%s/manifests/%s", dctrl1BaseURL, testCveImage, testImageTag), + ) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + // tags list for test CVE image should return data after the sync. + // only hit one instance as the request will get proxied anyway. + // get manifest is hit with a GET request. + resp, err = dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/tags/list", dctrl1BaseURL, testCveImage), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + + var cveTagsList TagsList + err = json.Unmarshal(resp.Body(), &cveTagsList) + So(err, ShouldBeNil) + So(cveTagsList, ShouldEqual, TagsList{ + Name: testCveImage, + Tags: []string{testImageTag}, + }) + + // storage for only one downstream should have the data for test image. + // with loremipsumdolors as the hashKey, + // zot-test is managed by member index 1. + // zot-cve-test is managed by member index 0. + + _, err = os.Stat(path.Join(destDir1, testImage)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + + _, err = os.Stat(path.Join(destDir2, testImage)) + So(err, ShouldBeNil) + + // storage for only one downstream should have the data for the test cve image. + // with loremipsumdolors as the hashKey, + // zot-test is managed by member index 1. + // zot-cve-test is managed by member index 0. + + _, err = os.Stat(path.Join(destDir1, testCveImage)) + So(err, ShouldBeNil) + + _, err = os.Stat(path.Join(destDir2, testCveImage)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + }) +} + +func TestOnDemandWithScaleOutClusterWithReposNotAddedForSync(t *testing.T) { + Convey("When repos are not added for sync, cluster should not sync images", t, func() { + sctlr, srcBaseURL, _, _, srcClient := makeUpstreamServer(t, false, false) + scm := test.NewControllerManager(sctlr) + scm.StartAndWait(sctlr.Config.HTTP.Port) + defer scm.StopServer() + + // sync config for both downstreams. + // there is a dummy entry in the Content array + tlsVerify := false + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Prefix: "doesnotexist", + }, + }, + URLs: []string{srcBaseURL}, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + // cluster config for member 1. + clusterCfgDownstream1 := config.ClusterConfig{ + Members: []string{ + "127.0.0.1:43222", + "127.0.0.1:43223", + }, + HashKey: "loremipsumdolors", + } + + // cluster config copied for member 2. + clusterCfgDownstream2 := clusterCfgDownstream1 + + dctrl1, dctrl1BaseURL, destDir1, dstClient1 := makeInsecureDownstreamServerFixedPort( + t, "43222", syncConfig, &clusterCfgDownstream1) + dctrl1Scm := test.NewControllerManager(dctrl1) + + dctrl2, dctrl2BaseURL, destDir2, dstClient2 := makeInsecureDownstreamServerFixedPort( + t, "43223", syncConfig, &clusterCfgDownstream2) + dctrl2Scm := test.NewControllerManager(dctrl2) + + dctrl1Scm.StartAndWait(dctrl1.Config.HTTP.Port) + defer dctrl1Scm.StopServer() + + dctrl2Scm.StartAndWait(dctrl2.Config.HTTP.Port) + defer dctrl2Scm.StopServer() + + // verify that all servers are up. + clients := []*resty.Client{srcClient, dstClient1, dstClient2} + baseURLs := []string{srcBaseURL, dctrl1BaseURL, dctrl2BaseURL} + + for clientIdx, client := range clients { + resp, err := client.R().Get(fmt.Sprintf("%s/v2/", baseURLs[clientIdx])) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + + // storage for each downstream should not have image data at the start. + destDirs := []string{destDir1, destDir2} + images := []string{testImage, testCveImage} + for _, image := range images { + for _, destDir := range destDirs { + _, err := os.Stat(path.Join(destDir, image)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + } + } + + repos := []string{testImage, testCveImage} + + // tags list for both images should return 404 at the start. + // only hit one instance as the request will get proxied anyway. + for _, repo := range repos { + resp, err := dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/tags/list", dctrl1BaseURL, repo), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + } + + // should not sync zot-test image when trying to load manifest. + // only hit one instance as the request will get proxied anyway. + resp, err := dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/manifests/%s", dctrl1BaseURL, testImage, testImageTag), + ) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + + // should not sync test vulnerable image when trying to check manifest. + // check manifest is hit with a HEAD or OPTIONS request. + resp, err = dstClient1.R().Head( + fmt.Sprintf("%s/v2/%s/manifests/%s", dctrl1BaseURL, testCveImage, testImageTag), + ) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + + // tags list for both images should return 404 after the sync as well. + // only hit one instance as the request will get proxied anyway. + for _, repo := range repos { + resp, err := dstClient1.R().Get( + fmt.Sprintf("%s/v2/%s/tags/list", dctrl1BaseURL, repo), + ) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNotFound) + } + + // storage for neither downstream should have the data for images. + // with loremipsumdolors as the hashKey, + // zot-test is managed by member index 1. + // zot-cve-test is managed by member index 0. + for _, repo := range repos { + for _, destDir := range destDirs { + _, err = os.Stat(path.Join(destDir, repo)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + } + } + }) +} + func TestSyncReferenceInLoop(t *testing.T) { Convey("Verify sync doesn't end up in an infinite loop when syncing image references", t, func() { sctlr, srcBaseURL, srcDir, _, _ := makeUpstreamServer(t, false, false) @@ -1418,6 +1752,119 @@ func TestPeriodically(t *testing.T) { }) } +func TestPeriodicallyWithScaleOutCluster(t *testing.T) { + Convey("Given a zot cluster with periodic sync enabled, test that instances sync only managed repos", t, func() { + updateDuration, _ := time.ParseDuration("30m") + const zotAlpineTestImageName = "zot-alpine-test" + + sctlr, srcBaseURL, _, _, _ := makeUpstreamServer(t, false, false) + + scm := test.NewControllerManager(sctlr) + scm.StartAndWait(sctlr.Config.HTTP.Port) + defer scm.StopServer() + + // upload additional image to the upstream. + // upload has to be done before starting the downstreams. + sampleImage := CreateRandomImage() + err := UploadImage(sampleImage, srcBaseURL, zotAlpineTestImageName, "0.0.1") + So(err, ShouldBeNil) + + tlsVerify := false + maxRetries := 2 + delay := 2 * time.Second + + syncRegistryConfig := syncconf.RegistryConfig{ + Content: []syncconf.Content{ + { + Prefix: zotAlpineTestImageName, + }, + { + Prefix: testImage, + }, + { + Prefix: testCveImage, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + MaxRetries: &maxRetries, + RetryDelay: &delay, + } + + defaultVal := true + syncConfig := &syncconf.Config{ + Enable: &defaultVal, + Registries: []syncconf.RegistryConfig{syncRegistryConfig}, + } + + // add scale out cluster config. + // we don't need to start multiple downstream instances as we want to just check that + // a given downstream instance skips images that it does not manage. + + // with loremipsumdolors as the hashKey, + // zot-test is managed by member index 1. + // zot-cve-test is managed by member index 0. + // zot-alpine-test is managed by member index 1. + clusterCfg := config.ClusterConfig{ + Members: []string{ + "127.0.0.1:100", + "127.0.0.1:42000", + }, + HashKey: "loremipsumdolors", + } + + dctlr, destBaseURL, destDir, destClient := makeInsecureDownstreamServerFixedPort(t, "42000", syncConfig, &clusterCfg) + + dcm := test.NewControllerManager(dctlr) + dcm.StartAndWait(dctlr.Config.HTTP.Port) + defer dcm.StopServer() + + // downstream should not have any of the images in its storage. + images := []string{testImage, testCveImage, zotAlpineTestImageName} + + for _, image := range images { + _, err := os.Stat(path.Join(destDir, image)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + } + + // wait for generator to complete. + waitSyncFinish(dctlr.Config.Log.Output) + + // downstream should sync only expected images from the upstream. + expectedImages := []string{zotAlpineTestImageName, testImage} + + for _, expected := range expectedImages { + for { + resp, err := destClient.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", destBaseURL, expected)) + So(err, ShouldBeNil) + + var destTagsList TagsList + err = json.Unmarshal(resp.Body(), &destTagsList) + So(err, ShouldBeNil) + + if len(destTagsList.Tags) > 0 { + break + } + + time.Sleep(500 * time.Millisecond) + } + } + + // only the zot-test and zot-alpine-test images should be downloaded. + for _, expected := range expectedImages { + _, err = os.Stat(path.Join(destDir, expected)) + So(err, ShouldBeNil) + } + + // the test cve image should not be downloaded. + _, err = os.Stat(path.Join(destDir, testCveImage)) + So(err, ShouldNotBeNil) + So(os.IsNotExist(err), ShouldBeTrue) + }) +} + func TestPermsDenied(t *testing.T) { Convey("Verify sync feature without perm on sync cache", t, func() { updateDuration, _ := time.ParseDuration("30m")