From d5102b976a69a8c4b0277c2f896c2e58a223a36c Mon Sep 17 00:00:00 2001 From: catpineapple Date: Tue, 3 Dec 2024 15:35:58 +0800 Subject: [PATCH 1/7] ddc decommission cg --- api/disaggregated/v1/types.go | 5 + pkg/common/utils/resource/decommission.go | 61 ++++++ .../disaggregated_cluster_controller.go | 13 +- .../computegroups/controller.go | 202 ++++++++++++++---- 4 files changed, 229 insertions(+), 52 deletions(-) create mode 100644 pkg/common/utils/resource/decommission.go diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index d82b2c2..7d7ae44 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -39,6 +39,11 @@ type DorisDisaggregatedClusterSpec struct { // the name of secret that type is `kubernetes.io/basic-auth` and contains keys username, password for management doris node in cluster as fe, be register. // the password key is `password`. the username defaults to `root` and is omitempty. AuthSecret string `json:"authSecret,omitempty"` + + // decommission be or not. default value is false. + // if true, will decommission be node when scale down compute group. + // if false, will drop be node when scale down compute group. + EnableDecommission bool `json:"enableDecommission,omitempty"` } type MetaService struct { diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go new file mode 100644 index 0000000..762bc57 --- /dev/null +++ b/pkg/common/utils/resource/decommission.go @@ -0,0 +1,61 @@ +package resource + +import ( + "github.com/apache/doris-operator/pkg/common/utils/mysql" +) + +type DecommissionPhase string + +const ( + Decommissioned DecommissionPhase = "Decommissioned" + Decommissioning DecommissionPhase = "Decommissioning" + DecommissionPhaseSteady DecommissionPhase = "Steady" + DecommissionPhaseUnknown DecommissionPhase = "Unknown" +) + +type DecommissionDetail struct { + AllBackendsSize int + UnDecommissionedCount int + DecommissioningCount int + DecommissionedCount int + BeKeepAmount int +} + +func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionDetail { + var unDecommissionedCount, decommissioningCount, decommissionedCount int + + for i := range allBackends { + node := allBackends[i] + if !node.SystemDecommissioned { + unDecommissionedCount++ + } else { + if node.TabletNum == 0 { + decommissionedCount++ + } else { + decommissioningCount++ + } + } + } + + return DecommissionDetail{ + AllBackendsSize: len(allBackends), + UnDecommissionedCount: unDecommissionedCount, + DecommissioningCount: decommissioningCount, + DecommissionedCount: decommissionedCount, + BeKeepAmount: int(cgKeepAmount), + } +} + +func (d *DecommissionDetail) GetDecommissionDetailStatus() DecommissionPhase { + if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount { + return DecommissionPhaseSteady + } + if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 { + return Decommissioning + } + + if d.UnDecommissionedCount == d.BeKeepAmount && d.UnDecommissionedCount+d.DecommissionedCount == d.AllBackendsSize { + return Decommissioned + } + return DecommissionPhaseUnknown +} diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go index d8565c2..68b3ea8 100644 --- a/pkg/controller/disaggregated_cluster_controller.go +++ b/pkg/controller/disaggregated_cluster_controller.go @@ -230,13 +230,13 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec //display new status. disRes, disErr := func() (ctrl.Result, error) { //reorganize status. - if res, err = dc.reorganizeStatus(&ddc); err != nil { - return res, err + if rsRes, rsErr := dc.reorganizeStatus(&ddc); rsErr != nil { + return rsRes, rsErr } //update cr or status - if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil { - return res, err + if updRes, updErr := dc.updateObjectORStatus(ctx, &ddc, hv); updErr != nil { + return updRes, updErr } return ctrl.Result{}, nil @@ -247,7 +247,10 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec res = disRes } - return res, err + if msg != "" { + return res, errors.New(msg) + } + return res, nil } func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index cb0193d..7fdfc57 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -81,6 +81,7 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj return errors.New("validating compute group failed") } + var errs []error cgs := ddc.Spec.ComputeGroups for i, _ := range cgs { @@ -88,8 +89,24 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj if event != nil { dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } - klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) + errs = append(errs, err) + if err.Error() != "" { + klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) + } + } + } + + if len(errs) != 0 { + msgHead := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name) + msg := "" + for _, err := range errs { + msg += err.Error() } + if msg != "" { + return errors.New(msgHead + msg) + } + // msg is "" , means Decommissioning + return errors.New("scaleDown Decommissioning, will Reconcile again, may not be an error, if you meet this error, please ignore it") } return nil @@ -143,13 +160,14 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C return event, err } event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg) - if err != nil { + if err != nil && err.Error() != "" { klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) } return event, err } +// reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { var est appv1.StatefulSet if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { @@ -175,27 +193,60 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte } scaleType := getScaleType(st, &est, cgStatus.Phase) - if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { - return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) - }); err != nil { - klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err - } - switch scaleType { //case "resume": //case "suspend": case "scaleDown": - cgKeepAmount := *cg.Replicas - cgName := cluster.GetCGName(cg) - if err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, cluster, cgName, cgKeepAmount); err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE failed, err:%s ", err.Error()) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) + if err != nil { + klog.Errorf("scaleDown getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } + defer sqlClient.Close() + + cgKeepAmount := *cg.Replicas + cgName := cluster.GetCGName(cg) + + decommissionPhase, err := dcgs.decommissionProgressCheck(cluster, sqlClient, cgName, cgKeepAmount) + if err != nil { + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err + } + + switch decommissionPhase { + case resource.DecommissionPhaseSteady: + err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount) + if err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, + err + } + cgStatus.Phase = dv1.Scaling + return nil, errors.New("") + case resource.Decommissioning: + cgStatus.Phase = dv1.Scaling + klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress") + return nil, errors.New("") + case resource.Decommissioned: + dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) + default: + // default is DecommissionPhaseUnknown, drop be , not decommission + if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, + err + } + } cgStatus.Phase = dv1.Scaling + } + if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { + return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) + }); err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset apply statefulset namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err } return nil, nil @@ -311,7 +362,13 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con // drop compute group cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") cgKeepAmount := int32(0) - err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, ddc, cgName, cgKeepAmount) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + defer sqlClient.Close() + err = dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) if err != nil { klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) @@ -485,12 +542,79 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD return nil } +func getScaledOutBENode( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) ([]*mysql.Backend, error) { + + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error()) + return nil, err + } + + var dropNodes []*mysql.Backend + for i := range allBackends { + node := allBackends[i] + split := strings.Split(node.Host, ".") + splitCGIDArr := strings.Split(split[0], "-") + podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) + return nil, err + } + if podNum >= int(cgKeepAmount) { + dropNodes = append(dropNodes, node) + } + } + return dropNodes, nil +} + func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( - ctx context.Context, k8sclient client.Client, - cluster *dv1.DorisDisaggregatedCluster, + masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) error { + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("scaledOutBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DropBE(dropNodes) + if err != nil { + klog.Errorf("scaledOutBENodesBySQL DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodesBySQL( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("decommissionBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DecommissionBE(dropNodes) + if err != nil { + klog.Errorf("decommissionBENodesBySQL DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) { // get user and password secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) adminUserName, password := resource.GetDorisLoginInformation(secret) @@ -513,39 +637,23 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) if err != nil { - klog.Errorf("dropNodeBySQLClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) - return err - } - defer masterDBClient.Close() - - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) - if err != nil { - klog.Errorf("dropNodeBySQLClient failed, ShowBackends err:%s", err.Error()) - return err - } - - var dropNodes []*mysql.Backend - for i := range allBackends { - node := allBackends[i] - split := strings.Split(node.Host, ".") - splitCGIDArr := strings.Split(split[0], "-") - podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) - if err != nil { - klog.Errorf("dropNodeBySQLClient splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) - return err - } - if podNum >= int(cgKeepAmount) { - dropNodes = append(dropNodes, node) - } + klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) + return nil, err } + return masterDBClient, nil +} - if len(dropNodes) == 0 { - return nil +// isDecommissionProgressFinished check decommission status +// if not start decomission or decommission succeed return true +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(cluster *dv1.DorisDisaggregatedCluster, masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { + if !cluster.Spec.EnableDecommission { + return resource.DecommissionPhaseUnknown, nil } - err = masterDBClient.DropBE(dropNodes) + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) if err != nil { - klog.Errorf("dropNodeBySQLClient DropBENodes failed, err:%s ", err.Error()) - return err + klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) + return resource.DecommissionPhaseUnknown, err } - return nil + decommissionDetail := resource.ConstructDecommissionDetail(allBackends, cgKeepAmount) + return decommissionDetail.GetDecommissionDetailStatus(), nil } From 36a8ff7b9c300ba4855b0120e3c8b75125a7f31d Mon Sep 17 00:00:00 2001 From: catpineapple Date: Tue, 3 Dec 2024 15:52:45 +0800 Subject: [PATCH 2/7] add license --- pkg/common/utils/resource/decommission.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go index 762bc57..c4b68b5 100644 --- a/pkg/common/utils/resource/decommission.go +++ b/pkg/common/utils/resource/decommission.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package resource import ( From fd931ea5ffea76fea1b89df1abfaa57e42e9ed5e Mon Sep 17 00:00:00 2001 From: catpineapple Date: Wed, 4 Dec 2024 16:37:57 +0800 Subject: [PATCH 3/7] modify status for decommission --- api/disaggregated/v1/types.go | 1 + pkg/common/utils/resource/decommission.go | 12 ++-- .../computegroups/controller.go | 65 +++++++++---------- 3 files changed, 39 insertions(+), 39 deletions(-) diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index 7d7ae44..2143901 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -304,6 +304,7 @@ const ( //Scaling represents service in Scaling. Scaling Phase = "Scaling" + Decommissioning Phase = "Decommissioning" ScaleDownFailed Phase = "ScaleDownFailed" ResumeFailed Phase = "ResumeFailed" SuspendFailed Phase = "SuspendFailed" diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go index c4b68b5..600f51b 100644 --- a/pkg/common/utils/resource/decommission.go +++ b/pkg/common/utils/resource/decommission.go @@ -26,11 +26,11 @@ type DecommissionPhase string const ( Decommissioned DecommissionPhase = "Decommissioned" Decommissioning DecommissionPhase = "Decommissioning" - DecommissionPhaseSteady DecommissionPhase = "Steady" + DecommissionBefore DecommissionPhase = "DecommissionBefore" DecommissionPhaseUnknown DecommissionPhase = "Unknown" ) -type DecommissionDetail struct { +type ComputeNodeStatusCounts struct { AllBackendsSize int UnDecommissionedCount int DecommissioningCount int @@ -38,7 +38,7 @@ type DecommissionDetail struct { BeKeepAmount int } -func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionDetail { +func ConstructComputeNodeStatusCounts(allBackends []*mysql.Backend, cgKeepAmount int32) ComputeNodeStatusCounts { var unDecommissionedCount, decommissioningCount, decommissionedCount int for i := range allBackends { @@ -54,7 +54,7 @@ func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int3 } } - return DecommissionDetail{ + return ComputeNodeStatusCounts{ AllBackendsSize: len(allBackends), UnDecommissionedCount: unDecommissionedCount, DecommissioningCount: decommissioningCount, @@ -63,9 +63,9 @@ func ConstructDecommissionDetail(allBackends []*mysql.Backend, cgKeepAmount int3 } } -func (d *DecommissionDetail) GetDecommissionDetailStatus() DecommissionPhase { +func (d *ComputeNodeStatusCounts) GetDecommissionStatus() DecommissionPhase { if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount { - return DecommissionPhaseSteady + return DecommissionBefore } if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 { return Decommissioning diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 7fdfc57..4025d5d 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -191,7 +191,7 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte break } } - scaleType := getScaleType(st, &est, cgStatus.Phase) + scaleType := getOperationType(st, &est, cgStatus.Phase) switch scaleType { //case "resume": @@ -208,30 +208,31 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte cgKeepAmount := *cg.Replicas cgName := cluster.GetCGName(cg) - decommissionPhase, err := dcgs.decommissionProgressCheck(cluster, sqlClient, cgName, cgKeepAmount) - if err != nil { - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err - } - - switch decommissionPhase { - case resource.DecommissionPhaseSteady: - err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount) + if cluster.Spec.EnableDecommission { + decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) if err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, - err + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err + } + + switch decommissionPhase { + case resource.DecommissionBefore: + err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount) + if err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, + err + } + cgStatus.Phase = dv1.Decommissioning + return nil, errors.New("") + case resource.Decommissioning, resource.DecommissionPhaseUnknown: + cgStatus.Phase = dv1.Decommissioning + klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress") + return nil, errors.New("") + case resource.Decommissioned: + dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) } - cgStatus.Phase = dv1.Scaling - return nil, errors.New("") - case resource.Decommissioning: - cgStatus.Phase = dv1.Scaling - klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress") - return nil, errors.New("") - case resource.Decommissioned: - dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) - default: - // default is DecommissionPhaseUnknown, drop be , not decommission + } else { // not decommission , drop node if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil { cgStatus.Phase = dv1.ScaleDownFailed klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error()) @@ -252,8 +253,9 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, nil } -func getScaleType(st, est *appv1.StatefulSet, phase dv1.Phase) string { - if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.ScaleDownFailed { +func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string { + //Should not check 'phase == dv1.Ready', because the default value of the state initialization is Reconciling in the new Reconcile + if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.Decommissioning || phase == dv1.ScaleDownFailed { return "scaleDown" } return "" @@ -277,7 +279,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris if cgss[i].UniqueId == uniqueId { if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended || cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed || - cgss[i].Phase == dv1.Scaling { + cgss[i].Phase == dv1.Scaling || cgss[i].Phase == dv1.Decommissioning { defaultStatus.Phase = cgss[i].Phase } defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas @@ -644,16 +646,13 @@ func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context } // isDecommissionProgressFinished check decommission status -// if not start decomission or decommission succeed return true -func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(cluster *dv1.DorisDisaggregatedCluster, masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { - if !cluster.Spec.EnableDecommission { - return resource.DecommissionPhaseUnknown, nil - } +// if not start decommission or decommission succeed return true +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { allBackends, err := masterDBClient.GetBackendsByCGName(cgName) if err != nil { klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) return resource.DecommissionPhaseUnknown, err } - decommissionDetail := resource.ConstructDecommissionDetail(allBackends, cgKeepAmount) - return decommissionDetail.GetDecommissionDetailStatus(), nil + decommissionDetail := resource.ConstructComputeNodeStatusCounts(allBackends, cgKeepAmount) + return decommissionDetail.GetDecommissionStatus(), nil } From 5f75294a883dc07268f2b05b7f1ba54fe80dafc2 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Fri, 6 Dec 2024 15:01:14 +0800 Subject: [PATCH 4/7] modify decommission --- pkg/common/utils/resource/decommission.go | 12 +- .../disaggregated_cluster_controller.go | 45 ++-- .../computegroups/controller.go | 204 +-------------- .../computegroups/prepare_modify.go | 239 ++++++++++++++++++ 4 files changed, 283 insertions(+), 217 deletions(-) create mode 100644 pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go index 600f51b..8c5f85c 100644 --- a/pkg/common/utils/resource/decommission.go +++ b/pkg/common/utils/resource/decommission.go @@ -26,11 +26,11 @@ type DecommissionPhase string const ( Decommissioned DecommissionPhase = "Decommissioned" Decommissioning DecommissionPhase = "Decommissioning" - DecommissionBefore DecommissionPhase = "DecommissionBefore" + DecommissionAcceptable DecommissionPhase = "DecommissionAcceptable" DecommissionPhaseUnknown DecommissionPhase = "Unknown" ) -type ComputeNodeStatusCounts struct { +type DecommissionTaskStatus struct { AllBackendsSize int UnDecommissionedCount int DecommissioningCount int @@ -38,7 +38,7 @@ type ComputeNodeStatusCounts struct { BeKeepAmount int } -func ConstructComputeNodeStatusCounts(allBackends []*mysql.Backend, cgKeepAmount int32) ComputeNodeStatusCounts { +func ConstructDecommissionTaskStatus(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionTaskStatus { var unDecommissionedCount, decommissioningCount, decommissionedCount int for i := range allBackends { @@ -54,7 +54,7 @@ func ConstructComputeNodeStatusCounts(allBackends []*mysql.Backend, cgKeepAmount } } - return ComputeNodeStatusCounts{ + return DecommissionTaskStatus{ AllBackendsSize: len(allBackends), UnDecommissionedCount: unDecommissionedCount, DecommissioningCount: decommissioningCount, @@ -63,9 +63,9 @@ func ConstructComputeNodeStatusCounts(allBackends []*mysql.Backend, cgKeepAmount } } -func (d *ComputeNodeStatusCounts) GetDecommissionStatus() DecommissionPhase { +func (d *DecommissionTaskStatus) GetDecommissionPhase() DecommissionPhase { if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount { - return DecommissionBefore + return DecommissionAcceptable } if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 { return Decommissioning diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go index 68b3ea8..90c5831 100644 --- a/pkg/controller/disaggregated_cluster_controller.go +++ b/pkg/controller/disaggregated_cluster_controller.go @@ -202,21 +202,12 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec hv := hash.HashObject(ddc.Spec) var res ctrl.Result - ////TODO: deprecated. - //cmnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Spec.InstanceConfigMap} - //ddcnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name} - //cmnnStr := cmnn.String() - //ddcnnStr := ddcnn.String() - //if _, ok := dc.wcms[cmnnStr]; !ok { - // dc.wcms[cmnnStr] = ddcnnStr - //} - - //sync resource. - //recall all errors var msg string reconRes, reconErr := dc.reconcileSub(ctx, &ddc) if reconErr != nil { msg = msg + reconErr.Error() + } + if !reconRes.IsZero() { res = reconRes } @@ -224,26 +215,32 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec clearRes, clearErr := dc.clearUnusedResources(ctx, &ddc) if clearErr != nil { msg = msg + reconErr.Error() + } + + if !clearRes.IsZero() { res = clearRes } //display new status. disRes, disErr := func() (ctrl.Result, error) { //reorganize status. - if rsRes, rsErr := dc.reorganizeStatus(&ddc); rsErr != nil { - return rsRes, rsErr + var stsRes ctrl.Result + var stsErr error + if stsRes, stsErr = dc.reorganizeStatus(&ddc); stsErr != nil { + return stsRes, stsErr } //update cr or status - if updRes, updErr := dc.updateObjectORStatus(ctx, &ddc, hv); updErr != nil { - return updRes, updErr + if stsRes, stsErr = dc.updateObjectORStatus(ctx, &ddc, hv); stsErr != nil { + return stsRes, stsErr } - return ctrl.Result{}, nil + return stsRes, stsErr }() - if disErr != nil { msg = msg + disErr.Error() + } + if !disRes.IsZero() { res = disRes } @@ -315,7 +312,19 @@ func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Conte //return ctrl.Result{}, err } } - return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC) + res, err := dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC) + + if err != nil { + return res, err + } + + for _, cgs := range ddc.Status.ComputeGroupStatuses { + if cgs.Phase == dv1.Decommissioning { + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + } + return res, nil + } func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index 4025d5d..d2c8de6 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -24,7 +24,6 @@ import ( dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils" "github.com/apache/doris-operator/pkg/common/utils/k8s" - "github.com/apache/doris-operator/pkg/common/utils/mysql" "github.com/apache/doris-operator/pkg/common/utils/resource" "github.com/apache/doris-operator/pkg/common/utils/set" sc "github.com/apache/doris-operator/pkg/controller/sub_controller" @@ -90,23 +89,16 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } errs = append(errs, err) - if err.Error() != "" { - klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) - } + klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) } } if len(errs) != 0 { - msgHead := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name) - msg := "" + msg := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name) for _, err := range errs { msg += err.Error() } - if msg != "" { - return errors.New(msgHead + msg) - } - // msg is "" , means Decommissioning - return errors.New("scaleDown Decommissioning, will Reconcile again, may not be an error, if you meet this error, please ignore it") + return errors.New(msg) } return nil @@ -160,7 +152,7 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C return event, err } event, err = dcgs.reconcileStatefulset(ctx, st, ddc, cg) - if err != nil && err.Error() != "" { + if err != nil { klog.Errorf("disaggregatedComputeGroupsController reconcile statefulset namespace %s name %s failed, err=%s", st.Namespace, st.Name, err.Error()) } @@ -182,65 +174,13 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, err } - var cgStatus *dv1.ComputeGroupStatus - - uniqueId := cg.UniqueId - for i := range cluster.Status.ComputeGroupStatuses { - if cluster.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { - cgStatus = &cluster.Status.ComputeGroupStatuses[i] - break - } - } - scaleType := getOperationType(st, &est, cgStatus.Phase) - - switch scaleType { - //case "resume": - //case "suspend": - case "scaleDown": - sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) - if err != nil { - klog.Errorf("scaleDown getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, - err - } - defer sqlClient.Close() - - cgKeepAmount := *cg.Replicas - cgName := cluster.GetCGName(cg) - - if cluster.Spec.EnableDecommission { - decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) - if err != nil { - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err - } - - switch decommissionPhase { - case resource.DecommissionBefore: - err = dcgs.decommissionBENodesBySQL(sqlClient, cgName, cgKeepAmount) - if err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE decommissionBENodesBySQL failed, err:%s ", err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, - err - } - cgStatus.Phase = dv1.Decommissioning - return nil, errors.New("") - case resource.Decommissioning, resource.DecommissionPhaseUnknown: - cgStatus.Phase = dv1.Decommissioning - klog.Infof("ScaleDownBE decommissionBENodesBySQL in progress") - return nil, errors.New("") - case resource.Decommissioned: - dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) - } - } else { // not decommission , drop node - if err := dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount); err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE scaledOutBENodesBySQL failed, err:%s ", err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, - err - } + err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg) + if err != nil { + if skipApplyStatefulset(err) { + return nil, nil } - cgStatus.Phase = dv1.Scaling + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { @@ -253,14 +193,6 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, nil } -func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string { - //Should not check 'phase == dv1.Ready', because the default value of the state initialization is Reconciling in the new Reconcile - if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.Decommissioning || phase == dv1.ScaleDownFailed { - return "scaleDown" - } - return "" -} - // initial compute group status before sync resources. status changing with sync steps, and generate the last status by classify pods. func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) { cgss := ddc.Status.ComputeGroupStatuses @@ -370,7 +302,7 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) } defer sqlClient.Close() - err = dcgs.scaledOutBENodesBySQL(sqlClient, cgName, cgKeepAmount) + err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) if err != nil { klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) @@ -512,7 +444,6 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie if errMs == "" { return nil } - return errors.New(errMs) } @@ -543,116 +474,3 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD } return nil } - -func getScaledOutBENode( - masterDBClient *mysql.DB, - cgName string, - cgKeepAmount int32) ([]*mysql.Backend, error) { - - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) - if err != nil { - klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error()) - return nil, err - } - - var dropNodes []*mysql.Backend - for i := range allBackends { - node := allBackends[i] - split := strings.Split(node.Host, ".") - splitCGIDArr := strings.Split(split[0], "-") - podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) - if err != nil { - klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) - return nil, err - } - if podNum >= int(cgKeepAmount) { - dropNodes = append(dropNodes, node) - } - } - return dropNodes, nil -} - -func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( - masterDBClient *mysql.DB, - cgName string, - cgKeepAmount int32) error { - - dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) - if err != nil { - klog.Errorf("scaledOutBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) - return err - } - - if len(dropNodes) == 0 { - return nil - } - err = masterDBClient.DropBE(dropNodes) - if err != nil { - klog.Errorf("scaledOutBENodesBySQL DropBENodes failed, err:%s ", err.Error()) - return err - } - return nil -} - -func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodesBySQL( - masterDBClient *mysql.DB, - cgName string, - cgKeepAmount int32) error { - - dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) - if err != nil { - klog.Errorf("decommissionBENodesBySQL getScaledOutBENode failed, err:%s ", err.Error()) - return err - } - - if len(dropNodes) == 0 { - return nil - } - err = masterDBClient.DecommissionBE(dropNodes) - if err != nil { - klog.Errorf("decommissionBENodesBySQL DropBENodes failed, err:%s ", err.Error()) - return err - } - return nil -} - -func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) { - // get user and password - secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) - adminUserName, password := resource.GetDorisLoginInformation(secret) - - // get host and port - // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace - host := cluster.GetFEVIPAddresss() - confMap := dcgs.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps) - queryPort := resource.GetPort(confMap, resource.QUERY_PORT) - - // connect to doris sql to get master node - // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. - dbConf := mysql.DBConfig{ - User: adminUserName, - Password: password, - Host: host, - Port: strconv.FormatInt(int64(queryPort), 10), - Database: "mysql", - } - // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time - masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) - if err != nil { - klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) - return nil, err - } - return masterDBClient, nil -} - -// isDecommissionProgressFinished check decommission status -// if not start decommission or decommission succeed return true -func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) - if err != nil { - klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) - return resource.DecommissionPhaseUnknown, err - } - decommissionDetail := resource.ConstructComputeNodeStatusCounts(allBackends, cgKeepAmount) - return decommissionDetail.GetDecommissionStatus(), nil -} diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go new file mode 100644 index 0000000..a2119d8 --- /dev/null +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -0,0 +1,239 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package computegroups + +import ( + "context" + "errors" + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils/k8s" + "github.com/apache/doris-operator/pkg/common/utils/mysql" + "github.com/apache/doris-operator/pkg/common/utils/resource" + appv1 "k8s.io/api/apps/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" +) + +const decommissioningMessage = "decommissionBENodes in progress" + +func skipApplyStatefulset(err error) bool { + if err == nil || err.Error() == decommissioningMessage { + return true + } + return false +} + +func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { + + var cgStatus *dv1.ComputeGroupStatus + + uniqueId := cg.UniqueId + for i := range cluster.Status.ComputeGroupStatuses { + if cluster.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { + cgStatus = &cluster.Status.ComputeGroupStatuses[i] + break + } + } + optType := getOperationType(st, est, cgStatus.Phase) + + switch optType { + case "scaleDown": + err := dcgs.PreScaleOut(ctx, cgStatus, cluster, cg) + if err != nil { + return err + } + } + + return nil + +} + +func (dcgs *DisaggregatedComputeGroupsController) PreScaleOut(ctx context.Context, cgStatus *dv1.ComputeGroupStatus, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) + if err != nil { + klog.Errorf("PreScaleOut getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) + return err + } + defer sqlClient.Close() + + cgKeepAmount := *cg.Replicas + cgName := cluster.GetCGName(cg) + + if cluster.Spec.EnableDecommission { + if err := dcgs.scaledOutBENodesByDecommission(cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { + return err + } + } else { // not decommission , drop node + if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("PreScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error()) + return err + } + } + cgStatus.Phase = dv1.Scaling + // return nil will apply sts + return nil +} + +func (dcgs DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { + decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) + if err != nil { + return err + } + switch decommissionPhase { + case resource.DecommissionAcceptable: + err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) + if err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("PreScaleOut decommissionBENodes failed, err:%s ", err.Error()) + return err + } + cgStatus.Phase = dv1.Decommissioning + return errors.New(decommissioningMessage) + case resource.Decommissioning, resource.DecommissionPhaseUnknown: + cgStatus.Phase = dv1.Decommissioning + klog.Infof("PreScaleOut decommissionBENodes in progress") + return errors.New(decommissioningMessage) + case resource.Decommissioned: + dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + } + return nil +} + +func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string { + //Should not check 'phase == dv1.Ready', because the default value of the state initialization is Reconciling in the new Reconcile + if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.Decommissioning || phase == dv1.ScaleDownFailed { + return "scaleDown" + } + return "" +} + +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDrop( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("scaledOutBENodesByDrop getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DropBE(dropNodes) + if err != nil { + klog.Errorf("scaledOutBENodesByDrop DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodes( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("decommissionBENodes getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DecommissionBE(dropNodes) + if err != nil { + klog.Errorf("decommissionBENodes DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) { + // get user and password + secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) + adminUserName, password := resource.GetDorisLoginInformation(secret) + + // get host and port + // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace + host := cluster.GetFEVIPAddresss() + confMap := dcgs.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps) + queryPort := resource.GetPort(confMap, resource.QUERY_PORT) + + // connect to doris sql to get master node + // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. + dbConf := mysql.DBConfig{ + User: adminUserName, + Password: password, + Host: host, + Port: strconv.FormatInt(int64(queryPort), 10), + Database: "mysql", + } + // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time + masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) + if err != nil { + klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) + return nil, err + } + return masterDBClient, nil +} + +// isDecommissionProgressFinished check decommission status +// if not start decommission or decommission succeed return true +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) + return resource.DecommissionPhaseUnknown, err + } + dts := resource.ConstructDecommissionTaskStatus(allBackends, cgKeepAmount) + return dts.GetDecommissionPhase(), nil +} + +func getScaledOutBENode( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) ([]*mysql.Backend, error) { + + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error()) + return nil, err + } + + var dropNodes []*mysql.Backend + for i := range allBackends { + node := allBackends[i] + split := strings.Split(node.Host, ".") + splitCGIDArr := strings.Split(split[0], "-") + podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) + return nil, err + } + if podNum >= int(cgKeepAmount) { + dropNodes = append(dropNodes, node) + } + } + return dropNodes, nil +} From e3ab93ec77bba2d237fe844152ae93af1d511c41 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Fri, 6 Dec 2024 15:12:08 +0800 Subject: [PATCH 5/7] modify decommission --- .../disaggregated_cluster/computegroups/prepare_modify.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index a2119d8..bb8452d 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -92,7 +92,7 @@ func (dcgs *DisaggregatedComputeGroupsController) PreScaleOut(ctx context.Contex return nil } -func (dcgs DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) if err != nil { return err From 8c08871d00a60b1bcb0e5805c35ced38b64c86c7 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Fri, 6 Dec 2024 15:18:13 +0800 Subject: [PATCH 6/7] modify decommission --- .../computegroups/prepare_modify.go | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index bb8452d..d04d71f 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -33,17 +33,8 @@ import ( const decommissioningMessage = "decommissionBENodes in progress" -func skipApplyStatefulset(err error) bool { - if err == nil || err.Error() == decommissioningMessage { - return true - } - return false -} - func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { - var cgStatus *dv1.ComputeGroupStatus - uniqueId := cg.UniqueId for i := range cluster.Status.ComputeGroupStatuses { if cluster.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { @@ -55,7 +46,7 @@ func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx contex switch optType { case "scaleDown": - err := dcgs.PreScaleOut(ctx, cgStatus, cluster, cg) + err := dcgs.scaleOut(ctx, cgStatus, cluster, cg) if err != nil { return err } @@ -65,10 +56,10 @@ func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx contex } -func (dcgs *DisaggregatedComputeGroupsController) PreScaleOut(ctx context.Context, cgStatus *dv1.ComputeGroupStatus, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { +func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, cgStatus *dv1.ComputeGroupStatus, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) if err != nil { - klog.Errorf("PreScaleOut getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) + klog.Errorf("ScaleOut getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) return err } defer sqlClient.Close() @@ -83,7 +74,7 @@ func (dcgs *DisaggregatedComputeGroupsController) PreScaleOut(ctx context.Contex } else { // not decommission , drop node if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("PreScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error()) + klog.Errorf("ScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error()) return err } } @@ -102,14 +93,14 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) if err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("PreScaleOut decommissionBENodes failed, err:%s ", err.Error()) + klog.Errorf("scaledOutBENodesByDecommission failed, err:%s ", err.Error()) return err } cgStatus.Phase = dv1.Decommissioning return errors.New(decommissioningMessage) case resource.Decommissioning, resource.DecommissionPhaseUnknown: cgStatus.Phase = dv1.Decommissioning - klog.Infof("PreScaleOut decommissionBENodes in progress") + klog.Infof("scaledOutBENodesByDecommission in progress") return errors.New(decommissioningMessage) case resource.Decommissioned: dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) @@ -237,3 +228,10 @@ func getScaledOutBENode( } return dropNodes, nil } + +func skipApplyStatefulset(err error) bool { + if err == nil || err.Error() == decommissioningMessage { + return true + } + return false +} From 2d6d9ded9afb3e69754dab111501f63ad82ac035 Mon Sep 17 00:00:00 2001 From: catpineapple Date: Mon, 9 Dec 2024 17:16:46 +0800 Subject: [PATCH 7/7] modify decommission --- api/disaggregated/v1/types.go | 4 -- .../computegroups/controller.go | 41 +++++++++---------- .../computegroups/prepare_modify.go | 35 ++++++++++------ 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index 2143901..f63b130 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -295,10 +295,6 @@ type Phase string const ( Ready Phase = "Ready" - //Upgrading represents the spec of the service changed, service in smoothing upgrade. - Upgrading Phase = "Upgrading" - //Failed represents service failed to start, can't be accessed. - Failed Phase = "Failed" //Creating represents service in creating stage. Reconciling Phase = "Reconciling" diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index d2c8de6..4c7013c 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -176,12 +176,12 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg) if err != nil { - if skipApplyStatefulset(err) { - return nil, nil - } klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } + if skipApplyStatefulset(cluster, cg) { + return nil, nil + } if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) @@ -209,9 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris for i := range cgss { if cgss[i].UniqueId == uniqueId { - if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended || - cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed || - cgss[i].Phase == dv1.Scaling || cgss[i].Phase == dv1.Decommissioning { + if cgss[i].Phase != dv1.Ready { defaultStatus.Phase = cgss[i].Phase } defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas @@ -292,22 +290,23 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con } if !cleared { eCGs = append(eCGs, clearCGs[i]) - } else { - // drop compute group - cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") - cgKeepAmount := int32(0) - sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) - if err != nil { - klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) - dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) - } - defer sqlClient.Close() - err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) - if err != nil { - klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) - dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) - } + continue } + // drop compute group + cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") + cgKeepAmount := int32(0) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + defer sqlClient.Close() + err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + } for i := range eCGs { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index d04d71f..b3162ad 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -19,7 +19,6 @@ package computegroups import ( "context" - "errors" dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/mysql" @@ -31,8 +30,6 @@ import ( "strings" ) -const decommissioningMessage = "decommissionBENodes in progress" - func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { var cgStatus *dv1.ComputeGroupStatus uniqueId := cg.UniqueId @@ -50,6 +47,8 @@ func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx contex if err != nil { return err } + default: + // default do nothing, not need pre ApplyStatefulSet } return nil @@ -68,22 +67,22 @@ func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, cgName := cluster.GetCGName(cg) if cluster.Spec.EnableDecommission { - if err := dcgs.scaledOutBENodesByDecommission(cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { + if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { return err } } else { // not decommission , drop node if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error()) + klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) return err } + cgStatus.Phase = dv1.Scaling } - cgStatus.Phase = dv1.Scaling // return nil will apply sts return nil } -func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) if err != nil { return err @@ -93,18 +92,19 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) if err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("scaledOutBENodesByDecommission failed, err:%s ", err.Error()) + klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) return err } cgStatus.Phase = dv1.Decommissioning - return errors.New(decommissioningMessage) + return nil case resource.Decommissioning, resource.DecommissionPhaseUnknown: cgStatus.Phase = dv1.Decommissioning - klog.Infof("scaledOutBENodesByDecommission in progress") - return errors.New(decommissioningMessage) + klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgName) + return nil case resource.Decommissioned: dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) } + cgStatus.Phase = dv1.Scaling return nil } @@ -229,8 +229,17 @@ func getScaledOutBENode( return dropNodes, nil } -func skipApplyStatefulset(err error) bool { - if err == nil || err.Error() == decommissioningMessage { +func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool { + var cgStatus *dv1.ComputeGroupStatus + uniqueId := cg.UniqueId + for i := range ddc.Status.ComputeGroupStatuses { + if ddc.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { + cgStatus = &ddc.Status.ComputeGroupStatuses[i] + break + } + } + + if cgStatus.Phase == dv1.Decommissioning { return true } return false