Skip to content

Commit

Permalink
Google Cloud Transparent GZIP Compression Support (#104)
Browse files Browse the repository at this point in the history
* Initial commit of PoC of transparent snappy compression

* Fix tests

* fix up require usage

* Fix up temp dir usage

* Temp dirs but right

* Delete custom buffered test writer thing

* make csbufio reads peek to id snappy

* Remove junk

* Prep for using lzma

* More

* Convert to gzip

* Fix google tests, handle errors, read compressed

* Start removing local compression support

* Remove enablecompression during testing

* Remove more

* Remove pointless redundancy

* Make Content-Encoding available externally

* Re-add accidentally removed test

* Destandardize Content-Encoding to content_encoding

* Add missing reference to enableCompression

* Add missed write path

* Set mime correctly

* Handle additional codepath

* Work around an interesting google library behavior

* Do the other codepath too

* Fix comment
  • Loading branch information
vitaminmoo authored Sep 19, 2023
1 parent a437124 commit 26225d7
Show file tree
Hide file tree
Showing 21 changed files with 593 additions and 463 deletions.
40 changes: 22 additions & 18 deletions awss3/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package awss3_test

import (
"os"
"path/filepath"
"testing"

"github.com/araddon/gou"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lytics/cloudstorage"
"github.com/lytics/cloudstorage/awss3"
Expand All @@ -28,6 +29,9 @@ func TestS3(t *testing.T) {
t.Skip()
return
}

tmpDir := t.TempDir()

conf := &cloudstorage.Config{
Type: awss3.StoreType,
Settings: gou.JsonHelper{
Expand All @@ -36,53 +40,55 @@ func TestS3(t *testing.T) {
}
// Should error with empty config
_, err := cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

conf.AuthMethod = awss3.AuthAccessKey
conf.Settings[awss3.ConfKeyAccessKey] = ""
conf.Settings[awss3.ConfKeyAccessSecret] = os.Getenv("AWS_SECRET_KEY")
conf.Bucket = os.Getenv("AWS_BUCKET")
conf.TmpDir = "/tmp/localcache/aws"
conf.TmpDir = filepath.Join(tmpDir, "localcache", "aws")
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

conf.Settings[awss3.ConfKeyAccessSecret] = ""
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

// conf.Settings[awss3.ConfKeyAccessKey] = "bad"
// conf.Settings[awss3.ConfKeyAccessSecret] = "bad"
// _, err = cloudstorage.NewStore(conf)
// assert.NotEqual(t, nil, err)
// require. NotEqual(t, nil, err)

conf.BaseUrl = "s3.custom.endpoint.com"
conf.Settings[awss3.ConfKeyAccessKey] = os.Getenv("AWS_ACCESS_KEY")
conf.Settings[awss3.ConfKeyAccessSecret] = os.Getenv("AWS_SECRET_KEY")
client, sess, err := awss3.NewClient(conf)
assert.Equal(t, nil, err)
assert.NotEqual(t, nil, client)
client, _, err := awss3.NewClient(conf)
require.NoError(t, err)
require.NotNil(t, client)

conf.Settings[awss3.ConfKeyDisableSSL] = true
client, sess, err = awss3.NewClient(conf)
assert.Equal(t, nil, err)
assert.NotEqual(t, nil, client)
client, sess, err := awss3.NewClient(conf)
require.NoError(t, err)
require.NotNil(t, client)

conf.TmpDir = ""
_, err = awss3.NewStore(client, sess, conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

// Trying to find dir they don't have access to?
conf.TmpDir = "/home/fake"
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)
}

func TestAll(t *testing.T) {
tmpDir := t.TempDir()

config := &cloudstorage.Config{
Type: awss3.StoreType,
AuthMethod: awss3.AuthAccessKey,
Bucket: os.Getenv("AWS_BUCKET"),
TmpDir: "/tmp/localcache/aws",
TmpDir: filepath.Join(tmpDir, "localcache", "aws"),
Settings: make(gou.JsonHelper),
Region: "us-east-1",
}
Expand All @@ -100,8 +106,6 @@ func TestAll(t *testing.T) {
t.Skip()
return
}
if store == nil {
t.Fatalf("No store???")
}
require.NotNil(t, store, "no store?")
testutils.RunTests(t, store, config)
}
9 changes: 8 additions & 1 deletion azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,19 @@ export AZURE_BUCKET="cloudstorageunittests"
*/

func main() {
tmpDir, err := os.MkdirTemp("/tmp", "azure_example")
if err != nil {
fmt.Println("Could not create temp dir", err)
os.Exit(1)
}
defer os.RemoveAll(tmpDir)

conf := &cloudstorage.Config{
Type: azure.StoreType,
AuthMethod: azure.AuthKey,
Bucket: os.Getenv("AZURE_BUCKET"),
Project: os.Getenv("AZURE_PROJECT"),
TmpDir: "/tmp/localcache/azure",
TmpDir: filepath.Join(tempDir, "localcache", "azure"),
Settings: make(gou.JsonHelper),
}

Expand Down
10 changes: 9 additions & 1 deletion azure/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/araddon/gou"
"google.golang.org/api/iterator"
Expand All @@ -23,12 +24,19 @@ export AZURE_BUCKET="cloudstorageunittests"
*/

func main() {
tmpDir, err := os.MkdirTemp("/tmp", "azure_example")
if err != nil {
fmt.Println("Could not create temp dir", err)
os.Exit(1)
}
defer os.RemoveAll(tmpDir)

conf := &cloudstorage.Config{
Type: azure.StoreType,
AuthMethod: azure.AuthKey,
Bucket: os.Getenv("AZURE_BUCKET"),
Project: os.Getenv("AZURE_PROJECT"),
TmpDir: "/tmp/localcache/azure",
TmpDir: filepath.Join(tmpDir, "localcache", "azure"),
Settings: make(gou.JsonHelper),
}

Expand Down
26 changes: 13 additions & 13 deletions azure/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,20 +547,20 @@ func newObject(f *FS, o *az.Blob) *object {
}

/*
func newObjectFromHead(f *FS, name string, o *s3.HeadObjectOutput) *object {
obj := &object{
fs: f,
name: name,
bucket: f.bucket,
cachepath: cloudstorage.CachePathObj(f.cachepath, name, f.ID),
}
if o.LastModified != nil {
obj.updated = *o.LastModified
func newObjectFromHead(f *FS, name string, o *s3.HeadObjectOutput) *object {
obj := &object{
fs: f,
name: name,
bucket: f.bucket,
cachepath: cloudstorage.CachePathObj(f.cachepath, name, f.ID),
}
if o.LastModified != nil {
obj.updated = *o.LastModified
}
// metadata?
obj.metadata, _ = convertMetaData(o.Metadata)
return obj
}
// metadata?
obj.metadata, _ = convertMetaData(o.Metadata)
return obj
}
*/
func (o *object) StorageSource() string {
return StoreType
Expand Down
25 changes: 13 additions & 12 deletions azure/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,24 @@ import (
"testing"

"github.com/araddon/gou"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lytics/cloudstorage"
"github.com/lytics/cloudstorage/azure"
"github.com/lytics/cloudstorage/testutils"
)

/*
# to use azure tests ensure you have exported
export AZURE_KEY="aaa"
export AZURE_PROJECT="bbb"
export AZURE_BUCKET="cloudstorageunittests"
*/
var config = &cloudstorage.Config{
Type: azure.StoreType,
AuthMethod: azure.AuthKey,
Bucket: os.Getenv("AZURE_BUCKET"),
TmpDir: "/tmp/localcache/azure",
Settings: make(gou.JsonHelper),
}

Expand All @@ -35,36 +32,37 @@ func TestConfig(t *testing.T) {
t.Skip()
return
}

conf := &cloudstorage.Config{
Type: azure.StoreType,
Project: os.Getenv("AZURE_PROJECT"),
Settings: make(gou.JsonHelper),
}
// Should error with empty config
_, err := cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

conf.AuthMethod = azure.AuthKey
conf.Settings[azure.ConfKeyAuthKey] = ""
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

conf.Settings[azure.ConfKeyAuthKey] = "bad"
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

conf.Settings[azure.ConfKeyAuthKey] = os.Getenv("AZURE_KEY")
client, sess, err := azure.NewClient(conf)
assert.Equal(t, nil, err)
assert.NotEqual(t, nil, client)
require.NoError(t, err)
require.NotNil(t, client)
conf.TmpDir = ""
_, err = azure.NewStore(client, sess, conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)

// Trying to find dir they don't have access to?
conf.TmpDir = "/home/fake"
_, err = cloudstorage.NewStore(conf)
assert.NotEqual(t, nil, err)
require.Error(t, err)
}

func TestAll(t *testing.T) {
Expand All @@ -74,6 +72,9 @@ func TestAll(t *testing.T) {
t.Skip()
return
}

config.TmpDir = t.TempDir()

config.Settings[azure.ConfKeyAuthKey] = os.Getenv("AZURE_KEY")
store, err := cloudstorage.NewStore(config)
if err != nil {
Expand All @@ -82,7 +83,7 @@ func TestAll(t *testing.T) {
return
}
client := store.Client()
assert.NotEqual(t, nil, client)
require.NotNil(t, client)

testutils.RunTests(t, store, config)
}
26 changes: 4 additions & 22 deletions csbufio/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/acomagu/bufpipe"
"github.com/stretchr/testify/require"
)

Expand All @@ -13,8 +14,9 @@ func TestReaderContextDone(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

m := memRWC([]byte("some-data"))
rc := NewReader(ctx, &m)
pr, pw := bufpipe.New([]byte("some-data"))
pw.Close()
rc := NewReader(ctx, pr)

var p []byte
n, err := rc.Read(p)
Expand All @@ -25,23 +27,3 @@ func TestReaderContextDone(t *testing.T) {
err = rc.Close()
require.ErrorIs(t, err, context.Canceled)
}

type memRWC []byte

func (m memRWC) Read(p []byte) (int, error) {
n := len(p)
if n > len(m) {
n = len(m)
}
copy(p, m)
return n, nil
}

func (m *memRWC) Write(p []byte) (int, error) {
*m = append(*m, p...)
return len(p), nil
}

func (m memRWC) Close() error {
return nil
}
13 changes: 10 additions & 3 deletions csbufio/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package csbufio

import (
"context"
"io"
"testing"

"github.com/acomagu/bufpipe"
"github.com/stretchr/testify/require"
)

Expand All @@ -13,13 +15,18 @@ func TestWriterContextDone(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

var m memRWC
wc := NewWriter(ctx, &m)
pr, pw := bufpipe.New(nil)
wc := NewWriter(ctx, pw)

n, err := wc.Write([]byte("some-data"))
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, 0, n)
require.Len(t, m, 0)
err = pw.Close()
require.NoError(t, err)

b, err := io.ReadAll(pr)
require.NoError(t, err, "error reading")
require.Equal(t, 0, len(b), "")

err = wc.Close()
require.ErrorIs(t, err, context.Canceled)
Expand Down
16 changes: 8 additions & 8 deletions file_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package cloudstorage
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestETAG(t *testing.T) {
assert.Equal(t, "hello", CleanETag("hello"))
assert.Equal(t, "hello", CleanETag(`"hello"`))
assert.Equal(t, "hello", CleanETag(`\"hello\"`))
assert.Equal(t, "hello", CleanETag("\"hello\""))
require.Equal(t, "hello", CleanETag("hello"))
require.Equal(t, "hello", CleanETag(`"hello"`))
require.Equal(t, "hello", CleanETag(`\"hello\"`))
require.Equal(t, "hello", CleanETag("\"hello\""))
}
func TestContentType(t *testing.T) {
assert.Equal(t, "text/csv; charset=utf-8", ContentType("data.csv"))
assert.Equal(t, "application/json", ContentType("data.json"))
assert.Equal(t, "application/octet-stream", ContentType("data.unknown"))
require.Equal(t, "text/csv; charset=utf-8", ContentType("data.csv"))
require.Equal(t, "application/json", ContentType("data.json"))
require.Equal(t, "application/octet-stream", ContentType("data.unknown"))
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ require (
google.golang.org/api v0.103.0
)

require github.com/acomagu/bufpipe v1.0.4

require (
cloud.google.com/go v0.105.0 // indirect
cloud.google.com/go/compute v1.12.1 // indirect
Expand Down
Loading

0 comments on commit 26225d7

Please sign in to comment.