Skip to content

Commit

Permalink
Merge pull request apache#306 from catpineapple/disaggregated-be-deco…
Browse files Browse the repository at this point in the history
…mmission

[feature](ddc)ddc decommission cg
  • Loading branch information
intelligentfu8 authored Dec 10, 2024
2 parents 37f4926 + 2d6d9de commit c304420
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 134 deletions.
10 changes: 6 additions & 4 deletions api/disaggregated/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type DorisDisaggregatedClusterSpec struct {
// the name of secret that type is `kubernetes.io/basic-auth` and contains keys username, password for management doris node in cluster as fe, be register.
// the password key is `password`. the username defaults to `root` and is omitempty.
AuthSecret string `json:"authSecret,omitempty"`

// decommission be or not. default value is false.
// if true, will decommission be node when scale down compute group.
// if false, will drop be node when scale down compute group.
EnableDecommission bool `json:"enableDecommission,omitempty"`
}

type MetaService struct {
Expand Down Expand Up @@ -290,15 +295,12 @@ type Phase string

const (
Ready Phase = "Ready"
//Upgrading represents the spec of the service changed, service in smoothing upgrade.
Upgrading Phase = "Upgrading"
//Failed represents service failed to start, can't be accessed.
Failed Phase = "Failed"
//Creating represents service in creating stage.
Reconciling Phase = "Reconciling"

//Scaling represents service in Scaling.
Scaling Phase = "Scaling"
Decommissioning Phase = "Decommissioning"
ScaleDownFailed Phase = "ScaleDownFailed"
ResumeFailed Phase = "ResumeFailed"
SuspendFailed Phase = "SuspendFailed"
Expand Down
78 changes: 78 additions & 0 deletions pkg/common/utils/resource/decommission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package resource

import (
"github.com/apache/doris-operator/pkg/common/utils/mysql"
)

type DecommissionPhase string

const (
Decommissioned DecommissionPhase = "Decommissioned"
Decommissioning DecommissionPhase = "Decommissioning"
DecommissionAcceptable DecommissionPhase = "DecommissionAcceptable"
DecommissionPhaseUnknown DecommissionPhase = "Unknown"
)

type DecommissionTaskStatus struct {
AllBackendsSize int
UnDecommissionedCount int
DecommissioningCount int
DecommissionedCount int
BeKeepAmount int
}

func ConstructDecommissionTaskStatus(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionTaskStatus {
var unDecommissionedCount, decommissioningCount, decommissionedCount int

for i := range allBackends {
node := allBackends[i]
if !node.SystemDecommissioned {
unDecommissionedCount++
} else {
if node.TabletNum == 0 {
decommissionedCount++
} else {
decommissioningCount++
}
}
}

return DecommissionTaskStatus{
AllBackendsSize: len(allBackends),
UnDecommissionedCount: unDecommissionedCount,
DecommissioningCount: decommissioningCount,
DecommissionedCount: decommissionedCount,
BeKeepAmount: int(cgKeepAmount),
}
}

func (d *DecommissionTaskStatus) GetDecommissionPhase() DecommissionPhase {
if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount {
return DecommissionAcceptable
}
if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 {
return Decommissioning
}

if d.UnDecommissionedCount == d.BeKeepAmount && d.UnDecommissionedCount+d.DecommissionedCount == d.AllBackendsSize {
return Decommissioned
}
return DecommissionPhaseUnknown
}
50 changes: 31 additions & 19 deletions pkg/controller/disaggregated_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,52 +202,52 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec
hv := hash.HashObject(ddc.Spec)

var res ctrl.Result
////TODO: deprecated.
//cmnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Spec.InstanceConfigMap}
//ddcnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name}
//cmnnStr := cmnn.String()
//ddcnnStr := ddcnn.String()
//if _, ok := dc.wcms[cmnnStr]; !ok {
// dc.wcms[cmnnStr] = ddcnnStr
//}

//sync resource.
//recall all errors
var msg string
reconRes, reconErr := dc.reconcileSub(ctx, &ddc)
if reconErr != nil {
msg = msg + reconErr.Error()
}
if !reconRes.IsZero() {
res = reconRes
}

// clear unused resources.
clearRes, clearErr := dc.clearUnusedResources(ctx, &ddc)
if clearErr != nil {
msg = msg + reconErr.Error()
}

if !clearRes.IsZero() {
res = clearRes
}

//display new status.
disRes, disErr := func() (ctrl.Result, error) {
//reorganize status.
if res, err = dc.reorganizeStatus(&ddc); err != nil {
return res, err
var stsRes ctrl.Result
var stsErr error
if stsRes, stsErr = dc.reorganizeStatus(&ddc); stsErr != nil {
return stsRes, stsErr
}

//update cr or status
if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil {
return res, err
if stsRes, stsErr = dc.updateObjectORStatus(ctx, &ddc, hv); stsErr != nil {
return stsRes, stsErr
}

return ctrl.Result{}, nil
return stsRes, stsErr
}()

if disErr != nil {
msg = msg + disErr.Error()
}
if !disRes.IsZero() {
res = disRes
}

return res, err
if msg != "" {
return res, errors.New(msg)
}
return res, nil
}

func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
Expand Down Expand Up @@ -312,7 +312,19 @@ func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Conte
//return ctrl.Result{}, err
}
}
return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC)
res, err := dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC)

if err != nil {
return res, err
}

for _, cgs := range ddc.Status.ComputeGroupStatuses {
if cgs.Phase == dv1.Decommissioning {
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
}
return res, nil

}

func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) {
Expand Down
Loading

0 comments on commit c304420

Please sign in to comment.