From 1a447f473ccd9c9c20f07ea4d4a129678dd73aba Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Fri, 9 Nov 2018 14:33:33 -0800 Subject: [PATCH 1/9] Copy kio/blockstorage/awsebs to kanister/pkg/blockstorage (#4376) * Copy kio/bolckstorage/aws to kanister --- glide.lock | 39 +- glide.yaml | 2 + pkg/blockstorage/awsebs/awsebs.go | 545 ++++++++++++++++++++++++++ pkg/blockstorage/awsebs/error.go | 28 ++ pkg/blockstorage/blockstorage.go | 25 ++ pkg/blockstorage/blockstorage_test.go | 255 ++++++++++++ pkg/blockstorage/const.go | 21 + pkg/blockstorage/getter/getter.go | 42 ++ pkg/blockstorage/helpers.go | 30 ++ pkg/blockstorage/models.go | 86 ++++ pkg/blockstorage/tags/tags.go | 76 ++++ 11 files changed, 1140 insertions(+), 9 deletions(-) create mode 100644 pkg/blockstorage/awsebs/awsebs.go create mode 100644 pkg/blockstorage/awsebs/error.go create mode 100644 pkg/blockstorage/blockstorage.go create mode 100644 pkg/blockstorage/blockstorage_test.go create mode 100644 pkg/blockstorage/const.go create mode 100644 pkg/blockstorage/getter/getter.go create mode 100644 pkg/blockstorage/helpers.go create mode 100644 pkg/blockstorage/models.go create mode 100644 pkg/blockstorage/tags/tags.go diff --git a/glide.lock b/glide.lock index 2c39034909..cb90eb0735 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 0683291a7097fc386e9d79872e95ae819523b93b199f6a88d91d0c8175df5308 -updated: 2018-08-22T15:50:21.370824639-07:00 +hash: 09eb11ac814f3773149d7aff5c898c2960f3c6936f3534791a23842988a8022c +updated: 2018-10-31T16:24:08.342234-07:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -9,6 +9,8 @@ imports: - internal - name: github.com/aokoli/goutils version: 3391d3790d23d03408670993e957e8f408993c34 +- name: github.com/asaskevich/govalidator + version: 593d64559f7600f29581a3ee42177f5dbded27a9 - name: github.com/aws/aws-sdk-go version: bc3f534c19ffdf835e524e11f0f825b3eaf541c3 subpackages: @@ -34,6 +36,7 @@ imports: - internal/sdkuri - internal/shareddefaults - private/protocol + - private/protocol/ec2query - private/protocol/eventstream - private/protocol/eventstream/eventstreamapi - private/protocol/query @@ -41,12 +44,13 @@ imports: - private/protocol/rest - private/protocol/restxml - private/protocol/xml/xmlutil + - service/ec2 - service/s3 - service/s3/s3iface - service/s3/s3manager - service/sts - name: github.com/Azure/azure-sdk-for-go - version: 767429fcb996dad413936d682c28301e6739bade + version: 100a309838ef6b0878b014e5ae541c83a7cbdfaa subpackages: - storage - version @@ -70,7 +74,11 @@ imports: - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-ini/ini - version: 32e4be5f41bb918afb6e37c07426e2ddbcb6647e + version: 9c8236e659b76e87bf02044d06fde8683008ff3e +- name: github.com/go-openapi/errors + version: d24ebc2075bad502fac3a8ae27aa6dd58e1952dc +- name: github.com/go-openapi/strfmt + version: 610b6cacdcde6852f4de68998bd20ce1dac85b22 - name: github.com/gogo/protobuf version: c0656edd0d9eab7c66d1eb0c568f9039345796f7 subpackages: @@ -119,19 +127,25 @@ imports: subpackages: - simplelru - name: github.com/huandu/xstrings - version: 2bf18b218c51864a87384c06996e40ff9dcff8e1 + version: 8bbcf2f9ccb55755e748b7644164cd4bdce94c1d - name: github.com/imdario/mergo version: 6633656539c1639d9d78127b7d47c622b5d7b6dc - name: github.com/jmespath/go-jmespath - version: 3433f3ea46d9f8019119e7dd41274e112a2359a9 + version: c2b33e8439af944379acbdd9c3a5fe0bc44bd8a5 - name: github.com/jpillora/backoff version: 8eab2debe79d12b7bd3d10653910df25fa9552ba - name: github.com/json-iterator/go version: f2b4162afba35581b6d4a50d3b8f34e33c144682 +- name: github.com/mailru/easyjson + version: 2f5df55504ebc322e4d52d34df6a1f5b503bf26d + subpackages: + - buffer + - jlexer + - jwriter - name: github.com/marstr/guid version: 8bdf7d1a087ccc975cf37dd6507da50698fd19ca - name: github.com/Masterminds/semver - version: 59c29afe1a994eacb71c833025ca7acf874bb1da + version: c84ddcca87bf5a941b138dde832a7e20b0159ad8 - name: github.com/Masterminds/sprig version: 6b2a58267f6a8b1dc8e2eb5519b984008fa85e8c - name: github.com/mitchellh/mapstructure @@ -149,7 +163,7 @@ imports: repo: https://github.com/kastenhq/operator-kit.git vcs: git - name: github.com/satori/go.uuid - version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 + version: b2ce2384e17bbe0c6d34077efa39dbab3e09123b - name: github.com/sirupsen/logrus version: 3e01752db0189b9157070a0e1668a620f9a85da2 - name: github.com/spf13/cobra @@ -181,7 +195,7 @@ imports: - jws - jwt - name: golang.org/x/sys - version: 95c6576299259db960f6c5b9b69ea52422860fce + version: 9b800f95dbbc54abff0acf7ee32d88ba4e328c89 subpackages: - unix - name: golang.org/x/text @@ -207,6 +221,11 @@ imports: version: 64131543e7896d5bcc6bd5a76287eb75ea96c673 - name: gopkg.in/inf.v0 version: 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4 +- name: gopkg.in/mgo.v2 + version: 9856a29383ce1c59f308dd1cf0363a79b5bef6b5 + subpackages: + - bson + - internal/json - name: gopkg.in/yaml.v2 version: 670d4cfef0544295bc27a114dbac37980d83185a - name: k8s.io/api @@ -284,6 +303,7 @@ imports: - pkg/util/json - pkg/util/mergepatch - pkg/util/net + - pkg/util/rand - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets @@ -302,6 +322,7 @@ imports: subpackages: - discovery - discovery/fake + - dynamic - kubernetes - kubernetes/fake - kubernetes/scheme diff --git a/glide.yaml b/glide.yaml index 779b5fc83e..ed1240b938 100644 --- a/glide.yaml +++ b/glide.yaml @@ -5,6 +5,8 @@ import: repo: https://github.com/GoogleCloudPlatform/google-cloud-go.git - package: github.com/aws/aws-sdk-go version: v1.14.31 +- package: github.com/go-openapi/strfmt + version: 610b6cacdcde6852f4de68998bd20ce1dac85b22 - package: github.com/graymeta/stow vcs: git version: master diff --git a/pkg/blockstorage/awsebs/awsebs.go b/pkg/blockstorage/awsebs/awsebs.go new file mode 100644 index 0000000000..495a60e60a --- /dev/null +++ b/pkg/blockstorage/awsebs/awsebs.go @@ -0,0 +1,545 @@ +package awsebs + +// AWS EBS Volume storage + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/jpillora/backoff" + "github.com/kanisterio/kanister/pkg/poll" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/kanisterio/kanister/pkg/blockstorage" + ktags "github.com/kanisterio/kanister/pkg/blockstorage/tags" +) + +var _ blockstorage.Provider = (*ebsStorage)(nil) + +type ebsStorage struct { + ec2Cli *EC2 +} + +// EC2 is kasten's wrapper around ec2.EC2 structs +type EC2 struct { + *ec2.EC2 + DryRun bool +} + +const ( + maxRetries = 10 + // ConfigRegion represents region key required in the map "config" + ConfigRegion = "region" + // AccessKeyID represents AWS Access key ID + AccessKeyID = "AWS_ACCESS_KEY_ID" + // SecretAccessKey represents AWS Secret Access Key + SecretAccessKey = "AWS_SECRET_ACCESS_KEY" +) + +func (s *ebsStorage) Type() blockstorage.Type { + return blockstorage.TypeEBS +} + +// NewProvider returns a provider for the EBS storage type in the specified region +func NewProvider(config map[string]string) (blockstorage.Provider, error) { + awsConfig, region, err := getConfig(config) + if err != nil { + return nil, err + } + ec2Cli, err := newEC2Client(region, awsConfig) + if err != nil { + return nil, errors.Wrapf(err, "Could not get EC2 client") + } + return &ebsStorage{ec2Cli: ec2Cli}, nil +} + +func getConfig(config map[string]string) (*aws.Config, string, error) { + region, ok := config[ConfigRegion] + if !ok { + return nil, "", errors.New("region required for storage type EBS") + } + accessKey, ok := config[AccessKeyID] + if !ok { + return nil, "", errors.New("AWS_ACCESS_KEY_ID required for storage type EBS") + } + secretAccessKey, ok := config[SecretAccessKey] + if !ok { + return nil, "", errors.New("AWS_SECRET_ACCESS_KEY required for storage type EBS") + } + return &aws.Config{Credentials: credentials.NewStaticCredentials(accessKey, secretAccessKey, "")}, region, nil +} + +// newEC2Client returns ec2 client struct. +func newEC2Client(awsRegion string, config *aws.Config) (*EC2, error) { + httpClient := &http.Client{Transport: http.DefaultTransport} + s, err := session.NewSession(config) + if err != nil { + return nil, err + } + return &EC2{EC2: ec2.New(s, &aws.Config{MaxRetries: aws.Int(maxRetries), + Region: aws.String(awsRegion), HTTPClient: httpClient})}, nil +} + +func (s *ebsStorage) VolumeCreate(ctx context.Context, volume blockstorage.Volume) (*blockstorage.Volume, error) { + cvi := &ec2.CreateVolumeInput{ + AvailabilityZone: aws.String(volume.Az), + VolumeType: aws.String(string(volume.VolumeType)), + Encrypted: aws.Bool(volume.Encrypted), + Size: aws.Int64(volume.Size), + } + // io1 type *requires* IOPS. Others *cannot* specify them. + if volume.VolumeType == ec2.VolumeTypeIo1 { + cvi.Iops = aws.Int64(volume.Iops) + } + + tags := make(map[string]string, len(volume.Tags)) + for _, tag := range volume.Tags { + tags[tag.Key] = tag.Value + } + + volID, err := createVolume(ctx, s.ec2Cli, cvi, ktags.GetTags(tags)) + if err != nil { + return nil, err + } + + return s.VolumeGet(ctx, volID, volume.Az) +} + +func (s *ebsStorage) VolumeGet(ctx context.Context, id string, zone string) (*blockstorage.Volume, error) { + volIDs := []*string{aws.String(id)} + dvi := &ec2.DescribeVolumesInput{VolumeIds: volIDs} + dvo, err := s.ec2Cli.DescribeVolumesWithContext(ctx, dvi) + if err != nil { + log.Errorf("Failed to get volumes %v Error: %+v", aws.StringValueSlice(volIDs), err) + return nil, err + } + if len(dvo.Volumes) != len(volIDs) { + return nil, errors.New("Object not found") + } + vols := dvo.Volumes + if len(vols) == 0 { + return nil, errors.New("Volume with volume_id: " + id + " not found") + } + if len(vols) > 1 { + return nil, errors.Errorf("Found an unexpected number of volumes: volume_id=%s result_count=%d", id, len(vols)) + } + vol := vols[0] + mv := s.volumeParse(ctx, vol) + return mv, nil +} + +func (s *ebsStorage) volumeParse(ctx context.Context, volume interface{}) *blockstorage.Volume { + vol := volume.(*ec2.Volume) + tags := []*blockstorage.KeyValue(nil) + for _, tag := range vol.Tags { + tags = append(tags, &blockstorage.KeyValue{Key: aws.StringValue(tag.Key), Value: aws.StringValue(tag.Value)}) + } + return &blockstorage.Volume{ + Type: s.Type(), + ID: aws.StringValue(vol.VolumeId), + Az: aws.StringValue(vol.AvailabilityZone), + Encrypted: aws.BoolValue(vol.Encrypted), + VolumeType: aws.StringValue(vol.VolumeType), + Size: aws.Int64Value(vol.Size), + Tags: tags, + Iops: aws.Int64Value(vol.Iops), + CreationTime: blockstorage.TimeStamp(aws.TimeValue(vol.CreateTime)), + } +} + +func (s *ebsStorage) VolumesList(ctx context.Context, tags map[string]string) ([]*blockstorage.Volume, error) { + var vols []*blockstorage.Volume + var fltrs []*ec2.Filter + dvi := &ec2.DescribeVolumesInput{} + for k, v := range tags { + fltr := ec2.Filter{Name: &k, Values: []*string{&v}} + fltrs = append(fltrs, &fltr) + } + + dvi.SetFilters(fltrs) + dvo, err := s.ec2Cli.DescribeVolumesWithContext(ctx, dvi) + if err != nil { + return nil, err + } + for _, v := range dvo.Volumes { + vols = append(vols, s.volumeParse(ctx, v)) + } + return vols, nil +} + +func (s *ebsStorage) snapshotParse(ctx context.Context, snap *ec2.Snapshot) *blockstorage.Snapshot { + tags := []*blockstorage.KeyValue(nil) + for _, tag := range snap.Tags { + tags = append(tags, &blockstorage.KeyValue{Key: *tag.Key, Value: *tag.Value}) + } + vol := &blockstorage.Volume{ + Type: s.Type(), + ID: aws.StringValue(snap.VolumeId), + } + // TODO: fix getting region from zone + return &blockstorage.Snapshot{ + ID: aws.StringValue(snap.SnapshotId), + Tags: tags, + Type: s.Type(), + Encrypted: aws.BoolValue(snap.Encrypted), + Size: aws.Int64Value(snap.VolumeSize), + Region: aws.StringValue(s.ec2Cli.Config.Region), + Volume: vol, + CreationTime: blockstorage.TimeStamp(aws.TimeValue(snap.StartTime)), + } +} + +func (s *ebsStorage) SnapshotsList(ctx context.Context, tags map[string]string) ([]*blockstorage.Snapshot, error) { + var snaps []*blockstorage.Snapshot + var fltrs []*ec2.Filter + dsi := &ec2.DescribeSnapshotsInput{} + for k, v := range tags { + fltr := ec2.Filter{Name: &k, Values: []*string{&v}} + fltrs = append(fltrs, &fltr) + } + + dsi.SetFilters(fltrs) + dso, err := s.ec2Cli.DescribeSnapshotsWithContext(ctx, dsi) + if err != nil { + return nil, err + } + for _, snap := range dso.Snapshots { + snaps = append(snaps, s.snapshotParse(ctx, snap)) + } + return snaps, nil +} + +// SnapshotCopy copies snapshot 'from' to 'to'. Follows aws restrictions regarding encryption; + +// i.e., copying unencrypted to encrypted snapshot is allowed but not vice versa. +func (s *ebsStorage) SnapshotCopy(ctx context.Context, from, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) { + if to.Region == "" { + return nil, errors.New("Destination snapshot AvailabilityZone must be specified") + } + if to.ID != "" { + return nil, errors.Errorf("Snapshot %v destination ID must be empty", to) + } + + // Copy operation must be initiated from the destination region. + ec2Cli, err := newEC2Client(to.Region, nil) + if err != nil { + return nil, errors.Wrapf(err, "Could not get EC2 client") + } + // Copy tags from source snap to dest. + tags := make(map[string]string, len(from.Tags)) + for _, tag := range from.Tags { + tags[tag.Key] = tag.Value + } + + csi := ec2.CopySnapshotInput{ + Description: aws.String("Copy of " + from.ID), + SourceSnapshotId: aws.String(from.ID), + SourceRegion: aws.String(from.Region), + DestinationRegion: ec2Cli.Config.Region, + } + cso, err := ec2Cli.CopySnapshotWithContext(ctx, &csi) + if err != nil { + return nil, errors.Wrapf(err, "Failed to copy snapshot %v", csi) + } + snapID := aws.StringValue(cso.SnapshotId) + if err = setResourceTags(ctx, ec2Cli, snapID, ktags.GetTags(tags)); err != nil { + return nil, err + } + if err = waitOnSnapshotID(ctx, ec2Cli, snapID); err != nil { + return nil, errors.Wrapf(err, "Snapshot %s did not complete", snapID) + } + snaps, err := getSnapshots(ctx, ec2Cli, []*string{aws.String(snapID)}) + if err != nil { + return nil, err + } + + // aws: Snapshots created by the CopySnapshot action have an arbitrary volume ID + // that should not be used for any purpose. + rs := s.snapshotParse(ctx, snaps[0]) + *rs.Volume = *from.Volume + rs.Region = to.Region + rs.Size = from.Size + return rs, nil +} + +func (s *ebsStorage) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) { + // Snapshot the EBS volume + csi := (&ec2.CreateSnapshotInput{}).SetVolumeId(volume.ID) + var snapID string + alltags := ktags.GetTags(tags) + log.Infof("Snapshotting EBS volume: %s", *csi.VolumeId) + csi.SetDryRun(s.ec2Cli.DryRun) + snap, err := s.ec2Cli.CreateSnapshotWithContext(ctx, csi) + if isDryRunErr(err) { + snapID = "" + } else { + if err != nil { + return nil, errors.Wrapf(err, "Failed to create snapshot, volume_id: %s", *csi.VolumeId) + } + if err = setResourceTags(ctx, s.ec2Cli, aws.StringValue(snap.SnapshotId), alltags); err != nil { + return nil, err + } + err = waitOnSnapshot(ctx, s.ec2Cli, snap) + if err != nil { + return nil, errors.Wrapf(err, "Waiting on snapshot %v", snap) + } + snapID = aws.StringValue(snap.SnapshotId) + } + + snaps, err := getSnapshots(ctx, s.ec2Cli, []*string{&snapID}) + if err != nil { + return nil, err + } + + ebssnap := snaps[0] + region, err := availabilityZoneToRegion(ctx, s.ec2Cli, volume.Az) + if err != nil { + return nil, err + } + + ms := s.snapshotParse(ctx, ebssnap) + ms.Region = region + for _, tag := range ebssnap.Tags { + ms.Tags = append(ms.Tags, &blockstorage.KeyValue{Key: aws.StringValue(tag.Key), Value: aws.StringValue(tag.Value)}) + } + ms.Volume = &volume + return ms, nil +} + +func (s *ebsStorage) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error { + log.Infof("EBS Snapshot ID %s", snapshot.ID) + rmsi := &ec2.DeleteSnapshotInput{} + rmsi.SetSnapshotId(snapshot.ID) + rmsi.SetDryRun(s.ec2Cli.DryRun) + _, err := s.ec2Cli.DeleteSnapshotWithContext(ctx, rmsi) + if isSnapNotFoundErr(err) { + // If the snapshot is already deleted, we log, but don't return an error. + log.Debugf("Snapshot already deleted") + return nil + } + if err != nil && !isDryRunErr(err) { + return errors.Wrap(err, "Failed to delete snapshot") + } + return nil +} + +func (s *ebsStorage) SnapshotGet(ctx context.Context, id string) (*blockstorage.Snapshot, error) { + snaps, err := getSnapshots(ctx, s.ec2Cli, []*string{&id}) + if err != nil { + return nil, err + } + snap := snaps[0] + ms := s.snapshotParse(ctx, snap) + for _, tag := range snap.Tags { + ms.Tags = append(ms.Tags, &blockstorage.KeyValue{Key: aws.StringValue(tag.Key), Value: aws.StringValue(tag.Value)}) + } + + return ms, nil +} + +func (s *ebsStorage) VolumeDelete(ctx context.Context, volume *blockstorage.Volume) error { + rmvi := &ec2.DeleteVolumeInput{} + rmvi.SetVolumeId(volume.ID) + rmvi.SetDryRun(s.ec2Cli.DryRun) + _, err := s.ec2Cli.DeleteVolumeWithContext(ctx, rmvi) + if isVolNotFoundErr(err) { + // If the volume is already deleted, we log, but don't return an error. + log.Debugf("Volume already deleted") + return nil + } + if err != nil && !isDryRunErr(err) { + return errors.Wrapf(err, "Failed to delete volume volID: %s", volume.ID) + } + return nil +} + +func (s *ebsStorage) SetTags(ctx context.Context, resource interface{}, tags map[string]string) error { + switch res := resource.(type) { + case *blockstorage.Volume: + return setResourceTags(ctx, s.ec2Cli, res.ID, tags) + case *blockstorage.Snapshot: + return setResourceTags(ctx, s.ec2Cli, res.ID, tags) + default: + return errors.Wrapf(nil, "Unknown resource type: %v", res) + } +} + +// setResourceTags sets tags on the specified resource +func setResourceTags(ctx context.Context, ec2Cli *EC2, resourceID string, tags map[string]string) error { + cti := &ec2.CreateTagsInput{Resources: []*string{&resourceID}, Tags: mapToEC2Tags(tags)} + if _, err := ec2Cli.CreateTags(cti); err != nil { + return errors.Wrapf(err, "Failed to set tags, resource_id:%s", resourceID) + } + return nil +} + +func (s *ebsStorage) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) { + if snapshot.Volume == nil { + return nil, errors.New("Snapshot volume information not available") + } + if snapshot.Volume.VolumeType == "" || snapshot.Volume.Az == "" || snapshot.Volume.Tags == nil { + return nil, errors.Errorf("Required volume fields not available, volumeType: %s, Az: %s, VolumeTags: %v", snapshot.Volume.VolumeType, snapshot.Volume.Az, snapshot.Volume.Tags) + } + + cvi := &ec2.CreateVolumeInput{ + AvailabilityZone: aws.String(snapshot.Volume.Az), + SnapshotId: aws.String(snapshot.ID), + VolumeType: aws.String(string(snapshot.Volume.VolumeType)), + } + // io1 type *requires* IOPS. Others *cannot* specify them. + if snapshot.Volume.VolumeType == ec2.VolumeTypeIo1 { + cvi.Iops = aws.Int64(snapshot.Volume.Iops) + } + // Incorporate pre-existing tags. + for _, tag := range snapshot.Volume.Tags { + if _, found := tags[tag.Key]; !found { + tags[tag.Key] = tag.Value + } + } + + volID, err := createVolume(ctx, s.ec2Cli, cvi, ktags.GetTags(tags)) + if err != nil { + return nil, err + } + return s.VolumeGet(ctx, volID, snapshot.Volume.Az) +} + +// createVolume creates an EBS volume using the specified parameters +func createVolume(ctx context.Context, ec2Cli *EC2, cvi *ec2.CreateVolumeInput, tags map[string]string) (string, error) { + // Set tags + awsTags := mapToEC2Tags(tags) + ts := []*ec2.TagSpecification{&ec2.TagSpecification{ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: awsTags}} + cvi.SetTagSpecifications(ts) + cvi.SetDryRun(ec2Cli.DryRun) + vol, err := ec2Cli.CreateVolumeWithContext(ctx, cvi) + if isDryRunErr(err) { + return "", nil + } + if err != nil { + log.Errorf("Failed to create volume for %v Error: %+v", *cvi, err) + return "", err + } + + err = waitOnVolume(ctx, ec2Cli, vol) + if err != nil { + return "", err + } + return aws.StringValue(vol.VolumeId), nil +} + +// getSnapshots returns the snapshot metadata for the specified snapshot ids +func getSnapshots(ctx context.Context, ec2Cli *EC2, snapIDs []*string) ([]*ec2.Snapshot, error) { + dsi := &ec2.DescribeSnapshotsInput{SnapshotIds: snapIDs} + dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get snapshot, snapshot_ids: %p", snapIDs) + } + // TODO: handle paging and continuation + if len(dso.Snapshots) != len(snapIDs) { + log.Errorf("Did not find all requested snapshots, snapshots_requested: %p, snapshots_found: %p", snapIDs, dso.Snapshots) + // TODO: Move mapping to HTTP error to the caller + return nil, errors.New("Object not found") + } + return dso.Snapshots, nil +} + +// availabilityZoneToRegion converts from Az to Region +func availabilityZoneToRegion(ctx context.Context, awsCli *EC2, az string) (ar string, err error) { + azi := &ec2.DescribeAvailabilityZonesInput{ + ZoneNames: []*string{&az}, + } + + azo, err := awsCli.DescribeAvailabilityZonesWithContext(ctx, azi) + if err != nil { + return "", errors.Wrapf(err, "Could not determine region for availability zone (AZ) %s", az) + } + + if len(azo.AvailabilityZones) == 0 { + return "", errors.New("Region unavailable for availability zone" + az) + } + + return aws.StringValue(azo.AvailabilityZones[0].RegionName), nil +} + +func mapToEC2Tags(tags map[string]string) []*ec2.Tag { + // Set tags + awsTags := make([]*ec2.Tag, 0, len(tags)) + for k, v := range tags { + awsTags = append(awsTags, &ec2.Tag{Key: aws.String(k), Value: aws.String(v)}) + } + return awsTags +} + +// waitOnVolume waits for the volume to be created +func waitOnVolume(ctx context.Context, ec2Cli *EC2, vol *ec2.Volume) error { + volWaitBackoff := backoff.Backoff{ + Factor: 2, + Jitter: false, + Min: 10 * time.Millisecond, + Max: 10 * time.Second, + } + dvi := &ec2.DescribeVolumesInput{} + dvi = dvi.SetVolumeIds([]*string{vol.VolumeId}) + for { + dvo, err := ec2Cli.DescribeVolumesWithContext(ctx, dvi) + if err != nil { + log.Errorf("Failed to describe volume %s Error: %+v", aws.StringValue(vol.VolumeId), err) + return err + } + if len(dvo.Volumes) != 1 { + return errors.New("Object not found") + } + s := dvo.Volumes[0] + if *s.State == ec2.VolumeStateError { + return errors.New("Creating EBS volume failed") + } + if *s.State == ec2.VolumeStateAvailable { + log.Infof("Volume %s complete", *vol.VolumeId) + return nil + } + log.Infof("Volume %s state: %s", *vol.VolumeId, *s.State) + time.Sleep(volWaitBackoff.Duration()) + } +} + +// waitOnSnapshot waits for the snapshot to be created +func waitOnSnapshot(ctx context.Context, ec2Cli *EC2, snap *ec2.Snapshot) error { + return waitOnSnapshotID(ctx, ec2Cli, *snap.SnapshotId) +} + +func waitOnSnapshotID(ctx context.Context, ec2Cli *EC2, snapID string) error { + snapWaitBackoff := backoff.Backoff{ + Factor: 2, + Jitter: false, + Min: 1 * time.Second, + Max: 10 * time.Second, + } + dsi := &ec2.DescribeSnapshotsInput{} + dsi = dsi.SetSnapshotIds([]*string{&snapID}) + return poll.WaitWithBackoff(ctx, snapWaitBackoff, func(ctx context.Context) (bool, error) { + dso, err := ec2Cli.DescribeSnapshotsWithContext(ctx, dsi) + if err != nil { + return false, errors.Wrapf(err, "Failed to describe snapshot, snapshot_id: %s", snapID) + } + if len(dso.Snapshots) != 1 { + return false, errors.New("Object not found") + } + s := dso.Snapshots[0] + if *s.State == ec2.SnapshotStateError { + return false, errors.New("Snapshot EBS volume failed") + } + if *s.State == ec2.SnapshotStateCompleted { + log.Infof("Snapshot with snapshot_id: %s completed", snapID) + return true, nil + } + log.Debugf("Snapshot progress: snapshot_id: %s, progress: %s", snapID, fmt.Sprintf("%+v", *s.Progress)) + return false, nil + }) +} diff --git a/pkg/blockstorage/awsebs/error.go b/pkg/blockstorage/awsebs/error.go new file mode 100644 index 0000000000..cd9fc862cc --- /dev/null +++ b/pkg/blockstorage/awsebs/error.go @@ -0,0 +1,28 @@ +package awsebs + +import ( + "github.com/aws/aws-sdk-go/aws/awserr" +) + +const ( + dryRunOperationCode = "DryRunOperation" + snapshotNotFoundCode = "InvalidSnapshot.NotFound" + volumeNotFoundCode = "InvalidVolume.NotFound" +) + +func isDryRunErr(err error) bool { + return isError(err, dryRunOperationCode) +} + +func isSnapNotFoundErr(err error) bool { + return isError(err, snapshotNotFoundCode) +} + +func isError(err error, code string) bool { + awsErr, ok := err.(awserr.Error) + return ok && awsErr.Code() == code +} + +func isVolNotFoundErr(err error) bool { + return isError(err, volumeNotFoundCode) +} diff --git a/pkg/blockstorage/blockstorage.go b/pkg/blockstorage/blockstorage.go new file mode 100644 index 0000000000..422536f02e --- /dev/null +++ b/pkg/blockstorage/blockstorage.go @@ -0,0 +1,25 @@ +package blockstorage + +import ( + "context" +) + +// Provider abstracts actions on underlying storage +type Provider interface { + // Type returns the underlying storage type + Type() Type + // Volume operations + VolumeCreate(context.Context, Volume) (*Volume, error) + VolumeCreateFromSnapshot(ctx context.Context, snapshot Snapshot, tags map[string]string) (*Volume, error) + VolumeDelete(context.Context, *Volume) error + VolumeGet(ctx context.Context, id string, zone string) (*Volume, error) + // Snapshot operations + SnapshotCopy(ctx context.Context, from Snapshot, to Snapshot) (*Snapshot, error) + SnapshotCreate(ctx context.Context, volume Volume, tags map[string]string) (*Snapshot, error) + SnapshotDelete(context.Context, *Snapshot) error + SnapshotGet(ctx context.Context, id string) (*Snapshot, error) + // Others + SetTags(ctx context.Context, resource interface{}, tags map[string]string) error + VolumesList(ctx context.Context, tags map[string]string) ([]*Volume, error) + SnapshotsList(ctx context.Context, tags map[string]string) ([]*Snapshot, error) +} diff --git a/pkg/blockstorage/blockstorage_test.go b/pkg/blockstorage/blockstorage_test.go new file mode 100644 index 0000000000..a8bcaede27 --- /dev/null +++ b/pkg/blockstorage/blockstorage_test.go @@ -0,0 +1,255 @@ +package blockstorage_test + +import ( + "context" + "os" + "testing" + + log "github.com/sirupsen/logrus" + . "gopkg.in/check.v1" + + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" + ktags "github.com/kanisterio/kanister/pkg/blockstorage/tags" +) + +const ( + clusterRegionAWS = "us-west-2" + testTagKey = "kanister.io/testid" + testTagValue = "unittest" +) + +func Test(t *testing.T) { TestingT(t) } + +type BlockStorageProviderSuite struct { + storageType blockstorage.Type + storageRegion string + provider blockstorage.Provider + volumes []*blockstorage.Volume + snapshots []*blockstorage.Snapshot +} + +var _ = Suite(&BlockStorageProviderSuite{storageType: blockstorage.TypeEBS, storageRegion: clusterRegionAWS}) + +func (s *BlockStorageProviderSuite) SetUpSuite(c *C) { + config := make(map[string]string) + var err error + if s.storageType == blockstorage.TypeEBS { + config[awsebs.ConfigRegion] = s.storageRegion + accessKey := os.Getenv(awsebs.AccessKeyID) + c.Assert(len(accessKey) > 0, Equals, true) + secretAccessKey := os.Getenv(awsebs.SecretAccessKey) + c.Assert(len(secretAccessKey) > 0, Equals, true) + config[awsebs.AccessKeyID] = accessKey + config[awsebs.SecretAccessKey] = secretAccessKey + } + s.provider, err = getter.New().Get(s.storageType, config) + c.Assert(err, IsNil) +} + +func (s *BlockStorageProviderSuite) TearDownTest(c *C) { + for _, snapshot := range s.snapshots { + c.Assert(s.provider.SnapshotDelete(context.Background(), snapshot), IsNil) + } + s.snapshots = nil + + for _, volume := range s.volumes { + c.Assert(s.provider.VolumeDelete(context.Background(), volume), IsNil) + } + s.volumes = nil +} + +func (s *BlockStorageProviderSuite) TestCreateVolume(c *C) { + vol := s.createVolume(c) + // Check setting tags on the volume + tags := map[string]string{"testtag": "testtagvalue"} + err := s.provider.SetTags(context.Background(), vol, tags) + c.Assert(err, IsNil) + volUpdated, err := s.provider.VolumeGet(context.Background(), vol.ID, vol.Az) + c.Assert(err, IsNil) + // Check previously set tags still exist + s.checkTagsExist(c, blockstorage.KeyValueToMap(volUpdated.Tags), blockstorage.KeyValueToMap(vol.Tags)) + // Check new tags were set + s.checkTagsExist(c, blockstorage.KeyValueToMap(volUpdated.Tags), tags) + // Check std tags + s.checkStdTagsExist(c, blockstorage.KeyValueToMap(volUpdated.Tags)) + + // Test VolumesList + s.testVolumesList(c) + + err = s.provider.VolumeDelete(context.Background(), volUpdated) + c.Assert(err, IsNil) + // We ensure that multiple deletions are handled. + err = s.provider.VolumeDelete(context.Background(), volUpdated) + c.Assert(err, IsNil) + s.volumes = nil +} + +func (s *BlockStorageProviderSuite) TestCreateSnapshot(c *C) { + snapshot := s.createSnapshot(c) + // Check setting tags on the snapshot + tags := map[string]string{"testtag": "testtagvalue"} + err := s.provider.SetTags(context.Background(), snapshot, tags) + c.Assert(err, IsNil) + snap, err := s.provider.SnapshotGet(context.Background(), snapshot.ID) + c.Assert(err, IsNil) + // Check previously set tags still exist + s.checkTagsExist(c, blockstorage.KeyValueToMap(snap.Tags), blockstorage.KeyValueToMap(snapshot.Tags)) + // Check new tags were set + s.checkTagsExist(c, blockstorage.KeyValueToMap(snap.Tags), tags) + // Check std tags exist + s.checkStdTagsExist(c, blockstorage.KeyValueToMap(snap.Tags)) + + snapshotGet, err := s.provider.SnapshotGet(context.Background(), snapshot.ID) + c.Assert(err, IsNil) + c.Assert(snapshotGet.ID, Equals, snapshot.ID) + + // Also test creating a volume from this snapshot + tags = map[string]string{testTagKey: testTagValue, "kanister.io/testname": c.TestName()} + vol, err := s.provider.VolumeCreateFromSnapshot(context.Background(), *snapshot, tags) + c.Assert(err, IsNil) + s.volumes = append(s.volumes, vol) + for _, tag := range snapshot.Volume.Tags { + if _, found := tags[tag.Key]; !found { + tags[tag.Key] = tag.Value + } + } + // Check tags were merged + s.checkTagsExist(c, blockstorage.KeyValueToMap(vol.Tags), tags) + s.checkStdTagsExist(c, blockstorage.KeyValueToMap(vol.Tags)) + + err = s.provider.SnapshotDelete(context.Background(), snapshot) + c.Assert(err, IsNil) + // We ensure that multiple deletions are handled. + err = s.provider.SnapshotDelete(context.Background(), snapshot) + c.Assert(err, IsNil) + s.snapshots = nil +} + +func (s *BlockStorageProviderSuite) TestSnapshotCopy(c *C) { + c.Skip("Sometimes, snapcopy takes over 10 minutes. go test declares failure if tests are that slow.") + + srcSnapshot := s.createSnapshot(c) + dstSnapshot := &blockstorage.Snapshot{ + Type: srcSnapshot.Type, + Encrypted: false, + Size: srcSnapshot.Size, + Region: "us-east-1", + Volume: nil, + } + snap, err := s.provider.SnapshotCopy(context.TODO(), *srcSnapshot, *dstSnapshot) + c.Assert(err, IsNil) + + log.Infof("Copied snapshot %v to %v", srcSnapshot.ID, snap.ID) + + config := make(map[string]string) + if s.storageType == blockstorage.TypeEBS { + config[awsebs.ConfigRegion] = dstSnapshot.Region + accessKey := os.Getenv(awsebs.AccessKeyID) + c.Assert(len(accessKey) > 0, Equals, true) + secretAccessKey := os.Getenv(awsebs.SecretAccessKey) + c.Assert(len(secretAccessKey) > 0, Equals, true) + config[awsebs.AccessKeyID] = accessKey + config[awsebs.SecretAccessKey] = secretAccessKey + } + provider, err := getter.New().Get(s.storageType, config) + c.Assert(err, IsNil) + + snapDetails, err := provider.SnapshotGet(context.TODO(), snap.ID) + c.Assert(err, IsNil) + + c.Check(snapDetails.Region, Equals, dstSnapshot.Region) + c.Check(snapDetails.Size, Equals, srcSnapshot.Size) + + err = provider.SnapshotDelete(context.TODO(), snap) + c.Assert(err, IsNil) +} + +func (s *BlockStorageProviderSuite) testVolumesList(c *C) { + var tags map[string]string + if s.provider.Type() == blockstorage.TypeGPD { + tags = map[string]string{"name": "*"} + } else { + tags = map[string]string{"status": "available"} + } + vols, err := s.provider.VolumesList(context.Background(), tags) + c.Assert(err, IsNil) + c.Assert(vols, NotNil) + c.Assert(vols, FitsTypeOf, []*blockstorage.Volume{}) + c.Assert(vols, Not(HasLen), 0) + c.Assert(vols[0].Type, Equals, s.provider.Type()) +} + +func (s *BlockStorageProviderSuite) TestSnapshotsList(c *C) { + var tags map[string]string + testSnaphot := s.createSnapshot(c) + if s.provider.Type() != blockstorage.TypeEBS { + tags = map[string]string{"labels." + ktags.SanitizeValueForGCP(testTagKey): testTagValue} + } else { + tags = map[string]string{"tag-key": testTagKey, "tag-value": testTagValue} + } + snaps, err := s.provider.SnapshotsList(context.Background(), tags) + c.Assert(err, IsNil) + c.Assert(snaps, NotNil) + c.Assert(snaps, FitsTypeOf, []*blockstorage.Snapshot{}) + c.Assert(snaps, Not(HasLen), 0) + c.Assert(snaps[0].Type, Equals, s.provider.Type()) + s.provider.SnapshotDelete(context.Background(), testSnaphot) +} + +// Helpers +func (s *BlockStorageProviderSuite) createVolume(c *C) *blockstorage.Volume { + tags := []*blockstorage.KeyValue{ + {Key: testTagKey, Value: testTagValue}, + {Key: "kanister.io/testname", Value: c.TestName()}, + } + vol := blockstorage.Volume{ + Size: 1, + Tags: tags, + } + switch s.storageType { + case blockstorage.TypeGPD: + vol.Az = "us-west1-b" + case blockstorage.TypeEBS: + vol.Az = "us-west-2b" + case blockstorage.TypeAD: + vol.Az = "centralus" + } + + ret, err := s.provider.VolumeCreate(context.Background(), vol) + c.Assert(err, IsNil) + s.volumes = append(s.volumes, ret) + c.Assert(ret.Size, Equals, int64(1)) + s.checkTagsExist(c, blockstorage.KeyValueToMap(ret.Tags), blockstorage.KeyValueToMap(tags)) + s.checkStdTagsExist(c, blockstorage.KeyValueToMap(ret.Tags)) + return ret +} + +func (s *BlockStorageProviderSuite) createSnapshot(c *C) *blockstorage.Snapshot { + vol := s.createVolume(c) + tags := map[string]string{testTagKey: testTagValue, "kanister.io/testname": c.TestName()} + ret, err := s.provider.SnapshotCreate(context.Background(), *vol, tags) + c.Assert(err, IsNil) + s.snapshots = append(s.snapshots, ret) + s.checkTagsExist(c, blockstorage.KeyValueToMap(ret.Tags), tags) + return ret +} + +func (s *BlockStorageProviderSuite) checkTagsExist(c *C, actual map[string]string, expected map[string]string) { + if s.provider.Type() != blockstorage.TypeEBS { + expected = blockstorage.SanitizeTags(expected) + } + + for k, v := range expected { + c.Check(actual[k], Equals, v) + + } +} + +func (s *BlockStorageProviderSuite) checkStdTagsExist(c *C, actual map[string]string) { + stdTags := ktags.GetStdTags() + for k := range stdTags { + c.Check(actual[k], NotNil) + } +} diff --git a/pkg/blockstorage/const.go b/pkg/blockstorage/const.go new file mode 100644 index 0000000000..66d5f35ab8 --- /dev/null +++ b/pkg/blockstorage/const.go @@ -0,0 +1,21 @@ +package blockstorage + +// Type is the type of storage supported +type Type string + +const ( + // TypeAD captures enum value "AD" + TypeAD Type = "AD" + // TypeEBS captures enum value "EBS" + TypeEBS Type = "EBS" + // TypeGPD captures enum value "GPD" + TypeGPD Type = "GPD" + // TypeCinder captures enum value "Cinder" + TypeCinder Type = "Cinder" + // TypeGeneric captures enum value "Generic" + TypeGeneric Type = "Generic" + // TypeCeph captures enum value "Ceph" + TypeCeph Type = "Ceph" + // TypeSoftlayerBlock captures enum value "SoftlayerBlock" + TypeSoftlayerBlock Type = "SoftlayerBlock" +) diff --git a/pkg/blockstorage/getter/getter.go b/pkg/blockstorage/getter/getter.go new file mode 100644 index 0000000000..214fe31fd8 --- /dev/null +++ b/pkg/blockstorage/getter/getter.go @@ -0,0 +1,42 @@ +package getter + +import ( + "github.com/pkg/errors" + + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" +) + +// Getter is a resolver for a storage provider. +type Getter interface { + Get(blockstorage.Type, map[string]string) (blockstorage.Provider, error) +} + +var _ Getter = (*getter)(nil) + +type getter struct{} + +// New retuns a new Getter +func New() Getter { + return &getter{} +} + +// Get returns a provider for the requested storage type in the specified region +func (*getter) Get(storageType blockstorage.Type, config map[string]string) (blockstorage.Provider, error) { + switch storageType { + case blockstorage.TypeEBS: + return awsebs.NewProvider(config) + default: + return nil, errors.Errorf("Unsupported storage type %v", storageType) + } +} + +// Supported returns true if the storage type is supported +func Supported(st blockstorage.Type) bool { + switch st { + case blockstorage.TypeEBS: + return true + default: + return false + } +} diff --git a/pkg/blockstorage/helpers.go b/pkg/blockstorage/helpers.go new file mode 100644 index 0000000000..6692795855 --- /dev/null +++ b/pkg/blockstorage/helpers.go @@ -0,0 +1,30 @@ +package blockstorage + +import ( + ktags "github.com/kanisterio/kanister/pkg/blockstorage/tags" +) + +// SanitizeTags are used to sanitize the tags +func SanitizeTags(tags map[string]string) map[string]string { + // From https://cloud.google.com/compute/docs/labeling-resources + // - Keys and values cannot be longer than 63 characters each. + // - Keys and values can only contain lowercase letters, numeric + // characters, underscores, and dashes. International characters + // are allowed. + // - Label keys must start with a lowercase letter and international + // characters are allowed. + fixedTags := make(map[string]string) + for k, v := range tags { + fixedTags[ktags.SanitizeValueForGCP(k)] = ktags.SanitizeValueForGCP(v) + } + return fixedTags +} + +// KeyValueToMap converts a KeyValue slice to a map +func KeyValueToMap(kv []*KeyValue) map[string]string { + stringMap := make(map[string]string) + for _, t := range kv { + stringMap[t.Key] = t.Value + } + return stringMap +} diff --git a/pkg/blockstorage/models.go b/pkg/blockstorage/models.go new file mode 100644 index 0000000000..a1dea7ff14 --- /dev/null +++ b/pkg/blockstorage/models.go @@ -0,0 +1,86 @@ +package blockstorage + +import ( + strfmt "github.com/go-openapi/strfmt" +) + +// Volume A storage provider volume +type Volume struct { + + // Availability zone + Az string + + // Time stamp when volume creation was initiated + CreationTime TimeStamp + + // Volume is encrypted + Encrypted bool + + // A unique identifier generated by the storage provider + ID string + + // Volume IOPS, if specified for this volume + Iops int64 + + // The size of the volume, in GiB + Size int64 + + // tags + Tags VolumeTags + + // Storage type for this volume + Type Type + + // Volume type + VolumeType string + + // Attributes specific to the provider + Attributes map[string]string +} + +// Snapshot of Volume +type Snapshot struct { + + // Time stamp when snapshot creation was initiated + CreationTime TimeStamp + + // Snapshot is encrypted + Encrypted bool + + // A unique identifier generated by the storage provider + ID string + + // Snapshot availability region + Region string + + // The size of the snapshot, in GiB + Size int64 + + // tags + Tags SnapshotTags + + // Storage type of the source volume for this snapshot + Type Type + + // volume + Volume *Volume +} + +// TimeStamp Time stamp for an event related to an object, for example when the object was created. +type TimeStamp strfmt.DateTime + +// VolumeTags volume tags +type VolumeTags []*KeyValue + +// SnapshotTags snapshot tags +type SnapshotTags []*KeyValue + +// KeyValue String key-value pairs +type KeyValue struct { + + // Key or index name + Key string + + // Value string + Value string +} diff --git a/pkg/blockstorage/tags/tags.go b/pkg/blockstorage/tags/tags.go new file mode 100644 index 0000000000..74e96f98f0 --- /dev/null +++ b/pkg/blockstorage/tags/tags.go @@ -0,0 +1,76 @@ +package tags + +import ( + "os" + "regexp" + "strings" + + log "github.com/sirupsen/logrus" +) + +const ( + // ClusterTagKey is used to tag resources with the cluster name + ClusterTagKey = "kanister.io/clustername" + // VersionTagKey is used to tag resources with the K10 version + VersionTagKey = "kanister.io/version" + // AppNameTag is used to tag volumes with the app they belong to + AppNameTag = "kanister.io/appname" +) + +// GetTags returns the tags to set on a resource +func GetTags(inputTags map[string]string) map[string]string { + tags := GetStdTags() + + // inputTags could've be derived from an existing object so only add tags that are + // missing (ignore ones that already exist) + return AddMissingTags(tags, inputTags) +} + +// GetStdTags returns a set of standard tags to use for tagging resources +func GetStdTags() map[string]string { + version := os.Getenv("VERSION") + clustername := os.Getenv("CLUSTER_NAME") + + stdTags := map[string]string{ + ClusterTagKey: clustername, + VersionTagKey: version, + } + return stdTags +} + +// AddMissingTags returns a new map which contains 'existing' + any tags +// in 'tagsToAdd' that did not exist +func AddMissingTags(existingTags map[string]string, tagsToAdd map[string]string) map[string]string { + ret := make(map[string]string, len(existingTags)) + for k, v := range existingTags { + ret[k] = v + } + // Add missing tags + for k, v := range tagsToAdd { + if val, ok := ret[k]; ok { + log.Infof("Ignoring duplicate tag: %s:%s. Retained value: %s", k, v, val) + } else { + ret[k] = v + } + } + return ret +} + +// SanitizeValueForGCP shrink value if needed and change prohibited chars +func SanitizeValueForGCP(value string) string { + // From https://cloud.google.com/compute/docs/labeling-resources + // - Keys and values cannot be longer than 63 characters each. + // - Keys and values can only contain lowercase letters, numeric + // characters, underscores, and dashes. International characters + // are allowed. + // - Label keys must start with a lowercase letter and international + // characters are allowed. + re := regexp.MustCompile("[^a-z0-9_-]") + sanitizedVal := value + if len(sanitizedVal) > 63 { + sanitizedVal = sanitizedVal[0:63] + } + sanitizedVal = strings.ToLower(sanitizedVal) + sanitizedVal = re.ReplaceAllString(sanitizedVal, "_") + return sanitizedVal +} From 711af013466e3b4aa8fc92fb34f7f18d5d7b46c9 Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Fri, 16 Nov 2018 09:39:14 -0800 Subject: [PATCH 2/9] Add helper functions for Volume snapshot (#4406) * Add helper functions for Volume snapshot * Address review suggestions * address review comments --- pkg/kube/volume.go | 175 ++++++++++++++++++++++++++++++++++++++++ pkg/kube/volume_test.go | 42 ++++++++++ 2 files changed, 217 insertions(+) create mode 100644 pkg/kube/volume.go create mode 100644 pkg/kube/volume_test.go diff --git a/pkg/kube/volume.go b/pkg/kube/volume.go new file mode 100644 index 0000000000..50ab5fa6b3 --- /dev/null +++ b/pkg/kube/volume.go @@ -0,0 +1,175 @@ +package kube + +import ( + "context" + "fmt" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/pkg/errors" + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/poll" +) + +const ( + pvMatchLabelName = "kanisterpvmatchid" + pvcGenerateName = "kanister-pvc-" + // PVZoneLabelName is a known k8s label. used to specify volume zone + PVZoneLabelName = "failure-domain.beta.kubernetes.io/zone" + // PVRegionLabelName is a known k8s label + PVRegionLabelName = "failure-domain.beta.kubernetes.io/region" + // NoPVCNameSpecified is used by the caller to indicate that the PVC name + // should be auto-generated + NoPVCNameSpecified = "" +) + +// CreatePVC creates a PersistentVolumeClaim and returns its name +// An empty 'targetVolID' indicates the caller would like the PV to be dynamically provisioned +// An empty 'name' indicates the caller would like the name to be auto-generated +// An error indicating that the PVC already exists is ignored (for idempotency) +func CreatePVC(ctx context.Context, kubeCli kubernetes.Interface, ns string, name string, sizeGB int64, targetVolID string, annotations map[string]string) (string, error) { + sizeFmt := fmt.Sprintf("%dGi", sizeGB) + size, err := resource.ParseQuantity(sizeFmt) + emptyStorageClass := "" + if err != nil { + return "", errors.Wrapf(err, "Unable to parse sizeFmt %s", sizeFmt) + } + pvc := v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: annotations, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): size, + }, + }, + }, + } + if name != "" { + pvc.ObjectMeta.Name = name + } else { + pvc.ObjectMeta.GenerateName = pvcGenerateName + } + + // If targetVolID is set, static provisioning is desired + if targetVolID != "" { + pvc.Spec.Selector = &metav1.LabelSelector{ + MatchLabels: map[string]string{pvMatchLabelName: filepath.Base(targetVolID)}, + } + // Disable dynamic provisioning by setting an empty storage + pvc.Spec.StorageClassName = &emptyStorageClass + } + createdPVC, err := kubeCli.CoreV1().PersistentVolumeClaims(ns).Create(&pvc) + if err != nil { + if name != "" && apierrors.IsAlreadyExists(err) { + return name, nil + } + return "", errors.Wrapf(err, "Unable to create PVC %v", pvc) + } + return createdPVC.Name, nil +} + +// CreatePV creates a PersistentVolume and returns its name +// For retry idempotency, checks whether PV associated with volume already exists +func CreatePV(ctx context.Context, kubeCli kubernetes.Interface, vol *blockstorage.Volume, volType blockstorage.Type, annotations map[string]string) (string, error) { + sizeFmt := fmt.Sprintf("%dGi", vol.Size) + size, err := resource.ParseQuantity(sizeFmt) + if err != nil { + return "", errors.Wrapf(err, "Unable to parse sizeFmt %s", sizeFmt) + } + matchLabels := map[string]string{pvMatchLabelName: filepath.Base(vol.ID)} + + // Since behavior and error returned from repeated create might vary, check first + sel := labelSelector(matchLabels) + options := metav1.ListOptions{LabelSelector: sel} + pvl, err := kubeCli.CoreV1().PersistentVolumes().List(options) + if err == nil && len(pvl.Items) == 1 { + return pvl.Items[0].Name, nil + } + + pv := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "kanister-pv-", + Labels: matchLabels, + Annotations: annotations, + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): size, + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimDelete, + }, + } + switch volType { + case blockstorage.TypeEBS: + pv.Spec.PersistentVolumeSource.AWSElasticBlockStore = &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: vol.ID, + } + pv.ObjectMeta.Labels[PVZoneLabelName] = vol.Az + pv.ObjectMeta.Labels[PVRegionLabelName] = zoneToRegion(vol.Az) + default: + return "", errors.Errorf("Volume type %v(%T) not supported ", volType, volType) + } + + createdPV, err := kubeCli.CoreV1().PersistentVolumes().Create(&pv) + if err != nil { + return "", errors.Wrapf(err, "Unable to create PV for volume %v", pv) + } + return createdPV.Name, nil +} + +// DeletePVC deletes the given PVC immediately and waits with timeout until it is returned as deleted +func DeletePVC(cli kubernetes.Interface, namespace, pvcName string) error { + var now int64 + if err := cli.Core().PersistentVolumeClaims(namespace).Delete(pvcName, &metav1.DeleteOptions{GracePeriodSeconds: &now}); err != nil { + // If the PVC does not exist, that's an acceptable error + if !apierrors.IsNotFound(err) { + return err + } + } + + // Check the pvc is not returned. If the expected condition is not met in time, PollImmediate will + // return ErrWaitTimeout + ctx, c := context.WithTimeout(context.TODO(), time.Minute) + defer c() + return poll.Wait(ctx, func(context.Context) (bool, error) { + _, err := cli.Core().PersistentVolumeClaims(namespace).Get(pvcName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return true, nil + } + return false, err + }) +} + +var labelBlackList = map[string]struct{}{ + "chart": struct{}{}, + "heritage": struct{}{}, +} + +func labelSelector(labels map[string]string) string { + ls := make([]string, 0, len(labels)) + for k, v := range labels { + if _, ok := labelBlackList[k]; ok { + continue + } + ls = append(ls, fmt.Sprintf("%s=%s", k, v)) + } + return strings.Join(ls, ",") +} + +// zoneToRegion removes -latter or just last latter from provided zone. +func zoneToRegion(zone string) string { + r, _ := regexp.Compile("-?[a-z]$") + return r.ReplaceAllString(zone, "") +} diff --git a/pkg/kube/volume_test.go b/pkg/kube/volume_test.go new file mode 100644 index 0000000000..4aa913629f --- /dev/null +++ b/pkg/kube/volume_test.go @@ -0,0 +1,42 @@ +package kube + +import ( + "context" + "path/filepath" + "reflect" + + . "gopkg.in/check.v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +type TestVolSuite struct{} + +var _ = Suite(&TestVolSuite{}) + +func (s *TestVolSuite) TestCreatePVC(c *C) { + // Create PVC + ctx := context.Background() + pvcSize := int64(1) + ns := "kanister-pvc-test" + targetVolID := "testVolID" + annotations := map[string]string{"a1": "foo"} + cli := fake.NewSimpleClientset() + pvcName, err := CreatePVC(ctx, cli, ns, NoPVCNameSpecified, pvcSize, targetVolID, annotations) + c.Assert(err, IsNil) + pvc, err := cli.Core().PersistentVolumeClaims(ns).Get(pvcName, metav1.GetOptions{}) + c.Assert(err, IsNil) + + c.Assert(len(pvc.Spec.AccessModes) >= 1, Equals, true) + accessMode := pvc.Spec.AccessModes[0] + c.Assert(accessMode, Equals, v1.ReadWriteOnce) + capacity, ok := pvc.Spec.Resources.Requests[v1.ResourceStorage] + c.Assert(ok, Equals, true) + c.Assert(capacity.Value() >= int64(pvcSize*1024*1024*1024), Equals, true) + eq := reflect.DeepEqual(annotations, pvc.ObjectMeta.Annotations) + c.Assert(eq, Equals, true) + c.Assert(len(pvc.Spec.Selector.MatchLabels) >= 1, Equals, true) + label := pvc.Spec.Selector.MatchLabels[pvMatchLabelName] + c.Assert(label, Equals, filepath.Base(targetVolID)) +} From 8db13c454f0239cbf93108828a3c74e3d9647062 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Fri, 16 Nov 2018 11:26:13 -0800 Subject: [PATCH 3/9] Switch Kanister KubeTask from job to pod (#4387) 1. Switch KubeTask to spin up a pod instead of a Kubernetes Job 2. Parse the pods logs and capture output 3. Modify unit tests --- docs/functions.rst | 2 +- pkg/function/copy_volume_data.go | 5 +++ pkg/function/delete_data.go | 2 +- pkg/function/kube_task.go | 42 ++++++++++++------ pkg/function/kube_task_test.go | 22 ++++++---- pkg/kube/pod.go | 74 ++++++++++++++++++++++++-------- pkg/kube/pod_test.go | 21 ++++++++- 7 files changed, 126 insertions(+), 42 deletions(-) diff --git a/docs/functions.rst b/docs/functions.rst index 3bb7b29dd2..0e37bb1d95 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -110,7 +110,7 @@ Example: KubeTask -------- -KubeTask spins up a new container and executes a command via a Kubernetes job. +KubeTask spins up a new container and executes a command via a Pod. This allows you to run a new Pod from a Blueprint. .. csv-table:: diff --git a/pkg/function/copy_volume_data.go b/pkg/function/copy_volume_data.go index 4c8ebb89b5..3170eb2398 100644 --- a/pkg/function/copy_volume_data.go +++ b/pkg/function/copy_volume_data.go @@ -60,6 +60,11 @@ func copyVolumeData(ctx context.Context, cli kubernetes.Interface, tp param.Temp } defer kube.DeletePod(context.Background(), cli, pod) + // Wait for pod to reach running state + if err := kube.WaitForPodReady(ctx, cli, pod.Namespace, pod.Name); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to be ready", pod.Name) + } + // Get restic repository if err = restic.GetOrCreateRepository(cli, namespace, pod.Name, pod.Spec.Containers[0].Name, targetPath, encryptionKey, tp.Profile); err != nil { return nil, err diff --git a/pkg/function/delete_data.go b/pkg/function/delete_data.go index 1f48a75705..44a886d1fe 100644 --- a/pkg/function/delete_data.go +++ b/pkg/function/delete_data.go @@ -63,7 +63,7 @@ func (*deleteDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args m // Generate delete command cmd := generateDeleteCommand(artifact, tp.Profile) // Use KubeTask to delete the artifact - return nil, kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.13.0", cmd) + return kubeTask(ctx, namespace, "kanisterio/kanister-tools:0.13.0", cmd) } func (*deleteDataFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index 0500d74963..4e6971c670 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -6,6 +6,7 @@ import ( "strconv" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" "github.com/pkg/errors" @@ -35,36 +36,49 @@ func generateJobName(jobPrefix string) string { return jobPrefix + jobNameSuffix } -func kubeTask(ctx context.Context, namespace, image string, command []string) error { +func kubeTask(ctx context.Context, namespace, image string, command []string) (map[string]interface{}, error) { var serviceAccount string var err error clientset, err := kube.NewClient() if err != nil { - return errors.Wrapf(err, "Failed to create Kubernetes client") + return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } if namespace == "" { namespace, err = kube.GetControllerNamespace() if err != nil { - return errors.Wrapf(err, "Failed to get controller namespace") + return nil, errors.Wrapf(err, "Failed to get controller namespace") } serviceAccount, err = kube.GetControllerServiceAccount(clientset) if err != nil { - return errors.Wrap(err, "Failed to get Controller Service Account") + return nil, errors.Wrap(err, "Failed to get Controller Service Account") } } - jobName := generateJobName(jobPrefix) - job, err := kube.NewJob(clientset, jobName, namespace, serviceAccount, image, nil, command...) + // Create a pod to run the command + pod, err := kube.CreatePod(ctx, clientset, &kube.PodOptions{ + Namespace: namespace, + GenerateName: jobPrefix, + Image: image, + Command: command, + ServiceAccountName: serviceAccount, + }) if err != nil { - return errors.Wrap(err, "Failed to create job") + return nil, errors.Wrapf(err, "Failed to create pod for KubeTask") } - if err := job.Create(); err != nil { - return errors.Wrapf(err, "Failed to create job %s in Kubernetes", jobName) + defer kube.DeletePod(context.Background(), clientset, pod) + + // Wait for pod completion + if err := kube.WaitForPodCompletion(ctx, clientset, pod.Namespace, pod.Name); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) } - defer job.Delete() - if err := job.WaitForCompletion(ctx); err != nil { - return errors.Wrapf(err, "Failed while waiting for job %s to complete", jobName) + // Fetch logs from the pod + logs, err := kube.GetPodLogs(ctx, clientset, pod.Namespace, pod.Name) + if err != nil { + return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } - return nil + format.Log(pod.Name, pod.Spec.Containers[0].Name, logs) + + out, err := parseLogAndCreateOutput(logs) + return out, errors.Wrap(err, "Failed to generate output") } func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { @@ -80,7 +94,7 @@ func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args if err = OptArg(args, KubeTaskNamespaceArg, &namespace, ""); err != nil { return nil, err } - return nil, kubeTask(ctx, namespace, image, command) + return kubeTask(ctx, namespace, image, command) } func (*kubeTaskFunc) RequiredArgs() []string { diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index 1f5d93face..2da131afeb 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -3,6 +3,7 @@ package function import ( "context" "os" + "time" . "gopkg.in/check.v1" "k8s.io/api/core/v1" @@ -53,19 +54,20 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint { Kind: "StatefulSet", Phases: []crv1alpha1.BlueprintPhase{ { - Name: "test", + Name: "testOutput", Func: "KubeTask", Args: map[string]interface{}{ KubeTaskNamespaceArg: namespace, - KubeTaskImageArg: "busybox", + KubeTaskImageArg: "kanisterio/kanister-tools:0.13.0", KubeTaskCommandArg: []string{ - "sleep", - "2", + "sh", + "-c", + "kando output version 0.13.0", }, }, }, { - Name: "test2", + Name: "testSleep", Func: "KubeTask", Args: map[string]interface{}{ KubeTaskNamespaceArg: namespace, @@ -83,7 +85,8 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint { } func (s *KubeTaskSuite) TestKubeTask(c *C) { - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() tp := param.TemplateParams{ StatefulSet: ¶m.StatefulSetParams{ Namespace: s.namespace, @@ -95,7 +98,10 @@ func (s *KubeTaskSuite) TestKubeTask(c *C) { phases, err := kanister.GetPhases(*bp, action, tp) c.Assert(err, IsNil) for _, p := range phases { - _, err = p.Exec(ctx, *bp, action, tp) - c.Assert(err, IsNil) + out, err := p.Exec(ctx, *bp, action, tp) + c.Assert(err, IsNil, Commentf("Phase %s failed", p.Name())) + if out != nil { + c.Assert(out["version"], NotNil) + } } } diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index f19ebe9444..b58e4ae5fd 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -2,6 +2,7 @@ package kube import ( "context" + "io/ioutil" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -14,11 +15,12 @@ import ( // PodOptions specifies options for `CreatePod` type PodOptions struct { - Namespace string - GenerateName string - Image string - Command []string - Volumes map[string]string + Namespace string + GenerateName string + Image string + Command []string + Volumes map[string]string + ServiceAccountName string } // CreatePod creates a pod with a single container based on the specified image @@ -39,24 +41,18 @@ func CreatePod(ctx context.Context, cli kubernetes.Interface, opts *PodOptions) VolumeMounts: volumeMounts, }, }, - Volumes: podVolumes, + // RestartPolicy dictates when the containers of the pod should be restarted. + // The possible values include Always, OnFailure and Never with Always being the default. + // OnFailure policy will result in failed containers being restarted with an exponential back-off delay. + RestartPolicy: v1.RestartPolicyOnFailure, + Volumes: podVolumes, + ServiceAccountName: opts.ServiceAccountName, }, } pod, err := cli.Core().Pods(opts.Namespace).Create(pod) if err != nil { return nil, errors.Wrapf(err, "Failed to create pod. Namespace: %s, NameFmt: %s", opts.Namespace, opts.GenerateName) } - err = poll.Wait(ctx, func(ctx context.Context) (bool, error) { - p, err := cli.Core().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{}) - if err != nil { - return true, err - } - return (p.Status.Phase == v1.PodRunning), nil - }) - if err != nil { - defer DeletePod(context.Background(), cli, pod) - return nil, errors.Wrapf(err, "Pod did not transition to running state. Namespace:%s, Name:%s", pod.Namespace, pod.Name) - } return pod, nil } @@ -67,3 +63,47 @@ func DeletePod(ctx context.Context, cli kubernetes.Interface, pod *v1.Pod) error } return nil } + +// GetPodLogs fetches the logs from the given pod +func GetPodLogs(ctx context.Context, cli kubernetes.Interface, namespace, name string) (string, error) { + reader, err := cli.Core().Pods(namespace).GetLogs(name, &v1.PodLogOptions{}).Stream() + if err != nil { + return "", err + } + defer reader.Close() + bytes, err := ioutil.ReadAll(reader) + if err != nil { + return "", err + } + return string(bytes), nil +} + +// WaitForPodReady waits for a pod to reach Running state +func WaitForPodReady(ctx context.Context, cli kubernetes.Interface, namespace, name string) error { + err := poll.Wait(ctx, func(ctx context.Context) (bool, error) { + p, err := cli.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return true, err + } + return (p.Status.Phase == v1.PodRunning), nil + }) + if err == nil { + return nil + } + return errors.Wrapf(err, "Pod did not transition into running state. Namespace:%s, Name:%s", namespace, name) +} + +// WaitForPodCompletion waits for a pod to reach a terminal state +func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespace, name string) error { + err := poll.Wait(ctx, func(ctx context.Context) (bool, error) { + p, err := cli.Core().Pods(namespace).Get(name, metav1.GetOptions{}) + if err != nil { + return true, err + } + return (p.Status.Phase == v1.PodSucceeded) || (p.Status.Phase == v1.PodFailed), nil + }) + if err == nil { + return nil + } + return errors.Wrapf(err, "Pod did not transition into a terminal state. Namespace:%s, Name:%s", namespace, name) +} diff --git a/pkg/kube/pod_test.go b/pkg/kube/pod_test.go index ec5a905db2..d8f3197239 100644 --- a/pkg/kube/pod_test.go +++ b/pkg/kube/pod_test.go @@ -5,6 +5,7 @@ package kube import ( "context" "fmt" + "strings" "time" . "gopkg.in/check.v1" @@ -47,13 +48,14 @@ func (s *PodSuite) TearDownSuite(c *C) { func (s *PodSuite) TestPod(c *C) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() - pod, err := CreatePod(ctx, s.cli, &PodOptions{ + pod, err := CreatePod(context.Background(), s.cli, &PodOptions{ Namespace: s.namespace, GenerateName: "test-", Image: "kanisterio/kanister-tools:0.13.0", Command: []string{"sh", "-c", "tail -f /dev/null"}, }) c.Assert(err, IsNil) + c.Assert(WaitForPodReady(ctx, s.cli, s.namespace, pod.Name), IsNil) c.Assert(DeletePod(context.Background(), s.cli, pod), IsNil) } @@ -85,3 +87,20 @@ func (s *PodSuite) TestPodWithVolumes(c *C) { c.Assert(pod.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim.ClaimName, Equals, "pvc-test") c.Assert(pod.Spec.Containers[0].VolumeMounts[0].MountPath, Equals, "/mnt/data1") } + +func (s *PodSuite) TestGetPodLogs(c *C) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + pod, err := CreatePod(context.Background(), s.cli, &PodOptions{ + Namespace: s.namespace, + GenerateName: "test-", + Image: "kanisterio/kanister-tools:0.13.0", + Command: []string{"sh", "-c", "echo hello"}, + }) + c.Assert(err, IsNil) + c.Assert(WaitForPodCompletion(ctx, s.cli, s.namespace, pod.Name), IsNil) + logs, err := GetPodLogs(ctx, s.cli, s.namespace, pod.Name) + c.Assert(err, IsNil) + c.Assert(strings.Contains(logs, "hello"), Equals, true) + c.Assert(DeletePod(context.Background(), s.cli, pod), IsNil) +} From eb4a83a3614f7f57aed1f9c3e866e8b4af68d068 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Wed, 21 Nov 2018 13:46:45 -0800 Subject: [PATCH 4/9] Fix kubeTask test image issue (#4441) --- pkg/kube/pod.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index b58e4ae5fd..09aa583c19 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -37,7 +37,7 @@ func CreatePod(ctx context.Context, cli kubernetes.Interface, opts *PodOptions) Name: "container", Image: opts.Image, Command: opts.Command, - ImagePullPolicy: v1.PullPolicy(v1.PullIfNotPresent), + ImagePullPolicy: v1.PullPolicy(v1.PullAlways), VolumeMounts: volumeMounts, }, }, From 830c8d48b550b0869727929baf37c98a96d0f51a Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Wed, 21 Nov 2018 15:22:25 -0800 Subject: [PATCH 5/9] Snapshot Volume for AWS EBS storage (#4361) * Snapshot Volume for AWS EBS storage * address review suggestions * address review suggestions * Address review comments and add unit tests * Add function for profile validation * fix compilation error * add unit tests --- glide.lock | 12 +- glide.yaml | 2 + pkg/function/create_volume_from_snapshot.go | 85 ++++--- .../create_volume_from_snapshot_test.go | 108 ++++++++ pkg/function/create_volume_snapshot.go | 151 ++++++++--- pkg/function/create_volume_snapshot_test.go | 165 ++++++++++++ pkg/function/delete_volume_snapshot.go | 50 +++- pkg/function/delete_volume_snapshot_test.go | 81 ++++++ ...ot_test.go => e2e_volume_snapshot_test.go} | 120 ++++++--- .../mockblockstorage/mockblockstorage.go | 234 ++++++++++++++++++ .../mockblockstorage/mockblockstorage_test.go | 20 ++ 11 files changed, 924 insertions(+), 104 deletions(-) create mode 100644 pkg/function/create_volume_from_snapshot_test.go create mode 100644 pkg/function/create_volume_snapshot_test.go create mode 100644 pkg/function/delete_volume_snapshot_test.go rename pkg/function/{volume_snapshot_test.go => e2e_volume_snapshot_test.go} (74%) create mode 100644 pkg/testutil/mockblockstorage/mockblockstorage.go create mode 100644 pkg/testutil/mockblockstorage/mockblockstorage_test.go diff --git a/glide.lock b/glide.lock index cb90eb0735..d4c8903947 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 09eb11ac814f3773149d7aff5c898c2960f3c6936f3534791a23842988a8022c -updated: 2018-10-31T16:24:08.342234-07:00 +hash: f1b78ff0d608fbb38269fc7b0ce7ace939b9afc2ed7c46e6d21786e6e42b3918 +updated: 2018-11-09T10:40:36.46445-08:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -50,7 +50,7 @@ imports: - service/s3/s3manager - service/sts - name: github.com/Azure/azure-sdk-for-go - version: 100a309838ef6b0878b014e5ae541c83a7cbdfaa + version: 9419692eb7ad7f923cca690cc5a6b2c6d22405e1 subpackages: - storage - version @@ -145,7 +145,7 @@ imports: - name: github.com/marstr/guid version: 8bdf7d1a087ccc975cf37dd6507da50698fd19ca - name: github.com/Masterminds/semver - version: c84ddcca87bf5a941b138dde832a7e20b0159ad8 + version: 59c29afe1a994eacb71c833025ca7acf874bb1da - name: github.com/Masterminds/sprig version: 6b2a58267f6a8b1dc8e2eb5519b984008fa85e8c - name: github.com/mitchellh/mapstructure @@ -163,7 +163,7 @@ imports: repo: https://github.com/kastenhq/operator-kit.git vcs: git - name: github.com/satori/go.uuid - version: b2ce2384e17bbe0c6d34077efa39dbab3e09123b + version: f58768cc1a7a7e77a3bd49e98cdd21419399b6a3 - name: github.com/sirupsen/logrus version: 3e01752db0189b9157070a0e1668a620f9a85da2 - name: github.com/spf13/cobra @@ -195,7 +195,7 @@ imports: - jws - jwt - name: golang.org/x/sys - version: 9b800f95dbbc54abff0acf7ee32d88ba4e328c89 + version: 95c6576299259db960f6c5b9b69ea52422860fce subpackages: - unix - name: golang.org/x/text diff --git a/glide.yaml b/glide.yaml index ed1240b938..0a0ee3623c 100644 --- a/glide.yaml +++ b/glide.yaml @@ -23,6 +23,8 @@ import: vcs: git version: rm-k8s-dep repo: https://github.com/kastenhq/operator-kit.git +- package: github.com/satori/go.uuid + version: v1.2.0 - package: github.com/sirupsen/logrus version: v1.0.6 - package: github.com/spf13/cobra diff --git a/pkg/function/create_volume_from_snapshot.go b/pkg/function/create_volume_from_snapshot.go index 8ebf9c985c..60e6f46470 100644 --- a/pkg/function/create_volume_from_snapshot.go +++ b/pkg/function/create_volume_from_snapshot.go @@ -6,9 +6,13 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" ) @@ -32,43 +36,63 @@ func (*createVolumeFromSnapshotFunc) Name() string { return "CreateVolumeFromSnapshot" } -func createVolumeFromSnapshot(ctx context.Context, cli kubernetes.Interface, namespace, snapshotinfo string, profile *param.Profile) error { +func createVolumeFromSnapshot(ctx context.Context, cli kubernetes.Interface, namespace, snapshotinfo string, profile *param.Profile, getter getter.Getter) (map[string]blockstorage.Provider, error) { PVCData := []VolumeSnapshotInfo{} err := json.Unmarshal([]byte(snapshotinfo), &PVCData) if err != nil { - return errors.Wrapf(err, "Could not decode JSON data") + return nil, errors.Wrapf(err, "Could not decode JSON data") } + // providerList required for unit testing + providerList := make(map[string]blockstorage.Provider) for _, pvcInfo := range PVCData { - var storageType string - switch pvcInfo.StorageType { - // TODO: use constants once blockstorage is moved to kanister repo - case "EBS": - storageType = "EBS" - case "GPD": - storageType = "GPD" - case "AD": - storageType = "AD" - case "Cinder": - storageType = "Cinder" - case "Ceph": - storageType = "Ceph" - default: - return errors.Errorf("Storage type %s not supported!", pvcInfo.StorageType) + config := make(map[string]string) + switch pvcInfo.Type { + case blockstorage.TypeEBS: + if err = ValidateProfile(profile); err != nil { + return nil, errors.Wrap(err, "Profile validation failed") + } + config[awsebs.ConfigRegion] = pvcInfo.Region + config[awsebs.AccessKeyID] = profile.Credential.KeyPair.ID + config[awsebs.SecretAccessKey] = profile.Credential.KeyPair.Secret } - log.Infof("snapshotId: %s, StorageType: %s, region: %s", pvcInfo.SnapshotID, storageType, pvcInfo.Region) - if err := createPVCFromSnapshot(); err != nil { - return errors.Wrapf(err, "Could not create PVC") + provider, err := getter.Get(pvcInfo.Type, config) + if err != nil { + return nil, errors.Wrapf(err, "Could not get storage provider %v", pvcInfo.Type) + } + _, err = cli.Core().PersistentVolumeClaims(namespace).Get(pvcInfo.PVCName, metav1.GetOptions{}) + if err == nil { + if err = kube.DeletePVC(cli, namespace, pvcInfo.PVCName); err != nil { + return nil, err + } + } + snapshot, err := provider.SnapshotGet(ctx, pvcInfo.SnapshotID) + if err != nil { + return nil, errors.Wrapf(err, "Failed to get Snapshot from Provider") + } + tags := map[string]string{ + "pvcname": pvcInfo.PVCName, + } + snapshot.Volume.VolumeType = pvcInfo.VolumeType + snapshot.Volume.Az = pvcInfo.Az + snapshot.Volume.Tags = pvcInfo.Tags + vol, err := provider.VolumeCreateFromSnapshot(ctx, *snapshot, tags) + if err != nil { + return nil, errors.Wrapf(err, "Failed to create volume from snapshot, snapID: %s", snapshot.ID) } - } - return nil -} - -func createPVCFromSnapshot() error { - return errors.Wrapf(createPV(), "Could not create PV") -} -func createPV() error { - return nil + annotations := map[string]string{} + pvc, err := kube.CreatePVC(ctx, cli, namespace, pvcInfo.PVCName, vol.Size, vol.ID, annotations) + if err != nil { + return nil, errors.Wrapf(err, "Unable to create PVC for volume %v", *vol) + } + pv, err := kube.CreatePV(ctx, cli, vol, vol.Type, annotations) + if err != nil { + return nil, errors.Wrapf(err, "Unable to create PV for volume %v", *vol) + } + log.Infof("Restore/Create volume from snapshot completed for pvc: %s, volume: %s", pvc, pv) + providerList[pvcInfo.PVCName] = provider + } + return providerList, nil } func (kef *createVolumeFromSnapshotFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { @@ -83,7 +107,8 @@ func (kef *createVolumeFromSnapshotFunc) Exec(ctx context.Context, tp param.Temp if err = Arg(args, CreateVolumeFromSnapshotManifestArg, &snapshotinfo); err != nil { return nil, err } - return nil, createVolumeFromSnapshot(ctx, cli, namespace, snapshotinfo, tp.Profile) + _, err = createVolumeFromSnapshot(ctx, cli, namespace, snapshotinfo, tp.Profile, getter.New()) + return nil, err } func (*createVolumeFromSnapshotFunc) RequiredArgs() []string { diff --git a/pkg/function/create_volume_from_snapshot_test.go b/pkg/function/create_volume_from_snapshot_test.go new file mode 100644 index 0000000000..5001f982bc --- /dev/null +++ b/pkg/function/create_volume_from_snapshot_test.go @@ -0,0 +1,108 @@ +package function + +import ( + "context" + "encoding/json" + "fmt" + + . "gopkg.in/check.v1" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/testing" + + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/testutil/mockblockstorage" +) + +type CreateVolumeFromSnapshotTestSuite struct{} + +var _ = Suite(&CreateVolumeFromSnapshotTestSuite{}) + +func (s *CreateVolumeFromSnapshotTestSuite) TestCreateVolumeFromSnapshot(c *C) { + ctx := context.Background() + ns := "ns" + mockGetter := mockblockstorage.NewGetter() + profile := ¶m.Profile{ + Location: crv1alpha1.Location{ + Type: crv1alpha1.LocationTypeS3Compliant, + S3Compliant: &crv1alpha1.S3CompliantLocation{ + Region: "us-west-2"}, + }, + Credential: param.Credential{ + Type: param.CredentialTypeKeyPair, + KeyPair: ¶m.KeyPair{ + ID: "foo", + Secret: "bar", + }, + }, + } + cli := fake.NewSimpleClientset() + // fake doesn't handle generated names for PVs, so ... + var i int + pvl := &v1.PersistentVolumeList{} + // kube.CreatePV() calls create() and list() which is to be handled for fake client + cli.PrependReactor("create", "persistentvolumes", + func(action testing.Action) (handled bool, ret runtime.Object, err error) { + ca := action.(testing.CreateAction) + pv := ca.GetObject().(*v1.PersistentVolume) + pvl.Items = append(pvl.Items, *pv) + if pv.ObjectMeta.Name == "" && pv.ObjectMeta.GenerateName != "" { + pv.ObjectMeta.Name = fmt.Sprintf("%s%d", pv.ObjectMeta.GenerateName, i) + i++ + return true, pv, nil + } + return false, nil, nil + }) + cli.PrependReactor("list", "persistentvolumes", + func(action testing.Action) (handled bool, ret runtime.Object, err error) { + return true, pvl, nil + }) + tags := []*blockstorage.KeyValue{ + {Key: "testkey", Value: "testval"}, + } + volInfo1 := VolumeSnapshotInfo{SnapshotID: "snap-1", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-1", Az: "us-west-2a", Tags: tags, VolumeType: "ssd"} + volInfo2 := VolumeSnapshotInfo{SnapshotID: "snap-2", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-2", Az: "us-west-2a", Tags: tags, VolumeType: "ssd"} + var PVCData1 []VolumeSnapshotInfo + PVCData1 = append(PVCData1, volInfo1) + PVCData1 = append(PVCData1, volInfo2) + info, err := json.Marshal(PVCData1) + c.Assert(err, IsNil) + snapinfo := string(info) + for _, tc := range []struct { + snapshotinfo string + check Checker + }{ + { + snapshotinfo: snapinfo, + check: IsNil, + }, + } { + providerList, err := createVolumeFromSnapshot(ctx, cli, ns, tc.snapshotinfo, profile, mockGetter) + c.Assert(providerList, Not(Equals), tc.check) + c.Assert(err, tc.check) + if err != nil { + continue + } + c.Assert(len(providerList) == 2, Equals, true) + provider, ok := providerList["pvc-1"] + c.Assert(ok, Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).SnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-1", provider.(*mockblockstorage.Provider).SnapIDList), Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).VolIDList) == 1, Equals, true) + + provider, ok = providerList["pvc-2"] + c.Assert(ok, Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).SnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-2", provider.(*mockblockstorage.Provider).SnapIDList), Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).VolIDList) == 1, Equals, true) + + _, err = cli.Core().PersistentVolumeClaims(ns).Get("pvc-1", metav1.GetOptions{}) + c.Assert(err, IsNil) + _, err = cli.Core().PersistentVolumeClaims(ns).Get("pvc-2", metav1.GetOptions{}) + c.Assert(err, IsNil) + } +} diff --git a/pkg/function/create_volume_snapshot.go b/pkg/function/create_volume_snapshot.go index 42a19ad102..d463785749 100644 --- a/pkg/function/create_volume_snapshot.go +++ b/pkg/function/create_volume_snapshot.go @@ -3,6 +3,10 @@ package function import ( "context" "encoding/json" + "fmt" + "path/filepath" + "strings" + "sync" "github.com/pkg/errors" "k8s.io/api/core/v1" @@ -10,6 +14,10 @@ import ( "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" ) @@ -34,34 +42,80 @@ func (*createVolumeSnapshotFunc) Name() string { } type VolumeSnapshotInfo struct { - SnapshotID string - StorageType string - Region string + SnapshotID string + Type blockstorage.Type + Region string + PVCName string + Az string + Tags blockstorage.VolumeTags + VolumeType string } type volumeInfo struct { - provider string - volumeID string - storageType string - volZone string - pvc string - size int64 + provider blockstorage.Provider + volumeID string + sType blockstorage.Type + volZone string + pvc string + size int64 + region string } -func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string) (map[string]interface{}, error) { +func ValidateProfile(profile *param.Profile) error { + if profile == nil { + return errors.New("Profile must be non-nil") + } + if profile.Location.Type != crv1alpha1.LocationTypeS3Compliant { + return errors.New("Location type not supported") + } + if len(profile.Location.S3Compliant.Region) == 0 { + return errors.New("Region is not set") + } + if profile.Credential.Type != param.CredentialTypeKeyPair { + return errors.New("Credential type not supported") + } + if len(profile.Credential.KeyPair.ID) == 0 { + return errors.New("Region is not set") + } + if len(profile.Credential.KeyPair.Secret) == 0 { + return errors.New("Secret access key is not set") + } + return nil +} - PVCData := make([]VolumeSnapshotInfo, 0, len(pvcs)) +func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kubernetes.Interface, namespace string, pvcs []string, getter getter.Getter) (map[string]interface{}, error) { + vols := make([]volumeInfo, 0, len(pvcs)) for _, pvc := range pvcs { - volInfo, err := getPVCInfo(cli, namespace, pvc) + volInfo, err := getPVCInfo(ctx, cli, namespace, pvc, tp, getter) if err != nil { return nil, errors.Wrapf(err, "Failed to get PVC info") } - volSnapInfo, err := snapshotVolume(ctx, cli, volInfo, namespace) - if err != nil { - return nil, errors.Wrapf(err, "Failed to snapshot volume") - } - PVCData = append(PVCData, *volSnapInfo) + vols = append(vols, *volInfo) + } + + var PVCData []VolumeSnapshotInfo + var wg sync.WaitGroup + var errstrings []string + for _, vol := range vols { + wg.Add(1) + go func(volInfo volumeInfo) { + defer wg.Done() + volSnapInfo, err := snapshotVolume(ctx, volInfo, namespace) + if err != nil { + errstrings = append(errstrings, err.Error()) + } else { + PVCData = append(PVCData, *volSnapInfo) + } + return + }(vol) } + wg.Wait() + + err := fmt.Errorf(strings.Join(errstrings, "\n")) + if len(err.Error()) > 0 { + return nil, errors.Wrapf(err, "Failed to snapshot one of the volumes") + } + manifestData, err := json.Marshal(PVCData) if err != nil { return nil, errors.Wrapf(err, "Failed to encode JSON data") @@ -70,11 +124,34 @@ func createVolumeSnapshot(ctx context.Context, tp param.TemplateParams, cli kube return map[string]interface{}{"volumeSnapshotInfo": string(manifestData)}, nil } -func snapshotVolume(ctx context.Context, cli kubernetes.Interface, vol *volumeInfo, namespace string) (*VolumeSnapshotInfo, error) { - return &VolumeSnapshotInfo{SnapshotID: vol.volumeID, StorageType: vol.storageType, Region: ""}, nil +func snapshotVolume(ctx context.Context, volume volumeInfo, namespace string) (*VolumeSnapshotInfo, error) { + provider := volume.provider + vol, err := provider.VolumeGet(ctx, volume.volumeID, volume.volZone) + if err != nil { + return nil, errors.Wrapf(err, "Volume unavailable, volumeID: %s", volume.volumeID) + } + if vol.Encrypted { + return nil, errors.New("Encrypted volumes are unsupported") + } + + // Snapshot the volume. + tags := map[string]string{ + "pvcname": volume.pvc, + } + if err = provider.SetTags(ctx, vol, tags); err != nil { + return nil, err + } + snap, err := provider.SnapshotCreate(ctx, *vol, tags) + if err != nil { + return nil, err + } + return &VolumeSnapshotInfo{SnapshotID: snap.ID, Type: volume.sType, Region: volume.region, PVCName: volume.pvc, Az: snap.Volume.Az, Tags: snap.Volume.Tags, VolumeType: snap.Volume.VolumeType}, nil } -func getPVCInfo(kubeCli kubernetes.Interface, namespace string, name string) (*volumeInfo, error) { +func getPVCInfo(ctx context.Context, kubeCli kubernetes.Interface, namespace string, name string, tp param.TemplateParams, getter getter.Getter) (*volumeInfo, error) { + _ = ctx + var region string + var provider blockstorage.Provider pvc, err := kubeCli.Core().PersistentVolumeClaims(namespace).Get(name, metav1.GetOptions{}) if err != nil { return nil, errors.Wrapf(err, "Failed to get PVC, PVC name: %s, namespace: %s", name, namespace) @@ -87,26 +164,30 @@ func getPVCInfo(kubeCli kubernetes.Interface, namespace string, name string) (*v if err != nil { return nil, errors.Wrapf(err, "Failed to get PV %s, namespace: %s", pvName, namespace) } + pvLabels := pv.GetObjectMeta().GetLabels() var size int64 if cap, ok := pv.Spec.Capacity[v1.ResourceStorage]; ok { size = cap.Value() } // Check to see which provider is the source. Spec mandates only one of the provider // fields will be set + config := make(map[string]string) if ebs := pv.Spec.AWSElasticBlockStore; ebs != nil { - return &volumeInfo{provider: "EBS", volumeID: pvName, storageType: "EBS", volZone: "", pvc: name, size: size}, nil - } - if gpd := pv.Spec.GCEPersistentDisk; gpd != nil { - return &volumeInfo{provider: "GPD", volumeID: pvName, storageType: "GPD", volZone: "", pvc: name, size: size}, nil - } - if ad := pv.Spec.AzureDisk; ad != nil { - return &volumeInfo{provider: "AD", volumeID: pvName, storageType: "AD", volZone: "", pvc: name, size: size}, nil - } - if cinder := pv.Spec.Cinder; cinder != nil { - return &volumeInfo{provider: "Cinder", volumeID: pvName, storageType: "Cinder", volZone: "", pvc: name, size: size}, nil - } - if ceph := pv.Spec.RBD; ceph != nil { - return &volumeInfo{provider: "Ceph", volumeID: pvName, storageType: "Ceph", volZone: "", pvc: name, size: size}, nil + if err = ValidateProfile(tp.Profile); err != nil { + return nil, errors.Wrap(err, "Profile validation failed") + } + region = tp.Profile.Location.S3Compliant.Region + if pvZone, ok := pvLabels[kube.PVZoneLabelName]; ok { + config[awsebs.ConfigRegion] = region + config[awsebs.AccessKeyID] = tp.Profile.Credential.KeyPair.ID + config[awsebs.SecretAccessKey] = tp.Profile.Credential.KeyPair.Secret + provider, err = getter.Get(blockstorage.TypeEBS, config) + if err != nil { + return nil, errors.Wrap(err, "Could not get storage provider") + } + return &volumeInfo{provider: provider, volumeID: filepath.Base(ebs.VolumeID), sType: blockstorage.TypeEBS, volZone: pvZone, pvc: name, size: size, region: region}, nil + } + return nil, errors.Errorf("PV zone label is empty, pvName: %s, namespace: %s", pvName, namespace) } return nil, errors.New("Storage type not supported!") } @@ -143,7 +224,7 @@ func (kef *createVolumeSnapshotFunc) Exec(ctx context.Context, tp param.Template if err = Arg(args, CreateVolumeSnapshotNamespaceArg, &namespace); err != nil { return nil, err } - if err = OptArg(args, RestoreDataVolsArg, &pvcs, nil); err != nil { + if err = OptArg(args, CreateVolumeSnapshotPVCsArg, &pvcs, nil); err != nil { return nil, err } if len(pvcs) == 0 { @@ -153,7 +234,7 @@ func (kef *createVolumeSnapshotFunc) Exec(ctx context.Context, tp param.Template return nil, err } } - return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs) + return createVolumeSnapshot(ctx, tp, cli, namespace, pvcs, getter.New()) } func (*createVolumeSnapshotFunc) RequiredArgs() []string { diff --git a/pkg/function/create_volume_snapshot_test.go b/pkg/function/create_volume_snapshot_test.go new file mode 100644 index 0000000000..74c5ec200c --- /dev/null +++ b/pkg/function/create_volume_snapshot_test.go @@ -0,0 +1,165 @@ +package function + +import ( + "context" + + . "gopkg.in/check.v1" + "k8s.io/api/core/v1" + k8sresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/kube" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/testutil/mockblockstorage" +) + +type CreateVolumeSnapshotTestSuite struct{} + +var _ = Suite(&CreateVolumeSnapshotTestSuite{}) + +func (s *CreateVolumeSnapshotTestSuite) TestGetPVCInfo(c *C) { + ctx := context.Background() + ns := "ns" + mockGetter := mockblockstorage.NewGetter() + tp := param.TemplateParams{ + Profile: ¶m.Profile{ + Location: crv1alpha1.Location{ + Type: crv1alpha1.LocationTypeS3Compliant, + S3Compliant: &crv1alpha1.S3CompliantLocation{ + Region: "us-west-2"}, + }, + Credential: param.Credential{ + Type: param.CredentialTypeKeyPair, + KeyPair: ¶m.KeyPair{ + ID: "foo", + Secret: "bar", + }, + }, + }, + } + cli := fake.NewSimpleClientset( + &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test-1", + Namespace: ns, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "pv-test-1", + }, + }, + &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv-test-1", + Labels: map[string]string{ + kube.PVZoneLabelName: "us-west-2a", + }, + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: *k8sresource.NewQuantity(1, k8sresource.BinarySI), + }, + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "vol-abc123", + }, + }, + }, + }, + &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test-2", + Namespace: ns, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "pv-test-2", + }, + }, + &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv-test-2", + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: *k8sresource.NewQuantity(1, k8sresource.BinarySI), + }, + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "vol-abc123", + }, + }, + }, + }, + &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc-test-3", + Namespace: ns, + }, + Spec: v1.PersistentVolumeClaimSpec{ + VolumeName: "pv-test-3", + }, + }, + &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pv-test-3", + Labels: map[string]string{ + kube.PVZoneLabelName: "us-west-2a", + }, + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: *k8sresource.NewQuantity(1, k8sresource.BinarySI), + }, + }, + }, + ) + _, err := cli.Core().PersistentVolumeClaims(ns).Get("pvc-test-1", metav1.GetOptions{}) + c.Assert(err, IsNil) + _, err = cli.Core().PersistentVolumes().Get("pv-test-1", metav1.GetOptions{}) + c.Assert(err, IsNil) + + for _, tc := range []struct { + pvc string + wantVolumeID string + wantType blockstorage.Type + wantVolZone string + wantPVC string + wantSize int64 + wantRegion string + check Checker + }{ + { + pvc: "pvc-test-1", + wantVolumeID: "vol-abc123", + wantType: blockstorage.TypeEBS, + wantVolZone: "us-west-2a", + wantPVC: "pvc-test-1", + wantSize: int64(1), + wantRegion: "us-west-2", + check: IsNil, + }, + { + pvc: "pvc-test-2", + check: NotNil, + }, + { + pvc: "pvc-test-3", + check: NotNil, + }, + } { + volInfo, err := getPVCInfo(ctx, cli, ns, tc.pvc, tp, mockGetter) + c.Assert(err, tc.check) + c.Assert(volInfo, Not(Equals), tc.check) + if err != nil { + continue + } + c.Assert(volInfo.volumeID, Equals, tc.wantVolumeID) + c.Assert(volInfo.sType, Equals, tc.wantType) + c.Assert(volInfo.volZone, Equals, tc.wantVolZone) + c.Assert(volInfo.pvc, Equals, tc.wantPVC) + c.Assert(volInfo.size, Equals, tc.wantSize) + c.Assert(volInfo.region, Equals, tc.wantRegion) + } +} diff --git a/pkg/function/delete_volume_snapshot.go b/pkg/function/delete_volume_snapshot.go index b3b43451ad..2caf64e7fd 100644 --- a/pkg/function/delete_volume_snapshot.go +++ b/pkg/function/delete_volume_snapshot.go @@ -2,11 +2,17 @@ package function import ( "context" + "encoding/json" + "strings" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" ) @@ -22,6 +28,7 @@ var ( const ( DeleteVolumeSnapshotNamespaceArg = "namespace" DeleteVolumeSnapshotManifestArg = "snapshots" + SnapshotDoesNotExistError = "does not exist" ) type deleteVolumeSnapshotFunc struct{} @@ -30,8 +37,44 @@ func (*deleteVolumeSnapshotFunc) Name() string { return "DeleteVolumeSnapshot" } -func deleteVolumeSnapshot(ctx context.Context, cli kubernetes.Interface, namespace, snapshotinfo string, profile *param.Profile) error { - return nil +func deleteVolumeSnapshot(ctx context.Context, cli kubernetes.Interface, namespace, snapshotinfo string, profile *param.Profile, getter getter.Getter) (map[string]blockstorage.Provider, error) { + PVCData := []VolumeSnapshotInfo{} + err := json.Unmarshal([]byte(snapshotinfo), &PVCData) + if err != nil { + return nil, errors.Wrapf(err, "Could not decode JSON data") + } + // providerList required for unit testing + providerList := make(map[string]blockstorage.Provider) + for _, pvcInfo := range PVCData { + config := make(map[string]string) + switch pvcInfo.Type { + case blockstorage.TypeEBS: + if err = ValidateProfile(profile); err != nil { + return nil, errors.Wrap(err, "Profile validation failed") + } + config[awsebs.ConfigRegion] = pvcInfo.Region + config[awsebs.AccessKeyID] = profile.Credential.KeyPair.ID + config[awsebs.SecretAccessKey] = profile.Credential.KeyPair.Secret + } + provider, err := getter.Get(pvcInfo.Type, config) + if err != nil { + return nil, errors.Wrapf(err, "Could not get storage provider") + } + snapshot, err := provider.SnapshotGet(ctx, pvcInfo.SnapshotID) + if err != nil { + if strings.Contains(err.Error(), SnapshotDoesNotExistError) { + log.Debugf("Snapshot %s already deleted", pvcInfo.SnapshotID) + } else { + return nil, errors.Wrapf(err, "Failed to get Snapshot from Provider") + } + } + if err = provider.SnapshotDelete(ctx, snapshot); err != nil { + return nil, err + } + log.Infof("Successfully deleted snapshot %s", pvcInfo.SnapshotID) + providerList[pvcInfo.PVCName] = provider + } + return providerList, nil } func (kef *deleteVolumeSnapshotFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { @@ -46,7 +89,8 @@ func (kef *deleteVolumeSnapshotFunc) Exec(ctx context.Context, tp param.Template if err = Arg(args, DeleteVolumeSnapshotManifestArg, &snapshotinfo); err != nil { return nil, err } - return nil, deleteVolumeSnapshot(ctx, cli, namespace, snapshotinfo, tp.Profile) + _, err = deleteVolumeSnapshot(ctx, cli, namespace, snapshotinfo, tp.Profile, getter.New()) + return nil, err } func (*deleteVolumeSnapshotFunc) RequiredArgs() []string { diff --git a/pkg/function/delete_volume_snapshot_test.go b/pkg/function/delete_volume_snapshot_test.go new file mode 100644 index 0000000000..ab4e69fb8b --- /dev/null +++ b/pkg/function/delete_volume_snapshot_test.go @@ -0,0 +1,81 @@ +package function + +import ( + "context" + "encoding/json" + + . "gopkg.in/check.v1" + "k8s.io/client-go/kubernetes/fake" + + crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/param" + "github.com/kanisterio/kanister/pkg/testutil/mockblockstorage" +) + +type DeleteVolumeSnapshotTestSuite struct{} + +var _ = Suite(&DeleteVolumeSnapshotTestSuite{}) + +func (s *DeleteVolumeSnapshotTestSuite) TestDeleteVolumeSnapshot(c *C) { + ctx := context.Background() + ns := "ns" + mockGetter := mockblockstorage.NewGetter() + profile := ¶m.Profile{ + Location: crv1alpha1.Location{ + Type: crv1alpha1.LocationTypeS3Compliant, + S3Compliant: &crv1alpha1.S3CompliantLocation{ + Region: "us-west-2"}, + }, + Credential: param.Credential{ + Type: param.CredentialTypeKeyPair, + KeyPair: ¶m.KeyPair{ + ID: "foo", + Secret: "bar", + }, + }, + } + cli := fake.NewSimpleClientset() + + tags := []*blockstorage.KeyValue{ + {Key: "testkey", Value: "testval"}, + } + volInfo1 := VolumeSnapshotInfo{SnapshotID: "snap-1", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-1", Az: "us-west-2a", Tags: tags, VolumeType: "ssd"} + volInfo2 := VolumeSnapshotInfo{SnapshotID: "snap-2", Type: blockstorage.TypeEBS, Region: "us-west-2", PVCName: "pvc-2", Az: "us-west-2a", Tags: tags, VolumeType: "ssd"} + var PVCData1 []VolumeSnapshotInfo + PVCData1 = append(PVCData1, volInfo1) + PVCData1 = append(PVCData1, volInfo2) + info, err := json.Marshal(PVCData1) + c.Assert(err, IsNil) + snapinfo := string(info) + for _, tc := range []struct { + snapshotinfo string + check Checker + }{ + { + snapshotinfo: snapinfo, + check: IsNil, + }, + } { + providerList, err := deleteVolumeSnapshot(ctx, cli, ns, tc.snapshotinfo, profile, mockGetter) + c.Assert(providerList, Not(Equals), tc.check) + c.Assert(err, tc.check) + if err != nil { + continue + } + c.Assert(len(providerList) == 2, Equals, true) + provider, ok := providerList["pvc-1"] + c.Assert(ok, Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).SnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-1", provider.(*mockblockstorage.Provider).SnapIDList), Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).DeletedSnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-1", provider.(*mockblockstorage.Provider).DeletedSnapIDList), Equals, true) + + provider, ok = providerList["pvc-2"] + c.Assert(ok, Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).SnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-2", provider.(*mockblockstorage.Provider).SnapIDList), Equals, true) + c.Assert(len(provider.(*mockblockstorage.Provider).DeletedSnapIDList) == 1, Equals, true) + c.Assert(mockblockstorage.CheckID("snap-2", provider.(*mockblockstorage.Provider).DeletedSnapIDList), Equals, true) + } +} diff --git a/pkg/function/volume_snapshot_test.go b/pkg/function/e2e_volume_snapshot_test.go similarity index 74% rename from pkg/function/volume_snapshot_test.go rename to pkg/function/e2e_volume_snapshot_test.go index 119cd1588e..437170a8d1 100644 --- a/pkg/function/volume_snapshot_test.go +++ b/pkg/function/e2e_volume_snapshot_test.go @@ -2,6 +2,7 @@ package function import ( "context" + "os" "strings" . "gopkg.in/check.v1" @@ -13,11 +14,14 @@ import ( kanister "github.com/kanisterio/kanister/pkg" crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1" + "github.com/kanisterio/kanister/pkg/blockstorage/awsebs" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" "github.com/kanisterio/kanister/pkg/client/clientset/versioned" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" "github.com/kanisterio/kanister/pkg/resource" "github.com/kanisterio/kanister/pkg/testutil" + "github.com/kanisterio/kanister/pkg/testutil/mockblockstorage" ) const ( @@ -25,12 +29,15 @@ const ( manifestKey = "manifest" backupInfoKey = "backupInfo" skipTestErrorMsg = "Storage type not supported" + AWSRegion = "AWS_REGION" ) type VolumeSnapshotTestSuite struct { - cli kubernetes.Interface - crCli versioned.Interface - namespace string + cli kubernetes.Interface + crCli versioned.Interface + namespace string + mockGetter getter.Getter + tp *param.TemplateParams } var _ = Suite(&VolumeSnapshotTestSuite{}) @@ -56,13 +63,78 @@ func (s *VolumeSnapshotTestSuite) SetUpTest(c *C) { c.Assert(err, IsNil) s.namespace = cns.GetName() - sec := testutil.NewTestProfileSecret() + sec := NewTestProfileSecret() sec, err = s.cli.Core().Secrets(s.namespace).Create(sec) c.Assert(err, IsNil) - p := testutil.NewTestProfile(s.namespace, sec.GetName()) + p := NewTestProfile(s.namespace, sec.GetName()) _, err = s.crCli.CrV1alpha1().Profiles(s.namespace).Create(p) c.Assert(err, IsNil) + + s.mockGetter = mockblockstorage.NewGetter() + + ctx := context.Background() + ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(newStatefulSet(s.namespace)) + c.Assert(err, IsNil) + err = kube.WaitOnStatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName()) + c.Assert(err, IsNil) + + as := crv1alpha1.ActionSpec{ + Object: crv1alpha1.ObjectReference{ + Kind: param.StatefulSetKind, + Name: ss.GetName(), + Namespace: s.namespace, + }, + Profile: &crv1alpha1.ObjectReference{ + Name: testutil.TestProfileName, + Namespace: s.namespace, + }, + } + + tp, err := param.New(ctx, s.cli, s.crCli, as) + c.Assert(err, IsNil) + s.tp = tp + +} + +// NewTestProfileSecret function returns a pointer to a new Secret test object. +func NewTestProfileSecret() *v1.Secret { + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-secret-", + }, + StringData: map[string]string{ + "id": os.Getenv(awsebs.AccessKeyID), + "secret": os.Getenv(awsebs.SecretAccessKey), + }, + } +} + +// NewTestProfile function returns a pointer to a new Profile test object that +// passes validation. +func NewTestProfile(namespace string, secretName string) *crv1alpha1.Profile { + return &crv1alpha1.Profile{ + ObjectMeta: metav1.ObjectMeta{ + Name: testutil.TestProfileName, + Namespace: namespace, + }, + Location: crv1alpha1.Location{ + Type: crv1alpha1.LocationTypeS3Compliant, + S3Compliant: &crv1alpha1.S3CompliantLocation{ + Region: os.Getenv(AWSRegion)}, + }, + Credential: crv1alpha1.Credential{ + Type: crv1alpha1.CredentialTypeKeyPair, + KeyPair: &crv1alpha1.KeyPair{ + Secret: crv1alpha1.ObjectReference{ + Name: secretName, + Namespace: namespace, + }, + IDField: "id", + SecretField: "secret", + }, + }, + } } func (s *VolumeSnapshotTestSuite) TearDownTest(c *C) { @@ -200,35 +272,23 @@ func newStatefulSet(namespace string) *appsv1.StatefulSet { } func (s *VolumeSnapshotTestSuite) TestVolumeSnapshot(c *C) { - ctx := context.Background() - - ss, err := s.cli.AppsV1().StatefulSets(s.namespace).Create(newStatefulSet(s.namespace)) - c.Assert(err, IsNil) - err = kube.WaitOnStatefulSetReady(ctx, s.cli, ss.GetNamespace(), ss.GetName()) - c.Assert(err, IsNil) - - as := crv1alpha1.ActionSpec{ - Object: crv1alpha1.ObjectReference{ - Kind: param.StatefulSetKind, - Name: ss.GetName(), - Namespace: s.namespace, - }, - Profile: &crv1alpha1.ObjectReference{ - Name: testutil.TestProfileName, - Namespace: s.namespace, - }, + if len(os.Getenv(AWSRegion)) == 0 { + c.Skip("Skipping the test since env variable AWS_REGION is not set") } - - tp, err := param.New(ctx, s.cli, s.crCli, as) - c.Assert(err, IsNil) - + if len(os.Getenv(awsebs.AccessKeyID)) == 0 { + c.Skip("Skipping the test since env variable AWS_ACCESS_KEY_ID is not set") + } + if len(os.Getenv(awsebs.SecretAccessKey)) == 0 { + c.Skip("Skipping the test since env variable AWS_SECRET_ACCESS_KEY is not set") + } + ctx := context.Background() actions := []string{"backup", "restore", "delete"} bp := newVolumeSnapshotBlueprint() for _, action := range actions { - phases, err := kanister.GetPhases(*bp, action, *tp) + phases, err := kanister.GetPhases(*bp, action, *s.tp) c.Assert(err, IsNil) for _, p := range phases { - output, err := p.Exec(ctx, *bp, action, *tp) + output, err := p.Exec(ctx, *bp, action, *s.tp) if err != nil && strings.Contains(err.Error(), skipTestErrorMsg) { c.Skip("Skipping the test since storage type not supported") } @@ -241,8 +301,8 @@ func (s *VolumeSnapshotTestSuite) TestVolumeSnapshot(c *C) { artifact := crv1alpha1.Artifact{ KeyValue: keyval, } - tp.ArtifactsIn = make(map[string]crv1alpha1.Artifact) - tp.ArtifactsIn[backupInfoKey] = artifact + s.tp.ArtifactsIn = make(map[string]crv1alpha1.Artifact) + s.tp.ArtifactsIn[backupInfoKey] = artifact } } } diff --git a/pkg/testutil/mockblockstorage/mockblockstorage.go b/pkg/testutil/mockblockstorage/mockblockstorage.go new file mode 100644 index 0000000000..016bd61f28 --- /dev/null +++ b/pkg/testutil/mockblockstorage/mockblockstorage.go @@ -0,0 +1,234 @@ +package mockblockstorage + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" + + "github.com/kanisterio/kanister/pkg/blockstorage" + "github.com/kanisterio/kanister/pkg/blockstorage/getter" +) + +var _ blockstorage.Provider = (*Provider)(nil) + +// Provider implements a mock storage provider +type Provider struct { + storageType blockstorage.Type + volume blockstorage.Volume + snapshot blockstorage.Snapshot + failPoints map[string]error + SnapIDList []string + DeletedSnapIDList []string + VolIDList []string +} + +var _ getter.Getter = (*mockGetter)(nil) + +type mockGetter struct{} + +// NewGetter retuns a new mockGetter +func NewGetter() getter.Getter { + return &mockGetter{} +} + +// Get returns a provider for the requested storage type in the specified region +func (*mockGetter) Get(storageType blockstorage.Type, config map[string]string) (blockstorage.Provider, error) { + // TODO(tom): we might want to honor these settings. + switch storageType { + case blockstorage.TypeEBS: + fallthrough + case blockstorage.TypeGPD: + return Get(storageType), nil + default: + return nil, errors.New("Get failed") + } +} + +// Get returns a mock storage provider +func Get(storageType blockstorage.Type) *Provider { + volume := blockstorage.Volume{ + Type: storageType, + ID: fmt.Sprintf("vol-%s", uuid.NewV1().String()), + Az: "AZ", + Encrypted: false, + VolumeType: "ssd", + Size: 1, + Iops: 0, + Tags: []*blockstorage.KeyValue{ + {Key: "kanister.io/jobid", Value: "unittest"}, + {Key: "kanister.io/volid", Value: "vol"}, + }, + CreationTime: blockstorage.TimeStamp(time.Time{}), + } + snapVol := volume + snapshot := blockstorage.Snapshot{ + Type: storageType, + ID: fmt.Sprintf("snap-%s", uuid.NewV1().String()), + Size: 1, + Tags: []*blockstorage.KeyValue{ + {Key: "kanister.io/jobid", Value: "unittest"}, + {Key: "kanister.io/snapid", Value: "snap"}, + }, + Volume: &snapVol, + CreationTime: blockstorage.TimeStamp(time.Time{}), + } + return &Provider{ + storageType: storageType, + volume: volume, + snapshot: snapshot, + failPoints: make(map[string]error), + SnapIDList: make([]string, 0), + DeletedSnapIDList: make([]string, 0), + VolIDList: make([]string, 0), + } +} + +// Type mock +func (p *Provider) Type() blockstorage.Type { + return p.storageType +} + +// VolumeCreate mock +func (p *Provider) VolumeCreate(context.Context, blockstorage.Volume) (*blockstorage.Volume, error) { + return p.MockVolume(), nil +} + +// VolumeCreateFromSnapshot mock +func (p *Provider) VolumeCreateFromSnapshot(ctx context.Context, snapshot blockstorage.Snapshot, tags map[string]string) (*blockstorage.Volume, error) { + vol := blockstorage.Volume{ + Type: snapshot.Type, + ID: fmt.Sprintf("vol-%s", uuid.NewV1().String()), + Az: "AZ", + Encrypted: false, + VolumeType: "ssd", + Size: 1, + Iops: 0, + Tags: []*blockstorage.KeyValue{ + {Key: "kanister.io/jobid", Value: "unittest"}, + {Key: "kanister.io/volid", Value: "vol"}, + }, + CreationTime: blockstorage.TimeStamp(time.Time{}), + } + p.AddVolID(vol.ID) + return &vol, nil +} + +// VolumeDelete mock +func (p *Provider) VolumeDelete(context.Context, *blockstorage.Volume) error { + return nil +} + +// VolumeGet mock +func (p *Provider) VolumeGet(ctx context.Context, id string, zone string) (*blockstorage.Volume, error) { + if err := p.checkFailPoint(id); err != nil { + return nil, err + } + return p.MockVolume(), nil +} + +// SnapshotCopy mock +func (p *Provider) SnapshotCopy(ctx context.Context, from, to blockstorage.Snapshot) (*blockstorage.Snapshot, error) { + return p.MockSnapshot(), nil +} + +// SnapshotCreate mock +func (p *Provider) SnapshotCreate(ctx context.Context, volume blockstorage.Volume, tags map[string]string) (*blockstorage.Snapshot, error) { + return p.MockSnapshot(), nil +} + +// SnapshotDelete mock +func (p *Provider) SnapshotDelete(ctx context.Context, snapshot *blockstorage.Snapshot) error { + p.AddDeletedSnapID(snapshot.ID) + return nil +} + +// SnapshotGet mock +func (p *Provider) SnapshotGet(ctx context.Context, id string) (*blockstorage.Snapshot, error) { + ret := p.snapshot + ret.ID = id + p.AddSnapID(id) + return &ret, nil +} + +// SetTags mock +func (p *Provider) SetTags(ctx context.Context, resource interface{}, tags map[string]string) error { + switch res := resource.(type) { + case *blockstorage.Volume: + return nil + case *blockstorage.Snapshot: + return nil + default: + return errors.Errorf("Unsupported resource type %v(%T)", res, res) + } +} + +// MockVolume returns the mock volume used in the provider +func (p *Provider) MockVolume() *blockstorage.Volume { + ret := p.volume + return &ret +} + +// MockSnapshot returns the mock snapshot used in the provider +func (p *Provider) MockSnapshot() *blockstorage.Snapshot { + ret := p.snapshot + return &ret +} + +// VolumesList mock +func (p *Provider) VolumesList(ctx context.Context, tags map[string]string) ([]*blockstorage.Volume, error) { + return []*blockstorage.Volume{p.MockVolume(), p.MockVolume()}, nil +} + +// SnapshotsList mock +func (p *Provider) SnapshotsList(ctx context.Context, tags map[string]string) ([]*blockstorage.Snapshot, error) { + return []*blockstorage.Snapshot{p.MockSnapshot(), p.MockSnapshot()}, nil +} + +// InjectFailure adds an id that provider operations should be failed on +func (p *Provider) InjectFailure(id string, err error) { + p.failPoints[id] = err +} + +func (p *Provider) checkFailPoint(id string) error { + if err, ok := p.failPoints[id]; ok { + return err + } + return nil +} + +// AddSnapID adds id to the list of snapshot id's +func (p *Provider) AddSnapID(id string) { + if present := CheckID(id, p.SnapIDList); !present { + p.SnapIDList = append(p.SnapIDList, id) + } + return +} + +// AddDeletedSnapID adds id to the list of delted snapshot id's +func (p *Provider) AddDeletedSnapID(id string) { + if present := CheckID(id, p.DeletedSnapIDList); !present { + p.DeletedSnapIDList = append(p.DeletedSnapIDList, id) + } + return +} + +// AddVolID adds id to the list of volume id's +func (p *Provider) AddVolID(id string) { + if present := CheckID(id, p.VolIDList); !present { + p.VolIDList = append(p.VolIDList, id) + } + return +} + +// CheckID checks if the id is present in the list +func CheckID(id string, list []string) bool { + for _, i := range list { + if i == id { + return true + } + } + return false +} diff --git a/pkg/testutil/mockblockstorage/mockblockstorage_test.go b/pkg/testutil/mockblockstorage/mockblockstorage_test.go new file mode 100644 index 0000000000..412c7c73af --- /dev/null +++ b/pkg/testutil/mockblockstorage/mockblockstorage_test.go @@ -0,0 +1,20 @@ +package mockblockstorage + +import ( + "testing" + + . "gopkg.in/check.v1" + + "github.com/kanisterio/kanister/pkg/blockstorage" +) + +func Test(t *testing.T) { TestingT(t) } + +type MockSuite struct{} + +var _ = Suite(&MockSuite{}) + +func (s *MockSuite) TestMockStorage(c *C) { + mock := Get(blockstorage.TypeEBS) + c.Assert(mock.Type(), Equals, blockstorage.TypeEBS) +} From 3383bd5590a19d29078c317994aa499afeff5216 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Mon, 26 Nov 2018 12:14:13 -0800 Subject: [PATCH 6/9] Switch Kanister PrepareData to use pod instead of a job (#4411) * Modify PrepareData to use pod - K10-1711 * Update prepareData to create pod * Minor changes * Doc update * Apply Suggestions --- docs/functions.rst | 12 +++++----- pkg/function/data_test.go | 2 +- pkg/function/kube_task.go | 9 +++++-- pkg/function/kube_task_test.go | 4 ++-- pkg/function/prepare_data.go | 43 ++++++++++++++++++++++++---------- pkg/function/restore_data.go | 2 +- pkg/kube/pod.go | 7 ++++-- 7 files changed, 53 insertions(+), 26 deletions(-) diff --git a/docs/functions.rst b/docs/functions.rst index 0e37bb1d95..05773b051b 100644 --- a/docs/functions.rst +++ b/docs/functions.rst @@ -197,7 +197,7 @@ Example of scaling up: PrepareData ----------- -This function allows running a Kubernetes Job that will mount one or more PVCs +This function allows running a new Pod that will mount one or more PVCs and execute a command or script that manipulates the data on the PVCs. The function can be useful when it is necessary to perform operations on the @@ -302,13 +302,13 @@ RestoreData ----------- This function restores data backed up by the BackupData function. -It creates a Kubernetes Job that mounts the PVCs referenced -by the specified Pod and restores data to the specified path. +It creates a new Pod that mounts the PVCs referenced by the specified Pod +and restores data to the specified path. .. note:: It is extremely important that, the PVCs are not be currently - in use by an active application container, as the Kubernetes Job - requires to mount the PVCs to a new Pod (ensure by using + in use by an active application container, as they are required + to be mounted to the new Pod (ensure by using ScaleWorkload with replicas=0 first). For advanced use cases, it is possible to have concurrent access but the PV needs to have RWX mode enabled and the volume needs to use a @@ -406,7 +406,7 @@ If the ActionSet `Object` is a PersistentVolumeClaim: DeleteData ---------- -This function uses a Kubernetes Job to delete the specified artifact +This function uses a new Pod to delete the specified artifact from an S3 compatible object store. .. csv-table:: diff --git a/pkg/function/data_test.go b/pkg/function/data_test.go index bb0fa7a697..af30d6e360 100644 --- a/pkg/function/data_test.go +++ b/pkg/function/data_test.go @@ -82,7 +82,7 @@ func newRestoreDataBlueprint(pvc string) *crv1alpha1.Blueprint { RestoreDataNamespaceArg: "{{ .StatefulSet.Namespace }}", RestoreDataImageArg: "kanisterio/kanister-tools:0.13.0", RestoreDataBackupArtifactPrefixArg: "{{ .Profile.Location.S3Compliant.Bucket }}/{{ .Profile.Location.S3Compliant.Prefix }}", - RestoreDataRestorePathArg: "/", + RestoreDataRestorePathArg: "/mnt/data", RestoreDataBackupIdentifierArg: "{{ .Time }}", RestoreDataEncryptionKeyArg: "{{ .Secrets.backupKey.Data.password | toString }}", RestoreDataVolsArg: map[string]string{ diff --git a/pkg/function/kube_task.go b/pkg/function/kube_task.go index 4e6971c670..0918997307 100644 --- a/pkg/function/kube_task.go +++ b/pkg/function/kube_task.go @@ -10,6 +10,7 @@ import ( "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) const ( @@ -64,7 +65,11 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) (m if err != nil { return nil, errors.Wrapf(err, "Failed to create pod for KubeTask") } - defer kube.DeletePod(context.Background(), clientset, pod) + defer func() { + if err := kube.DeletePod(context.Background(), clientset, pod); err != nil { + log.Error("Failed to delete pod ", err.Error()) + } + }() // Wait for pod completion if err := kube.WaitForPodCompletion(ctx, clientset, pod.Namespace, pod.Name); err != nil { @@ -78,7 +83,7 @@ func kubeTask(ctx context.Context, namespace, image string, command []string) (m format.Log(pod.Name, pod.Spec.Containers[0].Name, logs) out, err := parseLogAndCreateOutput(logs) - return out, errors.Wrap(err, "Failed to generate output") + return out, errors.Wrap(err, "Failed to parse phase output") } func (ktf *kubeTaskFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { diff --git a/pkg/function/kube_task_test.go b/pkg/function/kube_task_test.go index 2da131afeb..9859e940d8 100644 --- a/pkg/function/kube_task_test.go +++ b/pkg/function/kube_task_test.go @@ -30,7 +30,7 @@ func (s *KubeTaskSuite) SetUpSuite(c *C) { ns := &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: "kanisterdeletetest-", + GenerateName: "kanisterkubetasktest-", }, } cns, err := s.cli.Core().Namespaces().Create(ns) @@ -85,7 +85,7 @@ func newTaskBlueprint(namespace string) *crv1alpha1.Blueprint { } func (s *KubeTaskSuite) TestKubeTask(c *C) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() tp := param.TemplateParams{ StatefulSet: ¶m.StatefulSetParams{ diff --git a/pkg/function/prepare_data.go b/pkg/function/prepare_data.go index e950381a6c..31b4f4cac6 100644 --- a/pkg/function/prepare_data.go +++ b/pkg/function/prepare_data.go @@ -5,10 +5,12 @@ import ( "fmt" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" kanister "github.com/kanisterio/kanister/pkg" + "github.com/kanisterio/kanister/pkg/format" "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/param" ) @@ -57,26 +59,43 @@ func getVolumes(tp param.TemplateParams) (map[string]string, error) { return vols, nil } -func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, serviceAccount, image string, vols map[string]string, command ...string) error { +func prepareData(ctx context.Context, cli kubernetes.Interface, namespace, serviceAccount, image string, vols map[string]string, command ...string) (map[string]interface{}, error) { // Validate volumes for pvc := range vols { if _, err := cli.CoreV1().PersistentVolumeClaims(namespace).Get(pvc, metav1.GetOptions{}); err != nil { - return errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc) + return nil, errors.Wrapf(err, "Failed to retrieve PVC. Namespace %s, Name %s", namespace, pvc) } } - jobName := generateJobName(prepareDataJobPrefix) - job, err := kube.NewJob(cli, jobName, namespace, serviceAccount, image, vols, command...) + pod, err := kube.CreatePod(ctx, cli, &kube.PodOptions{ + Namespace: namespace, + GenerateName: prepareDataJobPrefix, + Image: image, + Command: command, + Volumes: vols, + ServiceAccountName: serviceAccount, + }) if err != nil { - return errors.Wrap(err, "Failed to create prepare data job") + return nil, errors.Wrapf(err, "Failed to create pod to run prepare data job") } - if err := job.Create(); err != nil { - return errors.Wrapf(err, "Failed to create job %s in Kubernetes", jobName) + defer func() { + if err := kube.DeletePod(context.Background(), cli, pod); err != nil { + log.Error("Failed to delete pod ", err.Error()) + } + }() + + // Wait for pod completion + if err := kube.WaitForPodCompletion(ctx, cli, pod.Namespace, pod.Name); err != nil { + return nil, errors.Wrapf(err, "Failed while waiting for Pod %s to complete", pod.Name) } - defer job.Delete() - if err := job.WaitForCompletion(ctx); err != nil { - return errors.Wrapf(err, "Failed while waiting for job %s to complete", jobName) + // Fetch logs from the pod + logs, err := kube.GetPodLogs(ctx, cli, pod.Namespace, pod.Name) + if err != nil { + return nil, errors.Wrapf(err, "Failed to fetch logs from the pod") } - return nil + format.Log(pod.Name, pod.Spec.Containers[0].Name, logs) + + out, err := parseLogAndCreateOutput(logs) + return out, errors.Wrap(err, "Failed to parse phase output") } func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) { @@ -108,7 +127,7 @@ func (*prepareDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args return nil, err } } - return nil, prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...) + return prepareData(ctx, cli, namespace, serviceAccount, image, vols, command...) } func (*prepareDataFunc) RequiredArgs() []string { diff --git a/pkg/function/restore_data.go b/pkg/function/restore_data.go index a4b48e31a7..7fa514e4c4 100644 --- a/pkg/function/restore_data.go +++ b/pkg/function/restore_data.go @@ -115,7 +115,7 @@ func (*restoreDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args if err != nil { return nil, errors.Wrapf(err, "Failed to create Kubernetes client") } - return nil, prepareData(ctx, cli, namespace, "", image, vols, cmd...) + return prepareData(ctx, cli, namespace, "", image, vols, cmd...) } func (*restoreDataFunc) RequiredArgs() []string { diff --git a/pkg/kube/pod.go b/pkg/kube/pod.go index 09aa583c19..00291b14ce 100644 --- a/pkg/kube/pod.go +++ b/pkg/kube/pod.go @@ -100,10 +100,13 @@ func WaitForPodCompletion(ctx context.Context, cli kubernetes.Interface, namespa if err != nil { return true, err } - return (p.Status.Phase == v1.PodSucceeded) || (p.Status.Phase == v1.PodFailed), nil + if p.Status.Phase == v1.PodFailed { + return false, errors.Errorf("Pod %s failed", name) + } + return (p.Status.Phase == v1.PodSucceeded), nil }) if err == nil { return nil } - return errors.Wrapf(err, "Pod did not transition into a terminal state. Namespace:%s, Name:%s", namespace, name) + return errors.Wrap(err, "Pod did not transition into complete state") } From c10c2f327e63b51cc551b93c8ad34aa8f9d4e8c8 Mon Sep 17 00:00:00 2001 From: Pavan Navarathna Devaraj Date: Tue, 27 Nov 2018 20:03:52 -0800 Subject: [PATCH 7/9] Fix time-log example blueprint (#4467) --- examples/time-log/blueprint.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/time-log/blueprint.yaml b/examples/time-log/blueprint.yaml index 25731c897f..d71e55dd2c 100644 --- a/examples/time-log/blueprint.yaml +++ b/examples/time-log/blueprint.yaml @@ -22,7 +22,7 @@ actions: container: test-container includePath: /var/log backupArtifactPrefix: "{{ .Profile.Location.S3Compliant.Bucket }}/time-log" - backupIdentifier: "{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02" }}" + backupIdentifier: '{{ toDate "2006-01-02T15:04:05.999999999Z07:00" .Time | date "2006-01-02" }}' restore: type: Deployment inputArtifactNames: From 1e5ac5bcb86292146d45c4858d4c652907be9846 Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Wed, 28 Nov 2018 13:41:13 -0800 Subject: [PATCH 8/9] Trivial: error msg modified (#4469) --- pkg/function/create_volume_snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/function/create_volume_snapshot.go b/pkg/function/create_volume_snapshot.go index d463785749..5528d19539 100644 --- a/pkg/function/create_volume_snapshot.go +++ b/pkg/function/create_volume_snapshot.go @@ -75,7 +75,7 @@ func ValidateProfile(profile *param.Profile) error { return errors.New("Credential type not supported") } if len(profile.Credential.KeyPair.ID) == 0 { - return errors.New("Region is not set") + return errors.New("AWS access key id is not set") } if len(profile.Credential.KeyPair.Secret) == 0 { return errors.New("Secret access key is not set") From 660cf95aeefceecda19a1b50555e277c15b548bd Mon Sep 17 00:00:00 2001 From: Supriya Kharade Date: Wed, 28 Nov 2018 14:36:02 -0800 Subject: [PATCH 9/9] Skip blockstorage_test if env not set (#4472) --- pkg/blockstorage/blockstorage_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/blockstorage/blockstorage_test.go b/pkg/blockstorage/blockstorage_test.go index a8bcaede27..8d9dbf8dab 100644 --- a/pkg/blockstorage/blockstorage_test.go +++ b/pkg/blockstorage/blockstorage_test.go @@ -37,10 +37,14 @@ func (s *BlockStorageProviderSuite) SetUpSuite(c *C) { var err error if s.storageType == blockstorage.TypeEBS { config[awsebs.ConfigRegion] = s.storageRegion - accessKey := os.Getenv(awsebs.AccessKeyID) - c.Assert(len(accessKey) > 0, Equals, true) - secretAccessKey := os.Getenv(awsebs.SecretAccessKey) - c.Assert(len(secretAccessKey) > 0, Equals, true) + accessKey, ok := os.LookupEnv(awsebs.AccessKeyID) + if !ok { + c.Skip("The necessary env variable AWS_ACCESS_KEY_ID is not set.") + } + secretAccessKey, ok := os.LookupEnv(awsebs.SecretAccessKey) + if !ok { + c.Skip("The necessary env variable AWS_SECRET_ACCESS_KEY is not set.") + } config[awsebs.AccessKeyID] = accessKey config[awsebs.SecretAccessKey] = secretAccessKey }