From af468fd760f4d668ca2b5d3d3faf2e98108cd305 Mon Sep 17 00:00:00 2001 From: zhouzilong <529620861@qq.com> Date: Tue, 27 Feb 2024 09:33:24 -0500 Subject: [PATCH] server struct --- Cargo.toml | 59 +- Makefile | 4 + crates/rest/Cargo.toml | 48 - crates/rest/DEPENDENCIES.rust.tsv | 295 --- crates/rest/src/catalog.rs | 1604 ----------------- crates/rest/src/lib.rs | 23 - .../rest/testdata/create_table_response.json | 53 - crates/rest/testdata/load_table_response.json | 68 - .../testdata/rest_catalog/docker-compose.yaml | 65 - .../rest/testdata/update_table_response.json | 40 - crates/rest/tests/rest_catalog_test.rs | 376 ---- crates/test_utils/Cargo.toml | 31 - crates/test_utils/src/cmd.rs | 41 - crates/test_utils/src/docker.rs | 102 -- crates/test_utils/src/lib.rs | 41 - rustfmt.toml | 4 + scripts/parse_dependencies.py | 42 + src/main.rs | 31 +- src/server/mod.rs | 1 + src/server/routes/config.rs | 4 + src/server/routes/metric.rs | 5 + src/server/routes/mod.rs | 5 + src/server/routes/namespace.rs | 35 + src/server/routes/table.rs | 47 + 24 files changed, 185 insertions(+), 2839 deletions(-) delete mode 100644 crates/rest/Cargo.toml delete mode 100644 crates/rest/DEPENDENCIES.rust.tsv delete mode 100644 crates/rest/src/catalog.rs delete mode 100644 crates/rest/src/lib.rs delete mode 100644 crates/rest/testdata/create_table_response.json delete mode 100644 crates/rest/testdata/load_table_response.json delete mode 100644 crates/rest/testdata/rest_catalog/docker-compose.yaml delete mode 100644 crates/rest/testdata/update_table_response.json delete mode 100644 crates/rest/tests/rest_catalog_test.rs delete mode 100644 crates/test_utils/Cargo.toml delete mode 100644 crates/test_utils/src/cmd.rs delete mode 100644 crates/test_utils/src/docker.rs delete mode 100644 crates/test_utils/src/lib.rs create mode 100644 rustfmt.toml create mode 100644 scripts/parse_dependencies.py create mode 100644 src/server/mod.rs create mode 100644 src/server/routes/config.rs create mode 100644 src/server/routes/metric.rs create mode 100644 src/server/routes/mod.rs create mode 100644 src/server/routes/namespace.rs create mode 100644 src/server/routes/table.rs diff --git a/Cargo.toml b/Cargo.toml index 282d0a2..67d32d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,58 +15,17 @@ # specific language governing permissions and limitations # under the License. -[workspace] -resolver = "2" -members = ["crates/rest", "crates/examples", "crates/iceberg", "crates/test_utils"] - -[workspace.package] -version = "0.2.0" +[package] +name = "catalog2" +version = "0.1.0" edition = "2021" - license = "Apache-2.0" repository = "https://github.com/cmu-db/15721-s24-catalog2" rust-version = "1.75.0" -[workspace.dependencies] -anyhow = "1.0.72" -apache-avro = "0.16" -arrow-arith = { version = ">=46" } -arrow-array = { version = ">=46" } -arrow-schema = { version = ">=46" } -async-trait = "0.1" -bimap = "0.6" -bitvec = "1.0.1" -chrono = "0.4" -derive_builder = "0.13.0" -either = "1" -env_logger = "0.11.0" -futures = "0.3" -iceberg = { path = "./crates/iceberg" } -iceberg-catalog-rest = { path = "./crates/rest" } -itertools = "0.12" -lazy_static = "1" -log = "^0.4" -mockito = "^1" -murmur3 = "0.5.2" -once_cell = "1" -opendal = "0.45" -ordered-float = "4.0.0" -pretty_assertions = "1.4.0" -port_scanner = "0.1.5" -reqwest = { version = "^0.11", features = ["json"] } -rust_decimal = "1.31.0" -serde = { version = "^1.0", features = ["rc"] } -serde_bytes = "0.11.8" -serde_derive = "^1.0" -serde_json = "^1.0" -serde_repr = "0.1.16" -serde_with = "3.4.0" -tempfile = "3.8" -tokio = { version = "1", features = ["macros"] } -typed-builder = "^0.18" -url = "2" -urlencoding = "2" -uuid = "1.6.1" -volo-thrift = "0.9.2" -hive_metastore = "0.0.2" -tera = "1" + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +rocket = { version = "0.5.0", features = ["json", "http2"] } +dotenv = "0.15.0" \ No newline at end of file diff --git a/Makefile b/Makefile index d411cce..efc7fd7 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,10 @@ RUST_LOG = debug build: cargo build +run: + cargo build + target/debug/catalog2 + check-fmt: cargo fmt --all -- --check diff --git a/crates/rest/Cargo.toml b/crates/rest/Cargo.toml deleted file mode 100644 index 6db3060..0000000 --- a/crates/rest/Cargo.toml +++ /dev/null @@ -1,48 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "iceberg-catalog-rest" -version = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } - -categories = ["database"] -description = "Apache Iceberg Rust REST API" -repository = { workspace = true } -license = { workspace = true } -keywords = ["iceberg", "rest", "catalog"] - -[dependencies] -# async-trait = { workspace = true } -async-trait = { workspace = true } -chrono = { workspace = true } -iceberg = { workspace = true } -log = "0.4.20" -reqwest = { workspace = true } -serde = { workspace = true } -serde_derive = { workspace = true } -serde_json = { workspace = true } -typed-builder = { workspace = true } -urlencoding = { workspace = true } -uuid = { workspace = true, features = ["v4"] } - -[dev-dependencies] -iceberg_test_utils = { path = "../test_utils", features = ["tests"] } -mockito = { workspace = true } -port_scanner = { workspace = true } -tokio = { workspace = true } diff --git a/crates/rest/DEPENDENCIES.rust.tsv b/crates/rest/DEPENDENCIES.rust.tsv deleted file mode 100644 index 6061ce8..0000000 --- a/crates/rest/DEPENDENCIES.rust.tsv +++ /dev/null @@ -1,295 +0,0 @@ -crate 0BSD Apache-2.0 Apache-2.0 WITH LLVM-exception BSD-2-Clause BSD-3-Clause BSL-1.0 CC0-1.0 ISC MIT OpenSSL Unicode-DFS-2016 Unlicense Zlib -addr2line@0.21.0 X X -adler@1.0.2 X X X -adler32@1.2.0 X -ahash@0.8.6 X X -aho-corasick@1.1.2 X X -android-tzdata@0.1.1 X X -android_system_properties@0.1.5 X X -anstream@0.6.11 X X -anstyle@1.0.4 X X -anstyle-parse@0.2.3 X X -anstyle-query@1.0.2 X X -anstyle-wincon@3.0.2 X X -anyhow@1.0.77 X X -apache-avro@0.16.0 X -arrayvec@0.7.4 X X -arrow-arith@49.0.0 X -arrow-array@49.0.0 X -arrow-buffer@49.0.0 X -arrow-data@49.0.0 X -arrow-schema@49.0.0 X -async-compat@0.2.3 X X -async-trait@0.1.75 X X -autocfg@1.1.0 X X -backon@0.4.1 X -backtrace@0.3.69 X X -base64@0.21.5 X X -base64ct@1.6.0 X X -bimap@0.6.3 X X -bitflags@1.3.2 X X -bitflags@2.4.1 X X -bitvec@1.0.1 X -block-buffer@0.10.4 X X -bumpalo@3.14.0 X X -byteorder@1.5.0 X X -bytes@1.5.0 X -cc@1.0.83 X X -cfg-if@1.0.0 X X -chrono@0.4.31 X X -colorchoice@1.0.0 X X -const-oid@0.9.6 X X -const-random@0.1.17 X X -const-random-macro@0.1.16 X X -core-foundation@0.9.4 X X -core-foundation-sys@0.8.6 X X -core2@0.4.0 X X -cpufeatures@0.2.11 X X -crc32fast@1.3.2 X X -crunchy@0.2.2 X -crypto-common@0.1.6 X X -darling@0.14.4 X -darling@0.20.3 X -darling_core@0.14.4 X -darling_core@0.20.3 X -darling_macro@0.14.4 X -darling_macro@0.20.3 X -dary_heap@0.3.6 X X -der@0.7.8 X X -deranged@0.3.10 X X -derive_builder@0.13.0 X X -derive_builder_core@0.13.0 X X -derive_builder_macro@0.13.0 X X -digest@0.10.7 X X -dlv-list@0.5.2 X X -either@1.9.0 X X -encoding_rs@0.8.33 X X X -env_filter@0.1.0 X X -env_logger@0.11.0 X X -equivalent@1.0.1 X X -fastrand@1.9.0 X X -fastrand@2.0.1 X X -flagset@0.4.4 X -fnv@1.0.7 X X -foreign-types@0.3.2 X X -foreign-types-shared@0.1.1 X X -form_urlencoded@1.2.1 X X -funty@2.0.0 X -futures@0.3.30 X X -futures-channel@0.3.30 X X -futures-core@0.3.30 X X -futures-executor@0.3.30 X X -futures-io@0.3.30 X X -futures-macro@0.3.30 X X -futures-sink@0.3.30 X X -futures-task@0.3.30 X X -futures-util@0.3.30 X X -generic-array@0.14.7 X -getrandom@0.2.11 X X -gimli@0.28.1 X X -h2@0.3.22 X -half@2.3.1 X X -hashbrown@0.13.2 X X -hashbrown@0.14.3 X X -heck@0.4.1 X X -hermit-abi@0.3.3 X X -hex@0.4.3 X X -hmac@0.12.1 X X -home@0.5.9 X X -http@0.2.11 X X -http-body@0.4.6 X -httparse@1.8.0 X X -httpdate@1.0.3 X X -humantime@2.1.0 X X -hyper@0.14.28 X -hyper-rustls@0.24.2 X X X -hyper-tls@0.5.0 X X -iana-time-zone@0.1.58 X X -iana-time-zone-haiku@0.1.2 X X -iceberg@0.2.0 X -iceberg-catalog-rest@0.2.0 X -iceberg_test_utils@0.2.0 X -ident_case@1.0.1 X X -idna@0.5.0 X X -indexmap@2.1.0 X X -instant@0.1.12 X -ipnet@2.9.0 X X -itertools@0.12.0 X X -itoa@1.0.10 X X -js-sys@0.3.66 X X -jsonwebtoken@9.2.0 X -lazy_static@1.4.0 X X -libc@0.2.151 X X -libflate@2.0.0 X -libflate_lz77@2.0.0 X -libm@0.2.8 X X -linux-raw-sys@0.4.12 X X X -lock_api@0.4.11 X X -log@0.4.20 X X -md-5@0.10.6 X X -memchr@2.6.4 X X -mime@0.3.17 X X -miniz_oxide@0.7.1 X X X -mio@0.8.10 X -murmur3@0.5.2 X X -native-tls@0.2.11 X X -num@0.4.1 X X -num-bigint@0.4.4 X X -num-bigint-dig@0.8.4 X X -num-complex@0.4.4 X X -num-integer@0.1.45 X X -num-iter@0.1.43 X X -num-rational@0.4.1 X X -num-traits@0.2.17 X X -num_cpus@1.16.0 X X -object@0.32.2 X X -once_cell@1.19.0 X X -opendal@0.44.0 X -openssl@0.10.62 X -openssl-macros@0.1.1 X X -openssl-probe@0.1.5 X X -openssl-sys@0.9.98 X -ordered-float@4.2.0 X -ordered-multimap@0.7.1 X -parking_lot@0.12.1 X X -parking_lot_core@0.9.9 X X -pem@3.0.3 X -pem-rfc7468@0.7.0 X X -percent-encoding@2.3.1 X X -pin-project@1.1.3 X X -pin-project-internal@1.1.3 X X -pin-project-lite@0.2.13 X X -pin-utils@0.1.0 X X -pkcs1@0.7.5 X X -pkcs8@0.10.2 X X -pkg-config@0.3.28 X X -powerfmt@0.2.0 X X -ppv-lite86@0.2.17 X X -proc-macro2@1.0.71 X X -quad-rand@0.2.1 X -quick-xml@0.30.0 X -quick-xml@0.31.0 X -quote@1.0.33 X X -radium@0.7.0 X -rand@0.8.5 X X -rand_chacha@0.3.1 X X -rand_core@0.6.4 X X -redox_syscall@0.4.1 X -regex@1.10.2 X X -regex-automata@0.4.3 X X -regex-lite@0.1.5 X X -regex-syntax@0.8.2 X X -reqsign@0.14.6 X -reqwest@0.11.23 X X -ring@0.17.7 X -rle-decode-fast@1.0.3 X X -rsa@0.9.6 X X -rust-ini@0.20.0 X -rust_decimal@1.33.1 X -rustc-demangle@0.1.23 X X -rustix@0.38.28 X X X -rustls@0.21.10 X X X -rustls-native-certs@0.6.3 X X X -rustls-pemfile@1.0.4 X X X -rustls-webpki@0.101.7 X -rustversion@1.0.14 X X -ryu@1.0.16 X X -schannel@0.1.22 X -scopeguard@1.2.0 X X -sct@0.7.1 X X X -security-framework@2.9.2 X X -security-framework-sys@2.9.1 X X -serde@1.0.193 X X -serde_bytes@0.11.13 X X -serde_derive@1.0.193 X X -serde_json@1.0.108 X X -serde_repr@0.1.17 X X -serde_urlencoded@0.7.1 X X -serde_with@3.4.0 X X -serde_with_macros@3.4.0 X X -sha1@0.10.6 X X -sha2@0.10.8 X X -signal-hook-registry@1.4.1 X X -signature@2.2.0 X X -simple_asn1@0.6.2 X -slab@0.4.9 X -smallvec@1.11.2 X X -socket2@0.5.5 X X -spin@0.5.2 X -spin@0.9.8 X -spki@0.7.3 X X -strsim@0.10.0 X -strum@0.25.0 X -strum_macros@0.25.3 X -subtle@2.5.0 X -syn@1.0.109 X X -syn@2.0.43 X X -system-configuration@0.5.1 X X -system-configuration-sys@0.5.0 X X -tap@1.0.1 X -tempfile@3.8.1 X X -thiserror@1.0.52 X X -thiserror-impl@1.0.52 X X -time@0.3.31 X X -time-core@0.1.2 X X -time-macros@0.2.16 X X -tiny-keccak@2.0.2 X -tinyvec@1.6.0 X X X -tinyvec_macros@0.1.1 X X X -tokio@1.35.1 X -tokio-macros@2.2.0 X -tokio-native-tls@0.3.1 X -tokio-rustls@0.24.1 X X -tokio-util@0.7.10 X -tower-service@0.3.2 X -tracing@0.1.40 X -tracing-core@0.1.32 X -try-lock@0.2.5 X -typed-builder@0.16.2 X X -typed-builder@0.18.0 X X -typed-builder-macro@0.16.2 X X -typed-builder-macro@0.18.0 X X -typenum@1.17.0 X X -unicode-bidi@0.3.14 X X -unicode-ident@1.0.12 X X X -unicode-normalization@0.1.22 X X -untrusted@0.9.0 X -url@2.5.0 X X -urlencoding@2.1.3 X -utf8parse@0.2.1 X X -uuid@1.6.1 X X -vcpkg@0.2.15 X X -version_check@0.9.4 X X -want@0.3.1 X -wasi@0.11.0+wasi-snapshot-preview1 X X X -wasm-bindgen@0.2.89 X X -wasm-bindgen-backend@0.2.89 X X -wasm-bindgen-futures@0.4.39 X X -wasm-bindgen-macro@0.2.89 X X -wasm-bindgen-macro-support@0.2.89 X X -wasm-bindgen-shared@0.2.89 X X -wasm-streams@0.3.0 X X -web-sys@0.3.66 X X -windows-core@0.51.1 X X -windows-sys@0.48.0 X X -windows-sys@0.52.0 X X -windows-targets@0.48.5 X X -windows-targets@0.52.0 X X -windows_aarch64_gnullvm@0.48.5 X X -windows_aarch64_gnullvm@0.52.0 X X -windows_aarch64_msvc@0.48.5 X X -windows_aarch64_msvc@0.52.0 X X -windows_i686_gnu@0.48.5 X X -windows_i686_gnu@0.52.0 X X -windows_i686_msvc@0.48.5 X X -windows_i686_msvc@0.52.0 X X -windows_x86_64_gnu@0.48.5 X X -windows_x86_64_gnu@0.52.0 X X -windows_x86_64_gnullvm@0.48.5 X X -windows_x86_64_gnullvm@0.52.0 X X -windows_x86_64_msvc@0.48.5 X X -windows_x86_64_msvc@0.52.0 X X -winreg@0.50.0 X -wyz@0.5.1 X -zerocopy@0.7.32 X X X -zeroize@1.7.0 X X diff --git a/crates/rest/src/catalog.rs b/crates/rest/src/catalog.rs deleted file mode 100644 index c10d904..0000000 --- a/crates/rest/src/catalog.rs +++ /dev/null @@ -1,1604 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains rest catalog implementation. - -use std::collections::HashMap; - -use async_trait::async_trait; -use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; -use reqwest::{Client, Request, Response, StatusCode}; -use serde::de::DeserializeOwned; -use typed_builder::TypedBuilder; -use urlencoding::encode; - -use crate::catalog::_serde::{ - CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse, -}; -use iceberg::io::FileIO; -use iceberg::table::Table; -use iceberg::Result; -use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, -}; - -use self::_serde::{ - CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, NamespaceSerde, - RenameTableRequest, NO_CONTENT, OK, -}; - -const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; -const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); -const PATH_V1: &str = "v1"; - -/// Rest catalog configuration. -#[derive(Debug, TypedBuilder)] -pub struct RestCatalogConfig { - uri: String, - #[builder(default, setter(strip_option))] - warehouse: Option, - - #[builder(default)] - props: HashMap, -} - -impl RestCatalogConfig { - fn config_endpoint(&self) -> String { - [&self.uri, PATH_V1, "config"].join("/") - } - - fn namespaces_endpoint(&self) -> String { - [&self.uri, PATH_V1, "namespaces"].join("/") - } - - fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { - [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") - } - - fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { - [ - &self.uri, - PATH_V1, - "namespaces", - &ns.encode_in_url(), - "tables", - ] - .join("/") - } - - fn rename_table_endpoint(&self) -> String { - [&self.uri, PATH_V1, "tables", "rename"].join("/") - } - - fn table_endpoint(&self, table: &TableIdent) -> String { - [ - &self.uri, - PATH_V1, - "namespaces", - &table.namespace.encode_in_url(), - "tables", - encode(&table.name).as_ref(), - ] - .join("/") - } - - fn try_create_rest_client(&self) -> Result { - //TODO: We will add oauth, ssl config, sigv4 later - let headers = HeaderMap::from_iter([ - ( - header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - ), - ( - HeaderName::from_static("x-client-version"), - HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION), - ), - ( - header::USER_AGENT, - HeaderValue::from_str(&format!("iceberg-rs/{}", CARGO_PKG_VERSION)).unwrap(), - ), - ]); - - Ok(HttpClient( - Client::builder().default_headers(headers).build()?, - )) - } -} - -#[derive(Debug)] -struct HttpClient(Client); - -impl HttpClient { - async fn query< - R: DeserializeOwned, - E: DeserializeOwned + Into, - const SUCCESS_CODE: u16, - >( - &self, - request: Request, - ) -> Result { - let resp = self.0.execute(request).await?; - - if resp.status().as_u16() == SUCCESS_CODE { - let text = resp.bytes().await?; - Ok(serde_json::from_slice::(&text).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to parse response from rest catalog server!", - ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) - })?) - } else { - let text = resp.bytes().await?; - let e = serde_json::from_slice::(&text).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to parse response from rest catalog server!", - ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) - })?; - Err(e.into()) - } - } - - async fn execute, const SUCCESS_CODE: u16>( - &self, - request: Request, - ) -> Result<()> { - let resp = self.0.execute(request).await?; - - if resp.status().as_u16() == SUCCESS_CODE { - Ok(()) - } else { - let code = resp.status(); - let text = resp.bytes().await?; - let e = serde_json::from_slice::(&text).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to parse response from rest catalog server!", - ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_context("code", code.to_string()) - .with_source(e) - })?; - Err(e.into()) - } - } - - /// More generic logic handling for special cases like head. - async fn do_execute>( - &self, - request: Request, - handler: impl FnOnce(&Response) -> Option, - ) -> Result { - let resp = self.0.execute(request).await?; - - if let Some(ret) = handler(&resp) { - Ok(ret) - } else { - let code = resp.status(); - let text = resp.bytes().await?; - let e = serde_json::from_slice::(&text).map_err(|e| { - Error::new( - ErrorKind::Unexpected, - "Failed to parse response from rest catalog server!", - ) - .with_context("code", code.to_string()) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) - })?; - Err(e.into()) - } - } -} - -/// Rest catalog implementation. -#[derive(Debug)] -pub struct RestCatalog { - config: RestCatalogConfig, - client: HttpClient, -} - -#[async_trait] -impl Catalog for RestCatalog { - /// List namespaces from table. - async fn list_namespaces( - &self, - parent: Option<&NamespaceIdent>, - ) -> Result> { - let mut request = self.client.0.get(self.config.namespaces_endpoint()); - if let Some(ns) = parent { - request = request.query(&[("parent", ns.encode_in_url())]); - } - - let resp = self - .client - .query::(request.build()?) - .await?; - - resp.namespaces - .into_iter() - .map(NamespaceIdent::from_vec) - .collect::>>() - } - - /// Create a new namespace inside the catalog. - async fn create_namespace( - &self, - namespace: &NamespaceIdent, - properties: HashMap, - ) -> Result { - let request = self - .client - .0 - .post(self.config.namespaces_endpoint()) - .json(&NamespaceSerde { - namespace: namespace.as_ref().clone(), - properties: Some(properties), - }) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - - Namespace::try_from(resp) - } - - /// Get a namespace information from the catalog. - async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { - let request = self - .client - .0 - .get(self.config.namespace_endpoint(namespace)) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - Namespace::try_from(resp) - } - - /// Update a namespace inside the catalog. - /// - /// # Behavior - /// - /// The properties must be the full set of namespace. - async fn update_namespace( - &self, - _namespace: &NamespaceIdent, - _properties: HashMap, - ) -> Result<()> { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating namespace not supported yet!", - )) - } - - async fn namespace_exists(&self, ns: &NamespaceIdent) -> Result { - let request = self - .client - .0 - .head(self.config.namespace_endpoint(ns)) - .build()?; - - self.client - .do_execute::(request, |resp| match resp.status() { - StatusCode::NO_CONTENT => Some(true), - StatusCode::NOT_FOUND => Some(false), - _ => None, - }) - .await - } - - /// Drop a namespace from the catalog. - async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { - let request = self - .client - .0 - .delete(self.config.namespace_endpoint(namespace)) - .build()?; - - self.client - .execute::(request) - .await - } - - /// List tables from namespace. - async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - let request = self - .client - .0 - .get(self.config.tables_endpoint(namespace)) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - - Ok(resp.identifiers) - } - - /// Create a new table inside the namespace. - async fn create_table( - &self, - namespace: &NamespaceIdent, - creation: TableCreation, - ) -> Result { - let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); - - let request = self - .client - .0 - .post(self.config.tables_endpoint(namespace)) - .json(&CreateTableRequest { - name: creation.name, - location: creation.location, - schema: creation.schema, - partition_spec: creation.partition_spec, - write_order: creation.sort_order, - // We don't support stage create yet. - stage_create: Some(false), - properties: if creation.properties.is_empty() { - None - } else { - Some(creation.properties) - }, - }) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - - let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?; - - let table = Table::builder() - .identifier(table_ident) - .file_io(file_io) - .metadata(resp.metadata) - .metadata_location(resp.metadata_location.ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Metadata location missing in create table response!", - ) - })?) - .build(); - - Ok(table) - } - - /// Load table from the catalog. - async fn load_table(&self, table: &TableIdent) -> Result
{ - let request = self - .client - .0 - .get(self.config.table_endpoint(table)) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - - let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?; - - let table_builder = Table::builder() - .identifier(table.clone()) - .file_io(file_io) - .metadata(resp.metadata); - - if let Some(metadata_location) = resp.metadata_location { - Ok(table_builder.metadata_location(metadata_location).build()) - } else { - Ok(table_builder.build()) - } - } - - /// Drop a table from the catalog. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { - let request = self - .client - .0 - .delete(self.config.table_endpoint(table)) - .build()?; - - self.client - .execute::(request) - .await - } - - /// Check if a table exists in the catalog. - async fn stat_table(&self, table: &TableIdent) -> Result { - let request = self - .client - .0 - .head(self.config.table_endpoint(table)) - .build()?; - - self.client - .do_execute::(request, |resp| match resp.status() { - StatusCode::NO_CONTENT => Some(true), - StatusCode::NOT_FOUND => Some(false), - _ => None, - }) - .await - } - - /// Rename a table in the catalog. - async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { - let request = self - .client - .0 - .post(self.config.rename_table_endpoint()) - .json(&RenameTableRequest { - source: src.clone(), - destination: dest.clone(), - }) - .build()?; - - self.client - .execute::(request) - .await - } - - /// Update table. - async fn update_table(&self, mut commit: TableCommit) -> Result
{ - let request = self - .client - .0 - .post(self.config.table_endpoint(commit.identifier())) - .json(&CommitTableRequest { - identifier: commit.identifier().clone(), - requirements: commit.take_requirements(), - updates: commit.take_updates(), - }) - .build()?; - - let resp = self - .client - .query::(request) - .await?; - - let file_io = self.load_file_io(Some(&resp.metadata_location), None)?; - Ok(Table::builder() - .identifier(commit.identifier().clone()) - .file_io(file_io) - .metadata(resp.metadata) - .metadata_location(resp.metadata_location) - .build()) - } -} - -impl RestCatalog { - /// Creates a rest catalog from config. - pub async fn new(config: RestCatalogConfig) -> Result { - let mut catalog = Self { - client: config.try_create_rest_client()?, - config, - }; - - catalog.update_config().await?; - catalog.client = catalog.config.try_create_rest_client()?; - - Ok(catalog) - } - - async fn update_config(&mut self) -> Result<()> { - let mut request = self.client.0.get(self.config.config_endpoint()); - - if let Some(warehouse_location) = &self.config.warehouse { - request = request.query(&[("warehouse", warehouse_location)]); - } - - let config = self - .client - .query::(request.build()?) - .await?; - - let mut props = config.defaults; - props.extend(self.config.props.clone()); - props.extend(config.overrides); - - self.config.props = props; - - Ok(()) - } - - fn load_file_io( - &self, - metadata_location: Option<&str>, - extra_config: Option>, - ) -> Result { - let mut props = self.config.props.clone(); - if let Some(config) = extra_config { - props.extend(config); - } - - let file_io = match self.config.warehouse.as_deref().or(metadata_location) { - Some(url) => FileIO::from_path(url)?.with_props(props).build()?, - None => { - return Err(Error::new( - ErrorKind::Unexpected, - "Unable to load file io, neither warehouse nor metadata location is set!", - ))? - } - }; - - Ok(file_io) - } -} - -/// Requests and responses for rest api. -mod _serde { - use std::collections::HashMap; - - use serde_derive::{Deserialize, Serialize}; - - use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; - use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate}; - - pub(super) const OK: u16 = 200u16; - pub(super) const NO_CONTENT: u16 = 204u16; - - #[derive(Clone, Debug, Serialize, Deserialize)] - pub(super) struct CatalogConfig { - pub(super) overrides: HashMap, - pub(super) defaults: HashMap, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct ErrorResponse { - error: ErrorModel, - } - - impl From for Error { - fn from(resp: ErrorResponse) -> Error { - resp.error.into() - } - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct ErrorModel { - pub(super) message: String, - pub(super) r#type: String, - pub(super) code: u16, - pub(super) stack: Option>, - } - - impl From for Error { - fn from(value: ErrorModel) -> Self { - let mut error = Error::new(ErrorKind::DataInvalid, value.message) - .with_context("type", value.r#type) - .with_context("code", format!("{}", value.code)); - - if let Some(stack) = value.stack { - error = error.with_context("stack", stack.join("\n")); - } - - error - } - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct OAuthError { - pub(super) error: String, - pub(super) error_description: Option, - pub(super) error_uri: Option, - } - - impl From for Error { - fn from(value: OAuthError) -> Self { - let mut error = Error::new( - ErrorKind::DataInvalid, - format!("OAuthError: {}", value.error), - ); - - if let Some(desc) = value.error_description { - error = error.with_context("description", desc); - } - - if let Some(uri) = value.error_uri { - error = error.with_context("uri", uri); - } - - error - } - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct NamespaceSerde { - pub(super) namespace: Vec, - pub(super) properties: Option>, - } - - impl TryFrom for super::Namespace { - type Error = Error; - fn try_from(value: NamespaceSerde) -> std::result::Result { - Ok(super::Namespace::with_properties( - super::NamespaceIdent::from_vec(value.namespace)?, - value.properties.unwrap_or_default(), - )) - } - } - - impl From<&Namespace> for NamespaceSerde { - fn from(value: &Namespace) -> Self { - Self { - namespace: value.name().as_ref().clone(), - properties: Some(value.properties().clone()), - } - } - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct ListNamespaceResponse { - pub(super) namespaces: Vec>, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct UpdateNamespacePropsRequest { - removals: Option>, - updates: Option>, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct UpdateNamespacePropsResponse { - updated: Vec, - removed: Vec, - missing: Option>, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct ListTableResponse { - pub(super) identifiers: Vec, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct RenameTableRequest { - pub(super) source: TableIdent, - pub(super) destination: TableIdent, - } - - #[derive(Debug, Deserialize)] - #[serde(rename_all = "kebab-case")] - pub(super) struct LoadTableResponse { - pub(super) metadata_location: Option, - pub(super) metadata: TableMetadata, - pub(super) config: Option>, - } - - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - pub(super) struct CreateTableRequest { - pub(super) name: String, - pub(super) location: Option, - pub(super) schema: Schema, - pub(super) partition_spec: Option, - pub(super) write_order: Option, - pub(super) stage_create: Option, - pub(super) properties: Option>, - } - - #[derive(Debug, Serialize, Deserialize)] - pub(super) struct CommitTableRequest { - pub(super) identifier: TableIdent, - pub(super) requirements: Vec, - pub(super) updates: Vec, - } - - #[derive(Debug, Serialize, Deserialize)] - #[serde(rename_all = "kebab-case")] - pub(super) struct CommitTableResponse { - pub(super) metadata_location: String, - pub(super) metadata: TableMetadata, - } -} - -#[cfg(test)] -mod tests { - use chrono::{TimeZone, Utc}; - use iceberg::spec::{ - FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, - SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, - UnboundPartitionField, UnboundPartitionSpec, - }; - use iceberg::transaction::Transaction; - use mockito::{Mock, Server, ServerGuard}; - use std::fs::File; - use std::io::BufReader; - use std::sync::Arc; - use uuid::uuid; - - use super::*; - - #[tokio::test] - async fn test_update_config() { - let mut server = Server::new_async().await; - - let config_mock = server - .mock("GET", "/v1/config") - .with_status(200) - .with_body( - r#"{ - "overrides": { - "warehouse": "s3://iceberg-catalog" - }, - "defaults": {} - }"#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - assert_eq!( - catalog.config.props.get("warehouse"), - Some(&"s3://iceberg-catalog".to_string()) - ); - - config_mock.assert_async().await; - } - - async fn create_config_mock(server: &mut ServerGuard) -> Mock { - server - .mock("GET", "/v1/config") - .with_status(200) - .with_body( - r#"{ - "overrides": { - "warehouse": "s3://iceberg-catalog" - }, - "defaults": {} - }"#, - ) - .create_async() - .await - } - - #[tokio::test] - async fn test_list_namespace() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let list_ns_mock = server - .mock("GET", "/v1/namespaces") - .with_body( - r#"{ - "namespaces": [ - ["ns1", "ns11"], - ["ns2"] - ] - }"#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let namespaces = catalog.list_namespaces(None).await.unwrap(); - - let expected_ns = vec![ - NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(), - NamespaceIdent::from_vec(vec!["ns2".to_string()]).unwrap(), - ]; - - assert_eq!(expected_ns, namespaces); - - config_mock.assert_async().await; - list_ns_mock.assert_async().await; - } - - #[tokio::test] - async fn test_create_namespace() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let create_ns_mock = server - .mock("POST", "/v1/namespaces") - .with_body( - r#"{ - "namespace": [ "ns1", "ns11"], - "properties" : { - "key1": "value1" - } - }"#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let namespaces = catalog - .create_namespace( - &NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(), - HashMap::from([("key1".to_string(), "value1".to_string())]), - ) - .await - .unwrap(); - - let expected_ns = Namespace::with_properties( - NamespaceIdent::from_vec(vec!["ns1".to_string(), "ns11".to_string()]).unwrap(), - HashMap::from([("key1".to_string(), "value1".to_string())]), - ); - - assert_eq!(expected_ns, namespaces); - - config_mock.assert_async().await; - create_ns_mock.assert_async().await; - } - - #[tokio::test] - async fn test_get_namespace() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let get_ns_mock = server - .mock("GET", "/v1/namespaces/ns1") - .with_body( - r#"{ - "namespace": [ "ns1"], - "properties" : { - "key1": "value1" - } - }"#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let namespaces = catalog - .get_namespace(&NamespaceIdent::new("ns1".to_string())) - .await - .unwrap(); - - let expected_ns = Namespace::with_properties( - NamespaceIdent::new("ns1".to_string()), - HashMap::from([("key1".to_string(), "value1".to_string())]), - ); - - assert_eq!(expected_ns, namespaces); - - config_mock.assert_async().await; - get_ns_mock.assert_async().await; - } - - #[tokio::test] - async fn check_namespace_exists() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let get_ns_mock = server - .mock("HEAD", "/v1/namespaces/ns1") - .with_status(204) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - assert!(catalog - .namespace_exists(&NamespaceIdent::new("ns1".to_string())) - .await - .unwrap()); - - config_mock.assert_async().await; - get_ns_mock.assert_async().await; - } - - #[tokio::test] - async fn test_drop_namespace() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let drop_ns_mock = server - .mock("DELETE", "/v1/namespaces/ns1") - .with_status(204) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - catalog - .drop_namespace(&NamespaceIdent::new("ns1".to_string())) - .await - .unwrap(); - - config_mock.assert_async().await; - drop_ns_mock.assert_async().await; - } - - #[tokio::test] - async fn test_list_tables() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let list_tables_mock = server - .mock("GET", "/v1/namespaces/ns1/tables") - .with_status(200) - .with_body( - r#"{ - "identifiers": [ - { - "namespace": ["ns1"], - "name": "table1" - }, - { - "namespace": ["ns1"], - "name": "table2" - } - ] - }"#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let tables = catalog - .list_tables(&NamespaceIdent::new("ns1".to_string())) - .await - .unwrap(); - - let expected_tables = vec![ - TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()), - TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()), - ]; - - assert_eq!(tables, expected_tables); - - config_mock.assert_async().await; - list_tables_mock.assert_async().await; - } - - #[tokio::test] - async fn test_drop_tables() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let delete_table_mock = server - .mock("DELETE", "/v1/namespaces/ns1/tables/table1") - .with_status(204) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - catalog - .drop_table(&TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "table1".to_string(), - )) - .await - .unwrap(); - - config_mock.assert_async().await; - delete_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_check_table_exists() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let check_table_exists_mock = server - .mock("HEAD", "/v1/namespaces/ns1/tables/table1") - .with_status(204) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - assert!(catalog - .stat_table(&TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "table1".to_string(), - )) - .await - .unwrap()); - - config_mock.assert_async().await; - check_table_exists_mock.assert_async().await; - } - - #[tokio::test] - async fn test_rename_table() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let rename_table_mock = server - .mock("POST", "/v1/tables/rename") - .with_status(204) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - catalog - .rename_table( - &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table1".to_string()), - &TableIdent::new(NamespaceIdent::new("ns1".to_string()), "table2".to_string()), - ) - .await - .unwrap(); - - config_mock.assert_async().await; - rename_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_load_table() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let rename_table_mock = server - .mock("GET", "/v1/namespaces/ns1/tables/test1") - .with_status(200) - .with_body_from_file(format!( - "{}/testdata/{}", - env!("CARGO_MANIFEST_DIR"), - "load_table_response.json" - )) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table = catalog - .load_table(&TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "test1".to_string(), - )) - .await - .unwrap(); - - assert_eq!( - &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), - table.identifier() - ); - assert_eq!("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", table.metadata_location().unwrap()); - assert_eq!(FormatVersion::V1, table.metadata().format_version()); - assert_eq!("s3://warehouse/database/table", table.metadata().location()); - assert_eq!( - uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"), - table.metadata().uuid() - ); - assert_eq!( - Utc.timestamp_millis_opt(1646787054459).unwrap(), - table.metadata().last_updated_ms() - ); - assert_eq!( - vec![&Arc::new( - Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String)) - .into(), - ]) - .build() - .unwrap() - )], - table.metadata().schemas_iter().collect::>() - ); - assert_eq!( - &HashMap::from([ - ("owner".to_string(), "bryan".to_string()), - ( - "write.metadata.compression-codec".to_string(), - "gzip".to_string() - ) - ]), - table.metadata().properties() - ); - assert_eq!(vec![&Arc::new(Snapshot::builder() - .with_snapshot_id(3497810964824022504) - .with_timestamp_ms(1646787054459) - .with_manifest_list("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro") - .with_sequence_number(0) - .with_schema_id(0) - .with_summary(Summary { - operation: Operation::Append, - other: HashMap::from_iter([ - ("spark.app.id", "local-1646787004168"), - ("added-data-files", "1"), - ("added-records", "1"), - ("added-files-size", "697"), - ("changed-partition-count", "1"), - ("total-records", "1"), - ("total-files-size", "697"), - ("total-data-files", "1"), - ("total-delete-files", "0"), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0") - ].iter().map(|p| (p.0.to_string(), p.1.to_string()))), - }).build() - )], table.metadata().snapshots().collect::>()); - assert_eq!( - &[SnapshotLog { - timestamp_ms: 1646787054459, - snapshot_id: 3497810964824022504, - }], - table.metadata().history() - ); - assert_eq!( - vec![&Arc::new(SortOrder { - order_id: 0, - fields: vec![], - })], - table.metadata().sort_orders_iter().collect::>() - ); - - config_mock.assert_async().await; - rename_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_load_table_404() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let rename_table_mock = server - .mock("GET", "/v1/namespaces/ns1/tables/test1") - .with_status(404) - .with_body(r#" -{ - "error": { - "message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "NoSuchNamespaceErrorException", - "code": 404 - } -} - "#) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table = catalog - .load_table(&TableIdent::new( - NamespaceIdent::new("ns1".to_string()), - "test1".to_string(), - )) - .await; - - assert!(table.is_err()); - assert!(table - .err() - .unwrap() - .message() - .contains("Table does not exist")); - - config_mock.assert_async().await; - rename_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_create_table() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let create_table_mock = server - .mock("POST", "/v1/namespaces/ns1/tables") - .with_status(200) - .with_body_from_file(format!( - "{}/testdata/{}", - env!("CARGO_MANIFEST_DIR"), - "create_table_response.json" - )) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table_creation = TableCreation::builder() - .name("test1".to_string()) - .schema( - Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) - .into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) - .into(), - ]) - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap(), - ) - .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) - .partition_spec( - UnboundPartitionSpec::builder() - .with_fields(vec![UnboundPartitionField::builder() - .source_id(1) - .transform(Transform::Truncate(3)) - .name("id".to_string()) - .build()]) - .build() - .unwrap(), - ) - .sort_order( - SortOrder::builder() - .with_sort_field( - SortField::builder() - .source_id(2) - .transform(Transform::Identity) - .direction(SortDirection::Ascending) - .null_order(NullOrder::First) - .build(), - ) - .build_unbound() - .unwrap(), - ) - .build(); - - let table = catalog - .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation) - .await - .unwrap(); - - assert_eq!( - &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), - table.identifier() - ); - assert_eq!( - "s3://warehouse/database/table/metadata.json", - table.metadata_location().unwrap() - ); - assert_eq!(FormatVersion::V1, table.metadata().format_version()); - assert_eq!("s3://warehouse/database/table", table.metadata().location()); - assert_eq!( - uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"), - table.metadata().uuid() - ); - assert_eq!( - 1657810967051, - table.metadata().last_updated_ms().timestamp_millis() - ); - assert_eq!( - vec![&Arc::new( - Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) - .into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) - .into(), - ]) - .with_schema_id(0) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap() - )], - table.metadata().schemas_iter().collect::>() - ); - assert_eq!( - &HashMap::from([ - ( - "write.delete.parquet.compression-codec".to_string(), - "zstd".to_string() - ), - ( - "write.metadata.compression-codec".to_string(), - "gzip".to_string() - ), - ( - "write.summary.partition-limit".to_string(), - "100".to_string() - ), - ( - "write.parquet.compression-codec".to_string(), - "zstd".to_string() - ), - ]), - table.metadata().properties() - ); - assert!(table.metadata().current_snapshot().is_none()); - assert!(table.metadata().history().is_empty()); - assert_eq!( - vec![&Arc::new(SortOrder { - order_id: 0, - fields: vec![], - })], - table.metadata().sort_orders_iter().collect::>() - ); - - config_mock.assert_async().await; - create_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_create_table_409() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let create_table_mock = server - .mock("POST", "/v1/namespaces/ns1/tables") - .with_status(409) - .with_body(r#" -{ - "error": { - "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", - "type": "AlreadyExistsException", - "code": 409 - } -} - "#) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table_creation = TableCreation::builder() - .name("test1".to_string()) - .schema( - Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) - .into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) - .into(), - ]) - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap(), - ) - .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) - .build(); - - let table_result = catalog - .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation) - .await; - - assert!(table_result.is_err()); - assert!(table_result - .err() - .unwrap() - .message() - .contains("Table already exists")); - - config_mock.assert_async().await; - create_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_update_table() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let update_table_mock = server - .mock("POST", "/v1/namespaces/ns1/tables/test1") - .with_status(200) - .with_body_from_file(format!( - "{}/testdata/{}", - env!("CARGO_MANIFEST_DIR"), - "update_table_response.json" - )) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table1 = { - let file = File::open(format!( - "{}/testdata/{}", - env!("CARGO_MANIFEST_DIR"), - "create_table_response.json" - )) - .unwrap(); - let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); - - Table::builder() - .metadata(resp.metadata) - .metadata_location(resp.metadata_location.unwrap()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) - .build() - }; - - let table = Transaction::new(&table1) - .upgrade_table_version(FormatVersion::V2) - .unwrap() - .commit(&catalog) - .await - .unwrap(); - - assert_eq!( - &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), - table.identifier() - ); - assert_eq!( - "s3://warehouse/database/table/metadata.json", - table.metadata_location().unwrap() - ); - assert_eq!(FormatVersion::V2, table.metadata().format_version()); - assert_eq!("s3://warehouse/database/table", table.metadata().location()); - assert_eq!( - uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"), - table.metadata().uuid() - ); - assert_eq!( - 1657810967051, - table.metadata().last_updated_ms().timestamp_millis() - ); - assert_eq!( - vec![&Arc::new( - Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) - .into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) - .into(), - ]) - .with_schema_id(0) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap() - )], - table.metadata().schemas_iter().collect::>() - ); - assert_eq!( - &HashMap::from([ - ( - "write.delete.parquet.compression-codec".to_string(), - "zstd".to_string() - ), - ( - "write.metadata.compression-codec".to_string(), - "gzip".to_string() - ), - ( - "write.summary.partition-limit".to_string(), - "100".to_string() - ), - ( - "write.parquet.compression-codec".to_string(), - "zstd".to_string() - ), - ]), - table.metadata().properties() - ); - assert!(table.metadata().current_snapshot().is_none()); - assert!(table.metadata().history().is_empty()); - assert_eq!( - vec![&Arc::new(SortOrder { - order_id: 0, - fields: vec![], - })], - table.metadata().sort_orders_iter().collect::>() - ); - - config_mock.assert_async().await; - update_table_mock.assert_async().await; - } - - #[tokio::test] - async fn test_update_table_404() { - let mut server = Server::new_async().await; - - let config_mock = create_config_mock(&mut server).await; - - let update_table_mock = server - .mock("POST", "/v1/namespaces/ns1/tables/test1") - .with_status(404) - .with_body( - r#" -{ - "error": { - "message": "The given table does not exist", - "type": "NoSuchTableException", - "code": 404 - } -} - "#, - ) - .create_async() - .await; - - let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) - .await - .unwrap(); - - let table1 = { - let file = File::open(format!( - "{}/testdata/{}", - env!("CARGO_MANIFEST_DIR"), - "create_table_response.json" - )) - .unwrap(); - let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); - - Table::builder() - .metadata(resp.metadata) - .metadata_location(resp.metadata_location.unwrap()) - .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) - .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) - .build() - }; - - let table_result = Transaction::new(&table1) - .upgrade_table_version(FormatVersion::V2) - .unwrap() - .commit(&catalog) - .await; - - assert!(table_result.is_err()); - assert!(table_result - .err() - .unwrap() - .message() - .contains("The given table does not exist")); - - config_mock.assert_async().await; - update_table_mock.assert_async().await; - } -} diff --git a/crates/rest/src/lib.rs b/crates/rest/src/lib.rs deleted file mode 100644 index 023fe7a..0000000 --- a/crates/rest/src/lib.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Iceberg REST API implementation. - -#![deny(missing_docs)] - -mod catalog; -pub use catalog::*; diff --git a/crates/rest/testdata/create_table_response.json b/crates/rest/testdata/create_table_response.json deleted file mode 100644 index e01a52f..0000000 --- a/crates/rest/testdata/create_table_response.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "metadata-location": "s3://warehouse/database/table/metadata.json", - "metadata": { - "format-version": 1, - "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", - "location": "s3://warehouse/database/table", - "last-updated-ms": 1657810967051, - "last-column-id": 3, - "schema": { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": false, "type": "string"}, - {"id": 2, "name": "bar", "required": true, "type": "int"}, - {"id": 3, "name": "baz", "required": false, "type": "boolean"} - ] - }, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": false, "type": "string"}, - {"id": 2, "name": "bar", "required": true, "type": "int"}, - {"id": 3, "name": "baz", "required": false, "type": "boolean"} - ] - } - ], - "partition-spec": [], - "default-spec-id": 0, - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": { - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd" - }, - "current-snapshot-id": -1, - "refs": {}, - "snapshots": [], - "snapshot-log": [], - "metadata-log": [] - }, - "config": { - "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", - "region": "us-west-2" - } -} \ No newline at end of file diff --git a/crates/rest/testdata/load_table_response.json b/crates/rest/testdata/load_table_response.json deleted file mode 100644 index 012f0e9..0000000 --- a/crates/rest/testdata/load_table_response.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", - "metadata": { - "format-version": 1, - "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f", - "location": "s3://warehouse/database/table", - "last-updated-ms": 1646787054459, - "last-column-id": 2, - "schema": { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": false, "type": "int"}, - {"id": 2, "name": "data", "required": false, "type": "string"} - ] - }, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": false, "type": "int"}, - {"id": 2, "name": "data", "required": false, "type": "string"} - ] - } - ], - "partition-spec": [], - "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": []}], - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": {"owner": "bryan", "write.metadata.compression-codec": "gzip"}, - "current-snapshot-id": 3497810964824022504, - "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}}, - "snapshots": [ - { - "snapshot-id": 3497810964824022504, - "timestamp-ms": 1646787054459, - "summary": { - "operation": "append", - "spark.app.id": "local-1646787004168", - "added-data-files": "1", - "added-records": "1", - "added-files-size": "697", - "changed-partition-count": "1", - "total-records": "1", - "total-files-size": "697", - "total-data-files": "1", - "total-delete-files": "0", - "total-position-deletes": "0", - "total-equality-deletes": "0" - }, - "manifest-list": "s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro", - "schema-id": 0 - } - ], - "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 3497810964824022504}], - "metadata-log": [ - { - "timestamp-ms": 1646787031514, - "metadata-file": "s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json" - } - ] - }, - "config": {"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"} -} \ No newline at end of file diff --git a/crates/rest/testdata/rest_catalog/docker-compose.yaml b/crates/rest/testdata/rest_catalog/docker-compose.yaml deleted file mode 100644 index 5c10146..0000000 --- a/crates/rest/testdata/rest_catalog/docker-compose.yaml +++ /dev/null @@ -1,65 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -version: '3.8' - -services: - rest: - image: tabulario/iceberg-rest:0.10.0 - environment: - - AWS_ACCESS_KEY_ID=admin - - AWS_SECRET_ACCESS_KEY=password - - AWS_REGION=us-east-1 - - CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog - - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory - - CATALOG_WAREHOUSE=s3://icebergdata/demo - - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO - - CATALOG_S3_ENDPOINT=http://minio:9000 - depends_on: - - minio - links: - - minio:icebergdata.minio - expose: - - 8181 - - minio: - image: minio/minio - environment: - - MINIO_ROOT_USER=admin - - MINIO_ROOT_PASSWORD=password - - MINIO_DOMAIN=minio - expose: - - 9001 - - 9000 - command: [ "server", "/data", "--console-address", ":9001" ] - - mc: - depends_on: - - minio - image: minio/mc - environment: - - AWS_ACCESS_KEY_ID=admin - - AWS_SECRET_ACCESS_KEY=password - - AWS_REGION=us-east-1 - entrypoint: > - /bin/sh -c " - until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; - /usr/bin/mc rm -r --force minio/icebergdata; - /usr/bin/mc mb minio/icebergdata; - /usr/bin/mc policy set public minio/icebergdata; - tail -f /dev/null - " \ No newline at end of file diff --git a/crates/rest/testdata/update_table_response.json b/crates/rest/testdata/update_table_response.json deleted file mode 100644 index 80ec269..0000000 --- a/crates/rest/testdata/update_table_response.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "metadata-location": "s3://warehouse/database/table/metadata.json", - "metadata": { - "format-version": 2, - "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", - "location": "s3://warehouse/database/table", - "last-sequence-number" : 1, - "last-updated-ms": 1657810967051, - "last-column-id": 3, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": false, "type": "string"}, - {"id": 2, "name": "bar", "required": true, "type": "int"}, - {"id": 3, "name": "baz", "required": false, "type": "boolean"} - ] - } - ], - "partition-specs": [], - "default-spec-id": 0, - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": { - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd" - }, - "current-snapshot-id": -1, - "refs": {}, - "snapshots": [], - "snapshot-log": [], - "metadata-log": [] - } -} \ No newline at end of file diff --git a/crates/rest/tests/rest_catalog_test.rs b/crates/rest/tests/rest_catalog_test.rs deleted file mode 100644 index a4d0795..0000000 --- a/crates/rest/tests/rest_catalog_test.rs +++ /dev/null @@ -1,376 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Integration tests for rest catalog. - -use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; -use iceberg::transaction::Transaction; -use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; -use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; -use iceberg_test_utils::docker::DockerCompose; -use iceberg_test_utils::{normalize_test_name, set_up}; -use port_scanner::scan_port_addr; -use std::collections::HashMap; -use tokio::time::sleep; - -const REST_CATALOG_PORT: u16 = 8181; - -struct TestFixture { - _docker_compose: DockerCompose, - rest_catalog: RestCatalog, -} - -async fn set_test_fixture(func: &str) -> TestFixture { - set_up(); - let docker_compose = DockerCompose::new( - normalize_test_name(format!("{}_{func}", module_path!())), - format!("{}/testdata/rest_catalog", env!("CARGO_MANIFEST_DIR")), - ); - - // Start docker compose - docker_compose.run(); - - let rest_catalog_ip = docker_compose.get_container_ip("rest"); - - let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT); - loop { - if !scan_port_addr(&read_port) { - log::info!("Waiting for 1s rest catalog to ready..."); - sleep(std::time::Duration::from_millis(1000)).await; - } else { - break; - } - } - - let config = RestCatalogConfig::builder() - .uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT)) - .build(); - let rest_catalog = RestCatalog::new(config).await.unwrap(); - - TestFixture { - _docker_compose: docker_compose, - rest_catalog, - } -} -#[tokio::test] -async fn test_get_non_exist_namespace() { - let fixture = set_test_fixture("test_get_non_exist_namespace").await; - - let result = fixture - .rest_catalog - .get_namespace(&NamespaceIdent::from_strs(["demo"]).unwrap()) - .await; - - assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Namespace does not exist")); -} - -#[tokio::test] -async fn test_get_namespace() { - let fixture = set_test_fixture("test_get_namespace").await; - - let ns = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Verify that namespace doesn't exist - assert!(fixture.rest_catalog.get_namespace(ns.name()).await.is_err()); - - // Create this namespace - let created_ns = fixture - .rest_catalog - .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); - - assert_eq!(ns.name(), created_ns.name()); - assert_map_contains(ns.properties(), created_ns.properties()); - - // Check that this namespace already exists - let get_ns = fixture.rest_catalog.get_namespace(ns.name()).await.unwrap(); - assert_eq!(ns.name(), get_ns.name()); - assert_map_contains(ns.properties(), created_ns.properties()); -} - -#[tokio::test] -async fn test_list_namespace() { - let fixture = set_test_fixture("test_list_namespace").await; - - let ns1 = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - let ns2 = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "macos"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "xuanwo".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Currently this namespace doesn't exist, so it should return error. - assert!(fixture - .rest_catalog - .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) - .await - .is_err()); - - // Create namespaces - fixture - .rest_catalog - .create_namespace(ns1.name(), ns1.properties().clone()) - .await - .unwrap(); - fixture - .rest_catalog - .create_namespace(ns2.name(), ns1.properties().clone()) - .await - .unwrap(); - - // List namespace - let mut nss = fixture - .rest_catalog - .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) - .await - .unwrap(); - nss.sort(); - - assert_eq!(&nss[0], ns1.name()); - assert_eq!(&nss[1], ns2.name()); -} - -#[tokio::test] -async fn test_list_empty_namespace() { - let fixture = set_test_fixture("test_list_empty_namespace").await; - - let ns_apple = Namespace::with_properties( - NamespaceIdent::from_strs(["apple"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Currently this namespace doesn't exist, so it should return error. - assert!(fixture - .rest_catalog - .list_namespaces(Some(ns_apple.name())) - .await - .is_err()); - - // Create namespaces - fixture - .rest_catalog - .create_namespace(ns_apple.name(), ns_apple.properties().clone()) - .await - .unwrap(); - - // List namespace - let nss = fixture - .rest_catalog - .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) - .await - .unwrap(); - assert!(nss.is_empty()); -} - -#[tokio::test] -async fn test_list_root_namespace() { - let fixture = set_test_fixture("test_list_root_namespace").await; - - let ns1 = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - let ns2 = Namespace::with_properties( - NamespaceIdent::from_strs(["google", "android"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "xuanwo".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Currently this namespace doesn't exist, so it should return error. - assert!(fixture - .rest_catalog - .list_namespaces(Some(&NamespaceIdent::from_strs(["apple"]).unwrap())) - .await - .is_err()); - - // Create namespaces - fixture - .rest_catalog - .create_namespace(ns1.name(), ns1.properties().clone()) - .await - .unwrap(); - fixture - .rest_catalog - .create_namespace(ns2.name(), ns1.properties().clone()) - .await - .unwrap(); - - // List namespace - let mut nss = fixture.rest_catalog.list_namespaces(None).await.unwrap(); - nss.sort(); - - assert_eq!(&nss[0], &NamespaceIdent::from_strs(["apple"]).unwrap()); - assert_eq!(&nss[1], &NamespaceIdent::from_strs(["google"]).unwrap()); -} - -#[tokio::test] -async fn test_create_table() { - let fixture = set_test_fixture("test_create_table").await; - - let ns = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Create namespaces - fixture - .rest_catalog - .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); - - let schema = Schema::builder() - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), - ]) - .build() - .unwrap(); - - let table_creation = TableCreation::builder() - .name("t1".to_string()) - .schema(schema.clone()) - .build(); - - let table = fixture - .rest_catalog - .create_table(ns.name(), table_creation) - .await - .unwrap(); - - assert_eq!( - table.identifier(), - &TableIdent::new(ns.name().clone(), "t1".to_string()) - ); - - assert_eq!( - table.metadata().current_schema().as_struct(), - schema.as_struct() - ); - assert_eq!(table.metadata().format_version(), FormatVersion::V2); - assert!(table.metadata().current_snapshot().is_none()); - assert!(table.metadata().history().is_empty()); - assert!(table.metadata().default_sort_order().unwrap().is_unsorted()); - assert!(table - .metadata() - .default_partition_spec() - .unwrap() - .is_unpartitioned()); -} - -#[tokio::test] -async fn test_update_table() { - let fixture = set_test_fixture("test_update_table").await; - - let ns = Namespace::with_properties( - NamespaceIdent::from_strs(["apple", "ios"]).unwrap(), - HashMap::from([ - ("owner".to_string(), "ray".to_string()), - ("community".to_string(), "apache".to_string()), - ]), - ); - - // Create namespaces - fixture - .rest_catalog - .create_namespace(ns.name(), ns.properties().clone()) - .await - .unwrap(); - - let schema = Schema::builder() - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), - ]) - .build() - .unwrap(); - - // Now we create a table - let table_creation = TableCreation::builder() - .name("t1".to_string()) - .schema(schema.clone()) - .build(); - - let table = fixture - .rest_catalog - .create_table(ns.name(), table_creation) - .await - .unwrap(); - - assert_eq!( - table.identifier(), - &TableIdent::new(ns.name().clone(), "t1".to_string()) - ); - - // Update table by committing transaction - let table2 = Transaction::new(&table) - .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) - .unwrap() - .commit(&fixture.rest_catalog) - .await - .unwrap(); - - assert_map_contains( - &HashMap::from([("prop1".to_string(), "v1".to_string())]), - table2.metadata().properties(), - ); -} - -fn assert_map_contains(map1: &HashMap, map2: &HashMap) { - for (k, v) in map1 { - assert!(map2.contains_key(k)); - assert_eq!(map2.get(k).unwrap(), v); - } -} diff --git a/crates/test_utils/Cargo.toml b/crates/test_utils/Cargo.toml deleted file mode 100644 index 2ad7ef9..0000000 --- a/crates/test_utils/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "iceberg_test_utils" -version = { workspace = true } -edition = { workspace = true } -rust-version = { workspace = true } -repository = { workspace = true } -license = { workspace = true } - -[dependencies] -env_logger = { workspace = true } -log = "0.4.20" - -[features] -tests = [] diff --git a/crates/test_utils/src/cmd.rs b/crates/test_utils/src/cmd.rs deleted file mode 100644 index 604d4a1..0000000 --- a/crates/test_utils/src/cmd.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::process::Command; - -pub fn run_command(mut cmd: Command, desc: impl ToString) { - let desc = desc.to_string(); - log::info!("Starting to {}, command: {:?}", &desc, cmd); - let exit = cmd.status().unwrap(); - if exit.success() { - log::info!("{} succeed!", desc) - } else { - panic!("{} failed: {:?}", desc, exit); - } -} - -pub fn get_cmd_output(mut cmd: Command, desc: impl ToString) -> String { - let desc = desc.to_string(); - log::info!("Starting to {}, command: {:?}", &desc, cmd); - let output = cmd.output().unwrap(); - if output.status.success() { - log::info!("{} succeed!", desc); - String::from_utf8(output.stdout).unwrap() - } else { - panic!("{} failed: {:?}", desc, output.status); - } -} diff --git a/crates/test_utils/src/docker.rs b/crates/test_utils/src/docker.rs deleted file mode 100644 index 6c5fbef..0000000 --- a/crates/test_utils/src/docker.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::cmd::{get_cmd_output, run_command}; -use std::process::Command; - -/// A utility to manage lifecycle of docker compose. -/// -/// It's will start docker compose when calling `run` method, and will be stopped when dropped. -#[derive(Debug)] -pub struct DockerCompose { - project_name: String, - docker_compose_dir: String, -} - -impl DockerCompose { - pub fn new(project_name: impl ToString, docker_compose_dir: impl ToString) -> Self { - Self { - project_name: project_name.to_string(), - docker_compose_dir: docker_compose_dir.to_string(), - } - } - - pub fn project_name(&self) -> &str { - self.project_name.as_str() - } - - pub fn run(&self) { - let mut cmd = Command::new("docker"); - cmd.current_dir(&self.docker_compose_dir); - - cmd.args(vec![ - "compose", - "-p", - self.project_name.as_str(), - "up", - "-d", - "--wait", - "--timeout", - "1200000", - ]); - - run_command( - cmd, - format!( - "Starting docker compose in {}, project name: {}", - self.docker_compose_dir, self.project_name - ), - ) - } - - pub fn get_container_ip(&self, service_name: impl AsRef) -> String { - let container_name = format!("{}-{}-1", self.project_name, service_name.as_ref()); - let mut cmd = Command::new("docker"); - cmd.arg("inspect") - .arg("-f") - .arg("{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}") - .arg(&container_name); - - get_cmd_output(cmd, format!("Get container ip of {container_name}")) - .trim() - .to_string() - } -} - -impl Drop for DockerCompose { - fn drop(&mut self) { - let mut cmd = Command::new("docker"); - cmd.current_dir(&self.docker_compose_dir); - - cmd.args(vec![ - "compose", - "-p", - self.project_name.as_str(), - "down", - "-v", - "--remove-orphans", - ]); - - run_command( - cmd, - format!( - "Stopping docker compose in {}, project name: {}", - self.docker_compose_dir, self.project_name - ), - ) - } -} diff --git a/crates/test_utils/src/lib.rs b/crates/test_utils/src/lib.rs deleted file mode 100644 index 4f63b8d..0000000 --- a/crates/test_utils/src/lib.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This crate contains common utilities for testing. -//! -//! It's not intended for use outside of `iceberg-rust`. - -#[cfg(feature = "tests")] -mod cmd; -#[cfg(feature = "tests")] -pub mod docker; - -#[cfg(feature = "tests")] -pub use common::*; - -#[cfg(feature = "tests")] -mod common { - use std::sync::Once; - - static INIT: Once = Once::new(); - pub fn set_up() { - INIT.call_once(env_logger::init); - } - pub fn normalize_test_name(s: impl ToString) -> String { - s.to_string().replace("::", "__").replace('.', "_") - } -} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..54aa0cc --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,4 @@ +edition = "2018" +max_width = 100 +tab_spaces = 2 +hard_tabs = false diff --git a/scripts/parse_dependencies.py b/scripts/parse_dependencies.py new file mode 100644 index 0000000..551f3a7 --- /dev/null +++ b/scripts/parse_dependencies.py @@ -0,0 +1,42 @@ +import os +import sys + +begin = False +package_version = {} +with open('./Cargo.toml') as f: + for line in f: + if '[' == line[0]: + begin = False + if 'dependencies' in line: + begin = True + continue + + if begin: + sep = line.find('=') + package_version[line[:sep-1].strip()] = line[sep+2:].strip() + +for dir_path in ["./libs/iceberg/", "./libs/rest/", "./libs/test_utils/"]: + r = open(dir_path + "Cargo.toml") + w = open(dir_path + "Cargo_n.toml", 'w') + begin = False + for r_line in r: + if '[' == r_line[0]: + begin = False + if 'dependencies' in r_line: + begin = True + w.write(r_line) + continue + + if begin: + sep = r_line.find('=') + package = r_line[:sep-1].strip() + if package in package_version: + w.writelines([f"{package} = {package_version[package]}", "\n"]) + else: + w.write(r_line) + else: + w.write(r_line) + r.close() + w.close() + os.remove(dir_path + "Cargo.toml") + os.rename(dir_path + "Cargo_n.toml", dir_path + "Cargo.toml") diff --git a/src/main.rs b/src/main.rs index e7a11a9..7e105ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,30 @@ -fn main() { - println!("Hello, world!"); +#[macro_use] +extern crate rocket; + +mod server; +use server::routes::*; + +#[launch] +fn rocket() -> _ { + rocket::build().mount( + "/v1", + routes![ + namespace::get_namespace, + namespace::post_namespace, + namespace::head_namespace_by_name, + namespace::get_namespace_by_name, + namespace::delete_namespace_by_name, + namespace::post_namespace_properties, + table::get_table_by_namespace, + table::post_table_by_namespace, + table::register_table, + table::get_table, + table::post_table, + table::delete_table, + table::head_table, + table::rename_table, + metric::post_metrics, + config::get_config, + ], + ) } diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..6a664ab --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1 @@ +pub mod routes; diff --git a/src/server/routes/config.rs b/src/server/routes/config.rs new file mode 100644 index 0000000..22e875d --- /dev/null +++ b/src/server/routes/config.rs @@ -0,0 +1,4 @@ +#[get("/config")] +pub fn get_config() { + todo!("get_config") +} diff --git a/src/server/routes/metric.rs b/src/server/routes/metric.rs new file mode 100644 index 0000000..1c50f22 --- /dev/null +++ b/src/server/routes/metric.rs @@ -0,0 +1,5 @@ +/// Send a metrics report to this endpoint to be processed by the backend +#[post("/namespaces//tables/
/metrics")] +pub fn post_metrics(namespace: &str, table: &str) { + todo!("post_metrics") +} diff --git a/src/server/routes/mod.rs b/src/server/routes/mod.rs new file mode 100644 index 0000000..d93f96d --- /dev/null +++ b/src/server/routes/mod.rs @@ -0,0 +1,5 @@ +#[allow(dead_code)] +pub mod config; +pub mod metric; +pub mod namespace; +pub mod table; diff --git a/src/server/routes/namespace.rs b/src/server/routes/namespace.rs new file mode 100644 index 0000000..6c08ff0 --- /dev/null +++ b/src/server/routes/namespace.rs @@ -0,0 +1,35 @@ +/// List namespaces, optionally providing a parent namespace to list underneath +#[get("/namespaces")] +pub fn get_namespace() { + todo!("get_namespace") +} + +/// Create a namespace +#[post("/namespaces")] +pub fn post_namespace() { + todo!("post_namespace") +} + +/// Check if a namespace exists +#[head("/namespaces/")] +pub fn head_namespace_by_name(namespace: &str) { + todo!("head_namespace_by_name") +} + +/// Load the metadata properties for a namespace +#[get("/namespaces/")] +pub fn get_namespace_by_name(namespace: &str) { + todo!("get_namespace_by_name") +} + +/// Drop a namespace from the catalog. Namespace must be empty. +#[delete("/namespaces/")] +pub fn delete_namespace_by_name(namespace: &str) { + todo!("delete_namespace_by_name") +} + +/// Set or remove properties on a namespace +#[post("/namespaces//properties")] +pub fn post_namespace_properties(namespace: &str) { + todo!("post_namespace_properties") +} diff --git a/src/server/routes/table.rs b/src/server/routes/table.rs new file mode 100644 index 0000000..a68e9c3 --- /dev/null +++ b/src/server/routes/table.rs @@ -0,0 +1,47 @@ +/// List all table identifiers underneath a given namespace +#[get("/namespaces//tables")] +pub fn get_table_by_namespace(namespace: &str) { + todo!("get_table_by_namespace") +} + +/// Create a table in the given namespace +#[post("/namespaces//tables")] +pub fn post_table_by_namespace(namespace: &str) { + todo!("post_table_by_namespace") +} + +/// Register a table in the given namespace using given metadata file location +#[post("/namespaces//register")] +pub fn register_table(namespace: &str) { + todo!("register_table") +} + +/// Load a table from the catalog +#[get("/namespaces//tables/
")] +pub fn get_table(namespace: &str, table: &str) { + todo!("post_namespace_table") +} + +/// Commit updates to a table +#[post("/namespaces//tables/
")] +pub fn post_table(namespace: &str, table: &str) { + todo!("post_namespace_table") +} + +/// Drop a table from the catalog +#[delete("/namespaces//tables/
")] +pub fn delete_table(namespace: &str, table: &str) { + todo!("post_namespace_table") +} + +/// Check if a table exists +#[head("/namespaces//tables/
")] +pub fn head_table(namespace: &str, table: &str) { + todo!("post_namespace_table") +} + +/// Rename a table from its current name to a new name +#[post("/tables/rename")] +pub fn rename_table() { + todo!("rename_table") +}