Skip to content

Commit

Permalink
Enhance the MedusaBackupSchedule API to trigger purge tasks (#1357)
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski authored Jun 27, 2024
1 parent 3f9daf4 commit dc6a87a
Show file tree
Hide file tree
Showing 11 changed files with 486 additions and 178 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG/CHANGELOG-1.18.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec
When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries.

## unreleased

* [FEATURE] [#1310](https://github.com/k8ssandra/k8ssandra-operator/issues/1310) Enhance the MedusaBackupSchedule API to allow scheduling purge tasks
8 changes: 7 additions & 1 deletion apis/medusa/v1alpha1/medusaschedule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ type MedusaBackupScheduleSpec struct {
// The "Allow" property is only valid if all the other active Tasks have "Allow" as well.
// +optional
ConcurrencyPolicy batchv1.ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`

// Specifies the type of operation to be performed
// +kubebuilder:validation:Enum=backup;purge
// +kubebuilder:default=backup
// +optional
OperationType string `json:"operationType,omitempty"`
}

// MedusaBackupScheduleStatus defines the observed state of MedusaBackupSchedule
Expand All @@ -56,7 +62,7 @@ type MedusaBackupScheduleStatus struct {
// +kubebuilder:printcolumn:name="Datacenter",type=string,JSONPath=".spec.backupSpec.cassandraDatacenter",description="Datacenter which the task targets"
// +kubebuilder:printcolumn:name="ScheduledExecution",type="date",JSONPath=".status.nextSchedule",description="Next scheduled execution time"
// +kubebuilder:printcolumn:name="LastExecution",type="date",JSONPath=".status.lastExecution",description="Previous execution time"
// +kubebuilder:printcolumn:name="BackupType",type="string",JSONPath=".spec.backupSpec.backupType",description="Type of backup"
// +kubebuilder:printcolumn:name="OperationType",type="string",JSONPath=".spec.operationType",description="Type of scheduled operation"
// MedusaBackupSchedule is the Schema for the medusabackupschedules API
type MedusaBackupSchedule struct {
metav1.TypeMeta `json:",inline"`
Expand Down
4 changes: 4 additions & 0 deletions apis/medusa/v1alpha1/medusatask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ const (
//+kubebuilder:subresource:status

// MedusaTask is the Schema for the MedusaTasks API
// +kubebuilder:printcolumn:name="Datacenter",type=string,JSONPath=".spec.cassandraDatacenter",description="Datacenter which the task targets"
// +kubebuilder:printcolumn:name="Operation",type="string",JSONPath=".spec.operation",description="Type of operation"
// +kubebuilder:printcolumn:name="Start",type="date",JSONPath=".status.startTime",description="Start time"
// +kubebuilder:printcolumn:name="Finish",type="date",JSONPath=".status.finishTime",description="Finish time"
type MedusaTask struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Expand Down
32 changes: 28 additions & 4 deletions charts/k8ssandra-operator/crds/k8ssandra-operator-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31708,9 +31708,9 @@ spec:
jsonPath: .status.lastExecution
name: LastExecution
type: date
- description: Type of backup
jsonPath: .spec.backupSpec.backupType
name: BackupType
- description: Type of scheduled operation
jsonPath: .spec.operationType
name: OperationType
type: string
name: v1alpha1
schema:
Expand Down Expand Up @@ -31771,6 +31771,13 @@ spec:
disabled:
description: Disabled if set ensures this job is not scheduling anything
type: boolean
operationType:
default: backup
description: Specifies the type of operation to be performed
enum:
- backup
- purge
type: string
required:
- backupSpec
- cronSchedule
Expand Down Expand Up @@ -32184,7 +32191,24 @@ spec:
singular: medusatask
scope: Namespaced
versions:
- name: v1alpha1
- additionalPrinterColumns:
- description: Datacenter which the task targets
jsonPath: .spec.cassandraDatacenter
name: Datacenter
type: string
- description: Type of operation
jsonPath: .spec.operation
name: Operation
type: string
- description: Start time
jsonPath: .status.startTime
name: Start
type: date
- description: Finish time
jsonPath: .status.finishTime
name: Finish
type: date
name: v1alpha1
schema:
openAPIV3Schema:
description: MedusaTask is the Schema for the MedusaTasks API
Expand Down
13 changes: 10 additions & 3 deletions config/crd/bases/medusa.k8ssandra.io_medusabackupschedules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ spec:
jsonPath: .status.lastExecution
name: LastExecution
type: date
- description: Type of backup
jsonPath: .spec.backupSpec.backupType
name: BackupType
- description: Type of scheduled operation
jsonPath: .spec.operationType
name: OperationType
type: string
name: v1alpha1
schema:
Expand Down Expand Up @@ -90,6 +90,13 @@ spec:
disabled:
description: Disabled if set ensures this job is not scheduling anything
type: boolean
operationType:
default: backup
description: Specifies the type of operation to be performed
enum:
- backup
- purge
type: string
required:
- backupSpec
- cronSchedule
Expand Down
19 changes: 18 additions & 1 deletion config/crd/bases/medusa.k8ssandra.io_medusatasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,24 @@ spec:
singular: medusatask
scope: Namespaced
versions:
- name: v1alpha1
- additionalPrinterColumns:
- description: Datacenter which the task targets
jsonPath: .spec.cassandraDatacenter
name: Datacenter
type: string
- description: Type of operation
jsonPath: .spec.operation
name: Operation
type: string
- description: Start time
jsonPath: .status.startTime
name: Start
type: date
- description: Finish time
jsonPath: .status.finishTime
name: Finish
type: date
name: v1alpha1
schema:
openAPIV3Schema:
description: MedusaTask is the Schema for the MedusaTasks API
Expand Down
84 changes: 69 additions & 15 deletions controllers/medusa/medusabackupschedule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package medusa
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"time"

batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -80,6 +81,20 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, err
}

// Set an owner reference on the task so that it can be cleaned up when the cassandra datacenter is deleted
if backupSchedule.OwnerReferences == nil {
if err = controllerutil.SetControllerReference(dc, backupSchedule, r.Scheme); err != nil {
logger.Error(err, "failed to set controller reference", "CassandraDatacenter", dcKey)
return ctrl.Result{}, err
}
if err = r.Update(ctx, backupSchedule); err != nil {
logger.Error(err, "failed to update task with owner reference", "CassandraDatacenter", dcKey)
return ctrl.Result{}, err
} else {
logger.Info("updated task with owner reference", "CassandraDatacenter", dcKey)
}
}

defaults(backupSchedule)

previousExecution, err := getPreviousExecutionTime(ctx, backupSchedule)
Expand All @@ -93,22 +108,24 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
nextExecution := sched.Next(previousExecution).UTC()

createBackup := false
createPurge := false

if nextExecution.Before(now) {
if backupSchedule.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent {
if activeTasks, err := r.activeTasks(backupSchedule, dc); err != nil {
if activeTasks, err := r.activeTasks(backupSchedule, dc, backupSchedule.Spec.OperationType); err != nil {
logger.V(1).Info("failed to get activeTasks", "error", err)
return ctrl.Result{}, err
} else {
if len(activeTasks) > 0 {
logger.V(1).Info("Postponing backup schedule due to existing active backups", "MedusaBackupSchedule", req.NamespacedName)
if activeTasks > 0 {
logger.V(1).Info("Postponing backup schedule due to an unfinished existing job", "MedusaBackupSchedule", req.NamespacedName)
return ctrl.Result{RequeueAfter: 1 * time.Minute}, nil
}
}
}
nextExecution = sched.Next(now)
previousExecution = now
createBackup = true && !backupSchedule.Spec.Disabled
createBackup = true && !backupSchedule.Spec.Disabled && (backupSchedule.Spec.OperationType == "backup" || backupSchedule.Spec.OperationType == "")
createPurge = true && !backupSchedule.Spec.Disabled && (backupSchedule.Spec.OperationType == string(medusav1alpha1.OperationTypePurge))
}

// Update the status if there are modifications
Expand Down Expand Up @@ -140,6 +157,27 @@ func (r *MedusaBackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl
}
}

if createPurge {
logger.V(1).Info("Scheduled time has been reached, creating a backup purge job", "MedusaBackupSchedule", req.NamespacedName)
generatedName := fmt.Sprintf("%s-%d", backupSchedule.Name, now.Unix())
purgeJob := &medusav1alpha1.MedusaTask{
ObjectMeta: metav1.ObjectMeta{
Name: generatedName,
Namespace: backupSchedule.Namespace,
Labels: dc.GetDatacenterLabels(),
},
Spec: medusav1alpha1.MedusaTaskSpec{
CassandraDatacenter: backupSchedule.Spec.BackupSpec.CassandraDatacenter,
Operation: medusav1alpha1.OperationTypePurge,
},
}

if err := r.Client.Create(ctx, purgeJob); err != nil {
// We've already updated the Status times.. we'll miss this job now?
return ctrl.Result{}, err
}
}

nextRunTime := nextExecution.Sub(now)
logger.V(1).Info("Requeing for next scheduled event", "nextRuntime", nextRunTime.String())
return ctrl.Result{RequeueAfter: nextRunTime}, nil
Expand All @@ -162,19 +200,35 @@ func defaults(backupSchedule *medusav1alpha1.MedusaBackupSchedule) {
}
}

func (r *MedusaBackupScheduleReconciler) activeTasks(backupSchedule *medusav1alpha1.MedusaBackupSchedule, dc *cassdcapi.CassandraDatacenter) ([]medusav1alpha1.MedusaBackupJob, error) {
backupJobs := &medusav1alpha1.MedusaBackupJobList{}
if err := r.Client.List(context.Background(), backupJobs, client.InNamespace(backupSchedule.Namespace), client.MatchingLabels(dc.GetDatacenterLabels())); err != nil {
return nil, err
}
activeJobs := make([]medusav1alpha1.MedusaBackupJob, 0)
for _, job := range backupJobs.Items {
if job.Status.FinishTime.IsZero() {
activeJobs = append(activeJobs, job)
func (r *MedusaBackupScheduleReconciler) activeTasks(backupSchedule *medusav1alpha1.MedusaBackupSchedule, dc *cassdcapi.CassandraDatacenter, operationType string) (int, error) {
if operationType == "" || operationType == "backup" {
backupJobs := &medusav1alpha1.MedusaBackupJobList{}
if err := r.Client.List(context.Background(), backupJobs, client.InNamespace(backupSchedule.Namespace), client.MatchingLabels(dc.GetDatacenterLabels())); err != nil {
return 0, err
}
activeJobs := make([]medusav1alpha1.MedusaBackupJob, 0)
for _, job := range backupJobs.Items {
if job.Status.FinishTime.IsZero() {
activeJobs = append(activeJobs, job)
}
}
}

return activeJobs, nil
return len(activeJobs), nil
} else if operationType == string(medusav1alpha1.OperationTypePurge) {
medusaTasks := &medusav1alpha1.MedusaTaskList{}
if err := r.Client.List(context.Background(), medusaTasks, client.InNamespace(backupSchedule.Namespace), client.MatchingLabels(dc.GetDatacenterLabels())); err != nil {
return 0, err
}
activeJobs := make([]medusav1alpha1.MedusaTask, 0)
for _, job := range medusaTasks.Items {
if job.Spec.Operation == medusav1alpha1.OperationTypePurge && job.Status.FinishTime.IsZero() {
activeJobs = append(activeJobs, job)
}
}

return len(activeJobs), nil
}
return 0, fmt.Errorf("unknown operation type %s", operationType)
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading

0 comments on commit dc6a87a

Please sign in to comment.