Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl rest catalog + table updates & requirements #146

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
ErrTableAlreadyExists = errors.New("table already exists")
)

// WithAwsConfig sets the AWS configuration for the catalog.
Expand Down Expand Up @@ -146,9 +147,9 @@ type Catalog interface {
// identifiers containing the information required to load the table via that catalog.
ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error)
LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error)
jwtryg marked this conversation as resolved.
Show resolved Hide resolved
// DropTable tells the catalog to drop the table entirely
DropTable(ctx context.Context, identifier table.Identifier) error
DropTable(ctx context.Context, identifier table.Identifier, purge bool) error
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
// RenameTable tells the catalog to rename a given table by the identifiers
// provided, and then loads and returns the destination table
RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error)
Expand Down
10 changes: 3 additions & 7 deletions catalog/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,19 @@ func (c *GlueCatalog) ListTables(ctx context.Context, namespace table.Identifier
// LoadTable loads a table from the catalog table details.
//
// The identifier should contain the Glue database name, then glue table name.
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) {
database, tableName, err := identifierToGlueTable(identifier)
if err != nil {
return nil, err
}

if props == nil {
props = map[string]string{}
}

location, err := c.getTable(ctx, database, tableName)
if err != nil {
return nil, err
}

// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(props, location)
iofs, err := io.LoadFS(nil, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}
Expand All @@ -125,7 +121,7 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}

func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error {
return fmt.Errorf("%w: [Glue Catalog] drop table", iceberg.ErrNotImplemented)
}

Expand Down
2 changes: 1 addition & 1 deletion catalog/glue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestGlueLoadTableIntegration(t *testing.T) {

catalog := NewGlueCatalog(WithAwsConfig(awscfg))

table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")}, nil)
table, err := catalog.LoadTable(context.TODO(), []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_TABLE_NAME")})
assert.NoError(err)
assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier())
}
247 changes: 212 additions & 35 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,86 @@ func (e errorResponse) Error() string {
return e.Type + ": " + e.Message
}

type Identifier struct {
Namespace []string `json:"namespace"`
Name string `json:"name"`
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to export this type, we should probably name it RestIdentifier or something equivalent to separate it from other catalog identifier types.


type commitTableResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Metadata table.Metadata `json:"-"`
}

func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) {
type Alias commitTableResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
}

type loadTableResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Config iceberg.Properties `json:"config"`
Metadata table.Metadata `json:"-"`
}

func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
type Alias loadTableResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
}

type createTableOption func(*createTableRequest)

func WithTableLocation(loc string) createTableOption {
return func(req *createTableRequest) {
req.Location = strings.TrimRight(loc, "/")
}
}

func WithTablePartitionSpec(spec iceberg.PartitionSpec) createTableOption {
return func(req *createTableRequest) {
req.PartitionSpec = spec
}
}

func WithTableWriteOrder(order table.SortOrder) createTableOption {
return func(req *createTableRequest) {
req.WriteOrder = order
}
}

func WithTableStagingCreate() createTableOption {
return func(req *createTableRequest) {
req.StageCreate = true
}
}

func WithTableProperties(props iceberg.Properties) createTableOption {
return func(req *createTableRequest) {
req.Props = props
}
}

type createTableRequest struct {
Name string `json:"name"`
Location string `json:"location"`
Schema *iceberg.Schema `json:"schema"`
PartitionSpec iceberg.PartitionSpec `json:"partition-spec"`
WriteOrder table.SortOrder `json:"write-order"`
StageCreate bool `json:"stage-create"`
Props iceberg.Properties `json:"properties"`
}

type oauthTokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
Expand Down Expand Up @@ -537,6 +617,25 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}

func (r *RestCatalog) tableFromResponse(identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
}

tblProps := maps.Clone(r.props)
maps.Copy(tblProps, metadata.Properties())
for k, v := range config {
tblProps[k] = v
}

iofs, err := iceio.LoadFS(tblProps, loc)
if err != nil {
return nil, err
}
return table.New(id, metadata, loc, iofs), nil
}

func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) {
if err := checkValidNamespace(namespace); err != nil {
return nil, err
Expand All @@ -546,12 +645,8 @@ func (r *RestCatalog) ListTables(ctx context.Context, namespace table.Identifier
path := []string{"namespaces", ns, "tables"}

type resp struct {
Identifiers []struct {
Namespace []string `json:"namespace"`
Name string `json:"name"`
} `json:"identifiers"`
Identifiers []Identifier `json:"identifiers"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the identifier type used anywhere other than here? The reason I had done it inline here was because it was only used in this one spot and i didn't want it to get confused with table.Identifier

}

rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
if err != nil {
return nil, err
Expand All @@ -573,64 +668,129 @@ func splitIdentForPath(ident table.Identifier) (string, string, error) {
return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), TableNameFromIdent(ident), nil
}

type tblResponse struct {
MetadataLoc string `json:"metadata-location"`
RawMetadata json.RawMessage `json:"metadata"`
Config iceberg.Properties `json:"config"`
Metadata table.Metadata `json:"-"`
}
func (r *RestCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOption) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
type Alias tblResponse
if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
return err
payload := createTableRequest{
Name: tbl,
Schema: schema,
}
for _, o := range opts {
o(&payload)
}

t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
return
ret, err := doPost[createTableRequest, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables"}, payload,
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists})
if err != nil {
return nil, err
}

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, ret.Config)
}

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
func (r *RestCatalog) RegisterTable(ctx context.Context, identifier table.Identifier, metadataLoc string) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

if props == nil {
props = iceberg.Properties{}
type payload struct {
Name string `json:"name"`
MetadataLoc string `json:"metadata-location"`
}

ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: ErrTableAlreadyExists})
if err != nil {
return nil, err
}

id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, ret.Config)
}

func (r *RestCatalog) LoadTable(ctx context.Context, identifier table.Identifier) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}

tblProps := maps.Clone(r.props)
maps.Copy(tblProps, props)
maps.Copy(tblProps, ret.Metadata.Properties())
for k, v := range ret.Config {
tblProps[k] = v
ret, err := doGet[loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
if err != nil {
return nil, err
}

iofs, err := iceio.LoadFS(tblProps, ret.MetadataLoc)
return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, ret.Config)
}

func (r *RestCatalog) UpdateTable(ctx context.Context, identifier table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}
return table.New(id, ret.Metadata, ret.MetadataLoc, iofs), nil

ident := Identifier{
Namespace: NamespaceFromIdent(identifier),
Name: tbl,
}
type payload struct {
Identifier Identifier `json:"identifier"`
Requirements []table.Requirement `json:"requirements"`
Updates []table.Update `json:"updates"`
}
ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables", tbl},
payload{Identifier: ident, Requirements: requirements, Updates: updates}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable, http.StatusConflict: ErrCommitFailed})
if err != nil {
return nil, err
}

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, nil)
}

func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier) error {
return fmt.Errorf("%w: [Rest Catalog] drop table", iceberg.ErrNotImplemented)
func (r *RestCatalog) DropTable(ctx context.Context, identifier table.Identifier, purge bool) error {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return err
}

uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl)
if purge {
v := url.Values{}
v.Set("purgeRequested", "true")
uri.RawQuery = v.Encode()
}

_, err = doDelete[struct{}](ctx, uri, []string{}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable})

return err
}

func (r *RestCatalog) RenameTable(ctx context.Context, from, to table.Identifier) (*table.Table, error) {
return nil, fmt.Errorf("%w: [Rest Catalog] rename table", iceberg.ErrNotImplemented)
type payload struct {
From Identifier `json:"from"`
To Identifier `json:"to"`
}
f := Identifier{
Namespace: NamespaceFromIdent(from),
Name: TableNameFromIdent(from),
}
t := Identifier{
Namespace: NamespaceFromIdent(to),
Name: TableNameFromIdent(to),
}

_, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", "rename"}, payload{From: f, To: t}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchTable})
if err != nil {
return nil, err
}

return r.LoadTable(ctx, to)
}

func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {
Expand Down Expand Up @@ -710,3 +870,20 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx context.Context, namespace t
return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI, []string{"namespaces", ns, "properties"},
payload{Remove: removals, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
}

func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) {
if err := checkValidNamespace(namespace); err != nil {
return false, err
}

_, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)},
r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
if err != nil {
if errors.Is(err, ErrNoSuchNamespace) {
return false, nil
}
return false, err
}

return true, nil
}
Loading
Loading