Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Azure storage connector #3036

Merged
merged 35 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
cbaf361
azure storage changes
rakeshsharma14317 Sep 4, 2023
b9639bc
testing azure
rakeshsharma14317 Sep 5, 2023
5e47c0e
interim commit
k-anshul Sep 5, 2023
2e3650d
changes with SharedKeyCredential
rakeshsharma14317 Sep 7, 2023
d2c05c9
nit changes
rakeshsharma14317 Sep 7, 2023
6733396
nit
rakeshsharma14317 Sep 7, 2023
1a6fe58
Merge remote-tracking branch 'origin/main' into 2992-runtime-azure-st…
rakeshsharma14317 Sep 7, 2023
29b6a44
linter fix
rakeshsharma14317 Sep 7, 2023
e75d21e
review changes 1
rakeshsharma14317 Sep 8, 2023
b4d64b0
review changes
rakeshsharma14317 Sep 11, 2023
5a98e5e
added access for public objects
rakeshsharma14317 Sep 11, 2023
0d1d099
ci fix
rakeshsharma14317 Sep 11, 2023
b7c6228
Merge remote-tracking branch 'origin/main' into 2992-runtime-azure-st…
rakeshsharma14317 Sep 11, 2023
b9883a3
minor changes
rakeshsharma14317 Sep 13, 2023
53ad4ff
changes
rakeshsharma14317 Sep 14, 2023
f9c37d9
cleanup
rakeshsharma14317 Sep 14, 2023
1249f21
more cleanup and added az login auth
rakeshsharma14317 Sep 14, 2023
41c15d6
merge with main branch
rakeshsharma14317 Sep 14, 2023
b0d93e1
reverted package-lock.json
rakeshsharma14317 Sep 14, 2023
96324dd
nit
rakeshsharma14317 Sep 15, 2023
f8a82a1
nit
rakeshsharma14317 Sep 15, 2023
eab2b25
linter fix
rakeshsharma14317 Sep 18, 2023
c00f42a
more changes
rakeshsharma14317 Sep 18, 2023
705c7a8
review changes
rakeshsharma14317 Sep 19, 2023
a27577c
added public bucket access
rakeshsharma14317 Sep 20, 2023
00b99a4
merge with main branch
rakeshsharma14317 Sep 20, 2023
c6b3543
added public blob access
rakeshsharma14317 Sep 21, 2023
7951f4c
review changes
rakeshsharma14317 Sep 22, 2023
fe46fc2
review changes
rakeshsharma14317 Sep 25, 2023
558503d
merge with main branch
rakeshsharma14317 Sep 25, 2023
4c85059
Return err in all cases
begelundmuller Sep 25, 2023
5ac5cdd
git pull and added log.info
rakeshsharma14317 Sep 25, 2023
4af6c48
default error fallback issue
rakeshsharma14317 Sep 26, 2023
040a8ae
conflicts resolved
rakeshsharma14317 Sep 26, 2023
523afec
Fix CI
begelundmuller Sep 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/cmd/runtime/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/sync/errgroup"

// Load connectors and reconcilers for runtime
_ "github.com/rilldata/rill/runtime/drivers/azure"
_ "github.com/rilldata/rill/runtime/drivers/bigquery"
_ "github.com/rilldata/rill/runtime/drivers/druid"
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
Expand Down
13 changes: 12 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cloud.google.com/go/storage v1.30.1
github.com/AlecAivazis/survey/v2 v2.3.6
github.com/Andrew-M-C/go.jsonvalue v1.3.4
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Masterminds/sprig/v3 v3.2.3
github.com/MicahParks/keyfunc v1.9.0
github.com/NYTimes/gziphandler v1.1.1
Expand Down Expand Up @@ -86,7 +87,17 @@ require (
moul.io/zapfilter v1.7.0
)

require google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect
)

require (
cloud.google.com/go v0.110.2 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -433,17 +433,21 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+Q
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.1/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.2/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0/go.mod h1:tZoQYdDZNOiIjdSn0dVWVfl0NEPGOJqVLzSrcFk4Is0=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1 h1:gVXuXcWd1i4C2Ruxe321aU+IKGaStvGB/S90PUPB/W8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.1/go.mod h1:DffdKW9RFqa5VgmsjUOsS7UE7eiA5iAvYUs63bhKQ0M=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.0.0/go.mod h1:+6sju8gk8FRmSajX3Oz4G5Gm7P+mbqE9FVaXXFYTkCM=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1 h1:T8quHYlUGyb/oqtSTwqlCr1ilJHrDv+ZtpSfo+hm1BU=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.1/go.mod h1:gLa1CL2RNE4s7M3yopJ/p0iq5DdY6Yv5ZUt9MTRZOQM=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2 h1:+5VZ72z0Qan5Bog5C+ZkgSqUbeVUd9wgtHOrIKuc5b8=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azkeys v0.9.0/go.mod h1:EAyXOW1F6BTJPiK2pDvmnvxOHPxoTYWoqBeIlql+QhI=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.0/go.mod h1:9V2j0jn9jDEkCkv8w/bKTNppX/d0FVA1ud77xCIP4KA=
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.7.1/go.mod h1:9V2j0jn9jDEkCkv8w/bKTNppX/d0FVA1ud77xCIP4KA=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.2.0/go.mod h1:R6+0udeRV8iYSTVuT5RT7If4sc46K5Bz3ZKrmvZQF7U=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v0.18.1/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc=
Expand All @@ -453,6 +457,7 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg6
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Azure/go-autorest v10.8.1+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.1/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw=
github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA=
Expand All @@ -467,13 +472,15 @@ github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSY
github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/mocks v0.4.2/go.mod h1:Vy7OitM9Kei0i1Oj+LvyAWMXJHeKH1MVlzFugfVrmyU=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/AzureAD/microsoft-authentication-library-for-go v0.4.0/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4=
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1 h1:oPdPEZFSbl7oSPEAIPMPBMUmiL+mqgzBJwM/9qYcwNg=
github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1/go.mod h1:4qFor3D/HDsvBME35Xy9rwW9DecL+M2sNw1ybjPtwA0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
Expand Down Expand Up @@ -902,6 +909,7 @@ github.com/digitalocean/godo v1.78.0/go.mod h1:GBmu8MkjZmNARE7IXRPmkbbnocNN8+uBm
github.com/digitalocean/godo v1.95.0/go.mod h1:NRpFznZFvhHjBoqZAaOD3khVzsJ3EibzKqFL4R60dmA=
github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ=
github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/distribution v0.0.0-20190905152932-14b96e55d84c/go.mod h1:0+TTO4EOBfRPhZXAeF1Vu+W3hHZ8eLp8PgKVZlcvtFY=
Expand Down Expand Up @@ -1606,6 +1614,7 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lensesio/tableprinter v0.0.0-20201125135848-89e81fc956e7 h1:k/1ku0yehLCPqERCHkIHMDqDg1R02AcCScRuHbamU3s=
github.com/lensesio/tableprinter v0.0.0-20201125135848-89e81fc956e7/go.mod h1:YR/zYthNdWfO8+0IOyHDcIDBBBS2JMnYUIwSsnwmRqU=
Expand Down Expand Up @@ -1864,6 +1873,7 @@ github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
github.com/pjbgf/sha1cd v0.3.0/go.mod h1:nZ1rrWOcGJ5uZgEEVL1VUM9iRQiZvWdbZjkKyFzPPsI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/diff v0.0.0-20200914180035-5b29258ca4f7/go.mod h1:zO8QMzTeZd5cpnIkz/Gn6iK0jDfGicM1nynOkkPIl28=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
257 changes: 257 additions & 0 deletions runtime/drivers/azure/azure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
package azure

import (
"context"
"fmt"
"os"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/bmatcuk/doublestar/v4"
"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/drivers"
rillblob "github.com/rilldata/rill/runtime/drivers/blob"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"github.com/rilldata/rill/runtime/pkg/globutil"
"go.uber.org/zap"
"gocloud.dev/blob/azureblob"
)

func init() {
drivers.Register("azure", driver{})
drivers.RegisterAsConnector("azure", driver{})
}

var spec = drivers.Spec{
DisplayName: "Azure Blob Storage",
Description: "Connect to Azure Blob Storage.",
ServiceAccountDocs: "https://docs.rilldata.com/deploy/credentials/azure",
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
SourceProperties: []drivers.PropertySchema{
{
Key: "path",
DisplayName: "Blob URI",
Description: "Path to file on the disk.",
Placeholder: "az://container-name/path/to/file.csv",
Type: drivers.StringPropertyType,
Required: true,
Hint: "Glob patterns are supported",
},
{
Key: "azure.storage.account",
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
DisplayName: "Azure Storage Account",
Description: "Azure Storage Account inferred from your local environment.",
Type: drivers.InformationalPropertyType,
Hint: "Set your local credentials: <code>az login</code> Click to learn more.",
Href: "https://docs.rilldata.com/develop/import-data#configure-credentials-for-azure",
},
},
ConfigProperties: []drivers.PropertySchema{
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
{
Key: "azure.storage.account",
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
Hint: "Enter path of file to load from.",
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
ValidateFunc: func(any interface{}) error {
val := any.(string)
if val == "" {
// user can chhose to leave empty for public sources
return nil
}

path, err := fileutil.ExpandHome(strings.TrimSpace(val))
if err != nil {
return err
}

_, err = os.Stat(path)
return err
},
},
},
}

type driver struct{}

type configProperties struct {
Account string `mapstructure:"azure.storage.account"`
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
}

func (d driver) Open(config map[string]any, shared bool, client activity.Client, logger *zap.Logger) (drivers.Handle, error) {
if shared {
return nil, fmt.Errorf("azure driver does not support shared connections")
}
conf := &configProperties{}
err := mapstructure.Decode(config, conf)
if err != nil {
return nil, err
}

conn := &Connection{
config: conf,
logger: logger,
}
return conn, nil
}

func (d driver) Drop(config map[string]any, logger *zap.Logger) error {
return drivers.ErrDropNotSupported
}

func (d driver) Spec() drivers.Spec {
return spec
}

func (d driver) HasAnonymousSourceAccess(ctx context.Context, src drivers.Source, logger *zap.Logger) (bool, error) {
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}

type Connection struct {
config *configProperties
logger *zap.Logger
}

var _ drivers.Handle = &Connection{}

// Driver implements drivers.Connection.
func (c *Connection) Driver() string {
return "azure"
}

// Config implements drivers.Connection.
func (c *Connection) Config() map[string]any {
m := make(map[string]any, 0)
_ = mapstructure.Decode(c.config, m)
return m
}

// Close implements drivers.Connection.
func (c *Connection) Close() error {
return nil
}

// Registry implements drivers.Connection.
func (c *Connection) AsRegistry() (drivers.RegistryStore, bool) {
return nil, false
}

// Catalog implements drivers.Connection.
func (c *Connection) AsCatalogStore(instanceID string) (drivers.CatalogStore, bool) {
return nil, false
}

// Repo implements drivers.Connection.
func (c *Connection) AsRepoStore(instanceID string) (drivers.RepoStore, bool) {
return nil, false
}

// OLAP implements drivers.Connection.
func (c *Connection) AsOLAP(instanceID string) (drivers.OLAPStore, bool) {
return nil, false
}

// Migrate implements drivers.Connection.
func (c *Connection) Migrate(ctx context.Context) (err error) {
return nil
}

// MigrationStatus implements drivers.Connection.
func (c *Connection) MigrationStatus(ctx context.Context) (current, desired int, err error) {
return 0, 0, nil
}

// AsObjectStore implements drivers.Connection.
func (c *Connection) AsObjectStore() (drivers.ObjectStore, bool) {
return c, true
}

// AsTransporter implements drivers.Connection.
func (c *Connection) AsTransporter(from, to drivers.Handle) (drivers.Transporter, bool) {
return nil, false
}

func (c *Connection) AsFileStore() (drivers.FileStore, bool) {
return nil, false
}

// AsSQLStore implements drivers.Connection.
func (c *Connection) AsSQLStore() (drivers.SQLStore, bool) {
return nil, false
}

// DownloadFiles returns a file iterator over objects stored in azure blob storage.
func (c *Connection) DownloadFiles(ctx context.Context, source *drivers.BucketSource) (drivers.FileIterator, error) {
conf, err := parseSourceProperties(source.Properties)
if err != nil {
return nil, fmt.Errorf("failed to parse config: %w", err)
}

name := os.Getenv("AZURE_STORAGE_ACCOUNT")
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
key := os.Getenv("AZURE_STORAGE_KEY")
credential, err := azblob.NewSharedKeyCredential(name, key)
if err != nil {
return nil, err
}

containerURL := fmt.Sprintf("https://%s.blob.core.windows.net/%s", name, conf.url.Host)

client, err := container.NewClientWithSharedKeyCredential(containerURL, credential, nil)
if err != nil {
return nil, err
}

// Create a *blob.Bucket.
bucketObj, err := azureblob.OpenBucket(ctx, client, nil)
if err != nil {
return nil, err
}
defer bucketObj.Close()

// prepare fetch configs
opts := rillblob.Options{
GlobMaxTotalSize: conf.GlobMaxTotalSize,
GlobMaxObjectsMatched: conf.GlobMaxObjectsMatched,
GlobMaxObjectsListed: conf.GlobMaxObjectsListed,
GlobPageSize: conf.GlobPageSize,
GlobPattern: conf.url.Path,
ExtractPolicy: source.ExtractPolicy,
}

iter, err := rillblob.NewIterator(ctx, bucketObj, opts, c.logger)
if err != nil {
return nil, err
}

return iter, nil
}

type sourceProperties struct {
Path string `key:"path"`
GlobMaxTotalSize int64 `mapstructure:"glob.max_total_size"`
GlobMaxObjectsMatched int `mapstructure:"glob.max_objects_matched"`
GlobMaxObjectsListed int64 `mapstructure:"glob.max_objects_listed"`
GlobPageSize int `mapstructure:"glob.page_size"`
url *globutil.URL
}

func parseSourceProperties(props map[string]any) (*sourceProperties, error) {
conf := &sourceProperties{}
err := mapstructure.Decode(props, conf)
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
if !doublestar.ValidatePattern(conf.Path) {
// ideally this should be validated at much earlier stage
// keeping it here to have gcs specific validations
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("glob pattern %s is invalid", conf.Path)
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
}
url, err := globutil.ParseBucketURL(conf.Path)
if err != nil {
return nil, fmt.Errorf("failed to parse path %q, %w", conf.Path, err)
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
}
if url.Scheme != "azblob" {
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("invalid scheme %q in path %q", url.Scheme, conf.Path)
}

conf.url = url
return conf, nil
}
2 changes: 2 additions & 0 deletions runtime/services/catalog/migrator/sources/embedded_sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func parseEmbeddedSourceConnector(path string) (string, string, bool) {
connector = "s3"
case "gs":
connector = "gcs"
case "azblob":
connector = "azure"
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
default:
return "", "", false
}
Expand Down
7 changes: 6 additions & 1 deletion runtime/services/catalog/migrator/sources/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func mergeFromParsedQuery(apiSource *runtimev1.Source, env map[string]string, re
if err != nil {
return err
}
case "s3", "gcs":
case "s3", "gcs", "azure":
apiSource.Connector = c
props["path"] = p
rakeshsharma14317 marked this conversation as resolved.
Show resolved Hide resolved
default:
Expand Down Expand Up @@ -364,6 +364,11 @@ func source(connector string, src *runtimev1.Source) (drivers.Source, error) {
ExtractPolicy: src.Policy,
Properties: props,
}, nil
case "azure":
return &drivers.BucketSource{
ExtractPolicy: src.Policy,
Properties: props,
}, nil
case "https":
return &drivers.FileSource{
Properties: props,
Expand Down
Loading