diff --git a/awss3/store.go b/awss3/store.go index 8dccf01..f71a8d5 100644 --- a/awss3/store.go +++ b/awss3/store.go @@ -450,7 +450,7 @@ func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metada uploader := s3manager.NewUploader(f.sess) pr, pw := io.Pipe() - bw := csbufio.NewWriter(pw) + bw := csbufio.NewWriter(ctx, pw) go func() { // TODO: this needs to be managed, ie shutdown signals, close, handler err etc. diff --git a/csbufio/reader.go b/csbufio/reader.go index 9cb659e..4225364 100644 --- a/csbufio/reader.go +++ b/csbufio/reader.go @@ -2,25 +2,39 @@ package csbufio import ( "bufio" + "context" "io" "os" ) -func OpenReader(name string) (io.ReadCloser, error) { +func OpenReader(ctx context.Context, name string) (io.ReadCloser, error) { f, err := os.Open(name) if err != nil { return nil, err } - return NewReader(f), nil + return NewReader(ctx, f), nil } -func NewReader(rc io.ReadCloser) io.ReadCloser { - return bufReadCloser{bufio.NewReader(rc), rc} +func NewReader(ctx context.Context, rc io.ReadCloser) io.ReadCloser { + return &bufReadCloser{ctx, bufio.NewReader(rc), rc} } type bufReadCloser struct { - io.Reader - c io.Closer + ctx context.Context + r io.Reader + c io.Closer } -func (bc bufReadCloser) Close() error { return bc.c.Close() } +func (b *bufReadCloser) Read(p []byte) (int, error) { + if err := b.ctx.Err(); err != nil { + return 0, err + } + return b.r.Read(p) +} + +func (b *bufReadCloser) Close() error { + if err := b.ctx.Err(); err != nil { + return err + } + return b.c.Close() +} diff --git a/csbufio/reader_test.go b/csbufio/reader_test.go new file mode 100644 index 0000000..079a53c --- /dev/null +++ b/csbufio/reader_test.go @@ -0,0 +1,47 @@ +package csbufio + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReaderContextDone(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + m := memRWC([]byte("some-data")) + rc := NewReader(ctx, &m) + + var p []byte + n, err := rc.Read(p) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 0, n) + require.Len(t, p, 0) + + err = rc.Close() + require.ErrorIs(t, err, context.Canceled) +} + +type memRWC []byte + +func (m memRWC) Read(p []byte) (int, error) { + n := len(p) + if n > len(m) { + n = len(m) + } + copy(p, m) + return n, nil +} + +func (m *memRWC) Write(p []byte) (int, error) { + *m = append(*m, p...) + return len(p), nil +} + +func (m memRWC) Close() error { + return nil +} diff --git a/csbufio/writer.go b/csbufio/writer.go index 7d1dc9d..5c128e7 100644 --- a/csbufio/writer.go +++ b/csbufio/writer.go @@ -2,33 +2,43 @@ package csbufio import ( "bufio" + "context" "io" "os" ) -type ( - bufWriteCloser struct { - *bufio.Writer - c io.Closer - } -) +type bufWriteCloser struct { + ctx context.Context + w *bufio.Writer + c io.Closer +} -func OpenWriter(name string) (io.WriteCloser, error) { +func OpenWriter(ctx context.Context, name string) (io.WriteCloser, error) { f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0665) if err != nil { return nil, err } - return NewWriter(f), nil + return NewWriter(ctx, f), nil } // NewWriter is a io.WriteCloser. -func NewWriter(rc io.WriteCloser) io.WriteCloser { - return bufWriteCloser{bufio.NewWriter(rc), rc} +func NewWriter(ctx context.Context, rc io.WriteCloser) io.WriteCloser { + return &bufWriteCloser{ctx, bufio.NewWriter(rc), rc} +} + +func (b *bufWriteCloser) Write(p []byte) (int, error) { + if err := b.ctx.Err(); err != nil { + return 0, err + } + return b.w.Write(p) } -func (bc bufWriteCloser) Close() error { - if err := bc.Flush(); err != nil { +func (b *bufWriteCloser) Close() error { + if err := b.ctx.Err(); err != nil { + return err + } + if err := b.w.Flush(); err != nil { return err } - return bc.c.Close() + return b.c.Close() } diff --git a/csbufio/writer_test.go b/csbufio/writer_test.go new file mode 100644 index 0000000..b00a02d --- /dev/null +++ b/csbufio/writer_test.go @@ -0,0 +1,26 @@ +package csbufio + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestWriterContextDone(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + var m memRWC + wc := NewWriter(ctx, &m) + + n, err := wc.Write([]byte("some-data")) + require.ErrorIs(t, err, context.Canceled) + require.Equal(t, 0, n) + require.Len(t, m, 0) + + err = wc.Close() + require.ErrorIs(t, err, context.Canceled) +} diff --git a/go.mod b/go.mod index a215e46..c1fb78d 100644 --- a/go.mod +++ b/go.mod @@ -1,22 +1,54 @@ module github.com/lytics/cloudstorage -go 1.15 +go 1.18 require ( cloud.google.com/go/storage v1.15.0 github.com/Azure/azure-sdk-for-go v40.5.0+incompatible - github.com/Azure/go-autorest/autorest v0.11.10 // indirect - github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/araddon/gou v0.0.0-20190110011759-c797efecbb61 github.com/aws/aws-sdk-go v1.29.34 - github.com/dnaeon/go-vcr v1.1.0 // indirect github.com/pborman/uuid v1.2.1 github.com/pkg/sftp v1.11.0 - github.com/satori/go.uuid v1.2.0 // indirect - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.8.0 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4 golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/api v0.45.0 ) + +require ( + cloud.google.com/go v0.81.0 // indirect + github.com/Azure/go-autorest v14.2.0+incompatible // indirect + github.com/Azure/go-autorest/autorest v0.11.10 // indirect + github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect + github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect + github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect + github.com/Azure/go-autorest/logger v0.2.0 // indirect + github.com/Azure/go-autorest/tracing v0.6.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dnaeon/go-vcr v1.1.0 // indirect + github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.1.2 // indirect + github.com/googleapis/gax-go/v2 v2.0.5 // indirect + github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/jstemmer/go-junit-report v0.9.1 // indirect + github.com/kr/fs v0.1.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/satori/go.uuid v1.2.0 // indirect + go.opencensus.io v0.23.0 // indirect + golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect + golang.org/x/mod v0.4.1 // indirect + golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750 // indirect + golang.org/x/text v0.3.5 // indirect + golang.org/x/tools v0.1.0 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto v0.0.0-20210420162539-3c870d7478d2 // indirect + google.golang.org/grpc v1.37.0 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 22d7928..383e72f 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,9 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -188,10 +189,13 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -521,8 +525,9 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/localfs/store.go b/localfs/store.go index a2ce337..292ef21 100644 --- a/localfs/store.go +++ b/localfs/store.go @@ -237,14 +237,13 @@ func (l *LocalStore) NewReaderWithContext(ctx context.Context, o string) (io.Rea if err != nil { return nil, err } - return csbufio.OpenReader(fo) + return csbufio.OpenReader(ctx, fo) } func (l *LocalStore) NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) { return l.NewWriterWithContext(context.Background(), o, metadata) } func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) { - fo := path.Join(l.storepath, o) err := cloudstorage.EnsureDir(fo) @@ -269,7 +268,8 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat if err != nil { return nil, err } - return csbufio.NewWriter(f), nil + + return csbufio.NewWriter(ctx, f), nil } func (l *LocalStore) Get(ctx context.Context, o string) (cloudstorage.Object, error) {