Skip to content

Commit

Permalink
feat(catalog): add initial rest catalog impl
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Jan 31, 2024
1 parent abd4fb0 commit ebce777
Show file tree
Hide file tree
Showing 6 changed files with 1,692 additions and 6 deletions.
130 changes: 126 additions & 4 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package catalog

import (
"context"
"crypto/tls"
"errors"
"net/url"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
)
Expand All @@ -37,7 +40,9 @@ const (

var (
// ErrNoSuchTable is returned when a table does not exist in the catalog.
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
)

// WithAwsConfig sets the AWS configuration for the catalog.
Expand All @@ -47,19 +52,136 @@ func WithAwsConfig(cfg aws.Config) Option {
}
}

func WithCredential(cred string) Option {
return func(o *options) {
o.credential = cred
}
}

func WithOAuthToken(token string) Option {
return func(o *options) {
o.oauthToken = token
}
}

func WithTLSConfig(config *tls.Config) Option {
return func(o *options) {
o.tlsConfig = config
}
}

func WithWarehouseLocation(loc string) Option {
return func(o *options) {
o.warehouseLocation = loc
}
}

func WithMetadataLocation(loc string) Option {
return func(o *options) {
o.metadataLocation = loc
}
}

func WithSigV4() Option {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Service = "execute-api"
}
}

func WithSigV4RegionSvc(region, service string) Option {
return func(o *options) {
o.enableSigv4 = true
o.sigv4Region = region

if service == "" {
o.sigv4Service = "execute-api"
} else {
o.sigv4Service = service
}
}
}

func WithAuthURI(uri *url.URL) Option {
return func(o *options) {
o.authUri = uri
}
}

func WithPrefix(prefix string) Option {
return func(o *options) {
o.prefix = prefix
}
}

type Option func(*options)

type options struct {
awsConfig aws.Config

tlsConfig *tls.Config
credential string
oauthToken string
warehouseLocation string
metadataLocation string
enableSigv4 bool
sigv4Region string
sigv4Service string
prefix string
authUri *url.URL
}

type PropertiesUpdateSummary struct {
Removed []string `json:"removed"`
Updated []string `json:"updated"`
Missing []string `json:"missing"`
}

// Catalog for iceberg table operations like create, drop, load, list and others.
type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() CatalogType

// ListTables returns a list of table identifiers in the catalog, with the returned
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error)
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
// DropTable tells the catalog to drop the table entirely
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
// ListNamespaces returns the list of available namespaces, optionally filtering by a
// parent namespace
ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error)
// CreateNamespace tells the catalog to create a new namespace with the given properties
CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error
// DropNamespace tells the catalog to drop the namespace and all tables in that namespace
DropNamespace(ctx context.Context, namespace table.Identifier) error
// LoadNamespaceProperties returns the current properties in the catalog for
// a given namespace
LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error)
// UpdateNamespaceProperties allows removing, adding, and/or updating properties of a namespace
UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error)
}

const (
keyToken = "token"
keyWarehouseLocation = "warehouse"
keyMetadataLocation = "metadata_location"
keyCredential = "credential"
)

func TableNameFromIdent(ident table.Identifier) string {
if len(ident) == 0 {
return ""
}

return ident[len(ident)-1]
}

func NamespaceFromIdent(ident table.Identifier) table.Identifier {
return ident[:len(ident)-1]
}
32 changes: 31 additions & 1 deletion catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier
// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props map[string]string) (*table.Table, error) {
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,6 +125,35 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] rename table", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {
return fmt.Errorf("%w: [Glue Catalog] create namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) DropNamespace(ctx context.Context, namespace table.Identifier) error {
return fmt.Errorf("%w: [Glue Catalog] drop namespace", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] load namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) UpdateNamespaceProperties(ctx context.Context, namespace table.Identifier,
removals []string, updates iceberg.Properties) (PropertiesUpdateSummary, error) {
return PropertiesUpdateSummary{}, fmt.Errorf("%w: [Glue Catalog] update namespace properties", iceberg.ErrNotImplemented)
}

func (c *GlueCatalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
return nil, fmt.Errorf("%w: [Glue Catalog] list namespaces", iceberg.ErrNotImplemented)
}

// GetTable loads a table from the Glue Catalog using the given database and table name.
func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) (string, error) {
tblRes, err := c.glueSvc.GetTable(ctx,
Expand Down
Loading

0 comments on commit ebce777

Please sign in to comment.