diff --git a/controllers/apps/transformer_component_reconfigure.go b/controllers/apps/transformer_component_reconfigure.go index fc4478ef89b..842bb83ee8f 100644 --- a/controllers/apps/transformer_component_reconfigure.go +++ b/controllers/apps/transformer_component_reconfigure.go @@ -79,7 +79,7 @@ func (t *componentReloadActionSidecarTransformer) Transform(ctx graph.TransformC } graphCli, _ := transCtx.Client.(model.GraphClient) - if err := checkAndCreateConfigRelatedObjs(transCtx, graphCli, dag, configmaps...); err != nil { + if err := ensureConfigMapsPresence(transCtx, graphCli, dag, configmaps...); err != nil { return err } if err := updatePodVolumes(synthesizeComp.PodSpec, synthesizeComp); err != nil { @@ -92,7 +92,7 @@ func (t *componentReloadActionSidecarTransformer) Transform(ctx graph.TransformC return configctrl.BuildReloadActionContainer(reconcileCtx, cluster, synthesizeComp, transCtx.CompDef, configmaps) } -func checkAndCreateConfigRelatedObjs(ctx context.Context, cli model.GraphClient, dag *graph.DAG, configmaps ...*corev1.ConfigMap) error { +func ensureConfigMapsPresence(ctx context.Context, cli model.GraphClient, dag *graph.DAG, configmaps ...*corev1.ConfigMap) error { for _, configmap := range configmaps { var cm = &corev1.ConfigMap{} if err := cli.Get(ctx, client.ObjectKeyFromObject(configmap), cm); err != nil { diff --git a/controllers/parameters/policy_util.go b/controllers/parameters/policy_util.go index 85af0797267..e153cba5d5c 100644 --- a/controllers/parameters/policy_util.go +++ b/controllers/parameters/policy_util.go @@ -96,7 +96,7 @@ func getPodsForOnlineUpdate(params reconfigureContext) ([]corev1.Pod, error) { // TODO commonOnlineUpdateWithPod migrate to sql command pipeline func commonOnlineUpdateWithPod(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, configFile string, updatedParams map[string]string) error { - address, err := cfgManagerGrpcURL(pod) + address, err := resolveReloadServerGrpcURL(pod) if err != nil { return err } @@ -131,7 +131,7 @@ func commonStopContainerWithPod(pod *corev1.Pod, ctx context.Context, containerN containerIDs = append(containerIDs, containerID) } - address, err := cfgManagerGrpcURL(pod) + address, err := resolveReloadServerGrpcURL(pod) if err != nil { return err } @@ -155,19 +155,19 @@ func commonStopContainerWithPod(pod *corev1.Pod, ctx context.Context, containerN return nil } -func cfgManagerGrpcURL(pod *corev1.Pod) (string, error) { +func resolveReloadServerGrpcURL(pod *corev1.Pod) (string, error) { podPort := viper.GetInt(constant.ConfigManagerGPRCPortEnv) if pod.Spec.HostNetwork { - containerPort, err := configuration.GetConfigManagerGRPCPort(pod.Spec.Containers) + containerPort, err := configuration.ResolveReloadServerGRPCPort(pod.Spec.Containers) if err != nil { return "", err } podPort = int(containerPort) } - return getURLFromPod(pod, podPort) + return generateGrpcURL(pod, podPort) } -func getURLFromPod(pod *corev1.Pod, portPort int) (string, error) { +func generateGrpcURL(pod *corev1.Pod, portPort int) (string, error) { ip, err := ipAddressFromPod(pod.Status) if err != nil { return "", err @@ -286,22 +286,24 @@ func resolveReloadActionPolicy(jsonPatch string, return policy, nil } +// genReconfigureActionTasks generates a list of reconfiguration tasks based on the provided templateSpec, +// reconfiguration context, configuration patch, and a restart flag. func genReconfigureActionTasks(templateSpec *appsv1.ComponentTemplateSpec, rctx *ReconcileContext, patch *core.ConfigPatchInfo, restart bool) ([]ReloadAction, error) { var tasks []ReloadAction + // If the patch or ConfigRender is nil, return a single restart task. if patch == nil || rctx.ConfigRender == nil { return []ReloadAction{buildRestartTask(templateSpec, rctx)}, nil } - checkNeedReloadAction := func(pd *parametersv1alpha1.ParametersDefinition, policy parametersv1alpha1.ReloadPolicy) bool { - if restart { - return policy == parametersv1alpha1.SyncDynamicReloadPolicy && intctrlutil.NeedDynamicReloadAction(&pd.Spec) - } - return true + // needReloadAction determines if a reload action is needed based on the ParametersDefinition and ReloadPolicy. + needReloadAction := func(pd *parametersv1alpha1.ParametersDefinition, policy parametersv1alpha1.ReloadPolicy) bool { + return !restart || (policy == parametersv1alpha1.SyncDynamicReloadPolicy && intctrlutil.NeedDynamicReloadAction(&pd.Spec)) } for key, jsonPatch := range patch.UpdateConfig { pd, ok := rctx.ParametersDefs[key] + // If the ParametersDefinition or its ReloadAction is nil, continue to the next iteration. if !ok || pd.Spec.ReloadAction == nil { continue } @@ -309,15 +311,18 @@ func genReconfigureActionTasks(templateSpec *appsv1.ComponentTemplateSpec, rctx if configFormat == nil || configFormat.FileFormatConfig == nil { continue } + // Determine the appropriate ReloadPolicy. policy, err := resolveReloadActionPolicy(string(jsonPatch), configFormat.FileFormatConfig, &pd.Spec) if err != nil { return nil, err } - if checkNeedReloadAction(pd, policy) { + // If a reload action is needed, append a new reload action task to the tasks slice. + if needReloadAction(pd, policy) { tasks = append(tasks, buildReloadActionTask(policy, templateSpec, rctx, pd, configFormat, patch)) } } + // If no tasks were added, return a single restart task. if len(tasks) == 0 { return []ReloadAction{buildRestartTask(templateSpec, rctx)}, nil } @@ -342,7 +347,7 @@ func buildReloadActionTask(reloadPolicy parametersv1alpha1.ReloadPolicy, templat } if reloadPolicy == parametersv1alpha1.SyncDynamicReloadPolicy { - reCtx.UpdatedParameters = getOnlineUpdateParams(patch, &pd.Spec, *configDescription) + reCtx.UpdatedParameters = generateOnlineUpdateParams(patch, &pd.Spec, *configDescription) } return reconfigureTask{ReloadPolicy: reloadPolicy, taskCtx: reCtx} diff --git a/controllers/parameters/reconcile_task.go b/controllers/parameters/reconcile_task.go index 0f3286d22d3..e6a62fc03c1 100644 --- a/controllers/parameters/reconcile_task.go +++ b/controllers/parameters/reconcile_task.go @@ -181,6 +181,10 @@ func syncImpl(taskCtx *TaskContext, return nil } +// persistUpdatedParameters merges and updates parameter-related configmaps. +// It first calls mergeAndApplyConfig to merge and update the configmap. +// If the updatedConfig is nil, it returns nil. Otherwise, it calls updateInjectedEnvVars +// to check and update the injected environment variables. func persistUpdatedParameters(rctx *configctrl.ResourceCtx, comp *component.SynthesizedComponent, configRender *parametersv1alpha1.ParameterDrivenConfigRender, @@ -189,16 +193,16 @@ func persistUpdatedParameters(rctx *configctrl.ResourceCtx, owner client.Object, item parametersv1alpha1.ConfigTemplateItemDetail, revision string) error { - if err := mergeAndUpdate(rctx, updatedConfig, original, owner, item, revision); err != nil { + if err := mergeAndApplyConfig(rctx, updatedConfig, original, owner, item, revision); err != nil { return err } if updatedConfig == nil { return nil } - return checkAndUpdateInjectedEnv(rctx, comp, configRender, updatedConfig, owner, item, revision) + return updateInjectedEnvVars(rctx, comp, configRender, updatedConfig, owner, item, revision) } -func checkAndUpdateInjectedEnv(rctx *configctrl.ResourceCtx, +func updateInjectedEnvVars(rctx *configctrl.ResourceCtx, comp *component.SynthesizedComponent, configRender *parametersv1alpha1.ParameterDrivenConfigRender, config *corev1.ConfigMap, @@ -234,7 +238,7 @@ func checkAndUpdateInjectedEnv(rctx *configctrl.ResourceCtx, return err } -func mergeAndUpdate(resourceCtx *configctrl.ResourceCtx, +func mergeAndApplyConfig(resourceCtx *configctrl.ResourceCtx, expected *corev1.ConfigMap, running *corev1.ConfigMap, owner client.Object, diff --git a/controllers/parameters/reconfigure_controller.go b/controllers/parameters/reconfigure_controller.go index 47a08e3bb5e..96eaa4bd1f4 100644 --- a/controllers/parameters/reconfigure_controller.go +++ b/controllers/parameters/reconfigure_controller.go @@ -234,65 +234,41 @@ func (r *ReconfigureReconciler) performUpgrade(rctx *ReconcileContext, reloadTas var reloadType string for _, task := range reloadTasks { - returnedStatus, err = task.ExecReload() reloadType = task.ReloadType() + returnedStatus, err = task.ExecReload() if err != nil || returnedStatus.Status != ESNone { return r.status(rctx, returnedStatus, reloadType, err) } } - - rctx.Recorder.Eventf(rctx.ConfigMap, - corev1.EventTypeNormal, - appsv1alpha1.ReasonReconfigureSucceed, - "the reconfigure[%s] has been processed successfully", - reloadType) - - result := reconciled(returnedStatus, reloadType, parametersv1alpha1.CFinishedPhase) - return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, reloadType, &result) + return r.succeed(rctx, reloadType, returnedStatus) } func (r *ReconfigureReconciler) status(rctx *ReconcileContext, returnedStatus ReturnedStatus, policy string, err error) (ctrl.Result, error) { + updatePhase := func(phase parametersv1alpha1.ParameterPhase, options ...options) (ctrl.Result, error) { + return updateConfigPhaseWithResult(rctx.Client, rctx.RequestCtx, rctx.ConfigMap, reconciled(returnedStatus, policy, phase, options...)) + } + switch returnedStatus.Status { - default: - return updateConfigPhaseWithResult( - rctx.Client, - rctx.RequestCtx, - rctx.ConfigMap, - reconciled(returnedStatus, policy, parametersv1alpha1.CFailedAndPausePhase, - withFailed(core.MakeError("unknown status"), false)), - ) case ESFailedAndRetry: - return updateConfigPhaseWithResult( - rctx.Client, - rctx.RequestCtx, - rctx.ConfigMap, - reconciled(returnedStatus, policy, parametersv1alpha1.CFailedPhase, - withFailed(err, true)), - ) + return updatePhase(parametersv1alpha1.CFailedPhase, withFailed(err, true)) case ESRetry: - return updateConfigPhaseWithResult( - rctx.Client, - rctx.RequestCtx, - rctx.ConfigMap, - reconciled(returnedStatus, policy, parametersv1alpha1.CUpgradingPhase), - ) + return updatePhase(parametersv1alpha1.CUpgradingPhase) case ESFailed: - return updateConfigPhaseWithResult( - rctx.Client, - rctx.RequestCtx, - rctx.ConfigMap, - reconciled(returnedStatus, policy, parametersv1alpha1.CFailedAndPausePhase, - withFailed(err, false)), - ) + return updatePhase(parametersv1alpha1.CFailedAndPausePhase, withFailed(err, false)) case ESNone: - rctx.Recorder.Eventf( - rctx.ConfigMap, - corev1.EventTypeNormal, - appsv1alpha1.ReasonReconfigureSucceed, - "the reconfigure[%s] has been processed successfully", - policy, - ) - result := reconciled(returnedStatus, policy, parametersv1alpha1.CFinishedPhase) - return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, policy, &result) + return r.succeed(rctx, policy, returnedStatus) + default: + return updatePhase(parametersv1alpha1.CFailedAndPausePhase, withFailed(core.MakeError("unknown status"), false)) } } + +func (r *ReconfigureReconciler) succeed(rctx *ReconcileContext, reloadType string, returnedStatus ReturnedStatus) (ctrl.Result, error) { + rctx.Recorder.Eventf(rctx.ConfigMap, + corev1.EventTypeNormal, + appsv1alpha1.ReasonReconfigureSucceed, + "the reconfigure[%s] has been processed successfully", + reloadType) + + result := reconciled(returnedStatus, reloadType, parametersv1alpha1.CFinishedPhase) + return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, reloadType, &result) +} diff --git a/controllers/parameters/reconfigure_policy.go b/controllers/parameters/reconfigure_policy.go index 35bc5ef5ff6..5bcaea5c9c9 100644 --- a/controllers/parameters/reconfigure_policy.go +++ b/controllers/parameters/reconfigure_policy.go @@ -130,7 +130,12 @@ func GetClientFactory() createReconfigureClient { } func (param *reconfigureContext) getConfigKey() string { - return param.ConfigTemplate.Name + key := param.ConfigTemplate.Name + if param.ConfigDescription != nil && param.ConfigDescription.Name != "" { + hash, _ := util.ComputeHash(param.ConfigDescription.Name) + key = key + "/" + hash + } + return key } func (param *reconfigureContext) getTargetVersionHash() string { diff --git a/controllers/parameters/rolling_upgrade_policy.go b/controllers/parameters/rolling_upgrade_policy.go index 6798a7d03a1..910a3b51710 100644 --- a/controllers/parameters/rolling_upgrade_policy.go +++ b/controllers/parameters/rolling_upgrade_policy.go @@ -77,20 +77,20 @@ func performRollingUpgrade(rctx reconfigureContext, funcs RollingUpgradeFuncs) ( return makeReturnedStatus(ESRetry), nil } - podStats := classifyPodByStats(pods, rctx.getTargetReplicas(), rctx.podMinReadySeconds()) - podSwitchWindow := markDynamicSwitchWindow(pods, podStats, configKey, configVersion, rollingReplicas) - if !canSafeUpdatePods(podSwitchWindow) { + podStatus := classifyPodByStats(pods, rctx.getTargetReplicas(), rctx.podMinReadySeconds()) + updateWindow := markDynamicSwitchWindow(pods, podStatus, configKey, configVersion, rollingReplicas) + if !canSafeUpdatePods(updateWindow) { rctx.Log.Info("wait for pod stat ready.") return makeReturnedStatus(ESRetry), nil } - waitUpgradingPods := podSwitchWindow.getWaitUpgradePods() - if len(waitUpgradingPods) == 0 { - return makeReturnedStatus(ESNone, withSucceed(int32(podStats.targetReplica)), withExpected(int32(podStats.targetReplica))), nil + podsToUpgrade := updateWindow.getPendingUpgradePods() + if len(podsToUpgrade) == 0 { + return makeReturnedStatus(ESNone, withSucceed(int32(podStatus.targetReplica)), withExpected(int32(podStatus.targetReplica))), nil } - for _, pod := range waitUpgradingPods { - if podStats.isUpdating(&pod) { + for _, pod := range podsToUpgrade { + if podStatus.isUpdating(&pod) { rctx.Log.Info("pod is in rolling update.", "pod name", pod.Name) continue } @@ -103,8 +103,8 @@ func performRollingUpgrade(rctx reconfigureContext, funcs RollingUpgradeFuncs) ( } return makeReturnedStatus(ESRetry, - withExpected(int32(podStats.targetReplica)), - withSucceed(int32(len(podStats.updated)+len(podStats.updating)))), nil + withExpected(int32(podStatus.targetReplica)), + withSucceed(int32(len(podStatus.updated)+len(podStatus.updating)))), nil } func canSafeUpdatePods(wind switchWindow) bool { @@ -201,7 +201,7 @@ type switchWindow struct { *componentPodStats } -func (w *switchWindow) getWaitUpgradePods() []corev1.Pod { +func (w *switchWindow) getPendingUpgradePods() []corev1.Pod { return w.pods[w.begin:w.end] } diff --git a/controllers/parameters/rolling_upgrade_policy_test.go b/controllers/parameters/rolling_upgrade_policy_test.go index bb13eee8e77..2c614a0103e 100644 --- a/controllers/parameters/rolling_upgrade_policy_test.go +++ b/controllers/parameters/rolling_upgrade_policy_test.go @@ -60,6 +60,7 @@ var _ = Describe("Reconfigure RollingPolicy", func() { withConfigSpec("for_test", map[string]string{ "key": "value", }), + withConfigDescription(¶metersv1alpha1.FileFormatConfig{Format: parametersv1alpha1.Properties}), withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) { return reconfigureClient, nil }), diff --git a/controllers/parameters/simple_policy.go b/controllers/parameters/simple_policy.go index 012511d30a9..4b908f75afb 100644 --- a/controllers/parameters/simple_policy.go +++ b/controllers/parameters/simple_policy.go @@ -39,14 +39,14 @@ func init() { func (s *simplePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) { rctx.Log.V(1).Info("simple policy begin....") - return restartAndCheckComponent(rctx, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(rctx)) + return restartAndVerifyComponent(rctx, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(rctx)) } func (s *simplePolicy) GetPolicyName() string { return string(parametersv1alpha1.NormalPolicy) } -func restartAndCheckComponent(rctx reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) { +func restartAndVerifyComponent(rctx reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) { var ( newVersion = rctx.getTargetVersionHash() configKey = rctx.getConfigKey() diff --git a/controllers/parameters/sync_upgrade_policy.go b/controllers/parameters/sync_upgrade_policy.go index ffc22fef960..c47fd188e61 100644 --- a/controllers/parameters/sync_upgrade_policy.go +++ b/controllers/parameters/sync_upgrade_policy.go @@ -128,7 +128,7 @@ func sync(rctx reconfigureContext, updatedParameters map[string]string, pods []c return makeReturnedStatus(r, withExpected(requireUpdatedCount), withSucceed(progress)), nil } -func getOnlineUpdateParams(configPatch *core.ConfigPatchInfo, paramDef *parametersv1alpha1.ParametersDefinitionSpec, description parametersv1alpha1.ComponentConfigDescription) map[string]string { +func generateOnlineUpdateParams(configPatch *core.ConfigPatchInfo, paramDef *parametersv1alpha1.ParametersDefinitionSpec, description parametersv1alpha1.ComponentConfigDescription) map[string]string { r := make(map[string]string) dynamicAction := intctrlutil.NeedDynamicReloadAction(paramDef) needReloadStaticParameters := intctrlutil.ReloadStaticParameters(paramDef) diff --git a/pkg/controller/configuration/config_utils.go b/pkg/controller/configuration/config_utils.go index 4a812f9e38c..7790ff722de 100644 --- a/pkg/controller/configuration/config_utils.go +++ b/pkg/controller/configuration/config_utils.go @@ -243,12 +243,12 @@ func buildConfigManagerParams(cli client.Client, ctx context.Context, cluster *a return cfgManagerParams, nil } -func GetConfigManagerGRPCPort(containers []corev1.Container) (int32, error) { +func ResolveReloadServerGRPCPort(containers []corev1.Container) (int32, error) { for _, container := range containers { if container.Name != constant.ConfigSidecarName { continue } - if port, ok := findPortByPortName(container); ok { + if port, ok := resolveReloadContainerPort(container); ok { return port, nil } } @@ -265,7 +265,7 @@ func allocConfigManagerHostPort(comp *component.SynthesizedComponent) (int32, er return port, nil } -func findPortByPortName(container corev1.Container) (int32, bool) { +func resolveReloadContainerPort(container corev1.Container) (int32, bool) { for _, port := range container.Ports { if port.Name == constant.ConfigManagerPortName { return port.ContainerPort, true