Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature to allow waiting for cleanup to finish after ScaleUp #723

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti
## unreleased

* [FEATURE] [#651](https://github.com/k8ssandra/cass-operator/issues/651) Add tsreload task for DSE deployments and ability to check if sync operation is available on the mgmt-api side
* [ENHANCEMENT] [#722](https://github.com/k8ssandra/cass-operator/issues/722) Add back the ability to track cleanup task before marking scale up as done. This is controlled by an annotation cassandra.datastax.com/track-cleanup-tasks
* [BUGFIX] [#705](https://github.com/k8ssandra/cass-operator/issues/705) Ensure ConfigSecret has annotations map before trying to set a value

## v1.22.4
Expand Down
23 changes: 23 additions & 0 deletions apis/cassandra/v1beta1/cassandradatacenter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const (
// UseClientBuilderAnnotation enforces the usage of new config builder from k8ssandra-client for versions that would otherwise use the cass-config-builder
UseClientBuilderAnnotation = "cassandra.datastax.com/use-new-config-builder"

// TrackCleanupTasksAnnotation enforces the operator to track cleanup tasks after doing scale up. This prevents other operations to take place until the cleanup
// task has completed.
TrackCleanupTasksAnnotation = "cassandra.datastax.com/track-cleanup-tasks"

AllowUpdateAlways AllowUpdateType = "always"
AllowUpdateOnce AllowUpdateType = "once"

Expand Down Expand Up @@ -563,6 +567,25 @@ func (status *CassandraDatacenterStatus) GetConditionStatus(conditionType Datace
return corev1.ConditionUnknown
}

func (status *CassandraDatacenterStatus) AddTaskToTrack(objectMeta metav1.ObjectMeta) {
if status.TrackedTasks == nil {
status.TrackedTasks = make([]corev1.ObjectReference, 0, 1)
}

status.TrackedTasks = append(status.TrackedTasks, corev1.ObjectReference{
Name: objectMeta.Name,
Namespace: objectMeta.Namespace,
})
}

func (status *CassandraDatacenterStatus) RemoveTrackedTask(objectMeta metav1.ObjectMeta) {
for index, task := range status.TrackedTasks {
if task.Name == objectMeta.Name && task.Namespace == objectMeta.Namespace {
status.TrackedTasks = append(status.TrackedTasks[:index], status.TrackedTasks[index+1:]...)
}
}
}

func (dc *CassandraDatacenter) GetConditionStatus(conditionType DatacenterConditionType) corev1.ConditionStatus {
return (&dc.Status).GetConditionStatus(conditionType)
}
Expand Down
60 changes: 59 additions & 1 deletion pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2274,10 +2274,27 @@ func (rc *ReconciliationContext) CheckCassandraNodeStatuses() result.ReconcileRe

func (rc *ReconciliationContext) cleanupAfterScaling() result.ReconcileResult {
if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.NoAutomatedCleanupAnnotation) {

if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
// Verify if the cleanup task has completed before moving on the with ScalingUp finished
task, err := rc.findActiveTask(taskapi.CommandCleanup)
if err != nil {
return result.Error(err)
}

if task != nil {
return rc.activeTaskCompleted(task)
}
}

// Create the cleanup task
if err := rc.createTask(taskapi.CommandCleanup); err != nil {
return result.Error(err)
}

if metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
return result.RequeueSoon(10)
}
}

return result.Continue()
Expand Down Expand Up @@ -2319,7 +2336,48 @@ func (rc *ReconciliationContext) createTask(command taskapi.CassandraCommand) er
return err
}

return nil
if !metav1.HasAnnotation(rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation) {
return nil
}

dcPatch := client.MergeFrom(dc.DeepCopy())

rc.Datacenter.Status.AddTaskToTrack(task.ObjectMeta)

return rc.Client.Status().Patch(rc.Ctx, dc, dcPatch)
}

func (rc *ReconciliationContext) activeTaskCompleted(task *taskapi.CassandraTask) result.ReconcileResult {
if task.Status.CompletionTime != nil {
// Job was completed, remove it from followed task
dc := rc.Datacenter
dcPatch := client.MergeFrom(dc.DeepCopy())
rc.Datacenter.Status.RemoveTrackedTask(task.ObjectMeta)
if err := rc.Client.Status().Patch(rc.Ctx, dc, dcPatch); err != nil {
return result.Error(err)
}
return result.Continue()
}
return result.RequeueSoon(10)
}

func (rc *ReconciliationContext) findActiveTask(command taskapi.CassandraCommand) (*taskapi.CassandraTask, error) {
if len(rc.Datacenter.Status.TrackedTasks) > 0 {
for _, taskMeta := range rc.Datacenter.Status.TrackedTasks {
taskKey := types.NamespacedName{Name: taskMeta.Name, Namespace: taskMeta.Namespace}
task := &taskapi.CassandraTask{}
if err := rc.Client.Get(rc.Ctx, taskKey, task); err != nil {
return nil, err
}

for _, job := range task.Spec.Jobs {
if job.Command == command {
return task, nil
}
}
}
}
return nil, nil
}

func (rc *ReconciliationContext) CheckClearActionConditions() result.ReconcileResult {
Expand Down
78 changes: 78 additions & 0 deletions pkg/reconciliation/reconcile_racks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,49 @@ func TestCleanupAfterScaling(t *testing.T) {
r := rc.cleanupAfterScaling()
assert.Equal(result.Continue(), r, "expected result of result.Continue()")
assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command)
assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks))
}

func TestCleanupAfterScalingWithTracker(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
assert := assert.New(t)

// Setup annotation

mockClient := mocks.NewClient(t)
rc.Client = mockClient

metav1.SetMetaDataAnnotation(&rc.Datacenter.ObjectMeta, api.TrackCleanupTasksAnnotation, "true")

var task *taskapi.CassandraTask
// 1. Create task - return ok
k8sMockClientCreate(rc.Client.(*mocks.Client), nil).
Run(func(args mock.Arguments) {
arg := args.Get(1).(*taskapi.CassandraTask)
task = arg
}).
Times(1)

k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once()

r := rc.cleanupAfterScaling()
assert.Equal(taskapi.CommandCleanup, task.Spec.Jobs[0].Command)
assert.Equal(result.RequeueSoon(10), r, "expected result of result.RequeueSoon(10)")
assert.Equal(1, len(rc.Datacenter.Status.TrackedTasks))
// 3. GET - return completed task
k8sMockClientGet(rc.Client.(*mocks.Client), nil).
Run(func(args mock.Arguments) {
arg := args.Get(2).(*taskapi.CassandraTask)
task.DeepCopyInto(arg)
timeNow := metav1.Now()
arg.Status.CompletionTime = &timeNow
}).Once()
// 4. Patch to datacenter status
k8sMockClientStatusPatch(mockClient.Status().(*mocks.SubResourceClient), nil).Once()
r = rc.cleanupAfterScaling()
assert.Equal(result.Continue(), r, "expected result of result.Continue()")
assert.Equal(0, len(rc.Datacenter.Status.TrackedTasks))
}

func TestStripPassword(t *testing.T) {
Expand Down Expand Up @@ -2874,3 +2917,38 @@ func TestDatacenterPodsOldLabels(t *testing.T) {
// We should still find the pods
assert.Equal(int(*desiredStatefulSet.Spec.Replicas), len(rc.datacenterPods()))
}

func TestCheckRackLabels(t *testing.T) {
rc, _, cleanupMockScr := setupTest()
defer cleanupMockScr()
require := require.New(t)
err := rc.CalculateRackInformation()
require.NoError(err)

desiredStatefulSet, err := newStatefulSetForCassandraDatacenter(
nil,
"default",
rc.Datacenter,
3)
require.NoErrorf(err, "error occurred creating statefulset")

desiredStatefulSet.Status.ReadyReplicas = *desiredStatefulSet.Spec.Replicas

trackObjects := []runtime.Object{
desiredStatefulSet,
rc.Datacenter,
}
rc.Client = fake.NewClientBuilder().WithStatusSubresource(rc.Datacenter).WithRuntimeObjects(trackObjects...).Build()

rc.statefulSets = []*appsv1.StatefulSet{desiredStatefulSet}

res := rc.CheckRackLabels()
require.Equal(result.Continue(), res, "Label updates should not cause errors")
require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))
desiredStatefulSet.Labels[api.RackLabel] = "r1"
require.NotSubset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))

res = rc.CheckRackLabels()
require.Equal(result.Continue(), res, "Label updates should not cause errors")
require.Subset(desiredStatefulSet.Labels, rc.Datacenter.GetRackLabels("default"))
}
2 changes: 1 addition & 1 deletion tests/testdata/default-two-rack-two-node-dc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ spec:
clusterName: cluster1
datacenterName: My_Super_Dc
serverType: cassandra
serverVersion: "4.0.10"
serverVersion: "4.1.7"
managementApiAuth:
insecure: {}
size: 2
Expand Down
Loading