From e80bc9409743dba646aaf8a514888afe6eb65bcf Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Mon, 21 Oct 2019 10:24:33 -0700 Subject: [PATCH 1/6] Add the ability to only write if a file doesn't exist to gcs and localfs This commit adds the ability to use O_EXCL and GCS's Generation(0) to only write a file if it doesn't exist. This allows consumer to use files as concurrency locks. --- awss3/store.go | 3 ++- azure/store.go | 2 +- google/store.go | 10 ++++++++-- localfs/store.go | 9 +++++++-- sftp/store.go | 2 +- store.go | 6 +++++- testutils/testutils.go | 13 ++++++++----- 7 files changed, 32 insertions(+), 13 deletions(-) diff --git a/awss3/store.go b/awss3/store.go index 3c06202..bce5275 100644 --- a/awss3/store.go +++ b/awss3/store.go @@ -403,6 +403,7 @@ func (f *FS) Move(ctx context.Context, src, des cloudstorage.Object) error { return oh.Delete(ctx) } */ + // NewReader create file reader. func (f *FS) NewReader(o string) (io.ReadCloser, error) { return f.NewReaderWithContext(context.Background(), o) @@ -430,7 +431,7 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC } // NewWriterWithContext create writer with provided context and metadata. -func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string) (io.WriteCloser, error) { +func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { // Create an uploader with the session and default options uploader := s3manager.NewUploader(f.sess) diff --git a/azure/store.go b/azure/store.go index af1be8e..a08ce7a 100644 --- a/azure/store.go +++ b/azure/store.go @@ -379,7 +379,7 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC } // NewWriterWithContext create writer with provided context and metadata. -func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string) (io.WriteCloser, error) { +func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { name = strings.Replace(name, " ", "+", -1) o := &object{name: name, metadata: metadata} rwc := newAzureWriteCloser(ctx, f, o) diff --git a/google/store.go b/google/store.go index 5d46ad2..dd83e07 100644 --- a/google/store.go +++ b/google/store.go @@ -232,8 +232,14 @@ func (g *GcsFS) NewWriter(o string, metadata map[string]string) (io.WriteCloser, } // NewWriterWithContext create writer with provided context and metadata. -func (g *GcsFS) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error) { - wc := g.gcsb().Object(o).NewWriter(ctx) +func (g *GcsFS) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { + obj := g.gcsb().Object(o) + if len(opts) > 0 && opts[0].IfNotExists { + // https://cloud.google.com/storage/docs/json_api/v1/objects/insert + // See ifGenerationMatch "Setting to 0 makes the operation succeed only if there are no live versions of the object" + obj = obj.Generation(0) + } + wc := obj.NewWriter(ctx) if metadata != nil { wc.Metadata = metadata //contenttype is only used for viewing the file in a browser. (i.e. the GCS Object browser). diff --git a/localfs/store.go b/localfs/store.go index 2531c20..c4e0bf7 100644 --- a/localfs/store.go +++ b/localfs/store.go @@ -227,7 +227,7 @@ func (l *LocalStore) NewReaderWithContext(ctx context.Context, o string) (io.Rea func (l *LocalStore) NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) { return l.NewWriterWithContext(context.Background(), o, metadata) } -func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error) { +func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { fo := path.Join(l.storepath, o) @@ -245,7 +245,12 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat return nil, err } - f, err := os.OpenFile(fo, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0665) + flag := os.O_RDWR | os.O_CREATE | os.O_TRUNC + if len(opts) > 0 && opts[0].IfNotExists { + flag = flag | os.O_EXCL + } + f, err := os.OpenFile(fo, flag, 0665) + fmt.Println(fo, err) if err != nil { return nil, err } diff --git a/sftp/store.go b/sftp/store.go index ec6fda3..809b277 100644 --- a/sftp/store.go +++ b/sftp/store.go @@ -553,7 +553,7 @@ func (m *Client) NewWriter(objectName string, metadata map[string]string) (io.Wr } // NewWriterWithContext create writer with provided context and metadata. -func (m *Client) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string) (io.WriteCloser, error) { +func (m *Client) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { name = strings.Replace(name, " ", "+", -1) diff --git a/store.go b/store.go index 02a5ca2..30d3072 100644 --- a/store.go +++ b/store.go @@ -41,6 +41,10 @@ var ( ) type ( + Opts struct { + IfNotExists bool + } + // StoreReader interface to define the Storage Interface abstracting // the GCS, S3, LocalFile, etc interfaces StoreReader interface { @@ -98,7 +102,7 @@ type ( // until Close has been called NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) // NewWriter but with context. - NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error) + NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...Opts) (io.WriteCloser, error) // NewObject creates a new empty object backed by the cloud store // This new object isn't' synced/created in the backing store diff --git a/testutils/testutils.go b/testutils/testutils.go index e839b93..cbce130 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -30,6 +30,7 @@ var ( ) func init() { + verbose = flag.Bool("vv", false, "Verbose Logging?") Setup() } @@ -44,11 +45,6 @@ type TestingT interface { func Setup() { setupOnce.Do(func() { - if flag.CommandLine.Lookup("vv") == nil { - verbose = flag.Bool("vv", false, "Verbose Logging?") - } - - flag.Parse() logger := gou.GetLogger() if logger != nil { // don't re-setup @@ -200,6 +196,7 @@ func BasicRW(t TestingT, store cloudstorage.Store) { obj, err = store.Get(context.Background(), "prefix/test.csv") assert.Equal(t, cloudstorage.ErrObjectNotFound, err) assert.Equal(t, nil, obj) + } func createFile(t TestingT, store cloudstorage.Store, name, data string) cloudstorage.Object { @@ -757,6 +754,12 @@ func TestReadWriteCloser(t TestingT, store cloudstorage.Store) { assert.Equalf(t, nil, err, "at loop-cnt:%v", i) time.Sleep(time.Millisecond * 100) + _, err = store.NewWriterWithContext(context.Background(), fileName, nil, cloudstorage.Opts{IfNotExists: true}) + assert.Error(t, err) + + // Read the object from store, delete if it exists + deleteIfExists(store, "prefix/test.csv") + rc, err := store.NewReader(fileName) assert.Equalf(t, nil, err, "at loop-cnt:%v", i) if rc == nil { From 419198c9a2881e038772c8b41f9c0eca72b973bb Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Mon, 21 Oct 2019 11:52:09 -0700 Subject: [PATCH 2/6] Remove debugging line from localfs store --- localfs/store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/localfs/store.go b/localfs/store.go index c4e0bf7..bf1daee 100644 --- a/localfs/store.go +++ b/localfs/store.go @@ -250,7 +250,6 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat flag = flag | os.O_EXCL } f, err := os.OpenFile(fo, flag, 0665) - fmt.Println(fo, err) if err != nil { return nil, err } From 6159dec2d7c96a1bcc1d3641de452e6f988d6693 Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Wed, 23 Oct 2019 16:41:35 -0700 Subject: [PATCH 3/6] Use the DoesNotExist conditional instead of the Generation for gcs locks --- google/store.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/google/store.go b/google/store.go index dd83e07..8449965 100644 --- a/google/store.go +++ b/google/store.go @@ -235,9 +235,7 @@ func (g *GcsFS) NewWriter(o string, metadata map[string]string) (io.WriteCloser, func (g *GcsFS) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { obj := g.gcsb().Object(o) if len(opts) > 0 && opts[0].IfNotExists { - // https://cloud.google.com/storage/docs/json_api/v1/objects/insert - // See ifGenerationMatch "Setting to 0 makes the operation succeed only if there are no live versions of the object" - obj = obj.Generation(0) + obj = obj.If(storage.Conditions{DoesNotExist: true}) } wc := obj.NewWriter(ctx) if metadata != nil { From 7027625887335cd04e4c2a8602df606fcb62e5cf Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Fri, 1 Nov 2019 10:32:38 -0700 Subject: [PATCH 4/6] Update the go versions that cloudstorage runs against --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index c1e340c..7dde487 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,8 @@ language: go go: - - 1.10.x - - 1.11.x + - 1.12.x + - 1.13.x before_install: - go get -t -v ./... From 8e0c0c0e3308ce0b9dae85578d9fb311fde22c3e Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Fri, 1 Nov 2019 10:37:08 -0700 Subject: [PATCH 5/6] Return an error for store types that do not support the IfNotExists opt --- awss3/store.go | 3 +++ azure/store.go | 3 +++ sftp/store.go | 3 +++ 3 files changed, 9 insertions(+) diff --git a/awss3/store.go b/awss3/store.go index bce5275..dee97c5 100644 --- a/awss3/store.go +++ b/awss3/store.go @@ -432,6 +432,9 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC // NewWriterWithContext create writer with provided context and metadata. func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { + if len(opts) > 0 && opts[0].IfNotExists { + return nil, fmt.Errorf("options IfNotExists not supported for store type") + } // Create an uploader with the session and default options uploader := s3manager.NewUploader(f.sess) diff --git a/azure/store.go b/azure/store.go index a08ce7a..ee4a3cb 100644 --- a/azure/store.go +++ b/azure/store.go @@ -380,6 +380,9 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC // NewWriterWithContext create writer with provided context and metadata. func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { + if len(opts) > 0 && opts[0].IfNotExists { + return nil, fmt.Errorf("options IfNotExists not supported for store type") + } name = strings.Replace(name, " ", "+", -1) o := &object{name: name, metadata: metadata} rwc := newAzureWriteCloser(ctx, f, o) diff --git a/sftp/store.go b/sftp/store.go index 809b277..f81e8c7 100644 --- a/sftp/store.go +++ b/sftp/store.go @@ -554,6 +554,9 @@ func (m *Client) NewWriter(objectName string, metadata map[string]string) (io.Wr // NewWriterWithContext create writer with provided context and metadata. func (m *Client) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { + if len(opts) > 0 && opts[0].IfNotExists { + return nil, fmt.Errorf("options IfNotExists not supported for store type") + } name = strings.Replace(name, " ", "+", -1) From 36a891b6a703a5ccc0a3a1ff1fe97e9dcdd757eb Mon Sep 17 00:00:00 2001 From: AJ Roetker Date: Fri, 1 Nov 2019 11:18:53 -0700 Subject: [PATCH 6/6] Actually try writing to gcs to get error on IfNotExists in tests --- testutils/testutils.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/testutils/testutils.go b/testutils/testutils.go index cbce130..f4b50ec 100644 --- a/testutils/testutils.go +++ b/testutils/testutils.go @@ -754,7 +754,14 @@ func TestReadWriteCloser(t TestingT, store cloudstorage.Store) { assert.Equalf(t, nil, err, "at loop-cnt:%v", i) time.Sleep(time.Millisecond * 100) - _, err = store.NewWriterWithContext(context.Background(), fileName, nil, cloudstorage.Opts{IfNotExists: true}) + wc, err = store.NewWriterWithContext(context.Background(), fileName, nil, cloudstorage.Opts{IfNotExists: true}) + if err == nil { + // If err == nil then we're gcs so try writing + _, err = bytes.NewBufferString(data).WriteTo(wc) + assert.NoErrorf(t, err, "at loop-cnt:%v", i) + err = wc.Close() + time.Sleep(time.Millisecond * 100) + } assert.Error(t, err) // Read the object from store, delete if it exists