Skip to content

Commit

Permalink
rename functions and args
Browse files Browse the repository at this point in the history
  • Loading branch information
sophon-zt committed Nov 20, 2024
1 parent 134e658 commit e0f6768
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 86 deletions.
15 changes: 10 additions & 5 deletions controllers/parameters/combine_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,29 @@ import (
parametersv1alpha1 "github.com/apecloud/kubeblocks/apis/parameters/v1alpha1"
)

var combineUpgradePolicyInstance = &combineUpgradePolicy{
policyExecutors: []reconfigurePolicy{
syncPolicyInstance,
simplePolicyInstance,
},
}

type combineUpgradePolicy struct {
policyExecutors []reconfigurePolicy
}

func init() {
RegisterPolicy(parametersv1alpha1.DynamicReloadAndRestartPolicy, &combineUpgradePolicy{
policyExecutors: []reconfigurePolicy{&syncPolicy{}, &simplePolicy{}},
})
RegisterPolicy(parametersv1alpha1.DynamicReloadAndRestartPolicy, combineUpgradePolicyInstance)
}

func (h *combineUpgradePolicy) GetPolicyName() string {
return string(parametersv1alpha1.DynamicReloadAndRestartPolicy)
}

func (h *combineUpgradePolicy) Upgrade(params reconfigureContext) (ReturnedStatus, error) {
func (h *combineUpgradePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
var ret ReturnedStatus
for _, executor := range h.policyExecutors {
retStatus, err := executor.Upgrade(params)
retStatus, err := executor.Upgrade(rctx)
if err != nil {
return retStatus, err
}
Expand Down
23 changes: 12 additions & 11 deletions controllers/parameters/parallel_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,40 @@ import (
podutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type parallelUpgradePolicy struct {
}
var parallelUpgradePolicyInstance = &parallelUpgradePolicy{}

type parallelUpgradePolicy struct{}

func init() {
RegisterPolicy(parametersv1alpha1.RestartPolicy, &parallelUpgradePolicy{})
RegisterPolicy(parametersv1alpha1.RestartPolicy, parallelUpgradePolicyInstance)
}

func (p *parallelUpgradePolicy) Upgrade(params reconfigureContext) (ReturnedStatus, error) {
func (p *parallelUpgradePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
funcs := GetInstanceSetRollingUpgradeFuncs()
pods, err := funcs.GetPodsFunc(params)
pods, err := funcs.GetPodsFunc(rctx)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}

return p.restartPods(params, pods, funcs)
return p.restartPods(rctx, pods, funcs)
}

func (p *parallelUpgradePolicy) GetPolicyName() string {
return string(parametersv1alpha1.RestartPolicy)
}

func (p *parallelUpgradePolicy) restartPods(params reconfigureContext, pods []corev1.Pod, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
var configKey = params.getConfigKey()
var configVersion = params.getTargetVersionHash()
func (p *parallelUpgradePolicy) restartPods(rctx reconfigureContext, pods []corev1.Pod, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
var configKey = rctx.getConfigKey()
var configVersion = rctx.getTargetVersionHash()

for _, pod := range pods {
if podutil.IsMatchConfigVersion(&pod, configKey, configVersion) {
continue
}
if err := funcs.RestartContainerFunc(&pod, params.Ctx, params.ContainerNames, params.ReconfigureClientFactory); err != nil {
if err := funcs.RestartContainerFunc(&pod, rctx.Ctx, rctx.ContainerNames, rctx.ReconfigureClientFactory); err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
if err := updatePodLabelsWithConfigVersion(&pod, configKey, configVersion, params.Client, params.Ctx); err != nil {
if err := updatePodLabelsWithConfigVersion(&pod, configKey, configVersion, rctx.Client, rctx.Ctx); err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/parameters/reconfigure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ReturnedStatus struct {

type reconfigurePolicy interface {
// Upgrade is to enable the configuration to take effect.
Upgrade(params reconfigureContext) (ReturnedStatus, error)
Upgrade(rctx reconfigureContext) (ReturnedStatus, error)

// GetPolicyName returns name of policy.
GetPolicyName() string
Expand Down
57 changes: 25 additions & 32 deletions controllers/parameters/rolling_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package parameters

import (
"context"
"os"
"log"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,24 +33,24 @@ import (
)

const (
// StatefulSetSpec.Spec.MinReadySeconds
// units: s
defaultMinReadySeconds = 10
)

type rollingUpgradePolicy struct {
}
var rollingUpgradePolicyInstance = &rollingUpgradePolicy{}

type rollingUpgradePolicy struct{}

func init() {
RegisterPolicy(parametersv1alpha1.RollingPolicy, &rollingUpgradePolicy{})
RegisterPolicy(parametersv1alpha1.RollingPolicy, rollingUpgradePolicyInstance)
if err := viper.BindEnv(constant.PodMinReadySecondsEnv); err != nil {
os.Exit(-1)
log.Fatalf("failed to bind environment variable: %v", err)
}
viper.SetDefault(constant.PodMinReadySecondsEnv, defaultMinReadySeconds)
}

func (r *rollingUpgradePolicy) Upgrade(params reconfigureContext) (ReturnedStatus, error) {
return performRollingUpgrade(params, GetInstanceSetRollingUpgradeFuncs())
// Upgrade performs a rolling upgrade based on the provided parameters.
func (r *rollingUpgradePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
return performRollingUpgrade(rctx, GetInstanceSetRollingUpgradeFuncs())
}

func (r *rollingUpgradePolicy) GetPolicyName() string {
Expand All @@ -61,26 +61,26 @@ func canPerformUpgrade(pods []corev1.Pod, params reconfigureContext) bool {
return len(pods) == params.getTargetReplicas()
}

func performRollingUpgrade(params reconfigureContext, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
pods, err := funcs.GetPodsFunc(params)
func performRollingUpgrade(rctx reconfigureContext, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
pods, err := funcs.GetPodsFunc(rctx)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}

var (
rollingReplicas = params.maxRollingReplicas()
configKey = params.getConfigKey()
configVersion = params.getTargetVersionHash()
rollingReplicas = rctx.maxRollingReplicas()
configKey = rctx.getConfigKey()
configVersion = rctx.getTargetVersionHash()
)

if !canPerformUpgrade(pods, params) {
if !canPerformUpgrade(pods, rctx) {
return makeReturnedStatus(ESRetry), nil
}

podStats := classifyPodByStats(pods, params.getTargetReplicas(), params.podMinReadySeconds())
podStats := classifyPodByStats(pods, rctx.getTargetReplicas(), rctx.podMinReadySeconds())
podSwitchWindow := markDynamicSwitchWindow(pods, podStats, configKey, configVersion, rollingReplicas)
if !canSafeUpdatePods(podSwitchWindow) {
params.Log.Info("wait for pod stat ready.")
rctx.Log.Info("wait for pod stat ready.")
return makeReturnedStatus(ESRetry), nil
}

Expand All @@ -91,13 +91,13 @@ func performRollingUpgrade(params reconfigureContext, funcs RollingUpgradeFuncs)

for _, pod := range waitUpgradingPods {
if podStats.isUpdating(&pod) {
params.Log.Info("pod is in rolling update.", "pod name", pod.Name)
rctx.Log.Info("pod is in rolling update.", "pod name", pod.Name)
continue
}
if err := funcs.RestartContainerFunc(&pod, params.Ctx, params.ContainerNames, params.ReconfigureClientFactory); err != nil {
if err := funcs.RestartContainerFunc(&pod, rctx.Ctx, rctx.ContainerNames, rctx.ReconfigureClientFactory); err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
if err := updatePodLabelsWithConfigVersion(&pod, configKey, configVersion, params.Client, params.Ctx); err != nil {
if err := updatePodLabelsWithConfigVersion(&pod, configKey, configVersion, rctx.Client, rctx.Ctx); err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
}
Expand Down Expand Up @@ -125,7 +125,6 @@ func markDynamicSwitchWindow(pods []corev1.Pod, podsStats *componentPodStats, co
componentPodStats: podsStats,
}

// find update last
for i := podsStats.targetReplica - 1; i >= 0; i-- {
pod := &pods[i]
if !podutil.IsMatchConfigVersion(pod, configKey, currentVersion) {
Expand Down Expand Up @@ -173,15 +172,10 @@ func classifyPodByStats(pods []corev1.Pod, targetReplicas int, minReadySeconds i
}

type componentPodStats struct {
// failed to start pod
ready map[string]*corev1.Pod
available map[string]*corev1.Pod

// updated pod count
updated map[string]*corev1.Pod
updating map[string]*corev1.Pod

// expected pod
ready map[string]*corev1.Pod
available map[string]*corev1.Pod
updated map[string]*corev1.Pod
updating map[string]*corev1.Pod
targetReplica int
}

Expand All @@ -203,8 +197,7 @@ func (s *componentPodStats) isUpdating(pod *corev1.Pod) bool {
type switchWindow struct {
begin int
end int

pods []corev1.Pod
pods []corev1.Pod
*componentPodStats
}

Expand Down
31 changes: 16 additions & 15 deletions controllers/parameters/simple_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,46 @@ import (
"github.com/apecloud/kubeblocks/pkg/configuration/core"
)

type simplePolicy struct {
}
var simplePolicyInstance = &simplePolicy{}

type simplePolicy struct{}

func init() {
RegisterPolicy(parametersv1alpha1.NormalPolicy, &simplePolicy{})
RegisterPolicy(parametersv1alpha1.NormalPolicy, simplePolicyInstance)
}

func (s *simplePolicy) Upgrade(params reconfigureContext) (ReturnedStatus, error) {
params.Log.V(1).Info("simple policy begin....")
func (s *simplePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
rctx.Log.V(1).Info("simple policy begin....")

return restartAndCheckComponent(params, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(params))
return restartAndCheckComponent(rctx, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(rctx))
}

func (s *simplePolicy) GetPolicyName() string {
return string(parametersv1alpha1.NormalPolicy)
}

func restartAndCheckComponent(param reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) {
func restartAndCheckComponent(rctx reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) {
var (
newVersion = param.getTargetVersionHash()
configKey = param.getConfigKey()
newVersion = rctx.getTargetVersionHash()
configKey = rctx.getConfigKey()

retStatus = ESRetry
progress = core.NotStarted
)

recordEvent := func(obj client.Object) {
param.Recorder.Eventf(obj,
rctx.Recorder.Eventf(obj,
corev1.EventTypeNormal, appsv1alpha1.ReasonReconfigureRestart,
"restarting component[%s] in cluster[%s], version: %s", param.ClusterComponent.Name, param.Cluster.Name, newVersion)
"restarting component[%s] in cluster[%s], version: %s", rctx.ClusterComponent.Name, rctx.Cluster.Name, newVersion)
}
if obj, err := funcs.RestartComponent(param.Client, param.RequestCtx, configKey, newVersion, objs, recordEvent); err != nil {
param.Recorder.Eventf(obj,
if obj, err := funcs.RestartComponent(rctx.Client, rctx.RequestCtx, configKey, newVersion, objs, recordEvent); err != nil {
rctx.Recorder.Eventf(obj,
corev1.EventTypeWarning, appsv1alpha1.ReasonReconfigureRestartFailed,
"failed to restart component[%s] in cluster[%s], version: %s", client.ObjectKeyFromObject(obj), param.Cluster.Name, newVersion)
"failed to restart component[%s] in cluster[%s], version: %s", client.ObjectKeyFromObject(obj), rctx.Cluster.Name, newVersion)
return makeReturnedStatus(ESFailedAndRetry), err
}

pods, err := funcs.GetPodsFunc(param)
pods, err := funcs.GetPodsFunc(rctx)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
Expand Down
39 changes: 20 additions & 19 deletions controllers/parameters/sync_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,30 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

type syncPolicy struct {
}
var syncPolicyInstance = &syncPolicy{}

type syncPolicy struct{}

func init() {
RegisterPolicy(parametersv1alpha1.SyncDynamicReloadPolicy, &syncPolicy{})
RegisterPolicy(parametersv1alpha1.SyncDynamicReloadPolicy, syncPolicyInstance)
}

func (o *syncPolicy) GetPolicyName() string {
return string(parametersv1alpha1.SyncDynamicReloadPolicy)
}

func (o *syncPolicy) Upgrade(params reconfigureContext) (ReturnedStatus, error) {
updatedParameters := params.UpdatedParameters
func (o *syncPolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
updatedParameters := rctx.UpdatedParameters
if len(updatedParameters) == 0 {
return makeReturnedStatus(ESNone), nil
}

funcs := GetInstanceSetRollingUpgradeFuncs()
pods, err := funcs.GetPodsFunc(params)
pods, err := funcs.GetPodsFunc(rctx)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
return sync(params, updatedParameters, pods, funcs)
return sync(rctx, updatedParameters, pods, funcs)
}

func matchLabel(pods []corev1.Pod, selector *metav1.LabelSelector) ([]corev1.Pod, error) {
Expand All @@ -71,18 +72,18 @@ func matchLabel(pods []corev1.Pod, selector *metav1.LabelSelector) ([]corev1.Pod
return result, nil
}

func sync(params reconfigureContext, updatedParameters map[string]string, pods []corev1.Pod, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
func sync(rctx reconfigureContext, updatedParameters map[string]string, pods []corev1.Pod, funcs RollingUpgradeFuncs) (ReturnedStatus, error) {
var (
r = ESNone
total = int32(len(pods))
replicas = int32(params.getTargetReplicas())
replicas = int32(rctx.getTargetReplicas())
progress = core.NotStarted

err error
ctx = params.Ctx
configKey = params.getConfigKey()
versionHash = params.getTargetVersionHash()
selector = intctrlutil.GetPodSelector(params.ParametersDef)
ctx = rctx.Ctx
configKey = rctx.getConfigKey()
versionHash = rctx.getTargetVersionHash()
selector = intctrlutil.GetPodSelector(rctx.ParametersDef)
fileName string
)

Expand All @@ -93,28 +94,28 @@ func sync(params reconfigureContext, updatedParameters map[string]string, pods [
return makeReturnedStatus(ESFailedAndRetry), err
}
if len(pods) == 0 {
params.Log.Info(fmt.Sprintf("no pods to update, and retry, selector: %v", selector))
rctx.Log.Info(fmt.Sprintf("no pods to update, and retry, selector: %v", selector))
return makeReturnedStatus(ESRetry), nil
}
if params.ConfigDescription != nil {
fileName = params.ConfigDescription.Name
if rctx.ConfigDescription != nil {
fileName = rctx.ConfigDescription.Name
}

requireUpdatedCount := int32(len(pods))
for _, pod := range pods {
params.Log.V(1).Info(fmt.Sprintf("sync pod: %s", pod.Name))
rctx.Log.V(1).Info(fmt.Sprintf("sync pod: %s", pod.Name))
if intctrlutil.IsMatchConfigVersion(&pod, configKey, versionHash) {
progress++
continue
}
if !intctrlutil.PodIsReady(&pod) {
continue
}
err = funcs.OnlineUpdatePodFunc(&pod, ctx, params.ReconfigureClientFactory, params.ConfigTemplate.Name, fileName, updatedParameters)
err = funcs.OnlineUpdatePodFunc(&pod, ctx, rctx.ReconfigureClientFactory, rctx.ConfigTemplate.Name, fileName, updatedParameters)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
err = updatePodLabelsWithConfigVersion(&pod, configKey, versionHash, params.Client, ctx)
err = updatePodLabelsWithConfigVersion(&pod, configKey, versionHash, rctx.Client, ctx)
if err != nil {
return makeReturnedStatus(ESFailedAndRetry), err
}
Expand Down
Loading

0 comments on commit e0f6768

Please sign in to comment.