From 19a5c721da93e7e5e34c5996eeea99aa6119c787 Mon Sep 17 00:00:00 2001 From: Ratnakar Date: Wed, 20 Sep 2023 09:54:14 -0400 Subject: [PATCH] Mark the item as deleted and remove from the Queue if deleted (#129) https://github.com/hyperledger-labs/fabric-operator/issues/128 Signed-off-by: asararatnakar --- .../staggerrestarts/staggerrestarts.go | 56 ++++++++++++++--- .../staggerrestarts_structs.go | 1 + .../staggerrestarts/staggerrestarts_test.go | 63 ++++++++++++++++++- 3 files changed, 110 insertions(+), 10 deletions(-) diff --git a/pkg/restart/staggerrestarts/staggerrestarts.go b/pkg/restart/staggerrestarts/staggerrestarts.go index 9167e4a2..0d1e3812 100644 --- a/pkg/restart/staggerrestarts/staggerrestarts.go +++ b/pkg/restart/staggerrestarts/staggerrestarts.go @@ -32,6 +32,7 @@ import ( "github.com/IBM-Blockchain/fabric-operator/pkg/restart/configmap" "github.com/pkg/errors" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -201,16 +202,27 @@ func (s *StaggerRestartsService) Reconcile(componentType, namespace string) (boo component.PodName = pods[0].Name } - // Restart component - err = s.RestartDeployment(name, namespace) - if err != nil { - return requeue, errors.Wrapf(err, "failed to restart deployment %s", name) - } + deployExists, _ := s.CheckDeployments(name, namespace) + if deployExists { + // Restart component + err = s.RestartDeployment(name, namespace) + if err != nil { + return requeue, errors.Wrapf(err, "failed to restart deployment %s", name) + } - // Update config - component.Status = Waiting - component.LastCheckedTimestamp = time.Now().UTC().String() - component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String() + // Update config + component.Status = Waiting + component.LastCheckedTimestamp = time.Now().UTC().String() + component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String() + } else { // if deployment doesn't exists then the cr spec might have been deleted + // deployment has been deleted, remove the entry from the queue + component.Status = Deleted + log.Info(fmt.Sprintf("%s restart status is %s, removing from %s restart queue", component.CRName, component.Status, mspid)) + component.LastCheckedTimestamp = time.Now().UTC().String() + component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String() + restartConfig.AddToLog(component) + restartConfig.PopFromQueue(mspid) + } updated = true @@ -330,6 +342,32 @@ func (s *StaggerRestartsService) RestartDeployment(name, namespace string) error return nil } +func (s *StaggerRestartsService) CheckDeployments(name, namespace string) (bool, error) { + deploymentsExists := false + + labelSelector, err := labels.Parse(fmt.Sprintf("app=%s", name)) + if err != nil { + return false, errors.Wrap(err, "failed to parse label selector for app name") + } + + listOptions := &client.ListOptions{ + LabelSelector: labelSelector, + Namespace: namespace, + } + deployList := &appsv1.DeploymentList{} + err = s.Client.List(context.TODO(), deployList, listOptions) + + if err != nil { + log.Error(err, "failed to get deployment list for %s", name) + return deploymentsExists, nil + } + if len(deployList.Items) > 0 { + deploymentsExists = true + } + + return deploymentsExists, nil +} + func (s *StaggerRestartsService) GetRunningPods(name, namespace string) ([]corev1.Pod, error) { pods := []corev1.Pod{} diff --git a/pkg/restart/staggerrestarts/staggerrestarts_structs.go b/pkg/restart/staggerrestarts/staggerrestarts_structs.go index ba0fcbae..b7538a9e 100644 --- a/pkg/restart/staggerrestarts/staggerrestarts_structs.go +++ b/pkg/restart/staggerrestarts/staggerrestarts_structs.go @@ -33,6 +33,7 @@ const ( Waiting Status = "waiting" Completed Status = "completed" Expired Status = "expired" + Deleted Status = "deleted" Restarted Status = "restarted" ) diff --git a/pkg/restart/staggerrestarts/staggerrestarts_test.go b/pkg/restart/staggerrestarts/staggerrestarts_test.go index 739170fe..0a66af84 100644 --- a/pkg/restart/staggerrestarts/staggerrestarts_test.go +++ b/pkg/restart/staggerrestarts/staggerrestarts_test.go @@ -96,6 +96,7 @@ var _ = Describe("Staggerrestarts", func() { component3 *staggerrestarts.Component pod *corev1.Pod + dep *appsv1.Deployment ) BeforeEach(func() { @@ -138,7 +139,21 @@ var _ = Describe("Staggerrestarts", func() { Phase: corev1.PodRunning, }, } - + replicas := int32(1) + dep = &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + corev1.Container{ + Name: "org1peer1", + }, + }, + }, + }, + }, + } bytes, err := json.Marshal(restartConfig) Expect(err).NotTo(HaveOccurred()) @@ -165,6 +180,9 @@ var _ = Describe("Staggerrestarts", func() { case *corev1.PodList: pods := obj.(*corev1.PodList) pods.Items = []corev1.Pod{*pod} + case *appsv1.DeploymentList: + deployments := obj.(*appsv1.DeploymentList) + deployments.Items = []appsv1.Deployment{*dep} } return nil } @@ -173,6 +191,14 @@ var _ = Describe("Staggerrestarts", func() { Context("pending", func() { It("returns empty pod list if failed to get running pods", func() { mockClient.ListReturns(errors.New("list error")) + mockClient.ListStub = func(ctx context.Context, obj client.ObjectList, opts ...k8sclient.ListOption) error { + switch obj.(type) { + case *appsv1.DeploymentList: + deployments := obj.(*appsv1.DeploymentList) + deployments.Items = []appsv1.Deployment{*dep} + } + return nil + } requeue, err := service.Reconcile("peer", "namespace") Expect(err).NotTo(HaveOccurred()) Expect(requeue).To(Equal(false)) @@ -187,6 +213,38 @@ var _ = Describe("Staggerrestarts", func() { }) }) + It("check deleted status when pods/deployments list is empty", func() { + mockClient.ListReturns(errors.New("list error")) + mockClient.ListStub = func(ctx context.Context, obj client.ObjectList, opts ...k8sclient.ListOption) error { + switch obj.(type) { + case *appsv1.DeploymentList: + deployments := obj.(*appsv1.DeploymentList) + deployments.Items = []appsv1.Deployment{} + } + return nil + } + requeue, err := service.Reconcile("peer", "namespace") + Expect(err).NotTo(HaveOccurred()) + Expect(requeue).To(Equal(false)) + + _, cm, _ := mockClient.CreateOrUpdateArgsForCall(0) + cfg := getRestartConfig(cm.(*corev1.ConfigMap)) + By("deleting first component from queue, immediate second component will be in pending state", func() { + Expect(cfg.Queues["org1"][0].CRName).To(Equal("org1peer2")) + Expect(cfg.Queues["org1"][0].Status).To(Equal(staggerrestarts.Pending)) + Expect(cfg.Queues["org1"][0].PodName).To(Equal("")) + }) + + By("moving the component to the log and setting status to deleted", func() { + Expect(len(cfg.Log)).To(Equal(2)) // since org1peer1 and org2peer1 has been deleted + + for _, components := range cfg.Log { + Expect(components[0].CRName).To(ContainSubstring("peer1")) // org1peer1 and org2peer1 + Expect(components[0].Status).To(Equal(staggerrestarts.Deleted)) + } + }) + }) + It("returns error if fails to restart deployment", func() { mockClient.PatchReturns(errors.New("patch error")) requeue, err := service.Reconcile("peer", "namespace") @@ -286,6 +344,9 @@ var _ = Describe("Staggerrestarts", func() { case *corev1.PodList: pods := obj.(*corev1.PodList) pods.Items = []corev1.Pod{*pod, *pod2} + case *appsv1.DeploymentList: + deployments := obj.(*appsv1.DeploymentList) + deployments.Items = []appsv1.Deployment{*dep} } return nil }