diff --git a/.github/workflows/kind-e2e.yaml b/.github/workflows/kind-e2e.yaml index 0b5c429e5..0711e2ab0 100644 --- a/.github/workflows/kind-e2e.yaml +++ b/.github/workflows/kind-e2e.yaml @@ -64,11 +64,10 @@ jobs: # Build and Publish our containers to the docker daemon (including test assets) export GO111MODULE=on export GOFLAGS=-mod=vendor - ko apply -PRf config/ -f test/config + ko apply -PRf config/ - # Build Knative plugin and create Secret + # Build Knative plugin go build -o kn-vsphere ./plugins/vsphere/cmd/vsphere - ./kn-vsphere login --secret-name vsphere-credentials --username user --password pass kubectl -n vmware-sources wait --timeout=10s --for=condition=Available deploy/webhook diff --git a/go.mod b/go.mod index 44226a2cd..a60cfe4fd 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( k8s.io/client-go v0.21.4 k8s.io/code-generator v0.21.4 k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 + k8s.io/utils v0.0.0-20201110183641-67b214c5f920 knative.dev/eventing v0.27.0 knative.dev/hack v0.0.0-20211101195839-11d193bf617b knative.dev/pkg v0.0.0-20211101212339-96c0204a70dc diff --git a/test/e2e/binding_test.go b/test/e2e/binding_test.go index 94fa108ff..58bf70baa 100644 --- a/test/e2e/binding_test.go +++ b/test/e2e/binding_test.go @@ -26,6 +26,10 @@ func TestBindingGOVC(t *testing.T) { clients := test.Setup(t) + //create vcsim + cleanupVcsim := CreateSimulator(t, clients) + defer cleanupVcsim() + selector, cancel := CreateJobBinding(t, clients) defer cancel() @@ -50,6 +54,10 @@ func TestBindingPowerCLICore(t *testing.T) { clients := test.Setup(t) + //create vcsim + cleanupVcsim := CreateSimulator(t, clients) + defer cleanupVcsim() + selector, cancel := CreateJobBinding(t, clients) defer cancel() diff --git a/test/e2e/source_test.go b/test/e2e/source_test.go index 5a46e65ee..c41b97604 100644 --- a/test/e2e/source_test.go +++ b/test/e2e/source_test.go @@ -25,6 +25,10 @@ func TestSource(t *testing.T) { clients := test.Setup(t) + //create vcsim + cleanupVcsim := CreateSimulator(t, clients) + defer cleanupVcsim() + // Create a job/svc that listens for events and then quits N seconds after the first is received. name, wait, cancelListener := RunJobListener(t, clients) defer cancelListener() diff --git a/test/e2e/util.go b/test/e2e/util.go index a02647bb5..25fa52e17 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -7,24 +7,38 @@ package e2e import ( "context" + "fmt" "testing" + "time" "github.com/vmware-tanzu/sources-for-knative/plugins/vsphere/pkg/command" "github.com/davecgh/go-spew/spew" "github.com/vmware-tanzu/sources-for-knative/test" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/pointer" "knative.dev/pkg/apis" pkgtest "knative.dev/pkg/test" "knative.dev/pkg/test/helpers" ) +const ( + vcsim = "vcsim" + ns = "default" + vsphereCreds = "vsphere-credentials" + user = "user" + password = "password" + jobNameKey = "job-name" +) + func CreateJobBinding(t *testing.T, clients *test.Clients) (map[string]string, context.CancelFunc) { + ctx := context.Background() t.Helper() name := helpers.ObjectNameForTest(t) @@ -39,20 +53,20 @@ func CreateJobBinding(t *testing.T, clients *test.Clients) (map[string]string, c "--name", name, "--address", "https://vcsim.default.svc.cluster.local", "--skip-tls-verify", "true", - "--secret-ref", "vsphere-credentials", + "--secret-ref", vsphereCreds, "--subject-api-version", "batch/v1", "--subject-kind", "Job", "--subject-selector", metav1.FormatLabelSelector(&metav1.LabelSelector{MatchLabels: selector}), }) - pkgtest.CleanupOnInterrupt(func() { clients.VMWareClient.Bindings.Delete(context.Background(), name, metav1.DeleteOptions{}) }, t.Logf) + pkgtest.CleanupOnInterrupt(func() { clients.VMWareClient.Bindings.Delete(ctx, name, metav1.DeleteOptions{}) }, t.Logf) if err := knativePlugin.Execute(); err != nil { t.Fatalf("Error creating binding: %v", err) } // Wait for the Binding to become "Ready" waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { - b, err := clients.VMWareClient.Bindings.Get(context.Background(), name, metav1.GetOptions{}) + b, err := clients.VMWareClient.Bindings.Get(ctx, name, metav1.GetOptions{}) if apierrs.IsNotFound(err) { return false, nil } else if err != nil { @@ -67,7 +81,7 @@ func CreateJobBinding(t *testing.T, clients *test.Clients) (map[string]string, c } return selector, func() { - err := clients.VMWareClient.Bindings.Delete(context.Background(), name, metav1.DeleteOptions{}) + err := clients.VMWareClient.Bindings.Delete(ctx, name, metav1.DeleteOptions{}) if err != nil { t.Errorf("Error cleaning up binding %s", name) } @@ -83,6 +97,7 @@ func RunPowershellJob(t *testing.T, clients *test.Clients, image, script string, } func RunJobScript(t *testing.T, clients *test.Clients, image string, command []string, script string, selector map[string]string) { + ctx := context.Background() job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: helpers.ObjectNameForTest(t), @@ -105,9 +120,9 @@ func RunJobScript(t *testing.T, clients *test.Clients, image string, command []s }, } pkgtest.CleanupOnInterrupt(func() { - clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) + clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) }, t.Logf) - job, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Create(context.Background(), job, metav1.CreateOptions{}) + job, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { t.Fatalf("Error creating Job: %v", err) } @@ -117,15 +132,19 @@ func RunJobScript(t *testing.T, clients *test.Clients, image string, command []s t.Log("", "job", spew.Sprint(job)) defer func() { - err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) + err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) if err != nil { t.Errorf("Error cleaning up Job %s", job.Name) } + err = clients.KubeClient.CoreV1().Pods(job.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", jobNameKey, job.Name)}) + if err != nil { + t.Errorf("Error cleaning up pods for Job %s", job.Name) + } }() // Wait for the Job to report a successful execution. waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { - js, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Get(context.Background(), job.Name, metav1.GetOptions{}) + js, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) if apierrs.IsNotFound(err) { return false, nil } else if err != nil { @@ -143,6 +162,7 @@ func RunJobScript(t *testing.T, clients *test.Clients, image string, command []s } func RunJobListener(t *testing.T, clients *test.Clients) (string, context.CancelFunc, context.CancelFunc) { + ctx := context.Background() name := helpers.ObjectNameForTest(t) selector := map[string]string{ @@ -183,9 +203,9 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel }, } pkgtest.CleanupOnInterrupt(func() { - clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) + clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) }, t.Logf) - job, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Create(context.Background(), job, metav1.CreateOptions{}) + job, err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}) if err != nil { t.Fatalf("Error creating Job: %v", err) } @@ -195,15 +215,19 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel t.Log("", "job", spew.Sprint(job)) cancel := func() { - err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(context.Background(), job.Name, metav1.DeleteOptions{}) + err := clients.KubeClient.BatchV1().Jobs(job.Namespace).Delete(ctx, job.Name, metav1.DeleteOptions{}) if err != nil { t.Errorf("Error cleaning up Job %s", job.Name) } + err = clients.KubeClient.CoreV1().Pods(job.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", jobNameKey, name)}) + if err != nil { + t.Errorf("Error cleaning up pods for Job %s", job.Name) + } } waiter := func() { // Wait for the Job to report a successful execution. waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { - js, err := clients.KubeClient.BatchV1().Jobs(test.Namespace).Get(context.Background(), name, metav1.GetOptions{}) + js, err := clients.KubeClient.BatchV1().Jobs(test.Namespace).Get(ctx, name, metav1.GetOptions{}) if apierrs.IsNotFound(err) { t.Logf("Not found: %v", err) return false, nil @@ -237,9 +261,9 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel }, } pkgtest.CleanupOnInterrupt(func() { - clients.KubeClient.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) + clients.KubeClient.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) }, t.Logf) - svc, err = clients.KubeClient.CoreV1().Services(svc.Namespace).Create(context.Background(), svc, metav1.CreateOptions{}) + svc, err = clients.KubeClient.CoreV1().Services(svc.Namespace).Create(ctx, svc, metav1.CreateOptions{}) if err != nil { cancel() t.Fatalf("Error creating Service: %v", err) @@ -247,7 +271,7 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel // Wait for pods to show up in the Endpoints resource. waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { - ep, err := clients.KubeClient.CoreV1().Endpoints(svc.Namespace).Get(context.Background(), svc.Name, metav1.GetOptions{}) + ep, err := clients.KubeClient.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{}) if apierrs.IsNotFound(err) { return false, nil } else if err != nil { @@ -266,7 +290,7 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel } return name, waiter, func() { - err := clients.KubeClient.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{}) + err := clients.KubeClient.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}) if err != nil { t.Errorf("Error cleaning up Service %s: %v", svc.Name, err) } @@ -275,8 +299,24 @@ func RunJobListener(t *testing.T, clients *test.Clients) (string, context.Cancel } func CreateSource(t *testing.T, clients *test.Clients, name string) context.CancelFunc { + ctx := context.Background() t.Helper() + //Set a checkpoint in the past in case test creates events before vsphere source is ready + checkpointTime := time.Now().Add(time.Minute * -9) + checkpointConfigmap, err := clients.KubeClient.CoreV1().ConfigMaps(ns).Create( + ctx, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("%s-configmap", name)}, + Data: map[string]string{"checkpoint": fmt.Sprintf(`{"lastEventKeyTimestamp": "%s"}`, checkpointTime.UTC().Format(time.RFC3339))}, + }, + metav1.CreateOptions{}, + ) + + if err != nil { + t.Fatalf("Error creating Configmap: %v", err) + } + knativePlugin := command.NewRootCommand(clients.AsPluginClients()) knativePlugin.SetArgs([]string{ "source", @@ -284,20 +324,25 @@ func CreateSource(t *testing.T, clients *test.Clients, name string) context.Canc "--name", name, "--address", "https://vcsim.default.svc.cluster.local", "--skip-tls-verify", "true", - "--secret-ref", "vsphere-credentials", + "--secret-ref", vsphereCreds, "--sink-api-version", "v1", "--sink-kind", "Service", "--sink-name", name, + "--checkpoint-age", "10m", }) - pkgtest.CleanupOnInterrupt(func() { clients.VMWareClient.Sources.Delete(context.Background(), name, metav1.DeleteOptions{}) }, t.Logf) + pkgtest.CleanupOnInterrupt(func() { + clients.VMWareClient.Sources.Delete(ctx, name, metav1.DeleteOptions{}) + clients.KubeClient.CoreV1().ConfigMaps(ns).Delete(ctx, checkpointConfigmap.Name, metav1.DeleteOptions{}) + }, t.Logf) + if err := knativePlugin.Execute(); err != nil { t.Fatalf("Error creating source: %v", err) } // Wait for the Source to become "Ready" waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { - src, err := clients.VMWareClient.Sources.Get(context.Background(), name, metav1.GetOptions{}) + src, err := clients.VMWareClient.Sources.Get(ctx, name, metav1.GetOptions{}) if apierrs.IsNotFound(err) { return false, nil } else if err != nil { @@ -312,9 +357,155 @@ func CreateSource(t *testing.T, clients *test.Clients, name string) context.Canc } return func() { - err := clients.VMWareClient.Sources.Delete(context.Background(), name, metav1.DeleteOptions{}) + err := clients.VMWareClient.Sources.Delete(ctx, name, metav1.DeleteOptions{}) if err != nil { t.Errorf("Error cleaning up source %s", name) } + err = clients.KubeClient.CoreV1().ConfigMaps(ns).Delete(ctx, checkpointConfigmap.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("Error cleaning up configmap %s", name) + } + } +} + +func CreateSimulator(t *testing.T, clients *test.Clients) context.CancelFunc { + ctx := context.Background() + simDeployment, simService := newSimulator(ns) + simSecret := newVCSecret(ns, vsphereCreds, user, password) + + pkgtest.CleanupOnInterrupt(func() { + clients.KubeClient.AppsV1().Deployments(simDeployment.Namespace).Delete(ctx, simDeployment.Name, metav1.DeleteOptions{}) + clients.KubeClient.CoreV1().Services(simService.Namespace).Delete(ctx, simService.Name, metav1.DeleteOptions{}) + clients.KubeClient.CoreV1().Secrets(simSecret.Namespace).Delete(ctx, simSecret.Name, metav1.DeleteOptions{}) + }, t.Logf) + secret, err := clients.KubeClient.CoreV1().Secrets(simSecret.Namespace).Create(ctx, simSecret, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating Secret: %v", err) + } + deployment, err := clients.KubeClient.AppsV1().Deployments(simDeployment.Namespace).Create(ctx, simDeployment, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating VCSIM Deployment: %v", err) + } + service, err := clients.KubeClient.CoreV1().Services(simService.Namespace).Create(ctx, simService, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating VCSIM Service: %v", err) + } + + cancel := func() { + err := clients.KubeClient.AppsV1().Deployments(deployment.Namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("Error cleaning up Deployment %s", deployment.Name) + } + err = clients.KubeClient.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("Error cleaning up Service %s", service.Name) + } + err = clients.KubeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("Error cleaning up Secret %s", secret.Name) + } + } + + waitErr := wait.PollImmediate(test.PollInterval, test.PollTimeout, func() (bool, error) { + depl, err := clients.KubeClient.AppsV1().Deployments(ns).Get(ctx, simDeployment.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + + status := depl.Status + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == appsv1.DeploymentAvailable && c.Status == corev1.ConditionTrue { + return true, nil + } + } + return false, nil + }) + if waitErr != nil { + cancel() + t.Fatalf("Error waiting for VCSIM deployment to be ready: %v", waitErr) + } + + t.Log("vcsim ready") + + return cancel +} + +func newSimulator(namespace string) (*appsv1.Deployment, *corev1.Service) { + l := map[string]string{ + "app": vcsim, + } + + sim := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: vcsim, + Namespace: namespace, + Labels: l, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: pointer.Int32Ptr(1), + Selector: &metav1.LabelSelector{ + MatchLabels: l, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: l, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: vcsim, + Image: "vmware/vcsim:latest", + Args: []string{ + "/vcsim", + "-l", + ":8989", + }, + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{ + { + Name: "https", + ContainerPort: 8989, + }, + }, + }}, + }, + }, + }, + } + + svc := corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: vcsim, + Namespace: namespace, + Labels: l, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "https", + Port: 443, + TargetPort: intstr.IntOrString{ + IntVal: 8989, + }, + }, + }, + Selector: l, + }, + } + + return &sim, &svc +} + +func newVCSecret(namespace, name, username, password string) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: map[string][]byte{ + corev1.BasicAuthUsernameKey: []byte(username), + corev1.BasicAuthPasswordKey: []byte(password), + }, + Type: corev1.SecretTypeBasicAuth, } } diff --git a/vendor/modules.txt b/vendor/modules.txt index d30abb0be..a0f4dc071 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -999,6 +999,7 @@ k8s.io/kube-openapi/pkg/generators/rules k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/util/sets # k8s.io/utils v0.0.0-20201110183641-67b214c5f920 +## explicit k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer