Skip to content

Commit

Permalink
chore: Adjusted the code according to ai suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
sophon-zt committed Nov 20, 2024
1 parent e0f6768 commit 115978a
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 84 deletions.
4 changes: 2 additions & 2 deletions controllers/apps/transformer_component_reconfigure.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (t *componentReloadActionSidecarTransformer) Transform(ctx graph.TransformC
}

graphCli, _ := transCtx.Client.(model.GraphClient)
if err := checkAndCreateConfigRelatedObjs(transCtx, graphCli, dag, configmaps...); err != nil {
if err := ensureConfigMapsPresence(transCtx, graphCli, dag, configmaps...); err != nil {
return err
}
if err := updatePodVolumes(synthesizeComp.PodSpec, synthesizeComp); err != nil {
Expand All @@ -92,7 +92,7 @@ func (t *componentReloadActionSidecarTransformer) Transform(ctx graph.TransformC
return configctrl.BuildReloadActionContainer(reconcileCtx, cluster, synthesizeComp, transCtx.CompDef, configmaps)
}

func checkAndCreateConfigRelatedObjs(ctx context.Context, cli model.GraphClient, dag *graph.DAG, configmaps ...*corev1.ConfigMap) error {
func ensureConfigMapsPresence(ctx context.Context, cli model.GraphClient, dag *graph.DAG, configmaps ...*corev1.ConfigMap) error {
for _, configmap := range configmaps {
var cm = &corev1.ConfigMap{}
if err := cli.Get(ctx, client.ObjectKeyFromObject(configmap), cm); err != nil {
Expand Down
31 changes: 18 additions & 13 deletions controllers/parameters/policy_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func getPodsForOnlineUpdate(params reconfigureContext) ([]corev1.Pod, error) {

// TODO commonOnlineUpdateWithPod migrate to sql command pipeline
func commonOnlineUpdateWithPod(pod *corev1.Pod, ctx context.Context, createClient createReconfigureClient, configSpec string, configFile string, updatedParams map[string]string) error {
address, err := cfgManagerGrpcURL(pod)
address, err := resolveReloadServerGrpcURL(pod)
if err != nil {
return err
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func commonStopContainerWithPod(pod *corev1.Pod, ctx context.Context, containerN
containerIDs = append(containerIDs, containerID)
}

address, err := cfgManagerGrpcURL(pod)
address, err := resolveReloadServerGrpcURL(pod)
if err != nil {
return err
}
Expand All @@ -155,19 +155,19 @@ func commonStopContainerWithPod(pod *corev1.Pod, ctx context.Context, containerN
return nil
}

func cfgManagerGrpcURL(pod *corev1.Pod) (string, error) {
func resolveReloadServerGrpcURL(pod *corev1.Pod) (string, error) {
podPort := viper.GetInt(constant.ConfigManagerGPRCPortEnv)
if pod.Spec.HostNetwork {
containerPort, err := configuration.GetConfigManagerGRPCPort(pod.Spec.Containers)
containerPort, err := configuration.ResolveReloadServerGRPCPort(pod.Spec.Containers)
if err != nil {
return "", err
}
podPort = int(containerPort)
}
return getURLFromPod(pod, podPort)
return generateGrpcURL(pod, podPort)
}

func getURLFromPod(pod *corev1.Pod, portPort int) (string, error) {
func generateGrpcURL(pod *corev1.Pod, portPort int) (string, error) {
ip, err := ipAddressFromPod(pod.Status)
if err != nil {
return "", err
Expand Down Expand Up @@ -286,38 +286,43 @@ func resolveReloadActionPolicy(jsonPatch string,
return policy, nil
}

// genReconfigureActionTasks generates a list of reconfiguration tasks based on the provided templateSpec,
// reconfiguration context, configuration patch, and a restart flag.
func genReconfigureActionTasks(templateSpec *appsv1.ComponentTemplateSpec, rctx *ReconcileContext, patch *core.ConfigPatchInfo, restart bool) ([]ReloadAction, error) {
var tasks []ReloadAction

// If the patch or ConfigRender is nil, return a single restart task.
if patch == nil || rctx.ConfigRender == nil {
return []ReloadAction{buildRestartTask(templateSpec, rctx)}, nil
}

checkNeedReloadAction := func(pd *parametersv1alpha1.ParametersDefinition, policy parametersv1alpha1.ReloadPolicy) bool {
if restart {
return policy == parametersv1alpha1.SyncDynamicReloadPolicy && intctrlutil.NeedDynamicReloadAction(&pd.Spec)
}
return true
// needReloadAction determines if a reload action is needed based on the ParametersDefinition and ReloadPolicy.
needReloadAction := func(pd *parametersv1alpha1.ParametersDefinition, policy parametersv1alpha1.ReloadPolicy) bool {
return !restart || (policy == parametersv1alpha1.SyncDynamicReloadPolicy && intctrlutil.NeedDynamicReloadAction(&pd.Spec))
}

for key, jsonPatch := range patch.UpdateConfig {
pd, ok := rctx.ParametersDefs[key]
// If the ParametersDefinition or its ReloadAction is nil, continue to the next iteration.
if !ok || pd.Spec.ReloadAction == nil {
continue
}
configFormat := intctrlutil.GetComponentConfigDescription(&rctx.ConfigRender.Spec, key)
if configFormat == nil || configFormat.FileFormatConfig == nil {
continue
}
// Determine the appropriate ReloadPolicy.
policy, err := resolveReloadActionPolicy(string(jsonPatch), configFormat.FileFormatConfig, &pd.Spec)
if err != nil {
return nil, err
}
if checkNeedReloadAction(pd, policy) {
// If a reload action is needed, append a new reload action task to the tasks slice.
if needReloadAction(pd, policy) {
tasks = append(tasks, buildReloadActionTask(policy, templateSpec, rctx, pd, configFormat, patch))
}
}

// If no tasks were added, return a single restart task.
if len(tasks) == 0 {
return []ReloadAction{buildRestartTask(templateSpec, rctx)}, nil
}
Expand All @@ -342,7 +347,7 @@ func buildReloadActionTask(reloadPolicy parametersv1alpha1.ReloadPolicy, templat
}

if reloadPolicy == parametersv1alpha1.SyncDynamicReloadPolicy {
reCtx.UpdatedParameters = getOnlineUpdateParams(patch, &pd.Spec, *configDescription)
reCtx.UpdatedParameters = generateOnlineUpdateParams(patch, &pd.Spec, *configDescription)
}

return reconfigureTask{ReloadPolicy: reloadPolicy, taskCtx: reCtx}
Expand Down
12 changes: 8 additions & 4 deletions controllers/parameters/reconcile_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func syncImpl(taskCtx *TaskContext,
return nil
}

// persistUpdatedParameters merges and updates parameter-related configmaps.
// It first calls mergeAndApplyConfig to merge and update the configmap.
// If the updatedConfig is nil, it returns nil. Otherwise, it calls updateInjectedEnvVars
// to check and update the injected environment variables.
func persistUpdatedParameters(rctx *configctrl.ResourceCtx,
comp *component.SynthesizedComponent,
configRender *parametersv1alpha1.ParameterDrivenConfigRender,
Expand All @@ -189,16 +193,16 @@ func persistUpdatedParameters(rctx *configctrl.ResourceCtx,
owner client.Object,
item parametersv1alpha1.ConfigTemplateItemDetail,
revision string) error {
if err := mergeAndUpdate(rctx, updatedConfig, original, owner, item, revision); err != nil {
if err := mergeAndApplyConfig(rctx, updatedConfig, original, owner, item, revision); err != nil {
return err
}
if updatedConfig == nil {
return nil
}
return checkAndUpdateInjectedEnv(rctx, comp, configRender, updatedConfig, owner, item, revision)
return updateInjectedEnvVars(rctx, comp, configRender, updatedConfig, owner, item, revision)
}

func checkAndUpdateInjectedEnv(rctx *configctrl.ResourceCtx,
func updateInjectedEnvVars(rctx *configctrl.ResourceCtx,
comp *component.SynthesizedComponent,
configRender *parametersv1alpha1.ParameterDrivenConfigRender,
config *corev1.ConfigMap,
Expand Down Expand Up @@ -234,7 +238,7 @@ func checkAndUpdateInjectedEnv(rctx *configctrl.ResourceCtx,
return err
}

func mergeAndUpdate(resourceCtx *configctrl.ResourceCtx,
func mergeAndApplyConfig(resourceCtx *configctrl.ResourceCtx,
expected *corev1.ConfigMap,
running *corev1.ConfigMap,
owner client.Object,
Expand Down
70 changes: 23 additions & 47 deletions controllers/parameters/reconfigure_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,65 +234,41 @@ func (r *ReconfigureReconciler) performUpgrade(rctx *ReconcileContext, reloadTas
var reloadType string

for _, task := range reloadTasks {
returnedStatus, err = task.ExecReload()
reloadType = task.ReloadType()
returnedStatus, err = task.ExecReload()
if err != nil || returnedStatus.Status != ESNone {
return r.status(rctx, returnedStatus, reloadType, err)
}
}

rctx.Recorder.Eventf(rctx.ConfigMap,
corev1.EventTypeNormal,
appsv1alpha1.ReasonReconfigureSucceed,
"the reconfigure[%s] has been processed successfully",
reloadType)

result := reconciled(returnedStatus, reloadType, parametersv1alpha1.CFinishedPhase)
return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, reloadType, &result)
return r.succeed(rctx, reloadType, returnedStatus)
}

func (r *ReconfigureReconciler) status(rctx *ReconcileContext, returnedStatus ReturnedStatus, policy string, err error) (ctrl.Result, error) {
updatePhase := func(phase parametersv1alpha1.ParameterPhase, options ...options) (ctrl.Result, error) {
return updateConfigPhaseWithResult(rctx.Client, rctx.RequestCtx, rctx.ConfigMap, reconciled(returnedStatus, policy, phase, options...))
}

switch returnedStatus.Status {
default:
return updateConfigPhaseWithResult(
rctx.Client,
rctx.RequestCtx,
rctx.ConfigMap,
reconciled(returnedStatus, policy, parametersv1alpha1.CFailedAndPausePhase,
withFailed(core.MakeError("unknown status"), false)),
)
case ESFailedAndRetry:
return updateConfigPhaseWithResult(
rctx.Client,
rctx.RequestCtx,
rctx.ConfigMap,
reconciled(returnedStatus, policy, parametersv1alpha1.CFailedPhase,
withFailed(err, true)),
)
return updatePhase(parametersv1alpha1.CFailedPhase, withFailed(err, true))
case ESRetry:
return updateConfigPhaseWithResult(
rctx.Client,
rctx.RequestCtx,
rctx.ConfigMap,
reconciled(returnedStatus, policy, parametersv1alpha1.CUpgradingPhase),
)
return updatePhase(parametersv1alpha1.CUpgradingPhase)
case ESFailed:
return updateConfigPhaseWithResult(
rctx.Client,
rctx.RequestCtx,
rctx.ConfigMap,
reconciled(returnedStatus, policy, parametersv1alpha1.CFailedAndPausePhase,
withFailed(err, false)),
)
return updatePhase(parametersv1alpha1.CFailedAndPausePhase, withFailed(err, false))
case ESNone:
rctx.Recorder.Eventf(
rctx.ConfigMap,
corev1.EventTypeNormal,
appsv1alpha1.ReasonReconfigureSucceed,
"the reconfigure[%s] has been processed successfully",
policy,
)
result := reconciled(returnedStatus, policy, parametersv1alpha1.CFinishedPhase)
return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, policy, &result)
return r.succeed(rctx, policy, returnedStatus)
default:
return updatePhase(parametersv1alpha1.CFailedAndPausePhase, withFailed(core.MakeError("unknown status"), false))
}
}

func (r *ReconfigureReconciler) succeed(rctx *ReconcileContext, reloadType string, returnedStatus ReturnedStatus) (ctrl.Result, error) {
rctx.Recorder.Eventf(rctx.ConfigMap,
corev1.EventTypeNormal,
appsv1alpha1.ReasonReconfigureSucceed,
"the reconfigure[%s] has been processed successfully",
reloadType)

result := reconciled(returnedStatus, reloadType, parametersv1alpha1.CFinishedPhase)
return r.updateConfigCMStatus(rctx.RequestCtx, rctx.ConfigMap, reloadType, &result)
}
7 changes: 6 additions & 1 deletion controllers/parameters/reconfigure_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,12 @@ func GetClientFactory() createReconfigureClient {
}

func (param *reconfigureContext) getConfigKey() string {
return param.ConfigTemplate.Name
key := param.ConfigTemplate.Name
if param.ConfigDescription != nil && param.ConfigDescription.Name != "" {
hash, _ := util.ComputeHash(param.ConfigDescription.Name)
key = key + "/" + hash
}
return key
}

func (param *reconfigureContext) getTargetVersionHash() string {
Expand Down
22 changes: 11 additions & 11 deletions controllers/parameters/rolling_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,20 @@ func performRollingUpgrade(rctx reconfigureContext, funcs RollingUpgradeFuncs) (
return makeReturnedStatus(ESRetry), nil
}

podStats := classifyPodByStats(pods, rctx.getTargetReplicas(), rctx.podMinReadySeconds())
podSwitchWindow := markDynamicSwitchWindow(pods, podStats, configKey, configVersion, rollingReplicas)
if !canSafeUpdatePods(podSwitchWindow) {
podStatus := classifyPodByStats(pods, rctx.getTargetReplicas(), rctx.podMinReadySeconds())
updateWindow := markDynamicSwitchWindow(pods, podStatus, configKey, configVersion, rollingReplicas)
if !canSafeUpdatePods(updateWindow) {
rctx.Log.Info("wait for pod stat ready.")
return makeReturnedStatus(ESRetry), nil
}

waitUpgradingPods := podSwitchWindow.getWaitUpgradePods()
if len(waitUpgradingPods) == 0 {
return makeReturnedStatus(ESNone, withSucceed(int32(podStats.targetReplica)), withExpected(int32(podStats.targetReplica))), nil
podsToUpgrade := updateWindow.getPendingUpgradePods()
if len(podsToUpgrade) == 0 {
return makeReturnedStatus(ESNone, withSucceed(int32(podStatus.targetReplica)), withExpected(int32(podStatus.targetReplica))), nil
}

for _, pod := range waitUpgradingPods {
if podStats.isUpdating(&pod) {
for _, pod := range podsToUpgrade {
if podStatus.isUpdating(&pod) {
rctx.Log.Info("pod is in rolling update.", "pod name", pod.Name)
continue
}
Expand All @@ -103,8 +103,8 @@ func performRollingUpgrade(rctx reconfigureContext, funcs RollingUpgradeFuncs) (
}

return makeReturnedStatus(ESRetry,
withExpected(int32(podStats.targetReplica)),
withSucceed(int32(len(podStats.updated)+len(podStats.updating)))), nil
withExpected(int32(podStatus.targetReplica)),
withSucceed(int32(len(podStatus.updated)+len(podStatus.updating)))), nil
}

func canSafeUpdatePods(wind switchWindow) bool {
Expand Down Expand Up @@ -201,7 +201,7 @@ type switchWindow struct {
*componentPodStats
}

func (w *switchWindow) getWaitUpgradePods() []corev1.Pod {
func (w *switchWindow) getPendingUpgradePods() []corev1.Pod {
return w.pods[w.begin:w.end]
}

Expand Down
1 change: 1 addition & 0 deletions controllers/parameters/rolling_upgrade_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var _ = Describe("Reconfigure RollingPolicy", func() {
withConfigSpec("for_test", map[string]string{
"key": "value",
}),
withConfigDescription(&parametersv1alpha1.FileFormatConfig{Format: parametersv1alpha1.Properties}),
withGRPCClient(func(addr string) (cfgproto.ReconfigureClient, error) {
return reconfigureClient, nil
}),
Expand Down
4 changes: 2 additions & 2 deletions controllers/parameters/simple_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func init() {
func (s *simplePolicy) Upgrade(rctx reconfigureContext) (ReturnedStatus, error) {
rctx.Log.V(1).Info("simple policy begin....")

return restartAndCheckComponent(rctx, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(rctx))
return restartAndVerifyComponent(rctx, GetInstanceSetRollingUpgradeFuncs(), fromWorkloadObjects(rctx))
}

func (s *simplePolicy) GetPolicyName() string {
return string(parametersv1alpha1.NormalPolicy)
}

func restartAndCheckComponent(rctx reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) {
func restartAndVerifyComponent(rctx reconfigureContext, funcs RollingUpgradeFuncs, objs []client.Object) (ReturnedStatus, error) {
var (
newVersion = rctx.getTargetVersionHash()
configKey = rctx.getConfigKey()
Expand Down
2 changes: 1 addition & 1 deletion controllers/parameters/sync_upgrade_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func sync(rctx reconfigureContext, updatedParameters map[string]string, pods []c
return makeReturnedStatus(r, withExpected(requireUpdatedCount), withSucceed(progress)), nil
}

func getOnlineUpdateParams(configPatch *core.ConfigPatchInfo, paramDef *parametersv1alpha1.ParametersDefinitionSpec, description parametersv1alpha1.ComponentConfigDescription) map[string]string {
func generateOnlineUpdateParams(configPatch *core.ConfigPatchInfo, paramDef *parametersv1alpha1.ParametersDefinitionSpec, description parametersv1alpha1.ComponentConfigDescription) map[string]string {
r := make(map[string]string)
dynamicAction := intctrlutil.NeedDynamicReloadAction(paramDef)
needReloadStaticParameters := intctrlutil.ReloadStaticParameters(paramDef)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/configuration/config_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ func buildConfigManagerParams(cli client.Client, ctx context.Context, cluster *a
return cfgManagerParams, nil
}

func GetConfigManagerGRPCPort(containers []corev1.Container) (int32, error) {
func ResolveReloadServerGRPCPort(containers []corev1.Container) (int32, error) {
for _, container := range containers {
if container.Name != constant.ConfigSidecarName {
continue
}
if port, ok := findPortByPortName(container); ok {
if port, ok := resolveReloadContainerPort(container); ok {
return port, nil
}
}
Expand All @@ -265,7 +265,7 @@ func allocConfigManagerHostPort(comp *component.SynthesizedComponent) (int32, er
return port, nil
}

func findPortByPortName(container corev1.Container) (int32, bool) {
func resolveReloadContainerPort(container corev1.Container) (int32, bool) {
for _, port := range container.Ports {
if port.Name == constant.ConfigManagerPortName {
return port.ContainerPort, true
Expand Down

0 comments on commit 115978a

Please sign in to comment.