Skip to content

Commit

Permalink
Pass size of source while calling put for storage bucket from kando (#…
Browse files Browse the repository at this point in the history
…809)

* Pass size of source while calling put for storage bucket

We were passing the object size 0 when we called `.put` on
the bucket of object storage and because of that `stow` was
not able to figure out the source size and eventually multi
part upload was not being triggered.

* Read io.reader to figure out size of content

* Figure out correct size using multipart reader, only in case of azure storeage

* Revert changes from mod files

* Add TestReaderSize test case for 0 size

Signed-off-by: PrasadG193 <[email protected]>

* Add unit tests for multipart upload

Signed-off-by: PrasadG193 <[email protected]>

* Do not return if delete fails

Signed-off-by: PrasadG193 <[email protected]>

* Add comments

Signed-off-by: PrasadG193 <[email protected]>

* Addressed review comments

Signed-off-by: PrasadG193 <[email protected]>

* Reorder test cases

Signed-off-by: PrasadG193 <[email protected]>

Co-authored-by: PrasadG193 <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 11, 2020
1 parent 4fa923d commit 629d229
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 11 deletions.
54 changes: 53 additions & 1 deletion pkg/location/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package location

import (
"bytes"
"context"
"io"
"path/filepath"
Expand All @@ -35,6 +36,11 @@ const (
GoogleProjectId = "GOOGLE_PROJECT_ID"
AzureStorageAccount = "AZURE_ACCOUNT_NAME"
AzureStorageKey = "AZURE_ACCOUNT_KEY"

// buffSize is the maximum size of an object that can be Put to Azure container in a single request
// https://github.com/kastenhq/stow/blob/v0.2.6-kasten/azure/container.go#L14
buffSize = 256 * 1024 * 1024
buffSizeLimit = 1 * 1024 * 1024 * 1024
)

// Write pipes data from `in` into the location specified by `profile` and `suffix`.
Expand Down Expand Up @@ -93,16 +99,62 @@ func readData(ctx context.Context, pType objectstore.ProviderType, profile param
}

func writeData(ctx context.Context, pType objectstore.ProviderType, profile param.Profile, in io.Reader, path string) error {
var input io.Reader
var size int64
bucket, err := getBucket(ctx, pType, profile)
if err != nil {
return err
}
if err := bucket.Put(ctx, path, in, 0, nil); err != nil {

var r io.Reader
var n int64
if pType == objectstore.ProviderTypeAzure {
// Switch to multipart upload based on data size
r, n, err = readerSize(in, buffSize)
if err != nil {
return err
}

input = r
size = int64(n)
} else {
input = in
size = 0
}

if err := bucket.Put(ctx, path, input, size, nil); err != nil {
return errors.Wrapf(err, "failed to write contents to bucket '%s'", profile.Location.Bucket)
}

return nil
}

// readerSize checks if data size is greater than buffSize i.e the max size of an object that can be Put to Azure container in a single request
// If size >= buffSize, return buffer size to enable multipart upload, otherwise return 0 buffersize.
func readerSize(in io.Reader, buffSize int64) (io.Reader, int64, error) {
var n int64
var err error
var r io.Reader

// Read first buffSize bytes into buffer
buf := make([]byte, buffSize)
m, err := in.Read(buf)
if err != nil && err != io.EOF {
return nil, 0, err
}

// If first buffSize bytes are read successfully, that means the data size >= buffSize
if int64(m) == buffSize {
r = io.MultiReader(bytes.NewReader(buf), in)
n = buffSizeLimit
} else {
buf = buf[:m]
r = bytes.NewReader(buf)
}

return r, n, nil
}

func deleteData(ctx context.Context, pType objectstore.ProviderType, profile param.Profile, path string) error {
bucket, err := getBucket(ctx, pType, profile)
if err != nil {
Expand Down
105 changes: 95 additions & 10 deletions pkg/location/location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package location
import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"os"
"testing"
"time"

Expand All @@ -34,14 +37,15 @@ import (
func Test(t *testing.T) { TestingT(t) }

type LocationSuite struct {
osType objectstore.ProviderType
provider objectstore.Provider
rand *rand.Rand
root objectstore.Bucket // root of the default test bucket
suiteDirPrefix string // directory name prefix for all tests in this suite
testpath string
region string // bucket region
profile param.Profile
osType objectstore.ProviderType
provider objectstore.Provider
rand *rand.Rand
root objectstore.Bucket // root of the default test bucket
suiteDirPrefix string // directory name prefix for all tests in this suite
testpath string
testMultipartPath string
region string // bucket region
profile param.Profile
}

const (
Expand Down Expand Up @@ -98,6 +102,7 @@ func (s *LocationSuite) SetUpSuite(c *C) {
c.Assert(s.root, NotNil)
s.suiteDirPrefix = time.Now().UTC().Format(time.RFC3339Nano)
s.testpath = s.suiteDirPrefix + "/testlocation.txt"
s.testMultipartPath = s.suiteDirPrefix + "/testchunk.txt"
}

func (s *LocationSuite) TearDownTest(c *C) {
Expand All @@ -107,18 +112,98 @@ func (s *LocationSuite) TearDownTest(c *C) {
err := s.root.Delete(ctx, s.testpath)
if err != nil {
c.Log("Cannot cleanup test directory: ", s.testpath)
return
}
}
if s.testMultipartPath != "" {
c.Assert(s.root, NotNil)
ctx := context.Background()
err := s.root.Delete(ctx, s.testMultipartPath)
if err != nil {
c.Log("Cannot cleanup test directory: ", s.testMultipartPath)
}
}
}

func (s *LocationSuite) TestWriteAndReadData(c *C) {
ctx := context.Background()
teststring := "test-content"
teststring := "test-content-check"
err := writeData(ctx, s.osType, s.profile, bytes.NewBufferString(teststring), s.testpath)
c.Check(err, IsNil)
buf := bytes.NewBuffer(nil)
err = readData(ctx, s.osType, s.profile, buf, s.testpath)
c.Check(err, IsNil)
c.Check(buf.String(), Equals, teststring)
}

func (s *LocationSuite) TestAzMultipartUpload(c *C) {
if s.osType != objectstore.ProviderTypeAzure {
c.Skip(fmt.Sprintf("Not applicable for location type %s", s.osType))
}

// Create dir if not exists
_, err := os.Stat(s.suiteDirPrefix)
if os.IsNotExist(err) {
err := os.MkdirAll(s.suiteDirPrefix, 0755)
c.Check(err, IsNil)
}
// Create test file
f, err := os.Create(s.testMultipartPath)
c.Check(err, IsNil)
defer f.Close()

ctx := context.Background()
for _, fileSize := range []int64{
0, // empty file
100 * 1024 * 1024, // 100M ie < buffSize
buffSize - 1,
buffSize,
buffSize + 1,
300 * 1024 * 1024, // 300M ie > buffSize
} {
_, err := f.Seek(0, io.SeekStart)
c.Assert(err, IsNil)

// Create dump file
err = os.Truncate(s.testMultipartPath, fileSize)
c.Assert(err, IsNil)
err = writeData(ctx, s.osType, s.profile, f, s.testMultipartPath)
c.Check(err, IsNil)
buf := bytes.NewBuffer(nil)
err = readData(ctx, s.osType, s.profile, buf, s.testMultipartPath)
c.Check(err, IsNil)
c.Check(int64(buf.Len()), Equals, fileSize)
}
}

func (s *LocationSuite) TestReaderSize(c *C) {
for _, tc := range []struct {
input string
buffSize int64
expectedSize int64
}{
{
input: "dummy-string-1",
buffSize: 4,
expectedSize: 1073741824, // buffSizeLimit = 1 * 1024 * 1024 * 1024
},
{
input: "dummy-string-1",
buffSize: 14,
expectedSize: 1073741824,
},
{
input: "dummy-string-1",
buffSize: 44,
expectedSize: 0,
},
{
input: "",
buffSize: 4,
expectedSize: 0,
},
} {
_, size, err := readerSize(bytes.NewBufferString(tc.input), tc.buffSize)
c.Assert(err, IsNil)
c.Assert(size, Equals, tc.expectedSize)
}
}

0 comments on commit 629d229

Please sign in to comment.