Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: preConditionDeadlineSeconds of the opsRequest is invalid when creating some opsRequests together (pick from c65ed8c3) #8541

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (r *OpsRequest) Force() bool {
return r.Spec.Force && r.Spec.Type != StartType
}

// validateClusterPhase validates whether the current cluster state supports the OpsRequest
func (r *OpsRequest) validateClusterPhase(cluster *Cluster) error {
// ValidateClusterPhase validates whether the current cluster state supports the OpsRequest
func (r *OpsRequest) ValidateClusterPhase(cluster *Cluster) error {
opsBehaviour := OpsRequestBehaviourMapper[r.Spec.Type]
// if the OpsType has no cluster phases, ignore it
if len(opsBehaviour.FromClusterPhases) == 0 {
Expand All @@ -134,25 +134,32 @@ func (r *OpsRequest) validateClusterPhase(cluster *Cluster) error {
// validate whether existing the same type OpsRequest
var (
opsRequestValue string
opsRecorder []OpsRecorder
opsRecorders []OpsRecorder
ok bool
)
if opsRequestValue, ok = cluster.Annotations[opsRequestAnnotationKey]; ok {
// opsRequest annotation value in cluster to map
if err := json.Unmarshal([]byte(opsRequestValue), &opsRecorder); err != nil {
if err := json.Unmarshal([]byte(opsRequestValue), &opsRecorders); err != nil {
return err
}
}
// check if the opsRequest can be executed in the current cluster.
if slices.Contains(opsBehaviour.FromClusterPhases, cluster.Status.Phase) {
return nil
}
var opsRecord *OpsRecorder
for _, v := range opsRecorders {
if v.Name == r.Name {
opsRecord = &v
break
}
}
// check if this opsRequest needs to verify cluster phase before opsRequest starts running.
needCheck := len(opsRecorder) == 0 || (opsRecorder[0].Name == r.Name && opsRecorder[0].InQueue)
if !needCheck {
return nil
needCheck := len(opsRecorders) == 0 || (opsRecord != nil && !opsRecord.InQueue)
if needCheck {
return fmt.Errorf("OpsRequest.spec.type=%s is forbidden when Cluster.status.phase=%s", r.Spec.Type, cluster.Status.Phase)
}
return fmt.Errorf("OpsRequest.spec.type=%s is forbidden when Cluster.status.phase=%s", r.Spec.Type, cluster.Status.Phase)
return nil
}

// getCluster gets cluster with webhook client
Expand Down Expand Up @@ -189,11 +196,11 @@ func (r *OpsRequest) Validate(ctx context.Context,
cluster *Cluster,
needCheckClusterPhase bool) error {
if needCheckClusterPhase {
if err := r.validateClusterPhase(cluster); err != nil {
if err := r.ValidateClusterPhase(cluster); err != nil {
return err
}
}
return r.validateOps(ctx, k8sClient, cluster)
return r.ValidateOps(ctx, k8sClient, cluster)
}

// ValidateEntry OpsRequest webhook validate entry
Expand All @@ -215,8 +222,8 @@ func (r *OpsRequest) validateEntry(isCreate bool) error {
return r.Validate(ctx, k8sClient, cluster, isCreate)
}

// validateOps validates ops attributes
func (r *OpsRequest) validateOps(ctx context.Context,
// ValidateOps validates ops attributes
func (r *OpsRequest) ValidateOps(ctx context.Context,
k8sClient client.Client,
cluster *Cluster) error {
// Check whether the corresponding attribute is legal according to the operation type
Expand Down
94 changes: 56 additions & 38 deletions controllers/apps/operations/ops_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,55 +67,26 @@ func (opsMgr *OpsManager) Do(reqCtx intctrlutil.RequestCtx, cli client.Client, o
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
}
} else {
// validate entry condition for OpsRequest, check if the cluster is in the right phase
if err = validateOpsWaitingPhase(opsRes.Cluster, opsRequest, opsBehaviour); err != nil {
// check if the error is caused by WaitForClusterPhaseErr error
if _, ok := err.(*WaitForClusterPhaseErr); ok {
return intctrlutil.ResultToP(intctrlutil.RequeueAfter(time.Second, reqCtx.Log, ""))
}
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
}
// validate OpsRequest.spec
// if the operation will create a new cluster, don't validate the cluster phase
if err = opsRequest.Validate(reqCtx.Ctx, cli, opsRes.Cluster, !opsBehaviour.IsClusterCreation); err != nil {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
}
err = opsRequest.ValidateOps(reqCtx.Ctx, cli, opsRes.Cluster)
}
if err != nil {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
}

if opsRequest.Status.Phase == appsv1alpha1.OpsPendingPhase {
if opsRequest.Spec.Cancel {
return &ctrl.Result{}, PatchOpsStatus(reqCtx.Ctx, cli, opsRes, appsv1alpha1.OpsCancelledPhase)
}
// TODO: abort last OpsRequest if using 'force' and intersecting with cluster component name or shard name.
if opsBehaviour.QueueByCluster || opsBehaviour.QueueBySelf {
// if ToClusterPhase is not empty, enqueue OpsRequest to the cluster Annotation.
opsRecorde, err := enqueueOpsRequestToClusterAnnotation(reqCtx.Ctx, cli, opsRes, opsBehaviour)
if intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
} else if err != nil {
return nil, err
}
if opsRecorde != nil && opsRecorde.InQueue {
// if the opsRequest is in the queue, return
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
}

// validate if the dependent ops have been successful
if pass, err := opsMgr.validateDependOnSuccessfulOps(reqCtx, cli, opsRes); intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
if err = opsMgr.doPreConditionAndTransPhaseToCreating(reqCtx, cli, opsRes, opsBehaviour); intctrlutil.IsTargetError(err, intctrlutil.ErrorTypeFatal) {
return &ctrl.Result{}, patchValidateErrorCondition(reqCtx.Ctx, cli, opsRes, err.Error())
} else if err != nil {
return nil, err
} else if !pass {
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}
opsDeepCopy := opsRequest.DeepCopy()
// save last configuration into status.lastConfiguration
if err = opsBehaviour.OpsHandler.SaveLastConfiguration(reqCtx, cli, opsRes); err != nil {
if _, ok := err.(*WaitForClusterPhaseErr); ok {
return intctrlutil.ResultToP(intctrlutil.RequeueAfter(time.Second, reqCtx.Log, "wait cluster to a right phase"))
}
return nil, err
}

return &ctrl.Result{}, patchOpsRequestToCreating(reqCtx, cli, opsRes, opsDeepCopy, opsBehaviour.OpsHandler)
return intctrlutil.ResultToP(intctrlutil.Reconciled())
}

if err = updateHAConfigIfNecessary(reqCtx, cli, opsRes.OpsRequest, "false"); err != nil {
Expand All @@ -134,6 +105,53 @@ func (opsMgr *OpsManager) Do(reqCtx intctrlutil.RequestCtx, cli client.Client, o
return nil, nil
}

func (opsMgr *OpsManager) doPreConditionAndTransPhaseToCreating(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
opsBehaviour OpsBehaviour) error {
if opsBehaviour.QueueByCluster || opsBehaviour.QueueBySelf {
// if ToClusterPhase is not empty, enqueue OpsRequest to the cluster Annotation.
opsRecorde, err := enqueueOpsRequestToClusterAnnotation(reqCtx.Ctx, cli, opsRes, opsBehaviour)
if err != nil {
return err
}
if opsRecorde != nil && opsRecorde.InQueue {
// if the opsRequest is in the queue, return
return nil
}
}
// validate if the dependent ops have been successful
pass, err := opsMgr.validateDependOnSuccessfulOps(reqCtx, cli, opsRes)
if err != nil || !pass {
return err
}
if preConditionDeadlineSecondsIsSet(opsRes.OpsRequest) &&
opsRes.OpsRequest.Annotations[constant.QueueEndTimeAnnotationKey] == "" {
// set the queue end time for preConditionDeadline validation
if opsRes.OpsRequest.Annotations == nil {
opsRes.OpsRequest.Annotations = map[string]string{}
}
opsRes.OpsRequest.Annotations[constant.QueueEndTimeAnnotationKey] = time.Now().Format(time.RFC3339)
return cli.Update(reqCtx.Ctx, opsRes.OpsRequest)
}
// if the operation will create a new cluster, don't validate the cluster phase
if !opsBehaviour.IsClusterCreation {
// validate entry condition for OpsRequest, check if the cluster is in the right phase
if err = validateOpsNeedWaitingClusterPhase(opsRes.Cluster, opsRes.OpsRequest, opsBehaviour); err != nil {
return err
}
if err = opsRes.OpsRequest.ValidateClusterPhase(opsRes.Cluster); err != nil {
return intctrlutil.NewFatalError(err.Error())
}
}
opsDeepCopy := opsRes.OpsRequest.DeepCopy()
// save last configuration into status.lastConfiguration
if err = opsBehaviour.OpsHandler.SaveLastConfiguration(reqCtx, cli, opsRes); err != nil {
return err
}
return patchOpsRequestToCreating(reqCtx, cli, opsRes, opsDeepCopy, opsBehaviour.OpsHandler)
}

// Reconcile entry function when OpsRequest.status.phase is Running.
// loops till the operation is completed.
func (opsMgr *OpsManager) Reconcile(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (time.Duration, error) {
Expand Down
29 changes: 25 additions & 4 deletions controllers/apps/operations/ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ func updateReconfigureStatusByCM(reconfiguringStatus *appsv1alpha1.Reconfiguring
return handleReconfigureStatus(cmStatus)
}

// validateOpsWaitingPhase validates whether the current cluster phase is expected, and whether the waiting time exceeds the limit.
// validateOpsNeedWaitingClusterPhase validates whether the current cluster phase is expected, and whether the waiting time exceeds the limit.
// only requests with `Pending` phase will be validated.
func validateOpsWaitingPhase(cluster *appsv1alpha1.Cluster, ops *appsv1alpha1.OpsRequest, opsBehaviour OpsBehaviour) error {
func validateOpsNeedWaitingClusterPhase(cluster *appsv1alpha1.Cluster, ops *appsv1alpha1.OpsRequest, opsBehaviour OpsBehaviour) error {
if ops.Force() {
return nil
}
Expand All @@ -211,6 +211,16 @@ func validateOpsWaitingPhase(cluster *appsv1alpha1.Cluster, ops *appsv1alpha1.Op
if slices.Contains(opsBehaviour.FromClusterPhases, cluster.Status.Phase) {
return nil
}
opsRequestSlice, err := opsutil.GetOpsRequestSliceFromCluster(cluster)
if err != nil {
return intctrlutil.NewFatalError(err.Error())
}
// skip the preConditionDeadline check if the ops is in queue.
index, opsRecorder := GetOpsRecorderFromSlice(opsRequestSlice, ops.Name)
if index != -1 && opsRecorder.InQueue {
return nil
}

// check if entry-condition is met
// if the cluster is not in the expected phase, we should wait for it for up to TTLSecondsBeforeAbort seconds.
if !needWaitPreConditionDeadline(ops) {
Expand All @@ -224,11 +234,22 @@ func validateOpsWaitingPhase(cluster *appsv1alpha1.Cluster, ops *appsv1alpha1.Op
}
}

func preConditionDeadlineSecondsIsSet(ops *appsv1alpha1.OpsRequest) bool {
return ops.Spec.PreConditionDeadlineSeconds != nil && *ops.Spec.PreConditionDeadlineSeconds != 0
}

func needWaitPreConditionDeadline(ops *appsv1alpha1.OpsRequest) bool {
if ops.Spec.PreConditionDeadlineSeconds == nil {
if !preConditionDeadlineSecondsIsSet(ops) {
return false
}
return time.Now().Before(ops.GetCreationTimestamp().Add(time.Duration(*ops.Spec.PreConditionDeadlineSeconds) * time.Second))
baseTime := ops.GetCreationTimestamp()
if queueEndTimeStr, ok := ops.Annotations[constant.QueueEndTimeAnnotationKey]; ok {
queueEndTime, _ := time.Parse(time.RFC3339, queueEndTimeStr)
if !queueEndTime.IsZero() {
baseTime = metav1.Time{Time: queueEndTime}
}
}
return time.Now().Before(baseTime.Add(time.Duration(*ops.Spec.PreConditionDeadlineSeconds) * time.Second))
}

func abortEarlierOpsRequestWithSameKind(reqCtx intctrlutil.RequestCtx,
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ const (

OpsDependentOnSuccessfulOpsAnnoKey = "ops.kubeblocks.io/dependent-on-successful-ops" // OpsDependentOnSuccessfulOpsAnnoKey wait for the dependent ops to succeed before executing the current ops. If it fails, this ops will also fail.
RelatedOpsAnnotationKey = "ops.kubeblocks.io/related-ops"
QueueEndTimeAnnotationKey = "operations.kubeblocks.io/queue-end-time"
)
Loading