Skip to content

Commit

Permalink
Merge pull request apache#318 from intelligentfu8/clear-resource-refa…
Browse files Browse the repository at this point in the history
…ctor

[Refactor](ddc)refactor clear resource code,fix service label error,use sts replicas…
  • Loading branch information
catpineapple authored Dec 19, 2024
2 parents adcfcd3 + 6960aa6 commit 5ef8f61
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 52 deletions.
23 changes: 23 additions & 0 deletions pkg/common/utils/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ func ApplyService(ctx context.Context, k8sclient client.Client, svc *corev1.Serv
return PatchClientObject(ctx, k8sclient, svc)
}

func ListServicesInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]corev1.Service, error) {
var svcList corev1.ServiceList
if err := k8sclient.List(ctx, &svcList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}

return svcList.Items, nil
}

func ListStatefulsetInNamespace(ctx context.Context, k8sclient client.Client, namespace string, selector map[string]string) ([]appv1.StatefulSet, error) {
var stsList appv1.StatefulSetList
if err := k8sclient.List(ctx, &stsList, client.InNamespace(namespace), client.MatchingLabels(selector)); err != nil {
return nil, err
}
return stsList.Items, nil
}

// ApplyStatefulSet when the object is not exist, create object. if exist and statefulset have been updated, patch the statefulset.
func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.StatefulSet, equal StatefulSetEqual) error {
var est appv1.StatefulSet
Expand All @@ -93,6 +110,12 @@ func ApplyStatefulSet(ctx context.Context, k8sclient client.Client, st *appv1.St
return err
}

func GetStatefulSet(ctx context.Context, k8sclient client.Client, namespace, name string) (*appv1.StatefulSet, error) {
var est appv1.StatefulSet
err := k8sclient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &est)
return &est, err
}

func CreateClientObject(ctx context.Context, k8sclient client.Client, object client.Object) error {
klog.Info("Creating resource service ", "namespace ", object.GetNamespace(), " name ", object.GetName(), " kind ", object.GetObjectKind().GroupVersionKind().Kind)
if err := k8sclient.Create(ctx, object); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,79 +259,220 @@ func (dcgs *DisaggregatedComputeGroupsController) validateRegex(cgs []dv1.Comput
func (dcgs *DisaggregatedComputeGroupsController) ClearResources(ctx context.Context, obj client.Object) (bool, error) {
ddc := obj.(*dv1.DorisDisaggregatedCluster)

if !dcgs.feAvailable(ddc) {
return false, nil
}

var clearCGs []dv1.ComputeGroupStatus
var eCGs []dv1.ComputeGroupStatus

for i, cgs := range ddc.Status.ComputeGroupStatuses {
for _, cg := range ddc.Spec.ComputeGroups {
if cgs.UniqueId == cg.UniqueId {
eCGs = append(eCGs, ddc.Status.ComputeGroupStatuses[i])
goto NoNeedAppend
break
}
}

clearCGs = append(clearCGs, ddc.Status.ComputeGroupStatuses[i])
// no need clear should not append.
NoNeedAppend:
}

sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
//list the svcs and stss owner reference to dorisDisaggregatedCluster.
cls := dcgs.GetCG2LayerCommonSchedulerLabels(ddc.Name)
svcs, err := k8s.ListServicesInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
klog.Errorf("DisaggregatedComputeGroupsController ListServicesInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
stss, err := k8s.ListStatefulsetInNamespace(ctx, dcgs.K8sclient, ddc.Namespace, cls)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ListStatefulsetInNamespace failed, dorisdisaggregatedcluster name=%s", ddc.Name)
return false, err
}
defer sqlClient.Close()

for i := range clearCGs {
cgs := clearCGs[i]
cleared := true
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
cleared = false
klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
}
//clear unused service and statefulset.
delSvcNames := dcgs.findUnusedSvcs(svcs, ddc)
delStsNames, delUniqueIds := dcgs.findUnusedStssAndUniqueIds(stss, ddc)

if err = dcgs.clearCGInDorisMeta(ctx, delUniqueIds, ddc); err != nil {
return false, err
}
if err = dcgs.clearSvcs(ctx, delSvcNames, ddc); err != nil {
return false, err
}
if err = dcgs.clearStatefulsets(ctx, delStsNames, ddc); err != nil {
return false, err
}

if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
cleared = false
klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
//clear unused pvc
for i := range eCGs {
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear ComputeGroup reduced replicas PVC failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, eCGs[i].UniqueId, err.Error())
}
if !cleared {
eCGs = append(eCGs, clearCGs[i])
continue
}

for _, uniqueId := range delUniqueIds {
//new fake computeGroup status for clear all pvcs owner reference to deleted compute group.
fakeCgs := dv1.ComputeGroupStatus{
UniqueId: uniqueId,
}
// drop compute group
cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
cgKeepAmount := int32(0)
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
err = dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, fakeCgs)
if err != nil {
klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear deleted compute group failed, namespace=%s, ddc name=%s, uniqueId=%s err=%s", ddc.Namespace, ddc.Name, uniqueId, err.Error())
}
}

ddc.Status.ComputeGroupStatuses = eCGs
return true, nil

//TODO: next pr remove the code
//sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
//if err != nil {
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
// return false, err
//}
//defer sqlClient.Close()
//
//for i := range clearCGs {
// cgs := clearCGs[i]
// cleared := true
// if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, cgs.StatefulsetName); err != nil {
// cleared = false
// klog.Errorf("disaggregatedComputeGroupsController delete statefulset namespace %s name %s failed, err=%s", ddc.Namespace, cgs.StatefulsetName, err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGStatefulsetDeleteFailed), err.Error())
// }
//
// if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, cgs.ServiceName); err != nil {
// cleared = false
// klog.Errorf("disaggregatedComputeGroupsController delete service namespace %s name %s failed, err=%s", ddc.Namespace, cgs.ServiceName, err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGServiceDeleteFailed), err.Error())
// }
// if !cleared {
// eCGs = append(eCGs, clearCGs[i])
// continue
// }
// // drop compute group
// cgName := strings.ReplaceAll(cgs.UniqueId, "_", "-")
// cgKeepAmount := int32(0)
// err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, cgKeepAmount)
// if err != nil {
// klog.Errorf("computeGroupSync ClearResources dropCGBySQLClient failed: %s", err.Error())
// dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
// }
//
//}
//
//for i := range eCGs {
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
// if err != nil {
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
// }
//}
//for i := range clearCGs {
// err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
// if err != nil {
// klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
// }
//}
//
//ddc.Status.ComputeGroupStatuses = eCGs
//
//return true, nil
}

func (dcgs *DisaggregatedComputeGroupsController) clearStatefulsets(ctx context.Context, stsNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range stsNames {
if err := k8s.DeleteStatefulset(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear statefulset failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
return nil
}

for i := range eCGs {
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, eCGs[i])
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear whole ComputeGroup PVC failed, err=%s", err.Error())
func (dcgs *DisaggregatedComputeGroupsController) clearSvcs(ctx context.Context, svcNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
for _, name := range svcNames {
if err := k8s.DeleteService(ctx, dcgs.K8sclient, ddc.Namespace, name); err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clear service failed, namespace=%s, name =%s, err=%s", ddc.Namespace, name, err.Error())
return err
}
}
for i := range clearCGs {
err := dcgs.ClearStatefulsetUnusedPVCs(ctx, ddc, clearCGs[i])
return nil
}

func (dcgs *DisaggregatedComputeGroupsController) clearCGInDorisMeta(ctx context.Context, cgNames []string, ddc *dv1.DorisDisaggregatedCluster) error {
if len(cgNames) == 0 {
return nil
}

sqlClient, err := dcgs.getMasterSqlClient(ctx, dcgs.K8sclient, ddc)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient getMasterSqlClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
defer sqlClient.Close()

for _, name := range cgNames {
//clear cg, the keepAmount = 0
//confirm used the right cgName, as the cgName get from the uniqueid that '-' replaced by '_'.
cgName := strings.ReplaceAll(name, "-", "_")
err = dcgs.scaledOutBENodesByDrop(sqlClient, cgName, 0)
if err != nil {
klog.Errorf("disaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs clear part ComputeGroup PVC failed, err=%s", err.Error())
klog.Errorf("DisaggregatedComputeGroupsController clearCGInDorisMeta dropCGBySQLClient failed: %s", err.Error())
dcgs.K8srecorder.Event(ddc, string(sc.EventWarning), string(sc.CGSqlExecFailed), "computeGroupSync dropCGBySQLClient failed: "+err.Error())
return err
}
}

ddc.Status.ComputeGroupStatuses = eCGs
return nil
}

return true, nil
func (dcgs *DisaggregatedComputeGroupsController) findUnusedSvcs(svcs []corev1.Service, ddc *dv1.DorisDisaggregatedCluster) []string {
var unusedSvcNames []string
for i, _ := range svcs {
own := ownerReference2ddc(&svcs[i], ddc)
if !own {
//not owner reference to ddc, should skip the service.
continue
}

svcUniqueId := getUniqueIdFromClientObject(&svcs[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == svcUniqueId {
exist = true
break
}
}

if !exist {
unusedSvcNames = append(unusedSvcNames, svcs[i].Name)
}
}

return unusedSvcNames
}

func (dcgs *DisaggregatedComputeGroupsController) findUnusedStssAndUniqueIds(stss []appv1.StatefulSet, ddc *dv1.DorisDisaggregatedCluster) ([]string /*sts*/, []string /*cgNames*/) {
var unusedStsNames []string
var unusedUniqueIds []string
for i, _ := range stss {
own := ownerReference2ddc(&stss[i], ddc)
if !own {
//not owner reference tto ddc should skip the statefulset.
continue
}

stsUniqueId := getUniqueIdFromClientObject(&stss[i])
exist := false
for j := 0; j < len(ddc.Spec.ComputeGroups); j++ {
if ddc.Spec.ComputeGroups[j].UniqueId == stsUniqueId {
exist = true
break
}
}
if !exist {
unusedStsNames = append(unusedStsNames, stss[i].Name)
unusedUniqueIds = append(unusedUniqueIds, stsUniqueId)
}
}

return unusedStsNames, unusedUniqueIds
}

// ClearStatefulsetUnusedPVCs
Expand Down Expand Up @@ -365,8 +506,17 @@ func (dcgs *DisaggregatedComputeGroupsController) ClearStatefulsetUnusedPVCs(ctx
}

if cg != nil {
replicas := int(*cg.Replicas)
//we should use statefulset replicas for avoiding the phase=scaleDown, when phase `scaleDown` cg' replicas is less than statefuslet.
replicas := 0
stsName := ddc.GetCGStatefulsetName(cg)
sts, err := k8s.GetStatefulSet(ctx, dcgs.K8sclient, ddc.Namespace, stsName)
if err != nil {
klog.Errorf("DisaggregatedComputeGroupsController ClearStatefulsetUnusedPVCs get statefulset namespace=%s, name=%s, failed, err=%s", ddc.Namespace, stsName, err.Error())
//waiting next reconciling.
return nil
}
replicas = int(*sts.Spec.Replicas)

cvs := dcgs.GetConfigValuesFromConfigMaps(ddc.Namespace, resource.BE_RESOLVEKEY, cg.CommonSpec.ConfigMaps)
paths, _ := dcgs.getCacheMaxSizeAndPaths(cvs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (dcgs *DisaggregatedComputeGroupsController) newService(ddc *dv1.DorisDisag

ob := &svc.ObjectMeta
ob.Name = ddc.GetCGServiceName(cg)
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Namespace, uniqueId)
ob.Labels = dcgs.newCG2LayerSchedulerLabels(ddc.Name, uniqueId)

spec := &svc.Spec
spec.Selector = dcgs.newCGPodsSelector(ddc.Name, uniqueId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ const (

// generate statefulset or service labels
func (dcgs *DisaggregatedComputeGroupsController) newCG2LayerSchedulerLabels(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
labels := dcgs.GetCG2LayerCommonSchedulerLabels(ddcName)
labels[dv1.DorisDisaggregatedComputeGroupUniqueId] = uniqueId
return labels
}

func (dcgs *DisaggregatedComputeGroupsController) GetCG2LayerCommonSchedulerLabels(ddcName string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedComputeGroupUniqueId: uniqueId,
dv1.DorisDisaggregatedOwnerReference: ddcName,
dv1.DorisDisaggregatedClusterName: ddcName,
dv1.DorisDisaggregatedOwnerReference: ddcName,
}
}

func (dcgs *DisaggregatedComputeGroupsController) newCGPodsSelector(ddcName /*DisaggregatedClusterName*/, uniqueId string) map[string]string {
return map[string]string{
dv1.DorisDisaggregatedClusterName: ddcName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,36 @@

package computegroups

import (
dv1 "github.com/apache/doris-operator/api/disaggregated/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// regex
var (
compute_group_name_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
compute_group_id_regex = "[a-zA-Z](_?[0-9a-zA-Z])*"
)

func ownerReference2ddc(obj client.Object, cluster *dv1.DorisDisaggregatedCluster) bool {
if obj == nil {
return false
}

ors := obj.GetOwnerReferences()
for _, or := range ors {
if or.Name == cluster.Name && or.UID == cluster.UID {
return true
}
}

return false
}

func getUniqueIdFromClientObject(obj client.Object) string {
if obj == nil {
return ""
}
labels := obj.GetLabels()
return labels[dv1.DorisDisaggregatedComputeGroupUniqueId]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (dfc *DisaggregatedFEController) newService(ddc *dv1.DorisDisaggregatedClus
svc := dfc.NewDefaultService(ddc)
om := &svc.ObjectMeta
om.Name = ddc.GetFEServiceName()
om.Labels = dfc.newFESchedulerLabels(ddc.Namespace)
om.Labels = dfc.newFESchedulerLabels(ddc.Name)

spec := &svc.Spec
spec.Selector = dfc.newFEPodsSelector(ddc.Name)
Expand Down

0 comments on commit 5ef8f61

Please sign in to comment.