Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AsyncBackup to create the backups #1049

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ ENVTEST ?= $(LOCALBIN)/setup-envtest
## Tool Versions
CERT_MANAGER_VERSION ?= v1.9.1
KUSTOMIZE_VERSION ?= v4.5.7
CONTROLLER_TOOLS_VERSION ?= v0.10.0
CONTROLLER_TOOLS_VERSION ?= v0.11.4

cert-manager: ## Install cert-manager to the cluster
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/$(CERT_MANAGER_VERSION)/cert-manager.yaml
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/config.k8ssandra.io_clientconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: clientconfigs.config.k8ssandra.io
spec:
group: config.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/control.k8ssandra.io_k8ssandratasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: k8ssandratasks.control.k8ssandra.io
spec:
group: control.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/k8ssandra.io_k8ssandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: k8ssandraclusters.k8ssandra.io
spec:
group: k8ssandra.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: medusabackupjobs.medusa.k8ssandra.io
spec:
group: medusa.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/medusa.k8ssandra.io_medusabackups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: medusabackups.medusa.k8ssandra.io
spec:
group: medusa.k8ssandra.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: medusabackupschedules.medusa.k8ssandra.io
spec:
group: medusa.k8ssandra.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: medusarestorejobs.medusa.k8ssandra.io
spec:
group: medusa.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/medusa.k8ssandra.io_medusatasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: medusatasks.medusa.k8ssandra.io
spec:
group: medusa.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/reaper.k8ssandra.io_reapers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: reapers.reaper.k8ssandra.io
spec:
group: reaper.k8ssandra.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: replicatedsecrets.replication.k8ssandra.io
spec:
group: replication.k8ssandra.io
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/stargate.k8ssandra.io_stargates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.10.0
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: stargates.stargate.k8ssandra.io
spec:
group: stargate.k8ssandra.io
Expand Down
1 change: 0 additions & 1 deletion config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
creationTimestamp: null
name: k8ssandra-operator
namespace: k8ssandra
rules:
Expand Down
2 changes: 0 additions & 2 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
creationTimestamp: null
name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
Expand All @@ -29,7 +28,6 @@ webhooks:
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
creationTimestamp: null
name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
Expand Down
124 changes: 77 additions & 47 deletions controllers/medusa/medusabackupjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"net"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -40,7 +39,6 @@ import (
"github.com/k8ssandra/k8ssandra-operator/pkg/config"
"github.com/k8ssandra/k8ssandra-operator/pkg/medusa"
"github.com/k8ssandra/k8ssandra-operator/pkg/shared"
"github.com/k8ssandra/k8ssandra-operator/pkg/utils"
)

// MedusaBackupJobReconciler reconciles a MedusaBackupJob object
Expand Down Expand Up @@ -105,10 +103,47 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{RequeueAfter: r.DefaultDelay}, err
}

// If there is anything in progress, simply requeue the request
// If there is anything in progress, simply requeue the request until each pod has finished or errored
if len(backup.Status.InProgress) > 0 {
logger.Info("MedusaBackupJob is being processed already", "Backup", req.NamespacedName)
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
logger.Info("There are backups in progress, checking them..")
progress := make([]string, 0, len(backup.Status.InProgress))
patch := client.MergeFrom(backup.DeepCopy())

StatusCheck:
for _, podName := range backup.Status.InProgress {
for _, pod := range pods {
if podName == pod.Name {
status, err := backupStatus(ctx, backup.ObjectMeta.Name, &pod, r.ClientFactory, logger)
if err != nil {
return ctrl.Result{}, err
}

if status == medusa.StatusType_IN_PROGRESS {
progress = append(progress, podName)
} else if status == medusa.StatusType_SUCCESS {
backup.Status.Finished = append(backup.Status.Finished, podName)
} else if status == medusa.StatusType_FAILED || status == medusa.StatusType_UNKNOWN {
backup.Status.Failed = append(backup.Status.Failed, podName)
}

continue StatusCheck
}
}
}

if len(backup.Status.InProgress) != len(progress) {
backup.Status.InProgress = progress
if err := r.Status().Patch(ctx, backup, patch); err != nil {
logger.Error(err, "failed to patch status")
return ctrl.Result{}, err
}
}

if len(progress) > 0 {
logger.Info("MedusaBackupJob is still being processed", "Backup", req.NamespacedName)
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}
return ctrl.Result{Requeue: true}, nil
}

// If the backup is already finished, there is nothing to do.
Expand All @@ -121,7 +156,7 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
if !backup.Status.StartTime.IsZero() {
// If there is anything in progress, simply requeue the request
if len(backup.Status.InProgress) > 0 {
logger.Info("Backups already in progress")
logger.Info("Backup is still in progress")
return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}

Expand Down Expand Up @@ -158,9 +193,6 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
patch := client.MergeFromWithOptions(backup.DeepCopy(), client.MergeFromWithOptimisticLock{})

backup.Status.StartTime = metav1.Now()
for _, pod := range pods {
backup.Status.InProgress = append(backup.Status.InProgress, pod.Name)
}

if err := r.Status().Patch(ctx, backup, patch); err != nil {
logger.Error(err, "Failed to patch status")
Expand All @@ -169,43 +201,21 @@ func (r *MedusaBackupJobReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

logger.Info("Starting backups")
// Do the actual backup in the background
go func() {
wg := sync.WaitGroup{}

// Mutex to prevent concurrent updates to the backup.Status object
backupMutex := sync.Mutex{}
patch := client.MergeFrom(backup.DeepCopy())
patch = client.MergeFrom(backup.DeepCopy())

for _, p := range pods {
pod := p
wg.Add(1)
go func() {
logger.Info("starting backup", "CassandraPod", pod.Name)
succeeded := false
if err := doMedusaBackup(ctx, backup.ObjectMeta.Name, backup.Spec.Type, &pod, r.ClientFactory, logger); err == nil {
logger.Info("finished backup", "CassandraPod", pod.Name)
succeeded = true
} else {
logger.Error(err, "backup failed", "CassandraPod", pod.Name)
}
backupMutex.Lock()
defer backupMutex.Unlock()
defer wg.Done()
backup.Status.InProgress = utils.RemoveValue(backup.Status.InProgress, pod.Name)
if succeeded {
backup.Status.Finished = append(backup.Status.Finished, pod.Name)
} else {
backup.Status.Failed = append(backup.Status.Failed, pod.Name)
}
}()
}
wg.Wait()
logger.Info("finished backup operations")
if err := r.Status().Patch(context.Background(), backup, patch); err != nil {
logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backup.Name, backup.Namespace))
for _, p := range pods {
logger.Info("starting backup", "CassandraPod", p.Name)
_, err := doMedusaBackup(ctx, backup.ObjectMeta.Name, backup.Spec.Type, &p, r.ClientFactory, logger)
if err != nil {
logger.Error(err, "backup failed", "CassandraPod", p.Name)
}
}()

backup.Status.InProgress = append(backup.Status.InProgress, p.Name)
}
// logger.Info("finished backup operations")
if err := r.Status().Patch(context.Background(), backup, patch); err != nil {
logger.Error(err, "failed to patch status", "Backup", fmt.Sprintf("%s/%s", backup.Name, backup.Namespace))
}

return ctrl.Result{RequeueAfter: r.DefaultDelay}, nil
}
Expand Down Expand Up @@ -249,15 +259,35 @@ func (r *MedusaBackupJobReconciler) createMedusaBackup(ctx context.Context, back
return nil
}

func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupType, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) error {
func doMedusaBackup(ctx context.Context, name string, backupType shared.BackupType, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (string, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(addr); err != nil {
return err
return "", err
} else {
logger.Info("successfully connected to backup sidecar", "Pod", pod.Name, "Address", addr)
defer medusaClient.Close()
return medusaClient.CreateBackup(ctx, name, string(backupType))
resp, err := medusaClient.CreateBackup(ctx, name, string(backupType))
if err != nil {
return "", err
}

return resp.BackupName, nil
}
}

func backupStatus(ctx context.Context, name string, pod *corev1.Pod, clientFactory medusa.ClientFactory, logger logr.Logger) (medusa.StatusType, error) {
addr := net.JoinHostPort(pod.Status.PodIP, fmt.Sprint(shared.BackupSidecarPort))
logger.Info("connecting to backup sidecar", "Pod", pod.Name, "Address", addr)
if medusaClient, err := clientFactory.NewClient(addr); err != nil {
return medusa.StatusType_UNKNOWN, err
} else {
resp, err := medusaClient.BackupStatus(ctx, name)
if err != nil {
return medusa.StatusType_UNKNOWN, err
}

return resp.Status, nil
}
}

Expand Down
8 changes: 5 additions & 3 deletions controllers/medusa/medusabackupjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ func (c *fakeMedusaClient) Close() error {
return nil
}

func (c *fakeMedusaClient) CreateBackup(ctx context.Context, name string, backupType string) error {
func (c *fakeMedusaClient) CreateBackup(ctx context.Context, name string, backupType string) (*medusa.BackupResponse, error) {
c.RequestedBackups = append(c.RequestedBackups, name)
return nil
return &medusa.BackupResponse{BackupName: name, Status: medusa.StatusType_IN_PROGRESS}, nil
}

func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSummary, error) {
Expand All @@ -350,7 +350,9 @@ func (c *fakeMedusaClient) GetBackups(ctx context.Context) ([]*medusa.BackupSumm
}

func (c *fakeMedusaClient) BackupStatus(ctx context.Context, name string) (*medusa.BackupStatusResponse, error) {
return nil, nil
return &medusa.BackupStatusResponse{
Status: medusa.StatusType_SUCCESS,
}, nil
}

func (c *fakeMedusaClient) PurgeBackups(ctx context.Context) (*medusa.PurgeBackupsResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions controllers/medusa/medusarestorejob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ func (c *fakeMedusaRestoreClient) Close() error {
return nil
}

func (c *fakeMedusaRestoreClient) CreateBackup(ctx context.Context, name string, backupType string) error {
return nil
func (c *fakeMedusaRestoreClient) CreateBackup(ctx context.Context, name string, backupType string) (*medusa.BackupResponse, error) {
return nil, nil
}

func (c *fakeMedusaRestoreClient) GetBackups(ctx context.Context) ([]*medusa.BackupSummary, error) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/medusa/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *DefaultFactory) NewClient(address string) (Client, error) {
type Client interface {
Close() error

CreateBackup(ctx context.Context, name string, backupType string) error
CreateBackup(ctx context.Context, name string, backupType string) (*BackupResponse, error)

GetBackups(ctx context.Context) ([]*BackupSummary, error)

Expand All @@ -48,7 +48,7 @@ func (c *defaultClient) Close() error {
return c.connection.Close()
}

func (c *defaultClient) CreateBackup(ctx context.Context, name string, backupType string) error {
func (c *defaultClient) CreateBackup(ctx context.Context, name string, backupType string) (*BackupResponse, error) {
backupMode := BackupRequest_DIFFERENTIAL
if backupType == "full" {
backupMode = BackupRequest_FULL
Expand All @@ -58,9 +58,13 @@ func (c *defaultClient) CreateBackup(ctx context.Context, name string, backupTyp
Name: name,
Mode: backupMode,
}
_, err := c.grpcClient.Backup(ctx, &request)

return err
resp, err := c.grpcClient.AsyncBackup(ctx, &request)
if err != nil {
return nil, err
}

return resp, err
}

func (c *defaultClient) GetBackups(ctx context.Context) ([]*BackupSummary, error) {
Expand Down
Loading