Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the Restarts Queue #134

Merged
137 changes: 137 additions & 0 deletions pkg/restart/staggerrestarts/staggerrestarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ package staggerrestarts
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"strconv"
"strings"
"time"

current "github.com/IBM-Blockchain/fabric-operator/api/v1beta1"
"github.com/IBM-Blockchain/fabric-operator/pkg/action"
k8sclient "github.com/IBM-Blockchain/fabric-operator/pkg/k8s/controllerclient"
"github.com/IBM-Blockchain/fabric-operator/pkg/restart/configmap"
"github.com/IBM-Blockchain/fabric-operator/pkg/util"
"github.com/pkg/errors"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -164,6 +167,119 @@ func (s *StaggerRestartsService) RestartImmediately(componentType string, instan
return nil
}

// this method checks if actually optimization is possible on the components and if restarts can be clubbed.
func isOptimizePossible(restartConfig *RestartConfig) bool {
canOptimize := false
var listOfMspCRName []string
for mspid, queue := range restartConfig.Queues {
for i := 0; i < len(queue); i++ {
// we dont want to consider waiting pods
if queue[i].Status == "waiting" {
continue
}

if util.ContainsValue(mspid+queue[i].CRName, listOfMspCRName) == true {
log.Info(fmt.Sprintf("We Can Optimize Restarts for '%s'", mspid+queue[i].CRName))
canOptimize = true
break
} else {
listOfMspCRName = append(listOfMspCRName, mspid+queue[i].CRName)
}
}
}
return canOptimize
}

// optimizeRestart is called by the ca/peer/orderer reconcile loops via the restart
// this method combines restart requests into one and reduces the number
// of restarts that is required for the components

// returns the Restart Config with Optimized Queues for Restarts
func optimizeRestart(restartConfig *RestartConfig) *RestartConfig {
optimizedMap := map[string]map[string]string{}
for mspid, queue := range restartConfig.Queues {
for i := 0; i < len(queue); i++ {
// if the pod is already in waiting state, do not combine the restart
if queue[i].Status == "waiting" {
tempqueue := map[string]string{}
tempqueue["reason"] = queue[i].Reason
tempqueue["status"] = string(queue[i].Status)
tempqueue["count"] = "1"
tempqueue["checkuntilltimestamp"] = queue[i].CheckUntilTimestamp
tempqueue["lastcheckedtimestamp"] = queue[i].LastCheckedTimestamp
tempqueue["podname"] = queue[i].PodName
tempqueue["mspid"] = mspid

optimizedMap[queue[i].CRName+"~wait"] = tempqueue
continue
}

// if the restart for that CRName already exist, increase the restart count and combine the reason
// else add it to the new map with the CRName and count as 1
if _, ok := optimizedMap[queue[i].CRName]; ok && optimizedMap[queue[i].CRName]["status"] != "waiting" {
existingCount := optimizedMap[queue[i].CRName]["count"]
newCount, _ := strconv.Atoi(existingCount)
newCount++
optimizedMap[queue[i].CRName]["count"] = strconv.Itoa(newCount)

existingReason := optimizedMap[queue[i].CRName]["reason"]
newReason := queue[i].Reason
newReason = existingReason + "~" + newReason
optimizedMap[queue[i].CRName]["reason"] = newReason
optimizedMap[queue[i].CRName]["status"] = "pending"
optimizedMap[queue[i].CRName]["mspid"] = mspid

} else {
tempqueue := map[string]string{}
tempqueue["reason"] = queue[i].Reason
tempqueue["count"] = "1"
tempqueue["status"] = "pending"
tempqueue["mspid"] = mspid
optimizedMap[queue[i].CRName] = tempqueue
}
}
}

f := map[string][]*Component{}
tempComponentArray := []*Component{}
currComponent := []*Component{}

// Merge the restart queues such that waiting restart requests are at 0 index of the slice
for mspid, queue := range restartConfig.Queues {
_ = queue
for k := range optimizedMap {
if optimizedMap[k]["mspid"] == mspid {
component := Component{}
component.Reason = optimizedMap[k]["reason"]
component.CheckUntilTimestamp = optimizedMap[k]["checkuntilltimestamp"]
component.LastCheckedTimestamp = optimizedMap[k]["lastcheckedtimestamp"]
component.Status = Status(optimizedMap[k]["status"])
component.PodName = (optimizedMap[k]["podname"])
k = strings.ReplaceAll(k, "~wait", "")
component.CRName = k
tempComponentArray = append(tempComponentArray, &component)
if f[mspid] == nil {
f[mspid] = tempComponentArray
} else {
tempComponentArray = f[mspid]
currComponent = append(currComponent, &component)
if component.Status == "waiting" {
tempComponentArray = append(currComponent, tempComponentArray...)
} else {
tempComponentArray = append(tempComponentArray, currComponent...)
}
f[mspid] = tempComponentArray
}
tempComponentArray = []*Component{}
currComponent = []*Component{}
}
}
}

restartConfig.Queues = f
return restartConfig
}

// Reconcile is called by the ca/peer/orderer reconcile loops via the restart
// manager when an update to the <ca/peer/orderer>-restart-config CM is detected
// and handles the different states of the first component of each queue.
Expand All @@ -177,6 +293,27 @@ func (s *StaggerRestartsService) Reconcile(componentType, namespace string) (boo
return requeue, err
}

isOptimizePossibleFlag := isOptimizePossible(restartConfig)
if isOptimizePossibleFlag {
u, err := json.Marshal(restartConfig.Queues)
if err != nil {
panic(err)
}
fmt.Println("Restart Config Before optimized", string(u))

restartConfig = optimizeRestart(restartConfig)
err = s.UpdateConfig(componentType, namespace, restartConfig)
if err != nil {
return requeue, err
}
u, err = json.Marshal(restartConfig.Queues)
if err != nil {
panic(err)
}
fmt.Println("Restart Config After optimized", string(u))

}

updated := false
// Check front component of each queue
for mspid, queue := range restartConfig.Queues {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ var _ = Describe("Util", func() {
It("Use default Image with registry URL when image is missing", func() {
defaultImg = "fabric-peer"
resultImg := image.GetImage(registryURL, "", defaultImg)
Expect(resultImg).To(Equal(registryURL+defaultImg))
Expect(resultImg).To(Equal(registryURL + defaultImg))
})
})
})
Loading