Skip to content

Commit

Permalink
add small CLI and catalog integration example
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Feb 9, 2024
1 parent fb2df75 commit e530020
Show file tree
Hide file tree
Showing 15 changed files with 1,449 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ lib/
*.iml

.envrc*

# local catalog environment via docker
dev/notebooks
dev/warehouse
106 changes: 106 additions & 0 deletions catalog/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Catalog Implementations

## Integration Testing

The Catalog implementations can be manually tested using the CLI implemented
in the `cmd/iceberg` folder.

### REST Catalog

To test the REST catalog implementation, we have a docker configuration
for a Minio container and tabluario/iceberg-rest container.

You can spin up the local catalog by going to the `dev/` folder and running
`docker-compose up`. You can then follow the steps of the Iceberg [Quickstart](https://iceberg.apache.org/spark-quickstart/#creating-a-table)
tutorial, which we've summarized below.

#### Setup your Iceberg catalog

First launch a pyspark console by running:

```bash
docker exec -it spark-iceberg pyspark
```

Once in the pyspark shell, we create a simple table with a namespace of
"demo.nyc" called "taxis":

```python
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
StructField("vendor_id", LongType(), True),
StructField("trip_id", LongType(), True),
StructField("trip_distance", FloatType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()
```

Finally, we write another data-frame to the table to add new files:

```python
schema = spark.table("demo.nyc.taxis").schema
data = [
(1, 1000371, 1.8, 15.32, "N"),
(2, 1000372, 2.5, 22.15, "N"),
(2, 1000373, 0.9, 9.01, "N"),
(1, 1000374, 8.4, 42.13, "Y")
]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()
```

#### Testing with the CLI

Now that we have a table in the catalog which is running. You can use the
CLI which is implemented in the `cmd/iceberg` folder. You will need to set
the following environment variables (which can also be found in the
docker-compose.yml):

```
AWS_S3_ENDPOINT=http://localhost:9000
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=admin
AWS_SECRET_ACCESS_KEY=password
```

With those environment variables set you can now run the CLI:

```bash
$ go run ./cmd/iceberg list --catalog rest --uri http://localhost:8181
┌──────┐
| IDs |
| ---- |
| demo |
└──────┘
```

You can retrieve the schema of the table:

```bash
$ go run ./cmd/iceberg schema --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Current Schema, id=0
├──1: vendor_id: optional long
├──2: trip_id: optional long
├──3: trip_distance: optional float
├──4: fare_amount: optional double
└──5: store_and_fwd_flag: optional string
```

You can get the file list:

```bash
$ go run ./cmd/iceberg files --catalog rest --uri http://localhost:8181 demo.nyc.taxis
Snapshots: rest.demo.nyc.taxis
└─┬Snapshot 7004656639550124164, schema 0: s3://warehouse/demo/nyc/taxis/metadata/snap-7004656639550124164-1-0d533cd4-f0c1-45a6-a691-f2be3abe5491.avro
└─┬Manifest: s3://warehouse/demo/nyc/taxis/metadata/0d533cd4-f0c1-45a6-a691-f2be3abe5491-m0.avro
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00004-24-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00009-29-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
├──Datafile: s3://warehouse/demo/nyc/taxis/data/00014-34-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
└──Datafile: s3://warehouse/demo/nyc/taxis/data/00019-39-244255d4-8bf6-41bd-8885-bf7d2136fddf-00001.parquet
```

and so on, for the various options available in the CLI.
6 changes: 1 addition & 5 deletions catalog/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,7 @@ func (r *RestCatalog) ListNamespaces(ctx context.Context, parent table.Identifie
return nil, err
}

out := make([]table.Identifier, len(rsp.Namespaces))
for i, ns := range rsp.Namespaces {
out[i] = append(parent, ns...)
}
return out, nil
return rsp.Namespaces, nil
}

func (r *RestCatalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
Expand Down
17 changes: 17 additions & 0 deletions catalog/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/table"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -816,3 +818,18 @@ func TestRestCatalog(t *testing.T) {
suite.Run(t, new(RestCatalogSuite))
suite.Run(t, new(RestTLSCatalogSuite))
}

func TestRestIntegration(t *testing.T) {
cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181")
require.NoError(t, err)

require.NotNil(t, cat)

tbls, err := cat.ListTables(context.Background(), catalog.ToRestIdentifier("demo", "nyc"))
require.NoError(t, err)

tbl, err := cat.LoadTable(context.Background(), tbls[0], nil)
require.NoError(t, err)

fmt.Println(tbl.Metadata())
}
Loading

0 comments on commit e530020

Please sign in to comment.