Skip to content

Commit

Permalink
Make the process entirely in the annotations, no more task
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm committed May 8, 2024
1 parent a7cc053 commit 5801d6a
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 182 deletions.
108 changes: 0 additions & 108 deletions controllers/control/k8ssandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
cassapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
k8capi "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/clientcache"
Expand Down Expand Up @@ -55,8 +54,6 @@ import (

const (
k8ssandraTaskFinalizer = "control.k8ssandra.io/finalizer"
InternalTaskAnnotation = "control.k8ssandra.io/internal-command"
internalRefreshCommand = "refresh"
defaultTTL = time.Duration(86400) * time.Second
)

Expand Down Expand Up @@ -152,12 +149,6 @@ func (r *K8ssandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return r.reportInvalidSpec(ctx, kTask, "unknown K8ssandraCluster %s.%s", kcKey.Namespace, kcKey.Name)
}

if kTask.Spec.Template.Jobs[0].Command == "refresh" {
if !metav1.HasAnnotation(kc.ObjectMeta, InternalTaskAnnotation) {
return r.executeRefreshTask(ctx, kTask, kc)
}
}

if dcs, err := filterDcs(kc, kTask.Spec.Datacenters); err != nil {
return r.reportInvalidSpec(ctx, kTask, err.Error())
} else {
Expand Down Expand Up @@ -210,92 +201,6 @@ func (r *K8ssandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

func (r *K8ssandraTaskReconciler) executeRefreshTask(ctx context.Context, kTask *api.K8ssandraTask, kc *k8capi.K8ssandraCluster) (ctrl.Result, error) {
if kTask.Status.StartTime == nil {
patch := client.MergeFrom(kTask.DeepCopy())
now := metav1.Now()
kTask.Status.StartTime = &now
kTask.Status.Active = 1
kTask.SetCondition(cassapi.JobRunning, metav1.ConditionTrue)
if err := r.Status().Patch(ctx, kTask, patch); err != nil {
return ctrl.Result{}, err
}
}

// First verify if K8ssandraCluster itself has "UpdateRequired" and process it until it no longer has it.
if kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate) == corev1.ConditionTrue {
if metav1.HasAnnotation(kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation) {
return ctrl.Result{Requeue: true}, nil
} else {
patch := client.MergeFrom(kc.DeepCopy())
metav1.SetMetaDataAnnotation(&kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation, string(k8capi.AllowUpdateOnce))
if err := r.Patch(ctx, kc, patch); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}
}

internalTask := &api.K8ssandraTask{}
err := r.Get(ctx, types.NamespacedName{Namespace: kTask.Namespace, Name: kTask.Name + "-refresh-internal"}, internalTask)
// If task wasn't found, create it and if task is still running, requeue
if k8serrors.IsNotFound(err) {
// Then verify that no CassandraDatacenter has "UpdateRequired" and if they do, create new tasks to execute them
dcs, err := r.datacenters(ctx, kc)
if err != nil {
return ctrl.Result{}, err
}

dcsRequiringUpdate := make([]string, 0, len(dcs))
for _, dc := range dcs {
if dc.Status.GetConditionStatus("RequiresUpdate") == corev1.ConditionTrue { // TODO Update this to cassdcapi const when cass-operator is updatedß
dcsRequiringUpdate = append(dcsRequiringUpdate, dc.DatacenterName()) // TODO Ís this the correct reference?
}
}

if len(dcsRequiringUpdate) > 0 {
// Delegate work to the task controller for Datacenter operations
task := &api.K8ssandraTask{
ObjectMeta: metav1.ObjectMeta{
Namespace: kTask.Namespace,
Name: kTask.Name + "-refresh-internal",
Annotations: map[string]string{
InternalTaskAnnotation: internalRefreshCommand,
},
},
Spec: api.K8ssandraTaskSpec{
Datacenters: make([]string, len(dcsRequiringUpdate)),
Template: kTask.Spec.Template,
},
}

if err := r.Create(ctx, task); err != nil {
return ctrl.Result{}, err
}
}
} else if err != nil {
return ctrl.Result{}, err
} else {
// Verify if the job is completed
if internalTask.Status.CompletionTime.IsZero() {
return ctrl.Result{Requeue: true}, nil
}
}

patch := client.MergeFrom(kTask.DeepCopy())
now := metav1.Now()
kTask.Status.CompletionTime = &now
kTask.Status.Succeeded = 1
kTask.Status.Active = 0
kTask.SetCondition(cassapi.JobComplete, metav1.ConditionTrue)
kTask.SetCondition(cassapi.JobRunning, metav1.ConditionFalse)
if err := r.Status().Patch(ctx, kTask, patch); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

func (r *K8ssandraTaskReconciler) getCluster(ctx context.Context, kcKey client.ObjectKey) (*k8capi.K8ssandraCluster, bool, error) {
kc := &k8capi.K8ssandraCluster{}
if err := r.Get(ctx, kcKey, kc); k8serrors.IsNotFound(err) {
Expand All @@ -306,19 +211,6 @@ func (r *K8ssandraTaskReconciler) getCluster(ctx context.Context, kcKey client.O
return kc, true, nil
}

func (r *K8ssandraTaskReconciler) datacenters(ctx context.Context, kc *k8capi.K8ssandraCluster) ([]cassdcapi.CassandraDatacenter, error) {
dcs := make([]cassdcapi.CassandraDatacenter, 0, len(kc.Spec.Cassandra.Datacenters))
for _, dcTemplate := range kc.Spec.Cassandra.Datacenters {
dcKey := client.ObjectKey{Namespace: utils.FirstNonEmptyString(dcTemplate.Meta.Namespace, kc.Namespace), Name: dcTemplate.Meta.Name}
dc := &cassdcapi.CassandraDatacenter{}
if err := r.Get(ctx, dcKey, dc); err != nil {
return nil, err
}
dcs = append(dcs, *dc)
}
return dcs, nil
}

func (r *K8ssandraTaskReconciler) deleteCassandraTasks(
ctx context.Context,
kTask *api.K8ssandraTask,
Expand Down
74 changes: 0 additions & 74 deletions controllers/control/k8ssandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package control

import (
"context"
"fmt"
"testing"
"time"

cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
cassapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
api "github.com/k8ssandra/k8ssandra-operator/apis/control/v1alpha1"
k8capi "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"github.com/k8ssandra/k8ssandra-operator/pkg/clientcache"
"github.com/k8ssandra/k8ssandra-operator/pkg/config"
testutils "github.com/k8ssandra/k8ssandra-operator/pkg/test"
Expand Down Expand Up @@ -73,7 +71,6 @@ func TestK8ssandraTask(t *testing.T) {
t.Run("ExecuteSequentialK8ssandraTask", testEnv.ControllerTest(ctx, executeSequentialK8ssandraTask))
t.Run("DeleteK8ssandraTask", testEnv.ControllerTest(ctx, deleteK8ssandraTask))
t.Run("ExpireK8ssandraTask", testEnv.ControllerTest(ctx, expireK8ssandraTask))
t.Run("RefreshK8ssandraCluster", testEnv.ControllerTest(ctx, refreshK8ssandraTask))
}

// executeParallelK8ssandraTask creates and runs a K8ssandraTask with parallel DC processing.
Expand Down Expand Up @@ -381,65 +378,6 @@ func expireK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framewo
require.Eventually(func() bool { return !f.K8ssandraTaskExists(ctx, k8TaskKey)() }, timeout, interval)
}

func refreshK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) {
require := require.New(t)

kc := newCluster(namespace, "kc",
newDc("dc1", f.DataPlaneContexts[0]))
require.NoError(f.Client.Create(ctx, kc), "failed to create K8ssandraCluster")

kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionTrue)
require.NoError(f.Client.Status().Update(ctx, kc))

dcConfig := cassandra.Coalesce(kc.CassClusterName(), kc.Spec.Cassandra.DeepCopy(), kc.Spec.Cassandra.Datacenters[0].DeepCopy())
dc, err := cassandra.NewDatacenter(types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}, dcConfig)
require.NoError(err)
require.NoError(f.Client.Create(ctx, dc))

t.Log("Create a K8ssandraTask with TTL")
ttl := new(int32)
*ttl = 1
k8Task := &api.K8ssandraTask{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "refresh",
},
Spec: api.K8ssandraTaskSpec{
Cluster: corev1.ObjectReference{
Name: "kc",
},
Template: cassapi.CassandraTaskTemplate{
TTLSecondsAfterFinished: ttl,
Jobs: []cassapi.CassandraJob{{
Name: "job1",
Command: "refresh",
}},
},
DcConcurrencyPolicy: batchv1.ForbidConcurrent,
},
}
require.NoError(f.Client.Create(ctx, k8Task), "failed to create K8ssandraTask")

require.Eventually(func() bool {
if err := f.Client.Get(ctx, types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}, kc); err != nil {
return false
}
if !metav1.HasAnnotation(kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation) {
return false
}
return kc.Annotations[k8capi.AutomatedUpdateAnnotation] == string(k8capi.AllowUpdateOnce)
}, timeout, interval)
// First case, there's only changes in K8ssandraCluster
t.Log("Mark the K8ssandraCluster as updated if the annotation was added")
require.Equal(corev1.ConditionTrue, kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate))
kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionFalse)
require.NoError(f.Client.Status().Update(ctx, kc))

waitForTaskCompletion(ctx, t, f, newClusterKey(f.ControlPlaneContext, namespace, "refresh"))
// Second case, we also have changes in the CassandraDatacenter, even after updating K8ssandraCluster

}

func newCluster(namespace, name string, dcs ...k8capi.CassandraDatacenterTemplate) *k8capi.K8ssandraCluster {
return &k8capi.K8ssandraCluster{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -472,18 +410,6 @@ func newDc(name string, k8sContext string) k8capi.CassandraDatacenterTemplate {
}
}

func waitForTaskCompletion(ctx context.Context, t *testing.T, f *framework.Framework, taskKey framework.ClusterKey) {
require.Eventually(t, func() bool {
k8Task := &api.K8ssandraTask{}
require.NoError(t, f.Get(ctx, taskKey, k8Task))
fmt.Printf("k8Task.Status: %+v\n", k8Task.Status)
return k8Task.Status.Active == 0 &&
k8Task.Status.Succeeded > 0 &&
k8Task.GetConditionStatus(cassapi.JobRunning) == metav1.ConditionFalse &&
k8Task.GetConditionStatus(cassapi.JobComplete) == metav1.ConditionTrue
}, timeout, interval)
}

func newClusterKey(k8sContext, namespace, name string) framework.ClusterKey {
return framework.ClusterKey{
NamespacedName: types.NamespacedName{Namespace: namespace, Name: name},
Expand Down
52 changes: 52 additions & 0 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ import (
"sort"
"strconv"
"strings"
"time"

"github.com/Masterminds/semver/v3"
"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1"
ktaskapi "github.com/k8ssandra/k8ssandra-operator/apis/control/v1alpha1"
api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1"
telemetryapi "github.com/k8ssandra/k8ssandra-operator/apis/telemetry/v1alpha1"
"github.com/k8ssandra/k8ssandra-operator/pkg/annotations"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"github.com/k8ssandra/k8ssandra-operator/pkg/labels"
"github.com/k8ssandra/k8ssandra-operator/pkg/result"
"github.com/k8ssandra/k8ssandra-operator/pkg/secret"
agent "github.com/k8ssandra/k8ssandra-operator/pkg/telemetry/cassandra_agent"
"github.com/k8ssandra/k8ssandra-operator/pkg/utils"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -243,6 +247,54 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k
}
}

if AllowUpdate(kc) {
dcsRequiringUpdate := make([]string, 0, len(actualDcs))
for _, dc := range actualDcs {
if dc.Status.GetConditionStatus("RequiresUpdate") == corev1.ConditionTrue { // TODO Update this to cassdcapi const when cass-operator is updatedß
dcsRequiringUpdate = append(dcsRequiringUpdate, dc.DatacenterName())
}
}

if len(dcsRequiringUpdate) > 0 {
generatedName := fmt.Sprintf("refresh-%d", time.Now().Unix())
internalTask := &ktaskapi.K8ssandraTask{}
err := r.Get(ctx, types.NamespacedName{Namespace: kc.Namespace, Name: generatedName}, internalTask)
// If task wasn't found, create it and if task is still running, requeue
if errors.IsNotFound(err) {
// Delegate work to the task controller for Datacenter operations
task := &ktaskapi.K8ssandraTask{
ObjectMeta: metav1.ObjectMeta{
Namespace: kc.Namespace,
Name: generatedName,
Labels: labels.WatchedByK8ssandraClusterLabels(kcKey),
},
Spec: ktaskapi.K8ssandraTaskSpec{
Cluster: corev1.ObjectReference{
Name: kc.Name,
},
Datacenters: make([]string, len(dcsRequiringUpdate)),
Template: cassctlapi.CassandraTaskTemplate{
Jobs: []cassctlapi.CassandraJob{{
Name: fmt.Sprintf("refresh-%s", kc.Name),
Command: "refresh",
}},
},
DcConcurrencyPolicy: batchv1.ForbidConcurrent,
},
}

if err := r.Create(ctx, task); err != nil {
return result.Error(err), actualDcs
}

return result.RequeueSoon(r.DefaultDelay), actualDcs

} else if internalTask.Status.CompletionTime.IsZero() {
return result.RequeueSoon(r.DefaultDelay), actualDcs
}
}
}

// If we reach this point all CassandraDatacenters are ready. We only set the
// CassandraInitialized condition if it is unset, i.e., only once. This allows us to
// distinguish whether we are deploying a CassandraDatacenter as part of a new cluster
Expand Down
47 changes: 47 additions & 0 deletions controllers/k8ssandra/k8ssandracluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2721,3 +2721,50 @@ func testGenerationCheck(t *testing.T, ctx context.Context, f *framework.Framewo
require.NoError(f.Get(ctx, dcKey, dc), "failed to get CassandraDatacenter dc1")
require.NotEqual("gibberish", dc.Annotations[api.ResourceHashAnnotation])
}

// func refreshK8ssandraTask(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) {
// require := require.New(t)

// kc := newCluster(namespace, "kc",
// newDc("dc1", f.DataPlaneContexts[0]))
// require.NoError(f.Client.Create(ctx, kc), "failed to create K8ssandraCluster")

// kcKey := types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}
// kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionTrue)
// require.NoError(f.Client.Status().Update(ctx, kc))

// dcKey := types.NamespacedName{Namespace: kc.Namespace, Name: kc.Name}
// dcConfig := cassandra.Coalesce(kc.CassClusterName(), kc.Spec.Cassandra.DeepCopy(), kc.Spec.Cassandra.Datacenters[0].DeepCopy())
// dc, err := cassandra.NewDatacenter(dcKey, dcConfig)
// require.NoError(err)
// require.NoError(f.Client.Create(ctx, dc))

// require.NoError(f.Client.Get(ctx, dcKey, dc))
// dc.Status.SetCondition(*cassdcapi.NewDatacenterCondition("RequiresUpdate", corev1.ConditionTrue))
// require.NoError(f.Client.Status().Update(ctx, dc))

// require.NoError(f.Client.Get(ctx, kcKey, kc))
// metav1.SetMetaDataAnnotation(&kc.ObjectMeta, k8capi.AutomatedUpdateAnnotation, string(k8capi.AllowUpdateOnce))

// require.Eventually(func() bool {
// // Find the correct task
// tasks := &api.K8ssandraTaskList{}
// }, timeout, interval)

// require.Eventually(func() bool {
// if err := f.Client.Get(ctx, dcKey, dc); err != nil {
// return false
// }
// return metav1.HasAnnotation(dc.ObjectMeta, "cassandra.datastax.com/autoupdate-spec")
// }, timeout, interval)

// // First case, there's only changes in K8ssandraCluster
// t.Log("Mark the K8ssandraCluster as updated if the annotation was added")
// require.Equal(corev1.ConditionTrue, kc.Status.GetConditionStatus(k8capi.ClusterRequiresUpdate))
// kc.Status.SetConditionStatus(k8capi.ClusterRequiresUpdate, corev1.ConditionFalse)
// require.NoError(f.Client.Status().Update(ctx, kc))

// waitForTaskCompletion(ctx, t, f, newClusterKey(f.ControlPlaneContext, namespace, "refresh"))
// // Second case, we also have changes in the CassandraDatacenter, even after updating K8ssandraCluster

// }

0 comments on commit 5801d6a

Please sign in to comment.