Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pv cannot be restored correctly with one click #187

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ module github.com/openebs/velero-plugin
go 1.13

require (
cloud.google.com/go v0.58.0 // indirect
cloud.google.com/go/storage v1.9.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
github.com/Azure/azure-storage-blob-go v0.8.0 // indirect
github.com/aws/aws-sdk-go v1.35.24
Expand All @@ -16,6 +14,7 @@ require (
github.com/onsi/ginkgo v1.15.2
github.com/onsi/gomega v1.10.2
github.com/openebs/api/v2 v2.3.0
github.com/openebs/cstor-csi v1.12.0-RC1.0.20220712095109-ed7121554bd2
github.com/openebs/maya v1.12.1-0.20210416090832-ad9c32f086d5
github.com/openebs/zfs-localpv v1.6.1-0.20210504173514-62b3a0b7fe5d
github.com/pkg/errors v0.9.1
Expand All @@ -40,6 +39,8 @@ replace (
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.20.2
k8s.io/code-generator => k8s.io/code-generator v0.20.2
k8s.io/component-base => k8s.io/component-base v0.20.2
k8s.io/component-helpers => k8s.io/component-helpers v0.20.0
k8s.io/controller-manager => k8s.io/controller-manager v0.20.0
k8s.io/cri-api => k8s.io/cri-api v0.20.2
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.20.2
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.20.2
Expand All @@ -50,6 +51,7 @@ replace (
k8s.io/kubelet => k8s.io/kubelet v0.20.2
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.20.2
k8s.io/metrics => k8s.io/metrics v0.20.2
k8s.io/mount-utils => k8s.io/mount-utils v0.20.0
k8s.io/node-api => k8s.io/node-api v0.20.2
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.20.2
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.20.2
Expand Down
147 changes: 99 additions & 48 deletions go.sum

Large diffs are not rendered by default.

22 changes: 21 additions & 1 deletion pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"os"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,9 @@ type Plugin struct {
// namespace in which openebs is installed, default is openebs
namespace string

// nodeID is used to identify the node on which the program is running
nodeID string

// cl stores cloud connection information
cl *cloud.Conn

Expand Down Expand Up @@ -218,6 +222,13 @@ func (p *Plugin) Init(config map[string]string) error {
p.namespace = ns
}

nodeID := os.Getenv("VELERO_NODE_ID")
if nodeID == "" {
return errors.New("env VELERO_NODE_ID not set")
}
p.Log.Infof("env VELERO_NODE_ID: ", nodeID)
p.nodeID = nodeID

conf, err := rest.InClusterConfig()
if err != nil {
p.Log.Errorf("Failed to get cluster config : %s", err.Error())
Expand Down Expand Up @@ -446,11 +457,20 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin
}

if !p.local {
// If cloud snapshot is configured then we need to backup PVC also
// If cloud snapshot is configured then we need to backup PVC,PV, CVC also
p.Log.Infof("backup PVC, PV, CVC first")
err := p.backupPVC(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for PVC")
}
err = p.backupPV(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for PV")
}
err = p.backupCVC(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for CVC")
}
}

p.Log.Infof("creating snapshot{%s}", bkpname)
Expand Down
140 changes: 140 additions & 0 deletions pkg/cstor/cvc_operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2019 The OpenEBS Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cstor

import (
"context"
"encoding/json"
"fmt"

cstorv1 "github.com/openebs/api/v2/pkg/apis/cstor/v1"
maya "github.com/openebs/cstor-csi/pkg/utils"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// (Kasakaze)todo: Determine whether it is csiVolume, if so, cvc must be backed up
func (p *Plugin) backupCVC(volumeID string) error {
vol := p.volumes[volumeID]

bkpCvc, err := maya.GetVolume(volumeID)
if err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Cause(err)
}
p.Log.Warnf("failed to get cvc, skip. %v", err)
return nil
}

data, err := json.MarshalIndent(bkpCvc, "", "\t")
if err != nil {
return errors.New("error doing json parsing")
}

// pv backup file name
filename := p.cl.GenerateRemoteFilename(vol.volname, vol.backupName)
if filename == "" {
return errors.New("error creating remote file name for pvc backup")
}
if ok := p.cl.Write(data, filename+".cvc"); !ok {
return errors.New("failed to upload CVC")
}

return nil
}

// restoreCVC create CVC for given volume name
// (Kasakaze)todo: Determine whether it is csiVolume, if so, cvc must be restored
func (p *Plugin) restoreCVC(volumeID, pvcName, pvcNamespace, snapName string) error {
// verify if the volume has already been created
cvc, err := maya.GetVolume(volumeID)
if err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Cause(err)
}
}
if err == nil && cvc != nil && cvc.DeletionTimestamp == nil {
p.Log.Warn("cvc already exists, don't provision volume")
return nil
}

p.Log.Info("cvc does not exist, download and provision")
rcvc, err := p.downloadCVC(volumeID, snapName)
if err != nil {
p.Log.Warnf("failed to download cvc, skip. %v", err)
return nil
}

var (
size, _ = rcvc.Spec.Capacity.Storage().AsInt64()
rCount = fmt.Sprint(rcvc.Spec.Provision.ReplicaCount)
cspcName = rcvc.ObjectMeta.Labels["openebs.io/cstor-pool-cluster"]
snapshotID = ""
nodeID = rcvc.Publish.NodeID
policyName = rcvc.ObjectMeta.Labels["openebs.io/volume-policy"]
)
nodeID, err = p.getValidNodeID(nodeID)
if err != nil {
return errors.Cause(err)
}

err = maya.ProvisionVolume(size, volumeID, rCount,
cspcName, snapshotID,
nodeID, policyName, pvcName, pvcNamespace)
if err != nil {
return errors.Cause(err)
}

return nil
}

func (p *Plugin) downloadCVC(volumeID, snapName string) (*cstorv1.CStorVolumeConfig, error) {
cvc := &cstorv1.CStorVolumeConfig{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)
filename += ".cvc"
data, ok := p.cl.Read(filename)
if !ok {
return nil, errors.Errorf("failed to download CVC file=%s", filename)
}

if err := json.Unmarshal(data, cvc); err != nil {
return nil, errors.Errorf("failed to decode CVC file=%s", filename)
}

return cvc, nil
}

// If the backup cvc nodeID does not belong to the current cluster, use the environment variable VELERO_NODE_ID
func (p *Plugin) getValidNodeID(nodeID string) (string, error) {
if nodeID == "" {
return p.nodeID, nil
}

_, err := p.K8sClient.CoreV1().Nodes().Get(context.TODO(), nodeID, metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
return "", errors.Cause(err)
}

p.Log.Warnf("invalid nodeID(%s), use env VELERO_NODE_ID(%s)", nodeID, p.nodeID)
nodeID = p.nodeID
}

return nodeID, nil
}
1 change: 1 addition & 0 deletions pkg/cstor/cvr_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *Plugin) waitForAllCVRsToBeInValidStatus(vol *Volume, statuses []string)
return errors.Errorf("Failed to fetch replicaCount for volume{%s}", vol.volname)
}

p.Log.Infof("Waiting for all CVRs of PV(%s) to be ready, replicaCount=%d", vol.volname, replicaCount)
if vol.isCSIVolume {
return p.waitForCSIBasedCVRs(vol, replicaCount, statuses)
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/cstor/pv_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -70,6 +71,7 @@ func (p *Plugin) restoreVolumeFromCloud(vol *Volume, targetBackupName string) er
err error
)

p.Log.Info("Restoring volume data from cloud")
if p.restoreAllSnapshots {
// We are restoring from base backup to targeted Backup
snapshotList, err = p.cl.GetSnapListFromCloud(vol.snapshotTag, p.getScheduleName(targetBackupName))
Expand Down Expand Up @@ -240,3 +242,97 @@ func contains(s []string, target string) bool {

return false
}

// backupPV perform backup for given volume's PV
func (p *Plugin) backupPV(volumeID string) error {
vol := p.volumes[volumeID]

bkpPv, err := p.K8sClient.
CoreV1().
PersistentVolumes().
Get(context.TODO(), vol.volname, metav1.GetOptions{})
if err != nil {
p.Log.Errorf("Error fetching PV(%s): %s", vol.volname, err.Error())
return errors.New("failed to fetch PV")
}

data, err := json.MarshalIndent(bkpPv, "", "\t")
if err != nil {
return errors.New("error doing json parsing")
}

filename := p.cl.GenerateRemoteFilename(vol.volname, vol.backupName)
if filename == "" {
return errors.New("error creating remote file name for pvc backup")
}

if ok := p.cl.Write(data, filename+".pv"); !ok {
return errors.New("failed to upload PV")
}

return nil
}

// restorePV create PV for given volume name
func (p *Plugin) restorePV(volumeID, snapName string) error {
_, err := p.K8sClient.
CoreV1().
PersistentVolumes().
Get(context.TODO(), volumeID, metav1.GetOptions{})
if err == nil {
p.Log.Infof("PV=%s already exists, skip restore", volumeID)
return nil
}
if !k8serrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get PV=%s", volumeID)
}

pv, err := p.downloadPV(volumeID, snapName)
if err != nil {
return errors.Wrapf(err, "failed to download pv")
}

// Add annotation PVCreatedByKey, with value 'restore' to PV
pv.Annotations = make(map[string]string)
pv.Annotations[v1alpha1.PVCreatedByKey] = "restore"
pv.ManagedFields = nil
pv.Finalizers = nil
if pv.Spec.ClaimRef != nil {
pv.Spec.ClaimRef.ResourceVersion = ""
pv.Spec.ClaimRef.UID = ""
}
pv.CreationTimestamp = metav1.Time{}
pv.ResourceVersion = ""
pv.UID = ""
pv.Status = v1.PersistentVolumeStatus{}

_, err = p.K8sClient.
CoreV1().
PersistentVolumes().
Create(context.TODO(), pv, metav1.CreateOptions{})
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create PV=%s", pv.Name)
}
p.Log.Infof("PV=%s already exists, skip restore", pv.Name)
}

return nil
}

func (p *Plugin) downloadPV(volumeID, snapName string) (*v1.PersistentVolume, error) {
pv := &v1.PersistentVolume{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)

data, ok := p.cl.Read(filename + ".pv")
if !ok {
return nil, errors.Errorf("failed to download PV file=%s", filename+".pv")
}

if err := json.Unmarshal(data, pv); err != nil {
return nil, errors.Errorf("failed to decode pv file=%s", filename+".pv")
}

return pv, nil
}
Loading