Skip to content

Commit

Permalink
Mark the item as deleted and remove from the Queue if deleted (hyperl…
Browse files Browse the repository at this point in the history
…edger-labs#129)

hyperledger-labs#128

Signed-off-by: asararatnakar <[email protected]>
Signed-off-by: Shoaeb Jindani <[email protected]>
  • Loading branch information
asararatnakar authored and Shoaeb Jindani committed Oct 25, 2023
1 parent dd1f673 commit cf785d4
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 10 deletions.
56 changes: 47 additions & 9 deletions pkg/restart/staggerrestarts/staggerrestarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/IBM-Blockchain/fabric-operator/pkg/util"
"github.com/pkg/errors"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -337,16 +338,27 @@ func (s *StaggerRestartsService) Reconcile(componentType, namespace string) (boo
component.PodName = pods[0].Name
}

// Restart component
err = s.RestartDeployment(name, namespace)
if err != nil {
return requeue, errors.Wrapf(err, "failed to restart deployment %s", name)
}
deployExists, _ := s.CheckDeployments(name, namespace)
if deployExists {
// Restart component
err = s.RestartDeployment(name, namespace)
if err != nil {
return requeue, errors.Wrapf(err, "failed to restart deployment %s", name)
}

// Update config
component.Status = Waiting
component.LastCheckedTimestamp = time.Now().UTC().String()
component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String()
// Update config
component.Status = Waiting
component.LastCheckedTimestamp = time.Now().UTC().String()
component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String()
} else { // if deployment doesn't exists then the cr spec might have been deleted
// deployment has been deleted, remove the entry from the queue
component.Status = Deleted
log.Info(fmt.Sprintf("%s restart status is %s, removing from %s restart queue", component.CRName, component.Status, mspid))
component.LastCheckedTimestamp = time.Now().UTC().String()
component.CheckUntilTimestamp = time.Now().Add(s.Timeout).UTC().String()
restartConfig.AddToLog(component)
restartConfig.PopFromQueue(mspid)
}

updated = true

Expand Down Expand Up @@ -466,6 +478,32 @@ func (s *StaggerRestartsService) RestartDeployment(name, namespace string) error
return nil
}

func (s *StaggerRestartsService) CheckDeployments(name, namespace string) (bool, error) {
deploymentsExists := false

labelSelector, err := labels.Parse(fmt.Sprintf("app=%s", name))
if err != nil {
return false, errors.Wrap(err, "failed to parse label selector for app name")
}

listOptions := &client.ListOptions{
LabelSelector: labelSelector,
Namespace: namespace,
}
deployList := &appsv1.DeploymentList{}
err = s.Client.List(context.TODO(), deployList, listOptions)

if err != nil {
log.Error(err, "failed to get deployment list for %s", name)
return deploymentsExists, nil
}
if len(deployList.Items) > 0 {
deploymentsExists = true
}

return deploymentsExists, nil
}

func (s *StaggerRestartsService) GetRunningPods(name, namespace string) ([]corev1.Pod, error) {
pods := []corev1.Pod{}

Expand Down
1 change: 1 addition & 0 deletions pkg/restart/staggerrestarts/staggerrestarts_structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
Waiting Status = "waiting"
Completed Status = "completed"
Expired Status = "expired"
Deleted Status = "deleted"

Restarted Status = "restarted"
)
Expand Down
63 changes: 62 additions & 1 deletion pkg/restart/staggerrestarts/staggerrestarts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var _ = Describe("Staggerrestarts", func() {
component3 *staggerrestarts.Component

pod *corev1.Pod
dep *appsv1.Deployment
)

BeforeEach(func() {
Expand Down Expand Up @@ -138,7 +139,21 @@ var _ = Describe("Staggerrestarts", func() {
Phase: corev1.PodRunning,
},
}

replicas := int32(1)
dep = &appsv1.Deployment{
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
corev1.Container{
Name: "org1peer1",
},
},
},
},
},
}
bytes, err := json.Marshal(restartConfig)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -165,6 +180,9 @@ var _ = Describe("Staggerrestarts", func() {
case *corev1.PodList:
pods := obj.(*corev1.PodList)
pods.Items = []corev1.Pod{*pod}
case *appsv1.DeploymentList:
deployments := obj.(*appsv1.DeploymentList)
deployments.Items = []appsv1.Deployment{*dep}
}
return nil
}
Expand All @@ -173,6 +191,14 @@ var _ = Describe("Staggerrestarts", func() {
Context("pending", func() {
It("returns empty pod list if failed to get running pods", func() {
mockClient.ListReturns(errors.New("list error"))
mockClient.ListStub = func(ctx context.Context, obj client.ObjectList, opts ...k8sclient.ListOption) error {
switch obj.(type) {
case *appsv1.DeploymentList:
deployments := obj.(*appsv1.DeploymentList)
deployments.Items = []appsv1.Deployment{*dep}
}
return nil
}
requeue, err := service.Reconcile("peer", "namespace")
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(Equal(false))
Expand All @@ -187,6 +213,38 @@ var _ = Describe("Staggerrestarts", func() {
})
})

It("check deleted status when pods/deployments list is empty", func() {
mockClient.ListReturns(errors.New("list error"))
mockClient.ListStub = func(ctx context.Context, obj client.ObjectList, opts ...k8sclient.ListOption) error {
switch obj.(type) {
case *appsv1.DeploymentList:
deployments := obj.(*appsv1.DeploymentList)
deployments.Items = []appsv1.Deployment{}
}
return nil
}
requeue, err := service.Reconcile("peer", "namespace")
Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(Equal(false))

_, cm, _ := mockClient.CreateOrUpdateArgsForCall(0)
cfg := getRestartConfig(cm.(*corev1.ConfigMap))
By("deleting first component from queue, immediate second component will be in pending state", func() {
Expect(cfg.Queues["org1"][0].CRName).To(Equal("org1peer2"))
Expect(cfg.Queues["org1"][0].Status).To(Equal(staggerrestarts.Pending))
Expect(cfg.Queues["org1"][0].PodName).To(Equal(""))
})

By("moving the component to the log and setting status to deleted", func() {
Expect(len(cfg.Log)).To(Equal(2)) // since org1peer1 and org2peer1 has been deleted

for _, components := range cfg.Log {
Expect(components[0].CRName).To(ContainSubstring("peer1")) // org1peer1 and org2peer1
Expect(components[0].Status).To(Equal(staggerrestarts.Deleted))
}
})
})

It("returns error if fails to restart deployment", func() {
mockClient.PatchReturns(errors.New("patch error"))
requeue, err := service.Reconcile("peer", "namespace")
Expand Down Expand Up @@ -286,6 +344,9 @@ var _ = Describe("Staggerrestarts", func() {
case *corev1.PodList:
pods := obj.(*corev1.PodList)
pods.Items = []corev1.Pod{*pod, *pod2}
case *appsv1.DeploymentList:
deployments := obj.(*appsv1.DeploymentList)
deployments.Items = []appsv1.Deployment{*dep}
}
return nil
}
Expand Down

0 comments on commit cf785d4

Please sign in to comment.