Skip to content

Commit

Permalink
Merge pull request apache#292 from catpineapple/add-restart-capability
Browse files Browse the repository at this point in the history
[feature](dcr) add rolling restart capability
  • Loading branch information
intelligentfu8 authored Nov 22, 2024
2 parents 7d27da0 + c2cb4eb commit 8b06624
Show file tree
Hide file tree
Showing 14 changed files with 181 additions and 13 deletions.
3 changes: 3 additions & 0 deletions api/doris/v1/doriscluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
const (
//ComponentsResourceHash the component hash
ComponentResourceHash string = "app.doris.components/hash"

FERestartAt string = "apache.doris.fe/restartedAt"
BERestartAt string = "apache.doris.be/restartedAt"
)

// the labels key
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,14 @@ spec:
type: string
type: object
type: object
enableWorkloadGroup:
description: |-
EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group.
Default value is 'false'.
Enabling it means that the container must be started in privileged mode.
Please confirm whether the host machine and k8s cluster allow it.
Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group
type: boolean
envVars:
description: cnEnvVars is a slice of environment variables that
are added to the pods, the default is empty.
Expand Down
8 changes: 8 additions & 0 deletions config/crd/bases/doris.selectdb.com_dorisclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,14 @@ spec:
type: string
type: object
type: object
enableWorkloadGroup:
description: |-
EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group.
Default value is 'false'.
Enabling it means that the container must be started in privileged mode.
Please confirm whether the host machine and k8s cluster allow it.
Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group
type: boolean
envVars:
description: cnEnvVars is a slice of environment variables that
are added to the pods, the default is empty.
Expand Down
17 changes: 16 additions & 1 deletion doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,21 @@ BaseSpec
<p>the foundation spec for creating be software services.</p>
</td>
</tr>
<tr>
<td>
<code>enableWorkloadGroup</code><br/>
<em>
bool
</em>
</td>
<td>
<p>EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group.
Default value is &lsquo;false&rsquo;.
Enabling it means that the container must be started in privileged mode.
Please confirm whether the host machine and k8s cluster allow it.
Doris workloadgroup reference document: <a href="https://doris.apache.org/docs/admin-manual/resource-admin/workload-group">https://doris.apache.org/docs/admin-manual/resource-admin/workload-group</a></p>
</td>
</tr>
</tbody>
</table>
<h3 id="doris.selectdb.com/v1.BrokerSpec">BrokerSpec
Expand Down Expand Up @@ -2604,5 +2619,5 @@ string
<hr/>
<p><em>
Generated with <code>gen-crd-api-reference-docs</code>
on git commit <code>54d1acc</code>.
on git commit <code>1a70fe9</code>.
</em></p>
2 changes: 1 addition & 1 deletion doc/disaggregated_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1441,5 +1441,5 @@ string
<hr/>
<p><em>
Generated with <code>gen-crd-api-reference-docs</code>
on git commit <code>54d1acc</code>.
on git commit <code>7d27da0</code>.
</em></p>
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,14 @@ spec:
type: string
type: object
type: object
enableWorkloadGroup:
description: |-
EnableWorkloadGroup is a switch that determines whether the doris cluster enables the workload group.
Default value is 'false'.
Enabling it means that the container must be started in privileged mode.
Please confirm whether the host machine and k8s cluster allow it.
Doris workloadgroup reference document: https://doris.apache.org/docs/admin-manual/resource-admin/workload-group
type: boolean
envVars:
description: cnEnvVars is a slice of environment variables that
are added to the pods, the default is empty.
Expand Down
11 changes: 10 additions & 1 deletion pkg/controller/sub_controller/be/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error {
if dcr.Spec.BeSpec == nil {
return nil
}

var oldStatus v1.ComponentStatus
if dcr.Status.BEStatus != nil {
oldStatus = *(dcr.Status.BEStatus.DeepCopy())
}
be.InitStatus(dcr, v1.Component_BE)
if !be.FeAvailable(dcr) {
return nil
Expand Down Expand Up @@ -83,6 +88,10 @@ func (be *Controller) Sync(ctx context.Context, dcr *v1.DorisCluster) error {
return err
}

if err = be.prepareStatefulsetApply(ctx, dcr, oldStatus); err != nil {
return err
}

st := be.buildBEStatefulSet(dcr)
if !be.PrepareReconcileResources(ctx, dcr, v1.Component_BE) {
klog.Infof("be controller sync preparing resource for reconciling namespace %s name %s!", dcr.Namespace, dcr.Name)
Expand All @@ -109,7 +118,7 @@ func (be *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {
return nil
}

return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas)
return be.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.BEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_BE), *cluster.Spec.BeSpec.Replicas, v1.Component_BE)
}

func (be *Controller) ClearResources(ctx context.Context, dcr *v1.DorisCluster) (bool, error) {
Expand Down
39 changes: 39 additions & 0 deletions pkg/controller/sub_controller/be/prepare_modify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package be

import (
"context"
v1 "github.com/apache/doris-operator/api/doris/v1"
)

// prepareStatefulsetApply means Pre-operation and status control on the client side
func (be *Controller) prepareStatefulsetApply(ctx context.Context, dcr *v1.DorisCluster, oldStatus v1.ComponentStatus) error {

// be rolling restart
// check 1: be Phase is Available
// check 2: be RestartTime is not empty and useful
// check 3: be RestartTime different from old(This condition does not need to be checked here. If it is allowed to pass, it will be processed idempotent when applying sts.)
if oldStatus.ComponentCondition.Phase == v1.Available && be.CheckRestartTimeAndInject(dcr, v1.Component_BE) {
dcr.Status.BEStatus.ComponentCondition.Phase = v1.Restarting
}

//TODO check upgrade

return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (bk *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {

cluster.Status.BrokerStatus = bs
bs.AccessService = v1.GenerateExternalServiceName(cluster, v1.Component_Broker)
return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas)
return bk.ClassifyPodsByStatus(cluster.Namespace, bs, v1.GenerateStatefulSetSelector(cluster, v1.Component_Broker), *cluster.Spec.BrokerSpec.Replicas, v1.Component_Broker)

}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/sub_controller/cn/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (cn *Controller) UpdateComponentStatus(cluster *dorisv1.DorisCluster) error

replicas := *est.Spec.Replicas
cs.AccessService = dorisv1.GenerateExternalServiceName(cluster, dorisv1.Component_CN)
return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas)
return cn.ClassifyPodsByStatus(cluster.Namespace, &cs.ComponentStatus, dorisv1.GenerateStatefulSetSelector(cluster, dorisv1.Component_CN), replicas, dorisv1.Component_CN)
}

// autoscaler represents start autoscaler or not.
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/sub_controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
MSServiceDeletedFailed EventReason = "MSServiceDeletedFailed"
MSStatefulsetDeleteFailed EventReason = "MSStatefulsetDeleteFailed"
FDBAddressNotConfiged EventReason = "FDBAddressNotConfiged"
RestartTimeInvalid EventReason = "RestartTimeInvalid"
)

type Event struct {
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/sub_controller/fe/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (fc *Controller) UpdateComponentStatus(cluster *v1.DorisCluster) error {
return nil
}

return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas)
return fc.ClassifyPodsByStatus(cluster.Namespace, cluster.Status.FEStatus, v1.GenerateStatefulSetSelector(cluster, v1.Component_FE), *cluster.Spec.FeSpec.Replicas, v1.Component_FE)
}

// New construct a FeController.
Expand All @@ -80,6 +80,10 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error
klog.Info("fe Controller Sync ", "the fe component is not needed ", "namespace ", cluster.Namespace, " doris cluster name ", cluster.Name)
return nil
}
var oldStatus v1.ComponentStatus
if cluster.Status.FEStatus != nil {
oldStatus = *(cluster.Status.FEStatus.DeepCopy())
}
fc.InitStatus(cluster, v1.Component_FE)

feSpec := cluster.Spec.FeSpec
Expand Down Expand Up @@ -111,7 +115,7 @@ func (fc *Controller) Sync(ctx context.Context, cluster *v1.DorisCluster) error
return nil
}

if err = fc.prepareStatefulsetApply(ctx, cluster); err != nil {
if err = fc.prepareStatefulsetApply(ctx, cluster, oldStatus); err != nil {
return err
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/controller/sub_controller/fe/prepare_modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

// prepareStatefulsetApply means Pre-operation and status control on the client side
func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster) error {
func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.DorisCluster, oldStatus v1.ComponentStatus) error {
var oldSt appv1.StatefulSet
err := fc.K8sclient.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: v1.GenerateComponentStatefulSetName(cluster, v1.Component_FE)}, &oldSt)
if err != nil {
Expand Down Expand Up @@ -63,7 +63,15 @@ func (fc *Controller) prepareStatefulsetApply(ctx context.Context, cluster *v1.D
return nil
}

//TODO check upgrade ,restart
// fe rolling restart
// check 1: fe Phase is Available
// check 2: fe RestartTime is not empty and useful
// check 3: fe RestartTime different from old(This condition does not need to be checked here. If it is allowed to pass, it will be processed idempotent when applying sts.)
if oldStatus.ComponentCondition.Phase == v1.Available && fc.CheckRestartTimeAndInject(cluster, v1.Component_FE) {
cluster.Status.FEStatus.ComponentCondition.Phase = v1.Restarting
}

//TODO check upgrade

return nil
}
Expand Down
73 changes: 69 additions & 4 deletions pkg/controller/sub_controller/sub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,86 @@ type SubDefaultController struct {
K8srecorder record.EventRecorder
}

func (d *SubDefaultController) CheckRestartTimeAndInject(dcr *dorisv1.DorisCluster, componentType dorisv1.ComponentType) bool {
var baseSpec *dorisv1.BaseSpec
var restartedAt string
var restartAnnotationsKey string
switch componentType {
case dorisv1.Component_FE:
baseSpec = &dcr.Spec.FeSpec.BaseSpec
restartedAt = dcr.Annotations[dorisv1.FERestartAt]
restartAnnotationsKey = dorisv1.FERestartAt
case dorisv1.Component_BE:
baseSpec = &dcr.Spec.BeSpec.BaseSpec
restartedAt = dcr.Annotations[dorisv1.BERestartAt]
restartAnnotationsKey = dorisv1.BERestartAt
default:
klog.Errorf("CheckRestartTimeAndInject dorisClusterName %s, namespace %s componentType %s not supported.", dcr.Name, dcr.Namespace, componentType)
}

if restartedAt == "" {
return false
}

// run shell: date +"%Y-%m-%dT%H:%M:%S%:z"
// "2024-11-21T11:08:56+08:00"
parseTime, err := time.Parse(time.RFC3339, restartedAt)
if err != nil {
checkErr := fmt.Errorf("CheckRestartTimeAndInject error: time format is incorrect. dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s , error: %s", dcr.Name, dcr.Namespace, componentType, restartedAt, err.Error())
klog.Error(checkErr.Error())
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartTimeInvalid), checkErr.Error())
return false
}

effectiveStartTime := time.Now().Add(-10 * time.Minute)

if effectiveStartTime.After(parseTime) {
klog.Errorf("CheckRestartTimeAndInject The time has expired, dorisClusterName: %s, namespace: %s, componentType %s, wrong parse 'restartedAt': %s : The time has expired, if you want to restart doris, please set a future time", dcr.Name, dcr.Namespace, componentType, restartedAt)
d.K8srecorder.Event(dcr, string(EventWarning), string(RestartTimeInvalid), fmt.Sprintf("the %s restart time is not effective. the 'restartedAt' %s can't be earlier than 10 minutes before the current time", componentType, restartedAt))
return false
}

// check passed, set annotations to doriscluster baseSpec
if baseSpec.Annotations == nil {
baseSpec.Annotations = make(map[string]string)
}
baseSpec.Annotations[restartAnnotationsKey] = restartedAt
return true
}

// UpdateStatus update the component status on src.
func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error {
return d.ClassifyPodsByStatus(namespace, status, labels, replicas)
func (d *SubDefaultController) UpdateStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error {
return d.ClassifyPodsByStatus(namespace, status, labels, replicas, componentType)
}

func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32) error {
func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *dorisv1.ComponentStatus, labels map[string]string, replicas int32, componentType dorisv1.ComponentType) error {
var podList corev1.PodList
if err := d.K8sclient.List(context.Background(), &podList, client.InNamespace(namespace), client.MatchingLabels(labels)); err != nil {
return err
}

var creatings, readys, faileds []string
var firstRestartAnnotation, restartAnnotationsKey string
podmap := make(map[string]corev1.Pod)

if len(podList.Items) == 0 {
return nil
}

switch componentType {
case dorisv1.Component_FE:
restartAnnotationsKey = dorisv1.FERestartAt
case dorisv1.Component_BE:
restartAnnotationsKey = dorisv1.BERestartAt
}
firstRestartAnnotation = podList.Items[0].Annotations[restartAnnotationsKey]

//get all pod status that controlled by st.
stsRollingRestartAnnotationsSameCheck := true
for _, pod := range podList.Items {
if pod.Annotations[restartAnnotationsKey] != firstRestartAnnotation {
stsRollingRestartAnnotationsSameCheck = false
}
podmap[pod.Name] = pod
if ready := k8s.PodIsReady(&pod.Status); ready {
readys = append(readys, pod.Name)
Expand All @@ -81,7 +146,7 @@ func (d *SubDefaultController) ClassifyPodsByStatus(namespace string, status *do
}
}

if len(readys) == int(replicas) {
if len(readys) == int(replicas) && stsRollingRestartAnnotationsSameCheck {
status.ComponentCondition.Phase = dorisv1.Available
} else if len(faileds) != 0 {
status.ComponentCondition.Phase = dorisv1.HaveMemberFailed
Expand Down

0 comments on commit 8b06624

Please sign in to comment.