diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml new file mode 100644 index 00000000..b67215e4 --- /dev/null +++ b/.github/workflows/e2e.yaml @@ -0,0 +1,21 @@ +name: E2E +on: + pull_request: + branches: + - main +jobs: + create-cluster: + name: Create Kind cluster + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Install GO + uses: actions/setup-go@v4 + with: + go-version-file: go.mod + cache: false + - name: Create kind cluster + uses: helm/kind-action@v1.10.0 + - run: kind get clusters + - run: go test -v -race ./pkg/cache/... -tags="e2e" diff --git a/Makefile b/Makefile index 509b7929..7363d262 100644 --- a/Makefile +++ b/Makefile @@ -132,7 +132,7 @@ velero-crds: .PHONY: test test: envtest ## run tests - @KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(GOPATH)/bin -p path)" go test $$(go list ./... | grep -v /e2e) -v + @KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(GOPATH)/bin -p path)" go test $$(go list ./... | grep -v /e2e) -v -tags="cache" .PHONY: lint lint: $(PRE) ## run linters diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go new file mode 100644 index 00000000..2ef3264a --- /dev/null +++ b/pkg/cache/cache_test.go @@ -0,0 +1,70 @@ +//go:build cache + +package cache + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestCache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Test cache") +} + +var _ = Describe("Resource cache", Ordered, func() { + Context("Resource cache", func() { + const ( + resourceName = "default" + namespace = "default" + key = "key" + ) + rce := &ResourceCacheEntry{} + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test", + Image: "test", + }, + }, + }, + } + cache := NewCache[*ResourceCacheEntry](context.Background(), time.Second) + + It("check cache", func() { + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod) + Expect(err).ToNot(HaveOccurred()) + unstructuredPod := unstructured.Unstructured{Object: res} + Expect(rce.SetSHA(unstructuredPod, ApplySHA)).ToNot(HaveOccurred()) + Expect(rce.SetSHA(unstructuredPod, ManifestSHA)).ToNot(HaveOccurred()) + Expect(rce.SetSHA(unstructuredPod, ServerSHA)).ToNot(HaveOccurred()) + + cache.Set(key, rce) + cachedResource, ok := cache.Get(key) + Expect(ok).To(BeTrue()) + Expect(cachedResource).To(Equal(rce)) + // should expire and clean applySHA and manifestSHA + time.Sleep(1 * time.Second) + cachedResource, ok = cache.Get(key) + Expect(ok).To(BeTrue()) + Expect(cachedResource.applySHA).Should(BeNil()) + Expect(cachedResource.manifestSHA).Should(BeNil()) + Expect(cachedResource.serverSHA).ShouldNot(BeNil()) + }) + + }) + +}) diff --git a/pkg/cache/resource_cache_entry_test.go b/pkg/cache/resource_cache_entry_test.go new file mode 100644 index 00000000..4de9d6b9 --- /dev/null +++ b/pkg/cache/resource_cache_entry_test.go @@ -0,0 +1,55 @@ +//go:build cache + +package cache + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +var _ = Describe("Resource cache entry", Ordered, func() { + Context("Resource cache entry", func() { + const ( + resourceName = "default" + namespace = "default" + ) + rce := ResourceCacheEntry{} + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: namespace, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test", + Image: "test", + }, + }, + }, + } + + It("check ResourceCacheEntry", func() { + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod) + Expect(err).ToNot(HaveOccurred()) + unstructuredPod := unstructured.Unstructured{Object: res} + Expect(rce.SetSHA(unstructuredPod, ApplySHA)).ToNot(HaveOccurred()) + Expect(rce.SetSHA(unstructuredPod, ManifestSHA)).ToNot(HaveOccurred()) + Expect(rce.SetSHA(unstructuredPod, ServerSHA)).ToNot(HaveOccurred()) + + Expect(rce.RequiresApply("test")).Should(BeTrue()) + Expect(rce.RequiresApply("U33NQLAAPDEC5RDDKQ2KUHCUHIQUOC4PLMCQ5QVBYZ53B6V5UI5A====")).Should(BeFalse()) + + rce.Expire() + Expect(rce.applySHA).Should(BeNil()) + Expect(rce.manifestSHA).Should(BeNil()) + Expect(rce.serverSHA).ShouldNot(BeNil()) + }) + + }) + +}) diff --git a/pkg/cache/resource_cache_test.go b/pkg/cache/resource_cache_test.go new file mode 100644 index 00000000..9ada5f9d --- /dev/null +++ b/pkg/cache/resource_cache_test.go @@ -0,0 +1,256 @@ +//go:build e2e + +package cache + +import ( + "context" + "fmt" + "io" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" + "github.com/pluralsh/deployment-operator/pkg/common" + "github.com/pluralsh/polly/containers" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("Resource cache", Ordered, func() { + Context("When reconciling a resource", func() { + const ( + resourceName = "default" + namespace = "default" + key = "default_default_apps_Deployment" + crdObjectKey = "default_default_deployments.plural.sh_CustomHealth" + crdDefinitionKey = "_customhealths.deployments.plural.sh_apiextensions.k8s.io_CustomResourceDefinition" + ) + + ctx := context.Background() + + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + + deployment := &appsv1.Deployment{} + + BeforeAll(func() { + By("creating test Deployment") + err := kClient.Get(ctx, typeNamespacedName, deployment) + if err != nil && errors.IsNotFound(err) { + resource := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: namespace, + Labels: map[string]string{ + common.ManagedByLabel: common.AgentLabelValue, + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: lo.ToPtr(int32(3)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "nginx", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "nginx", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx:1.14.2", + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, + } + + Expect(kClient.Create(ctx, resource)).To(Succeed()) + } + }) + + AfterAll(func() { + By("Cleanup the specific resource instance") + resource := &appsv1.Deployment{} + Expect(kClient.Get(ctx, typeNamespacedName, resource)).NotTo(HaveOccurred()) + Expect(kClient.Delete(ctx, resource)).To(Succeed()) + + customHealth := &deploymentsv1alpha1.CustomHealth{} + Expect(kClient.Get(ctx, typeNamespacedName, customHealth)).NotTo(HaveOccurred()) + Expect(kClient.Delete(ctx, customHealth)).To(Succeed()) + }) + + It("should successfully create resource cache", func() { + Init(ctx, cfg, 100*time.Second) + toAdd := containers.NewSet[ResourceKey]() + + // register resource and watch for changes + rk, err := ResourceKeyFromString(key) + Expect(err).NotTo(HaveOccurred()) + toAdd.Add(rk) + GetResourceCache().Register(toAdd) + // get resource + resource := &appsv1.Deployment{} + Expect(kClient.Get(ctx, typeNamespacedName, resource)).To(Succeed()) + // update resource + Expect(updateWithRetry(ctx, kClient, resource, func(obj client.Object) client.Object { + deployment := obj.(*appsv1.Deployment) + deployment.Spec.Replicas = lo.ToPtr(int32(1)) + return deployment + })).To(Succeed()) + rce, err := getResourceCacheWithRetry(5, key) + Expect(err).NotTo(HaveOccurred()) + Expect(rce.serverSHA).NotTo(BeNil()) + GetResourceCache().Unregister(toAdd) + GetResourceCache().SetCacheEntry(key, ResourceCacheEntry{}) + }) + + It("should successfully watch CRD object", func() { + Init(ctx, cfg, 100*time.Second) + toAdd := containers.NewSet[ResourceKey]() + + err = applyYamlFile(ctx, kClient, "../../config/crd/bases/deployments.plural.sh_customhealths.yaml") + Expect(err).NotTo(HaveOccurred()) + crdList := &extv1.CustomResourceDefinitionList{} + Expect(kClient.List(ctx, crdList)).NotTo(HaveOccurred()) + Expect(crdList.Items).To(HaveLen(1)) + time.Sleep(time.Second) + // register CRD object first + crdObjKey, err := ResourceKeyFromString(crdObjectKey) + Expect(err).NotTo(HaveOccurred()) + toAdd.Add(crdObjKey) + GetResourceCache().Register(toAdd) + // register CRD definition + crdDefKey, err := ResourceKeyFromString(crdDefinitionKey) + Expect(err).NotTo(HaveOccurred()) + toAdd.Add(crdDefKey) + GetResourceCache().Register(toAdd) + + customHealth := &deploymentsv1alpha1.CustomHealth{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: namespace, + Labels: map[string]string{ + common.ManagedByLabel: common.AgentLabelValue, + }, + }, + Spec: deploymentsv1alpha1.CustomHealthSpec{ + Script: "test", + }, + } + Expect(kClient.Create(ctx, customHealth)).To(Succeed()) + // update resource + Expect(updateWithRetry(ctx, kClient, customHealth, func(obj client.Object) client.Object { + ch := obj.(*deploymentsv1alpha1.CustomHealth) + ch.Spec.Script = "new" + return ch + })).To(Succeed()) + rce, err := getResourceCacheWithRetry(5, crdObjectKey) + Expect(err).NotTo(HaveOccurred()) + Expect(rce.serverSHA).NotTo(BeNil()) + }) + }) + +}) + +func getResourceCacheWithRetry(attempts int, key string) (ResourceCacheEntry, error) { + for i := 0; i <= attempts; i++ { + rce, ok := GetResourceCache().GetCacheEntry(key) + if ok { + return rce, nil + } + time.Sleep(time.Second) + } + return ResourceCacheEntry{}, fmt.Errorf("couldn't get resource cache item after %d attempts", attempts) +} + +func updateWithRetry(ctx context.Context, k8sClient client.Client, obj client.Object, updateFunc func(client.Object) client.Object) error { + attempts := 5 + for i := 0; i <= attempts; i++ { + // Apply the update function to the resource + updatedObj := updateFunc(obj.DeepCopyObject().(client.Object)) + + // Attempt to update the resource + err = k8sClient.Update(ctx, updatedObj) + if err == nil { + GinkgoWriter.Println("Resource updated successfully") + return nil + } + + if !errors.IsConflict(err) { + return fmt.Errorf("failed to update resource: %w", err) + } + + GinkgoWriter.Println("Conflict detected, retrying...") + + // Fetch the latest version of the resource + err = k8sClient.Get(ctx, client.ObjectKeyFromObject(obj), obj) + if err != nil { + return fmt.Errorf("failed to get resource: %w", err) + } + + } + return fmt.Errorf("couldn't update resource after %d attempts", attempts) +} + +func applyYamlFile(ctx context.Context, k8sClient client.Client, filename string) error { + yamlFile, err := os.Open(filename) + if err != nil { + return fmt.Errorf("failed to read YAML file: %v", err) + } + + // Decode the YAML file into a runtime.Object + decoder := yaml.NewYAMLOrJSONDecoder(io.NopCloser(yamlFile), 4096) + ext := runtime.RawExtension{} + if err := decoder.Decode(&ext); err != nil { + return fmt.Errorf("failed to decode YAML: %v", err) + } + + // Decode the RawExtension into a known type + obj, gvk, err := scheme.Codecs.UniversalDeserializer().Decode(ext.Raw, nil, nil) + if err != nil { + return fmt.Errorf("failed to decode object: %v", err) + } + clientObj, ok := obj.(client.Object) + if !ok { + fmt.Errorf("object is not a client.Object") + } + // Apply the object to the Kubernetes cluster + err = k8sClient.Patch(ctx, clientObj, client.Apply, client.FieldOwner("example-controller")) + if err != nil { + if errors.IsNotFound(err) { + err = k8sClient.Create(ctx, clientObj) + if err != nil { + return fmt.Errorf("failed to create object: %v", err) + } + } else { + return fmt.Errorf("failed to patch object: %v", err) + } + } + + GinkgoWriter.Printf("Applied resource: %s\n", gvk.String()) + return nil +} diff --git a/pkg/cache/suite_test.go b/pkg/cache/suite_test.go new file mode 100644 index 00000000..5ad50160 --- /dev/null +++ b/pkg/cache/suite_test.go @@ -0,0 +1,67 @@ +//go:build e2e + +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. +var kClient client.Client +var cfg *rest.Config +var err error + +func TestE2ECache(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Resource Cache Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + cfg = ctrl.GetConfigOrDie() + + Expect(cfg).NotTo(BeNil()) + Expect(deploymentsv1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred()) + Expect(extv1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred()) + + kClient, err = client.New(cfg, client.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(kClient).NotTo(BeNil()) +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + +})