diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index d82b2c2..f63b130 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -39,6 +39,11 @@ type DorisDisaggregatedClusterSpec struct { // the name of secret that type is `kubernetes.io/basic-auth` and contains keys username, password for management doris node in cluster as fe, be register. // the password key is `password`. the username defaults to `root` and is omitempty. AuthSecret string `json:"authSecret,omitempty"` + + // decommission be or not. default value is false. + // if true, will decommission be node when scale down compute group. + // if false, will drop be node when scale down compute group. + EnableDecommission bool `json:"enableDecommission,omitempty"` } type MetaService struct { @@ -290,15 +295,12 @@ type Phase string const ( Ready Phase = "Ready" - //Upgrading represents the spec of the service changed, service in smoothing upgrade. - Upgrading Phase = "Upgrading" - //Failed represents service failed to start, can't be accessed. - Failed Phase = "Failed" //Creating represents service in creating stage. Reconciling Phase = "Reconciling" //Scaling represents service in Scaling. Scaling Phase = "Scaling" + Decommissioning Phase = "Decommissioning" ScaleDownFailed Phase = "ScaleDownFailed" ResumeFailed Phase = "ResumeFailed" SuspendFailed Phase = "SuspendFailed" diff --git a/pkg/common/utils/resource/decommission.go b/pkg/common/utils/resource/decommission.go new file mode 100644 index 0000000..8c5f85c --- /dev/null +++ b/pkg/common/utils/resource/decommission.go @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package resource + +import ( + "github.com/apache/doris-operator/pkg/common/utils/mysql" +) + +type DecommissionPhase string + +const ( + Decommissioned DecommissionPhase = "Decommissioned" + Decommissioning DecommissionPhase = "Decommissioning" + DecommissionAcceptable DecommissionPhase = "DecommissionAcceptable" + DecommissionPhaseUnknown DecommissionPhase = "Unknown" +) + +type DecommissionTaskStatus struct { + AllBackendsSize int + UnDecommissionedCount int + DecommissioningCount int + DecommissionedCount int + BeKeepAmount int +} + +func ConstructDecommissionTaskStatus(allBackends []*mysql.Backend, cgKeepAmount int32) DecommissionTaskStatus { + var unDecommissionedCount, decommissioningCount, decommissionedCount int + + for i := range allBackends { + node := allBackends[i] + if !node.SystemDecommissioned { + unDecommissionedCount++ + } else { + if node.TabletNum == 0 { + decommissionedCount++ + } else { + decommissioningCount++ + } + } + } + + return DecommissionTaskStatus{ + AllBackendsSize: len(allBackends), + UnDecommissionedCount: unDecommissionedCount, + DecommissioningCount: decommissioningCount, + DecommissionedCount: decommissionedCount, + BeKeepAmount: int(cgKeepAmount), + } +} + +func (d *DecommissionTaskStatus) GetDecommissionPhase() DecommissionPhase { + if d.DecommissioningCount == 0 && d.DecommissionedCount == 0 && d.UnDecommissionedCount > d.BeKeepAmount { + return DecommissionAcceptable + } + if d.UnDecommissionedCount == d.BeKeepAmount && d.DecommissioningCount > 0 { + return Decommissioning + } + + if d.UnDecommissionedCount == d.BeKeepAmount && d.UnDecommissionedCount+d.DecommissionedCount == d.AllBackendsSize { + return Decommissioned + } + return DecommissionPhaseUnknown +} diff --git a/pkg/controller/disaggregated_cluster_controller.go b/pkg/controller/disaggregated_cluster_controller.go index d8565c2..90c5831 100644 --- a/pkg/controller/disaggregated_cluster_controller.go +++ b/pkg/controller/disaggregated_cluster_controller.go @@ -202,21 +202,12 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec hv := hash.HashObject(ddc.Spec) var res ctrl.Result - ////TODO: deprecated. - //cmnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Spec.InstanceConfigMap} - //ddcnn := types.NamespacedName{Namespace: ddc.Namespace, Name: ddc.Name} - //cmnnStr := cmnn.String() - //ddcnnStr := ddcnn.String() - //if _, ok := dc.wcms[cmnnStr]; !ok { - // dc.wcms[cmnnStr] = ddcnnStr - //} - - //sync resource. - //recall all errors var msg string reconRes, reconErr := dc.reconcileSub(ctx, &ddc) if reconErr != nil { msg = msg + reconErr.Error() + } + if !reconRes.IsZero() { res = reconRes } @@ -224,30 +215,39 @@ func (dc *DisaggregatedClusterReconciler) Reconcile(ctx context.Context, req rec clearRes, clearErr := dc.clearUnusedResources(ctx, &ddc) if clearErr != nil { msg = msg + reconErr.Error() + } + + if !clearRes.IsZero() { res = clearRes } //display new status. disRes, disErr := func() (ctrl.Result, error) { //reorganize status. - if res, err = dc.reorganizeStatus(&ddc); err != nil { - return res, err + var stsRes ctrl.Result + var stsErr error + if stsRes, stsErr = dc.reorganizeStatus(&ddc); stsErr != nil { + return stsRes, stsErr } //update cr or status - if res, err = dc.updateObjectORStatus(ctx, &ddc, hv); err != nil { - return res, err + if stsRes, stsErr = dc.updateObjectORStatus(ctx, &ddc, hv); stsErr != nil { + return stsRes, stsErr } - return ctrl.Result{}, nil + return stsRes, stsErr }() - if disErr != nil { msg = msg + disErr.Error() + } + if !disRes.IsZero() { res = disRes } - return res, err + if msg != "" { + return res, errors.New(msg) + } + return res, nil } func (dc *DisaggregatedClusterReconciler) clearUnusedResources(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { @@ -312,7 +312,19 @@ func (dc *DisaggregatedClusterReconciler) updateObjectORStatus(ctx context.Conte //return ctrl.Result{}, err } } - return dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC) + res, err := dc.updateDorisDisaggregatedClusterStatus(ctx, deepCopyDDC) + + if err != nil { + return res, err + } + + for _, cgs := range ddc.Status.ComputeGroupStatuses { + if cgs.Phase == dv1.Decommissioning { + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + } + return res, nil + } func (dc *DisaggregatedClusterReconciler) updateDorisDisaggregatedClusterStatus(ctx context.Context, ddc *dv1.DorisDisaggregatedCluster) (ctrl.Result, error) { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index cb0193d..4c7013c 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -24,7 +24,6 @@ import ( dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils" "github.com/apache/doris-operator/pkg/common/utils/k8s" - "github.com/apache/doris-operator/pkg/common/utils/mysql" "github.com/apache/doris-operator/pkg/common/utils/resource" "github.com/apache/doris-operator/pkg/common/utils/set" sc "github.com/apache/doris-operator/pkg/controller/sub_controller" @@ -81,6 +80,7 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj return errors.New("validating compute group failed") } + var errs []error cgs := ddc.Spec.ComputeGroups for i, _ := range cgs { @@ -88,10 +88,19 @@ func (dcgs *DisaggregatedComputeGroupsController) Sync(ctx context.Context, obj if event != nil { dcgs.K8srecorder.Event(ddc, string(event.Type), string(event.Reason), event.Message) } + errs = append(errs, err) klog.Errorf("disaggregatedComputeGroupsController computeGroups sync failed, compute group Uniqueid %s sync failed, err=%s", cgs[i].UniqueId, sc.EventString(event)) } } + if len(errs) != 0 { + msg := fmt.Sprintf("disaggregatedComputeGroupsController sync namespace: %s ,ddc name: %s, compute group has the following error: ", ddc.Namespace, ddc.Name) + for _, err := range errs { + msg += err.Error() + } + return errors.New(msg) + } + return nil } @@ -150,6 +159,7 @@ func (dcgs *DisaggregatedComputeGroupsController) computeGroupSync(ctx context.C return event, err } +// reconcileStatefulset return bool means reconcile print error message. func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx context.Context, st *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) (*sc.Event, error) { var est appv1.StatefulSet if err := dcgs.K8sclient.Get(ctx, types.NamespacedName{Namespace: st.Namespace, Name: st.Name}, &est); apierrors.IsNotFound(err) { @@ -164,16 +174,14 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return nil, err } - var cgStatus *dv1.ComputeGroupStatus - - uniqueId := cg.UniqueId - for i := range cluster.Status.ComputeGroupStatuses { - if cluster.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { - cgStatus = &cluster.Status.ComputeGroupStatuses[i] - break - } + err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg) + if err != nil { + klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) + return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err + } + if skipApplyStatefulset(cluster, cg) { + return nil, nil } - scaleType := getScaleType(st, &est, cgStatus.Phase) if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) @@ -182,32 +190,9 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte return &sc.Event{Type: sc.EventWarning, Reason: sc.CGApplyResourceFailed, Message: err.Error()}, err } - switch scaleType { - //case "resume": - //case "suspend": - case "scaleDown": - cgKeepAmount := *cg.Replicas - cgName := cluster.GetCGName(cg) - if err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, cluster, cgName, cgKeepAmount); err != nil { - cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleDownBE failed, err:%s ", err.Error()) - return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, - err - } - cgStatus.Phase = dv1.Scaling - - } - return nil, nil } -func getScaleType(st, est *appv1.StatefulSet, phase dv1.Phase) string { - if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.ScaleDownFailed { - return "scaleDown" - } - return "" -} - // initial compute group status before sync resources. status changing with sync steps, and generate the last status by classify pods. func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) { cgss := ddc.Status.ComputeGroupStatuses @@ -224,9 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris for i := range cgss { if cgss[i].UniqueId == uniqueId { - if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended || - cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed || - cgss[i].Phase == dv1.Scaling { + if cgss[i].Phase != dv1.Ready { defaultStatus.Phase = cgss[i].Phase } defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas @@ -307,16 +290,23 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con } if !cleared { eCGs = append(eCGs, clearCGs[i]) - } else { - // drop compute group - cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") - cgKeepAmount := int32(0) - err := dcgs.scaledOutBENodesBySQL(ctx, dcgs.K8sclient, ddc, cgName, cgKeepAmount) - if err != nil { - klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) - dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) - } + continue } + // drop compute group + cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") + cgKeepAmount := int32(0) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + defer sqlClient.Close() + err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + } for i := range eCGs { @@ -453,7 +443,6 @@ func (dcgs *DisaggregatedComputeGroupsController) UpdateComponentStatus(obj clie if errMs == "" { return nil } - return errors.New(errMs) } @@ -484,68 +473,3 @@ func (dcgs *DisaggregatedComputeGroupsController) updateCGStatus(ddc *dv1.DorisD } return nil } - -func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesBySQL( - ctx context.Context, k8sclient client.Client, - cluster *dv1.DorisDisaggregatedCluster, - cgName string, - cgKeepAmount int32) error { - - // get user and password - secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) - adminUserName, password := resource.GetDorisLoginInformation(secret) - - // get host and port - // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace - host := cluster.GetFEVIPAddresss() - confMap := dcgs.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps) - queryPort := resource.GetPort(confMap, resource.QUERY_PORT) - - // connect to doris sql to get master node - // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. - dbConf := mysql.DBConfig{ - User: adminUserName, - Password: password, - Host: host, - Port: strconv.FormatInt(int64(queryPort), 10), - Database: "mysql", - } - // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time - masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) - if err != nil { - klog.Errorf("dropNodeBySQLClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) - return err - } - defer masterDBClient.Close() - - allBackends, err := masterDBClient.GetBackendsByCGName(cgName) - if err != nil { - klog.Errorf("dropNodeBySQLClient failed, ShowBackends err:%s", err.Error()) - return err - } - - var dropNodes []*mysql.Backend - for i := range allBackends { - node := allBackends[i] - split := strings.Split(node.Host, ".") - splitCGIDArr := strings.Split(split[0], "-") - podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) - if err != nil { - klog.Errorf("dropNodeBySQLClient splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) - return err - } - if podNum >= int(cgKeepAmount) { - dropNodes = append(dropNodes, node) - } - } - - if len(dropNodes) == 0 { - return nil - } - err = masterDBClient.DropBE(dropNodes) - if err != nil { - klog.Errorf("dropNodeBySQLClient DropBENodes failed, err:%s ", err.Error()) - return err - } - return nil -} diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go new file mode 100644 index 0000000..b3162ad --- /dev/null +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package computegroups + +import ( + "context" + dv1 "github.com/apache/doris-operator/api/disaggregated/v1" + "github.com/apache/doris-operator/pkg/common/utils/k8s" + "github.com/apache/doris-operator/pkg/common/utils/mysql" + "github.com/apache/doris-operator/pkg/common/utils/resource" + appv1 "k8s.io/api/apps/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "strings" +) + +func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { + var cgStatus *dv1.ComputeGroupStatus + uniqueId := cg.UniqueId + for i := range cluster.Status.ComputeGroupStatuses { + if cluster.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { + cgStatus = &cluster.Status.ComputeGroupStatuses[i] + break + } + } + optType := getOperationType(st, est, cgStatus.Phase) + + switch optType { + case "scaleDown": + err := dcgs.scaleOut(ctx, cgStatus, cluster, cg) + if err != nil { + return err + } + default: + // default do nothing, not need pre ApplyStatefulSet + } + + return nil + +} + +func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, cgStatus *dv1.ComputeGroupStatus, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, cluster) + if err != nil { + klog.Errorf("ScaleOut getMasterSqlClient failed, get fe master node connection err:%s", err.Error()) + return err + } + defer sqlClient.Close() + + cgKeepAmount := *cg.Replicas + cgName := cluster.GetCGName(cg) + + if cluster.Spec.EnableDecommission { + if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { + return err + } + } else { // not decommission , drop node + if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) + return err + } + cgStatus.Phase = dv1.Scaling + } + // return nil will apply sts + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { + decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) + if err != nil { + return err + } + switch decommissionPhase { + case resource.DecommissionAcceptable: + err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) + if err != nil { + cgStatus.Phase = dv1.ScaleDownFailed + klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) + return err + } + cgStatus.Phase = dv1.Decommissioning + return nil + case resource.Decommissioning, resource.DecommissionPhaseUnknown: + cgStatus.Phase = dv1.Decommissioning + klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgName) + return nil + case resource.Decommissioned: + dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + } + cgStatus.Phase = dv1.Scaling + return nil +} + +func getOperationType(st, est *appv1.StatefulSet, phase dv1.Phase) string { + //Should not check 'phase == dv1.Ready', because the default value of the state initialization is Reconciling in the new Reconcile + if *(st.Spec.Replicas) < *(est.Spec.Replicas) || phase == dv1.Decommissioning || phase == dv1.ScaleDownFailed { + return "scaleDown" + } + return "" +} + +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDrop( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("scaledOutBENodesByDrop getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DropBE(dropNodes) + if err != nil { + klog.Errorf("scaledOutBENodesByDrop DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) decommissionBENodes( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) error { + + dropNodes, err := getScaledOutBENode(masterDBClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("decommissionBENodes getScaledOutBENode failed, err:%s ", err.Error()) + return err + } + + if len(dropNodes) == 0 { + return nil + } + err = masterDBClient.DecommissionBE(dropNodes) + if err != nil { + klog.Errorf("decommissionBENodes DropBENodes failed, err:%s ", err.Error()) + return err + } + return nil +} + +func (dcgs *DisaggregatedComputeGroupsController) getMasterSqlClient(ctx context.Context, k8sclient client.Client, cluster *dv1.DorisDisaggregatedCluster) (*mysql.DB, error) { + // get user and password + secret, _ := k8s.GetSecret(ctx, k8sclient, cluster.Namespace, cluster.Spec.AuthSecret) + adminUserName, password := resource.GetDorisLoginInformation(secret) + + // get host and port + // When the operator and dcr are deployed in different namespace, it will be inaccessible, so need to add the dcr svc namespace + host := cluster.GetFEVIPAddresss() + confMap := dcgs.GetConfigValuesFromConfigMaps(cluster.Namespace, resource.FE_RESOLVEKEY, cluster.Spec.FeSpec.ConfigMaps) + queryPort := resource.GetPort(confMap, resource.QUERY_PORT) + + // connect to doris sql to get master node + // It may not be the master, or even the node that needs to be deleted, causing the deletion SQL to fail. + dbConf := mysql.DBConfig{ + User: adminUserName, + Password: password, + Host: host, + Port: strconv.FormatInt(int64(queryPort), 10), + Database: "mysql", + } + // Connect to the master and run the SQL statement of system admin, because it is not excluded that the user can shrink be and fe at the same time + masterDBClient, err := mysql.NewDorisMasterSqlDB(dbConf) + if err != nil { + klog.Errorf("getMasterSqlClient NewDorisMasterSqlDB failed, get fe node connection err:%s", err.Error()) + return nil, err + } + return masterDBClient, nil +} + +// isDecommissionProgressFinished check decommission status +// if not start decommission or decommission succeed return true +func (dcgs *DisaggregatedComputeGroupsController) decommissionProgressCheck(masterDBClient *mysql.DB, cgName string, cgKeepAmount int32) (resource.DecommissionPhase, error) { + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("decommissionProgressCheck failed, ShowBackends err:%s", err.Error()) + return resource.DecommissionPhaseUnknown, err + } + dts := resource.ConstructDecommissionTaskStatus(allBackends, cgKeepAmount) + return dts.GetDecommissionPhase(), nil +} + +func getScaledOutBENode( + masterDBClient *mysql.DB, + cgName string, + cgKeepAmount int32) ([]*mysql.Backend, error) { + + allBackends, err := masterDBClient.GetBackendsByCGName(cgName) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing failed, ShowBackends err:%s", err.Error()) + return nil, err + } + + var dropNodes []*mysql.Backend + for i := range allBackends { + node := allBackends[i] + split := strings.Split(node.Host, ".") + splitCGIDArr := strings.Split(split[0], "-") + podNum, err := strconv.Atoi(splitCGIDArr[len(splitCGIDArr)-1]) + if err != nil { + klog.Errorf("scaledOutBEPreprocessing splitCGIDArr can not split host : %s,err:%s", node.Host, err.Error()) + return nil, err + } + if podNum >= int(cgKeepAmount) { + dropNodes = append(dropNodes, node) + } + } + return dropNodes, nil +} + +func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool { + var cgStatus *dv1.ComputeGroupStatus + uniqueId := cg.UniqueId + for i := range ddc.Status.ComputeGroupStatuses { + if ddc.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { + cgStatus = &ddc.Status.ComputeGroupStatuses[i] + break + } + } + + if cgStatus.Phase == dv1.Decommissioning { + return true + } + return false +}