diff --git a/api/disaggregated/v1/types.go b/api/disaggregated/v1/types.go index 2143901..f63b130 100644 --- a/api/disaggregated/v1/types.go +++ b/api/disaggregated/v1/types.go @@ -295,10 +295,6 @@ type Phase string const ( Ready Phase = "Ready" - //Upgrading represents the spec of the service changed, service in smoothing upgrade. - Upgrading Phase = "Upgrading" - //Failed represents service failed to start, can't be accessed. - Failed Phase = "Failed" //Creating represents service in creating stage. Reconciling Phase = "Reconciling" diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go index d2c8de6..4c7013c 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/controller.go @@ -176,12 +176,12 @@ func (dcgs *DisaggregatedComputeGroupsController) reconcileStatefulset(ctx conte err := dcgs.preApplyStatefulSet(ctx, st, &est, cluster, cg) if err != nil { - if skipApplyStatefulset(err) { - return nil, nil - } klog.Errorf("disaggregatedComputeGroupsController reconcileStatefulset preApplyStatefulSet namespace=%s name=%s failed, err=%s", st.Namespace, st.Name, err.Error()) return &sc.Event{Type: sc.EventWarning, Reason: sc.CGSqlExecFailed, Message: err.Error()}, err } + if skipApplyStatefulset(cluster, cg) { + return nil, nil + } if err := k8s.ApplyStatefulSet(ctx, dcgs.K8sclient, st, func(st, est *appv1.StatefulSet) bool { return resource.StatefulsetDeepEqualWithKey(st, est, dv1.DisaggregatedSpecHashValueAnnotation, false) @@ -209,9 +209,7 @@ func (dcgs *DisaggregatedComputeGroupsController) initialCGStatus(ddc *dv1.Doris for i := range cgss { if cgss[i].UniqueId == uniqueId { - if cgss[i].Phase == dv1.ScaleDownFailed || cgss[i].Phase == dv1.Suspended || - cgss[i].Phase == dv1.SuspendFailed || cgss[i].Phase == dv1.ResumeFailed || - cgss[i].Phase == dv1.Scaling || cgss[i].Phase == dv1.Decommissioning { + if cgss[i].Phase != dv1.Ready { defaultStatus.Phase = cgss[i].Phase } defaultStatus.SuspendReplicas = cgss[i].SuspendReplicas @@ -292,22 +290,23 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Con } if !cleared { eCGs = append(eCGs, clearCGs[i]) - } else { - // drop compute group - cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") - cgKeepAmount := int32(0) - sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) - if err != nil { - klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) - dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) - } - defer sqlClient.Close() - err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) - if err != nil { - klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) - dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) - } + continue } + // drop compute group + cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-") + cgKeepAmount := int32(0) + sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + defer sqlClient.Close() + err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) + if err != nil { + klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error()) + dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error()) + } + } for i := range eCGs { diff --git a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go index d04d71f..b3162ad 100644 --- a/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go +++ b/pkg/controller/sub_controller/disaggregated_cluster/computegroups/prepare_modify.go @@ -19,7 +19,6 @@ package computegroups import ( "context" - "errors" dv1 "github.com/apache/doris-operator/api/disaggregated/v1" "github.com/apache/doris-operator/pkg/common/utils/k8s" "github.com/apache/doris-operator/pkg/common/utils/mysql" @@ -31,8 +30,6 @@ import ( "strings" ) -const decommissioningMessage = "decommissionBENodes in progress" - func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx context.Context, st, est *appv1.StatefulSet, cluster *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) error { var cgStatus *dv1.ComputeGroupStatus uniqueId := cg.UniqueId @@ -50,6 +47,8 @@ func (dcgs *DisaggregatedComputeGroupsController) preApplyStatefulSet(ctx contex if err != nil { return err } + default: + // default do nothing, not need pre ApplyStatefulSet } return nil @@ -68,22 +67,22 @@ func (dcgs *DisaggregatedComputeGroupsController) scaleOut(ctx context.Context, cgName := cluster.GetCGName(cg) if cluster.Spec.EnableDecommission { - if err := dcgs.scaledOutBENodesByDecommission(cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { + if err := dcgs.scaledOutBENodesByDecommission(cluster, cgStatus, sqlClient, cgName, cgKeepAmount); err != nil { return err } } else { // not decommission , drop node if err := dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount); err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("ScaleOut scaledOutBENodesByDrop failed, err:%s ", err.Error()) + klog.Errorf("ScaleOut scaledOutBENodesByDrop ddcName:%s, namespace:%s, computeGroupName:%s, drop nodes failed:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) return err } + cgStatus.Phase = dv1.Scaling } - cgStatus.Phase = dv1.Scaling // return nil will apply sts return nil } -func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { +func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission(cluster *dv1.DorisDisaggregatedCluster, cgStatus *dv1.ComputeGroupStatus, sqlClient *mysql.DB, cgName string, cgKeepAmount int32) error { decommissionPhase, err := dcgs.decommissionProgressCheck(sqlClient, cgName, cgKeepAmount) if err != nil { return err @@ -93,18 +92,19 @@ func (dcgs *DisaggregatedComputeGroupsController) scaledOutBENodesByDecommission err = dcgs.decommissionBENodes(sqlClient, cgName, cgKeepAmount) if err != nil { cgStatus.Phase = dv1.ScaleDownFailed - klog.Errorf("scaledOutBENodesByDecommission failed, err:%s ", err.Error()) + klog.Errorf("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s , Decommission failed, err:%s ", cluster.Name, cluster.Namespace, cgName, err.Error()) return err } cgStatus.Phase = dv1.Decommissioning - return errors.New(decommissioningMessage) + return nil case resource.Decommissioning, resource.DecommissionPhaseUnknown: cgStatus.Phase = dv1.Decommissioning - klog.Infof("scaledOutBENodesByDecommission in progress") - return errors.New(decommissioningMessage) + klog.Infof("scaledOutBENodesByDecommission ddcName:%s, namespace:%s, computeGroupName:%s, Decommission in progress", cluster.Name, cluster.Namespace, cgName) + return nil case resource.Decommissioned: dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount) } + cgStatus.Phase = dv1.Scaling return nil } @@ -229,8 +229,17 @@ func getScaledOutBENode( return dropNodes, nil } -func skipApplyStatefulset(err error) bool { - if err == nil || err.Error() == decommissioningMessage { +func skipApplyStatefulset(ddc *dv1.DorisDisaggregatedCluster, cg *dv1.ComputeGroup) bool { + var cgStatus *dv1.ComputeGroupStatus + uniqueId := cg.UniqueId + for i := range ddc.Status.ComputeGroupStatuses { + if ddc.Status.ComputeGroupStatuses[i].UniqueId == uniqueId { + cgStatus = &ddc.Status.ComputeGroupStatuses[i] + break + } + } + + if cgStatus.Phase == dv1.Decommissioning { return true } return false