Skip to content

Commit

Permalink
Merge pull request #78 from lytics/add_atomic_writes
Browse files Browse the repository at this point in the history
Add the ability to only write if a file doesn't exist to gcs and localfs
  • Loading branch information
ajroetker authored Nov 4, 2019
2 parents 2045e02 + 36a891b commit e5030c5
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: go

go:
- 1.10.x
- 1.11.x
- 1.12.x
- 1.13.x

before_install:
- go get -t -v ./...
Expand Down
6 changes: 5 additions & 1 deletion awss3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (f *FS) Move(ctx context.Context, src, des cloudstorage.Object) error {
return oh.Delete(ctx)
}
*/

// NewReader create file reader.
func (f *FS) NewReader(o string) (io.ReadCloser, error) {
return f.NewReaderWithContext(context.Background(), o)
Expand Down Expand Up @@ -430,7 +431,10 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC
}

// NewWriterWithContext create writer with provided context and metadata.
func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string) (io.WriteCloser, error) {
func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {
if len(opts) > 0 && opts[0].IfNotExists {
return nil, fmt.Errorf("options IfNotExists not supported for store type")
}

// Create an uploader with the session and default options
uploader := s3manager.NewUploader(f.sess)
Expand Down
5 changes: 4 additions & 1 deletion azure/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,10 @@ func (f *FS) NewWriter(objectName string, metadata map[string]string) (io.WriteC
}

// NewWriterWithContext create writer with provided context and metadata.
func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string) (io.WriteCloser, error) {
func (f *FS) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {
if len(opts) > 0 && opts[0].IfNotExists {
return nil, fmt.Errorf("options IfNotExists not supported for store type")
}
name = strings.Replace(name, " ", "+", -1)
o := &object{name: name, metadata: metadata}
rwc := newAzureWriteCloser(ctx, f, o)
Expand Down
8 changes: 6 additions & 2 deletions google/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,12 @@ func (g *GcsFS) NewWriter(o string, metadata map[string]string) (io.WriteCloser,
}

// NewWriterWithContext create writer with provided context and metadata.
func (g *GcsFS) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error) {
wc := g.gcsb().Object(o).NewWriter(ctx)
func (g *GcsFS) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {
obj := g.gcsb().Object(o)
if len(opts) > 0 && opts[0].IfNotExists {
obj = obj.If(storage.Conditions{DoesNotExist: true})
}
wc := obj.NewWriter(ctx)
if metadata != nil {
wc.Metadata = metadata
//contenttype is only used for viewing the file in a browser. (i.e. the GCS Object browser).
Expand Down
8 changes: 6 additions & 2 deletions localfs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (l *LocalStore) NewReaderWithContext(ctx context.Context, o string) (io.Rea
func (l *LocalStore) NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) {
return l.NewWriterWithContext(context.Background(), o, metadata)
}
func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error) {
func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {

fo := path.Join(l.storepath, o)

Expand All @@ -245,7 +245,11 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat
return nil, err
}

f, err := os.OpenFile(fo, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0665)
flag := os.O_RDWR | os.O_CREATE | os.O_TRUNC
if len(opts) > 0 && opts[0].IfNotExists {
flag = flag | os.O_EXCL
}
f, err := os.OpenFile(fo, flag, 0665)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion sftp/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,10 @@ func (m *Client) NewWriter(objectName string, metadata map[string]string) (io.Wr
}

// NewWriterWithContext create writer with provided context and metadata.
func (m *Client) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string) (io.WriteCloser, error) {
func (m *Client) NewWriterWithContext(ctx context.Context, name string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {
if len(opts) > 0 && opts[0].IfNotExists {
return nil, fmt.Errorf("options IfNotExists not supported for store type")
}

name = strings.Replace(name, " ", "+", -1)

Expand Down
6 changes: 5 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var (
)

type (
Opts struct {
IfNotExists bool
}

// StoreReader interface to define the Storage Interface abstracting
// the GCS, S3, LocalFile, etc interfaces
StoreReader interface {
Expand Down Expand Up @@ -98,7 +102,7 @@ type (
// until Close has been called
NewWriter(o string, metadata map[string]string) (io.WriteCloser, error)
// NewWriter but with context.
NewWriterWithContext(ctx context.Context, o string, metadata map[string]string) (io.WriteCloser, error)
NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...Opts) (io.WriteCloser, error)

// NewObject creates a new empty object backed by the cloud store
// This new object isn't' synced/created in the backing store
Expand Down
20 changes: 15 additions & 5 deletions testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
)

func init() {
verbose = flag.Bool("vv", false, "Verbose Logging?")
Setup()
}

Expand All @@ -44,11 +45,6 @@ type TestingT interface {
func Setup() {
setupOnce.Do(func() {

if flag.CommandLine.Lookup("vv") == nil {
verbose = flag.Bool("vv", false, "Verbose Logging?")
}

flag.Parse()
logger := gou.GetLogger()
if logger != nil {
// don't re-setup
Expand Down Expand Up @@ -200,6 +196,7 @@ func BasicRW(t TestingT, store cloudstorage.Store) {
obj, err = store.Get(context.Background(), "prefix/test.csv")
assert.Equal(t, cloudstorage.ErrObjectNotFound, err)
assert.Equal(t, nil, obj)

}

func createFile(t TestingT, store cloudstorage.Store, name, data string) cloudstorage.Object {
Expand Down Expand Up @@ -757,6 +754,19 @@ func TestReadWriteCloser(t TestingT, store cloudstorage.Store) {
assert.Equalf(t, nil, err, "at loop-cnt:%v", i)
time.Sleep(time.Millisecond * 100)

wc, err = store.NewWriterWithContext(context.Background(), fileName, nil, cloudstorage.Opts{IfNotExists: true})
if err == nil {
// If err == nil then we're gcs so try writing
_, err = bytes.NewBufferString(data).WriteTo(wc)
assert.NoErrorf(t, err, "at loop-cnt:%v", i)
err = wc.Close()
time.Sleep(time.Millisecond * 100)
}
assert.Error(t, err)

// Read the object from store, delete if it exists
deleteIfExists(store, "prefix/test.csv")

rc, err := store.NewReader(fileName)
assert.Equalf(t, nil, err, "at loop-cnt:%v", i)
if rc == nil {
Expand Down

0 comments on commit e5030c5

Please sign in to comment.