From 7a3f212f82608a4560e43258b8b9d41fda92efe4 Mon Sep 17 00:00:00 2001 From: Zilong Zhou <60960532+zhouzilong2020@users.noreply.github.com> Date: Wed, 1 May 2024 23:39:04 -0400 Subject: [PATCH] Benchmark (#13) * import iceberg * server struct * rename crates to libs * init db * add table response * add first four table responses, and response.rs for struct * init db * init error * init api framework * error handling * error handling * namespace w/o delete and add new properties * return JsonResultGeneric * fix function signature for table.rs * fix conflict * general not found message * general error handler * create namespace * fix return type in table.rs * remove/update namespace && empty result * modify table implementation * fix conflict * fix ok_empty() * fix ok_empty() * unit test * add structs for Schema * fmt * reorg * db config && move catches to server/catches.rs * complete all table functions * fix import * added namespace init * 1 test * fix Namespace Param * fmt * add ? for Result<> * add structs for Schema * Update design doc with benchmarking * Update design doc for benchmarking new * fix get table by namespace * fix delete table implementation * add Schema/TableMetadata to Table struct * fix create and get Table uuid * add atomic increase table uuid * add get_table_by_namespace unit test * fix unit test json; but with possible conflict error * create tempdir for a new db in unittest, having bugs * change hardcoded root_dir in new DBconnection * modularized mock_client_creation * add some table unit tests * add unit tests for namespace * Add table unit tests * clean up (#12) * clean dead import, dead result * mod test * modify benchmark.py * benchmark with vegeta * plot * add schema in createTableRequest * Modify benchmark request rate * comment out schema in createTableRequest * add 3 endpoints to benchmark * minor name fix * add random endpoints * minor bug fixed * change duration to 60 secs and # of random tables * rm lib iceberg * merge main --------- Co-authored-by: Angela-CMU Co-authored-by: Yen-Ju Wu --- .gitignore | 6 +- Cargo.toml | 10 +- doc/design_doc.md | 19 +- plot.html | 0 scripts/bench | 164 +++++++++ src/catalog/mod.rs | 2 + src/catalog/namespace.rs | 192 +++++++++++ src/catalog/table.rs | 181 ++++++++++ src/cli.rs | 15 + src/common/mod.rs | 2 + src/common/response.rs | 77 +++++ src/common/result.rs | 86 +++++ src/db/mod.rs | 115 +++++- src/main.rs | 76 ++-- src/server/catches.rs | 52 +++ src/server/mod.rs | 1 + src/server/routes/common.rs | 127 +++++++ src/server/routes/metric.rs | 2 +- src/server/routes/mod.rs | 4 +- src/server/routes/namespace.rs | 395 ++++++++++++++++++++- src/server/routes/request.rs | 92 +++++ src/server/routes/response.rs | 50 +++ src/server/routes/table.rs | 614 +++++++++++++++++++++++++++++++-- src/util/mod.rs | 1 + src/util/time.rs | 9 + 25 files changed, 2212 insertions(+), 80 deletions(-) create mode 100644 plot.html create mode 100755 scripts/bench create mode 100644 src/catalog/mod.rs create mode 100644 src/catalog/namespace.rs create mode 100644 src/catalog/table.rs create mode 100644 src/cli.rs create mode 100644 src/common/mod.rs create mode 100644 src/common/response.rs create mode 100644 src/common/result.rs create mode 100644 src/server/catches.rs create mode 100644 src/server/routes/common.rs create mode 100644 src/server/routes/request.rs create mode 100644 src/server/routes/response.rs create mode 100644 src/util/mod.rs create mode 100644 src/util/time.rs diff --git a/.gitignore b/.gitignore index 6d586ef..4e1ede4 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,8 @@ Cargo.lock # macOS resource forks and .DS_Store files .DS_Store -.vscode \ No newline at end of file +.vscode + +database + +test/ diff --git a/Cargo.toml b/Cargo.toml index 6c16e15..e0168bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,11 @@ license = "Apache-2.0" repository = "https://github.com/cmu-db/15721-s24-catalog2" rust-version = "1.75.0" - - # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] rocket = { version = "0.5.0", features = ["json", "http2"] } -iceberg = { src = "./libs/iceberg" } -dotenv = "0.15.0" -pickledb = "^0.5.0" \ No newline at end of file +pickledb = "^0.5.0" +derive_builder = "0.20.0" +serde_json = "1.0.79" +clap = { version = "4.5.4", features = ["derive"] } +tempfile = "3.10.1" \ No newline at end of file diff --git a/doc/design_doc.md b/doc/design_doc.md index 407b7a0..f65f45b 100644 --- a/doc/design_doc.md +++ b/doc/design_doc.md @@ -10,6 +10,7 @@ The goal of this project is to design and implement a **Catalog Service** for an ## Architectural Design We follow the logic model described below. The input of our service comes from execution engine and I/O service. And we will provide metadata to planner and scheduler. We will use [pickleDB](https://docs.rs/pickledb/latest/pickledb/) as the key-value store to store (namespace, tables) and (table_name, metadata) as two (key, value) pairs as local db files. We will use [Rocket](https://rocket.rs) as the web framework handling incoming API traffic. + ![system architecture](./assets/system-architecture.png) ### Data Model We adhere to the Iceberg data model, arranging tables based on namespaces, with each table uniquely identified by its name. @@ -19,9 +20,10 @@ The parameters for request and response can be referenced from [REST API](https: ### Use Cases #### Namespace -create/delete/rename namespace +create/delete/rename/list namespace #### Table -create/delete/rename table +create/delete/rename/list table + #### Query Table’s Metadata (including statistics, version, table-uuid, location, last-column-id, schema, and partition-spec) get metadeta by {namespace}/{table} @@ -35,6 +37,7 @@ get metadeta by {namespace}/{table} * Centralized metadata management achieved by separating data and metadata, reducing complexity and facilitating consistent metadata handling. * Code modularity and clear interfaces facilitate easier updates and improvements. * We adopt the existing kvstore ([pickleDB](https://docs.rs/pickledb/latest/pickledb/)) and server ([Rocket](https://github.com/rwf2/Rocket)) to mitigate the engineering complexity. + * Testing: * Comprehensive testing plans cover correctness through unit tests and performance through long-running regression tests. Unit tests focus on individual components of the catalog service, while regression tests evaluate system-wide performance and stability. * Other Implementations: @@ -46,14 +49,16 @@ To ensure the quality and the performance of the catalog implemented, a comprehe * Functional testing * API tests: For functional testing, we can achieve the goal through unit tests. We will test each API endpoint implemented in our project to ensure correct behavior. We will test various input parameters and validate the response format and the status code are as expected. Also, we will try to mimic possible edge cases and errors to ensure the implementation is robust and can perform suitable error handling. By doing so, we can ensure the API works as expected and provides correct results to clients. * Metadata tests: We will focus on verifying the correct storage and retrieval of metadata. Tests will include different scenarios, including some edge cases. [Quickcheck](https://github.com/BurntSushi/quickcheck) is an example for performing the testing. - * [Documentation tests](https://doc.rust-lang.org/rustdoc/write-documentation/documentation-tests.html#documentation-tests): Execute document examples -* Non-functional testing - * Microbenchmarking for performance evaluation: We can use [Criterion.rs](https://github.com/bheisler/criterion.rs?tab=readme-ov-file#features) and [bencher](https://github.com/bluss/bencher) to collect statistics to enable statistics-driven optimizations. In addition, we can set up a performance baseline to compare the performance with our implementation. We can measure different metrics, for example, response time, throughput, etc. - * Scalability test: We will try to test our implementation under increased load and ensure the correctness and efficiency at the same time. +* Benchmark testing + * Key performance metrics: Latency and Request Per Second (RPS) would be used as key metrics. + * Workload: Since we are working on an OLAP database, the workload expected should be read-heavy. We thus expect read-heavy and write-occasional workloads that include complex joins and predicates, analytical queries, periodic updates on catalog data, and some metadata updates. Based on this assumption, we plan to evaluate 3 different read-to-write ratios: 1000:1, 100:1, and 10:1. + * Performance evaluation: We can use [ali](https://github.com/nakabonne/ali) to create HTTP traffic and visualize the outcomes in real-time for performance evaluation. + * Performance optimization: We can use [Criterion.rs](https://github.com/bheisler/criterion.rs?tab=readme-ov-file#features) and [bencher](https://github.com/bluss/bencher) to collect statistics to enable statistics-driven optimizations. In addition, we can set up a performance baseline to compare the performance with our implementation. We can measure different metrics, for example, response time, throughput, etc. + ## Trade-offs and Potential Problems * Balancing between metadata retrieval speed and storage efficiency. * Balancing between query performance and engineering complexity/maintainability (such as adding bloom filters). ## Glossary (Optional) ->If you are introducing new concepts or giving unintuitive names to components, write them down here. +>If you are introducing new concepts or giving unintuitive names to components, write them down here. \ No newline at end of file diff --git a/plot.html b/plot.html new file mode 100644 index 0000000..e69de29 diff --git a/scripts/bench b/scripts/bench new file mode 100755 index 0000000..1f3d13a --- /dev/null +++ b/scripts/bench @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +# This script is used to benchmark the catalog server. +# It will start the catalog server, seed the catalog with some namespaces and tables, and use vegeta to stress test the server. +# vegeta: https://github.com/tsenart/vegeta +# Install on mac: brew install vegeta + +import subprocess as sp +import time +import signal +import sys +import requests +import argparse +import string +import random + + +def get_random_str(length=8): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for _ in range(length)) + + +def run(cmd, note, bg=False, out=None): + print(f"{note.ljust(48)}...", end=" ", flush=True) + try: + res = None + if out: + with open(out, "a") as f: + if bg: + res = sp.Popen(cmd, shell=True, stdout=f, stderr=f) + else: + sp.run(cmd, shell=True, check=True, + stdout=f, stderr=f) + else: + if bg: + res = sp.Popen(cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) + else: + sp.run(cmd, shell=True, check=True, + stdout=sp.DEVNULL, stderr=sp.DEVNULL) + print("DONE!") + return res + except sp.CalledProcessError as e: + print("FAIL!") + print("Error:", e) + + +TEST_ROOT_DIR = "test" +DEFAULT_BINARY_NAME = "catalog2" +DEFAULT_DB_ROOT_DIR = f"{TEST_ROOT_DIR}/db" +DEFAULT_BASE_URL = "http://127.0.0.1:8000/v1/" +DEFAULT_NAMESPACE_NUM = 1 +DEFAULT_TABLE_NUM = 1 +DEFAULT_RATE = 8 + +parser = argparse.ArgumentParser(description="Benchmark.") +parser.add_argument("-b", "--binary_name", type=str, + default=DEFAULT_BINARY_NAME, help="Name of the catalog binary.") +parser.add_argument("-d", "--db_root", type=str, + default=DEFAULT_DB_ROOT_DIR, help="Root directory for the database.") +parser.add_argument("-u", "--base_url", type=str, + default=DEFAULT_BASE_URL, help="Base URL for catalog server.") +parser.add_argument("-n", "--namespace_num", type=int, + default=DEFAULT_NAMESPACE_NUM, help="The number of namespace to seed in catalog.") +parser.add_argument("-t", "--table_num", type=int, + default=DEFAULT_TABLE_NUM, help="The number of table to seed in catalog.") +parser.add_argument("-r", "--rate", type=int, + default=DEFAULT_RATE, help="Request rate.") +parser.add_argument("-p", "--plot", action="store_true", + default=False, help="Generate a plot of this benchmark.") +args = parser.parse_args() + + +CATALOG_LOG = f"{TEST_ROOT_DIR}/catalog.log" + +# build catalog in release mode +run(f"rm -rf {TEST_ROOT_DIR} && mkdir {TEST_ROOT_DIR}", + note="initializing test dir") +run(f"cargo build --release && cp target/release/{args.binary_name} {TEST_ROOT_DIR}/{args.binary_name}", + note="building catalog in release mode") +catalog_server = run(f"{TEST_ROOT_DIR}/{args.binary_name} --db-root {args.db_root}", + note="starting catalog server", bg=True, out=CATALOG_LOG) +print("Waiting for catalog server to start...") +time.sleep(1) + +# seeding the catalog, uniformly distribute tables to namespaces +print(f"Seeding namespaces and tables...") +NAMESPACE_ENDPOINT = "namespaces" +TABLE_ENDPOINT = "tables" +namespaces = [] +table_per_namespace = args.table_num // args.namespace_num +for i in range(args.namespace_num): + namespace = get_random_str(32) + tables = [] + for j in range(table_per_namespace): + tables.append(get_random_str(32)) + namespaces.append({'name': namespace, 'tables': tables}) + # create namespace + response = requests.post(f"{args.base_url}/{NAMESPACE_ENDPOINT}", + json={'namespace': [namespace]}) + assert response.status_code == 200, f"Failed to create namespace {namespace}" + + # crate tables + for table in tables: + response = requests.post( + f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace}/{TABLE_ENDPOINT}", + json={'name': table} + ) + assert response.status_code == 200, f"Failed to create namespace {namespace}" + +print(f"Seeded {len(namespaces)} namespaces and {len(namespaces) * table_per_namespace} tables.") + +# test begins +# 1. single endpoint stress test +namespace = namespaces[0] +table = namespace['tables'][0] +targets = { + "get_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}/{table}", + "list_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}", + "get_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}", + "list_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}" +} + +for name, target in targets.items(): + STATISTIC_FILE = f"{TEST_ROOT_DIR}/results_{name}.bin" + attack = f"echo 'GET {target}' | vegeta attack -rate={args.rate} -duration=60s | tee {STATISTIC_FILE} | vegeta report" + run(attack, note=f"single endpoint stress test for {name}", + out=f"{TEST_ROOT_DIR}/veneta_{name}.log") + if args.plot: + PLOT_FILE = f"{TEST_ROOT_DIR}/plot_{name}.html" + run(f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}", + note="generating plot") +# ... more? + + +# 2. random endpoint stress test +# Define the file path +PATH_TARGET_FILE = f"{TEST_ROOT_DIR}/requests_get_table.txt" + +# Write the URLs to the file +with open(PATH_TARGET_FILE, "w") as file: + for i in range(len(namespaces)): + random_namespace = random.choice(namespaces) + random_table = random.choice(random_namespace['tables']) + + # Generate request URL + target = f"{args.base_url}/{NAMESPACE_ENDPOINT}/{random_namespace['name']}/{TABLE_ENDPOINT}/{random_table}" + request_url = f"GET {target}" + + file.write(request_url + "\n") + +print("URLs have been written to", PATH_TARGET_FILE) + + +STATISTIC_FILE = f"{TEST_ROOT_DIR}/results_random.bin" +attack = f"vegeta attack -targets={PATH_TARGET_FILE} -rate={args.rate} -duration=60s | tee {STATISTIC_FILE} | vegeta report" +run(attack, note="random endpoints stress test", + out=f"{TEST_ROOT_DIR}/veneta_random.log") +if args.plot: + PLOT_FILE = f"{TEST_ROOT_DIR}/plot_random.html" + run(f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}", + note="generating plot") + + +# clean up +catalog_server.send_signal(signal.SIGINT) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs new file mode 100644 index 0000000..a65bd7d --- /dev/null +++ b/src/catalog/mod.rs @@ -0,0 +1,2 @@ +pub mod namespace; +pub mod table; diff --git a/src/catalog/namespace.rs b/src/catalog/namespace.rs new file mode 100644 index 0000000..b8bb398 --- /dev/null +++ b/src/catalog/namespace.rs @@ -0,0 +1,192 @@ +use crate::{ + common::result::{ErrorType, Location, Result}, + err, + util::time, +}; +use rocket::serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; + +use crate::db::DBConnection; + +pub type NamespaceIdent = String; + +// we store the namespace as a string, the value should contains all the parent namespaces +// e.g. all the direct child to namespace A.B will be stored in the child field, +// and the ident field will be A::B +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct Namespace { + pub child: Vec, + pub properties: Value, + pub tables: Vec, +} + +fn hash<'a>(level: &Vec) -> String { + if level.is_empty() { + "root".to_string() + } else { + format!("root::{}", level.join("::")) + } +} + +impl Namespace { + pub fn init(conn: &mut DBConnection) -> Result<()> { + let key = "root"; + match conn.exists(key) { + true => Ok(()), + false => { + let properties = json!({ + "created_at": time::now().to_string(), + }); + let namespace = Namespace { + child: vec![], + properties: properties, + tables: vec![], + }; + conn.put(key, &namespace)?; + Ok(()) + } + } + } + + // exist will not return an error + pub fn exists(conn: &DBConnection, level: &Vec) -> bool { + let key = hash(level); + conn.exists(&key) + } + + // List all the child namespaces of the given parent namespace. + pub fn list( + conn: &DBConnection, + parent: &Vec, + ) -> Option>> { + let key = hash(&parent); + let res: Option = conn.get(&key); + if res.is_none() { + return None; + } + let val = res.unwrap(); + let parent: Vec<_> = parent.into_iter().map(|x| x.to_string()).collect(); + Some( + val + .child + .into_iter() + .map(|x| { + let mut r = parent.clone(); + r.push(x); + r + }) + .collect(), + ) + } + + pub fn create( + conn: &mut DBConnection, + level: &Vec, + properties: Option, + ) -> Result { + let key = hash(level); + if Namespace::exists(conn, level) { + return err!( + ErrorType::AlreadyExists, + Location::Namespace, + format!("Namespace {} already exists", key) + ); + } + + let mut old_properties = properties.unwrap_or_else(|| json!({})); + let new_properties = old_properties.as_object_mut().unwrap(); + new_properties.insert( + "created_at".to_string(), + Value::from(time::now().to_string()), + ); + let namespace = Namespace { + child: vec![], + properties: Value::Object(new_properties.to_owned()), + tables: vec![], + }; + conn.put(key.as_str(), &namespace)?; + Ok(namespace) + } + + // get will return an error if the namespace does not exist + pub fn get_properties(conn: &DBConnection, level: &Vec) -> Result> { + let key = hash(level); + let namespace: Option = conn.get(key.as_str()); + if namespace.is_none() { + return err!( + ErrorType::NotFound, + Location::Namespace, + format!("Namespace {} not found", key) + ); + } + Ok(Some(namespace.unwrap().properties)) + } + + // get will return an error if the namespace does not exist + pub fn delete(conn: &mut DBConnection, level: &Vec) -> Result<()> { + let key = hash(level); + let namespace: Option = conn.get(&key); + if namespace.is_none() { + return err!( + ErrorType::NotFound, + Location::Namespace, + format!("Namespace {} not found", key) + ); + } + let namespace = namespace.unwrap(); + if !namespace.child.is_empty() { + return err!( + ErrorType::BadRequest, + Location::Namespace, + format!("Namespace {} has children", key) + ); + } + conn.delete(&key) + } + + pub fn update( + conn: &mut DBConnection, + level: &Vec, + removals: Option>, + updates: Option, + ) -> Result { + let key = hash(level); + let namespace: Option = conn.get(&key); + if namespace.is_none() { + return err!( + ErrorType::NotFound, + Location::Namespace, + format!("Namespace {} not found", key) + ); + } + let mut namespace = namespace.unwrap(); + let properties = namespace.properties.as_object_mut().unwrap(); + + let mut removed_keys = vec![]; + let mut missing_keys = vec![]; + if let Some(removals) = removals { + for key in removals { + if let Some(_) = properties.remove(&key) { + removed_keys.push(key); + } else { + missing_keys.push(key); + } + } + } + + let mut updated_keys: Vec = vec![]; + if let Some(updates) = updates { + for (key, value) in updates.as_object().unwrap() { + properties.insert(key.to_string(), value.to_owned()); + updated_keys.push(key.to_string()); + } + } + + Ok(json!({ + "removed_keys": removed_keys, + "missing_keys": missing_keys, + "updated_keys": updated_keys, + })) + } +} diff --git a/src/catalog/table.rs b/src/catalog/table.rs new file mode 100644 index 0000000..fa497c0 --- /dev/null +++ b/src/catalog/table.rs @@ -0,0 +1,181 @@ +use crate::{ + common::result::{ErrorType, Location, Result}, + err, + server::routes::common::*, +}; +use rocket::{ + serde::{Deserialize, Serialize}, + State, +}; + +// use crate::Location::Namespace; // TODO: update +use crate::catalog::namespace::Namespace; + +use crate::db::DBConnection; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct Table { + pub name: String, + // pub properties: Value, + pub schema: Schema, + pub metadata: TableMetadata, +} + +impl Table { + // exist will not return an error + pub fn exists(conn: &DBConnection, namespace_name: String, table_name: String) -> bool { + let table_key = format!("{}_{}", namespace_name, table_name); + conn.exists(&table_key) + + // TODO: probably want to know whether it is namespace not found or table not found + } + + pub fn create( + conn: &mut DBConnection, + namespace: String, + table: String, + table_metedata_generator: &State, + ) -> Result { + let namespace_key = namespace.clone(); + let table_key = format!("{}_{}", namespace, table); + let table_name = table.clone(); + let table_clone = table.clone(); + + // add checking for whether namespace exists + if !conn.exists(&namespace) { + return err!( + ErrorType::NotFound, + Location::Namespace, // ?? + format!("Namespace {} not found", namespace) + ); + } + + if Table::exists(conn, namespace, table) { + return err!( + ErrorType::AlreadyExists, + Location::Table, + format!("Table {} already exists", table_key) + ); + } + let new_table = Table { + name: table_name, + schema: Schema { + // struct_type: StructType { + // type_: "".to_string(), + // fields: vec![], + // // Populate StructType fields as needed + // }, + schema_id: 0, // Set schema_id to a valid value + identifier_field_ids: vec![], // Provide identifier_field_ids if needed + }, + metadata: table_metedata_generator.generate_table_metadata(1), + }; + conn.put(&table_key, &new_table)?; + + // add the table to the namespace tables + if let Some(mut namespace_instance) = conn.get::(&namespace_key) { + namespace_instance.tables.push(table_clone.clone()); + conn.put(&namespace_key, &namespace_instance)?; + } + + Ok(new_table) + } + + pub fn delete(conn: &mut DBConnection, namespace: String, table: String) -> Result<()> { + let table_key = format!("{}_{}", namespace, table); + let table_name = table.clone(); + if !Table::exists(conn, namespace.clone(), table) { + return err!( + ErrorType::NotFound, + Location::Table, + format!("Table {} not found", table_key) + ); + } + + let namespace_key = namespace.clone(); + if let Some(mut namespace_instance) = conn.get::(&namespace_key) { + // Remove the table from the namespace's tables vector + if let Some(index) = namespace_instance + .tables + .iter() + .position(|t| t == &table_name) + { + namespace_instance.tables.remove(index); + conn.put(&namespace_key, &namespace_instance)?; + } + } + + conn.delete(&table_key) + } + + pub fn list(conn: &DBConnection, namespace: String) -> Option> { + let key = namespace; + // let namespace_instance = conn.get(key); + // let tables = namespace_instance.tables; + + // add checking for whether namespace exists + // let namespace_clone = namespace.clone(); + // if !conn.exists(&namespace_clone){ + // return err!( + // ErrorType::NotFound, + // Location::Namespace, + // format!("Namespace {} not found", namespace_clone) + // ); + // } + + if let Some(namespace_instance) = conn.get::(&key) { + Some(namespace_instance.tables.clone()) + } else { + None + } + // Some(tables) + } + + pub fn get(conn: &DBConnection, namespace_name: String, table_name: String) -> Option
{ + let table_key = format!("{}_{}", namespace_name, table_name); + if let Some(table_instance) = conn.get::
(&table_key) { + Some(table_instance) + } else { + None + } + + // TODO: probably want to know whether it is namespace not found or table not found + } + + pub fn rename( + conn: &mut DBConnection, + namespace_name: String, + old_table_name: String, + new_table_name: String, + ) -> Result { + let old_table_key = format!("{}_{}", namespace_name.clone(), old_table_name.clone()); + let new_table_key = format!("{}_{}", namespace_name.clone(), new_table_name.clone()); + + if let Some(mut old_table) = conn.get::
(&old_table_key) { + old_table.name = new_table_name.clone(); + conn.put(&new_table_key, &old_table)?; + conn.delete(&old_table_key)?; // Remove the old key + } else { + return Ok(false); // If the old table does not exist, return false or handle it accordingly + } + + // true + let namespace_key = namespace_name.clone(); + if let Some(mut namespace) = conn.get::(&namespace_key) { + if let Some(index) = namespace + .tables + .iter() + .position(|name| name == &old_table_name) + { + namespace.tables[index] = new_table_name.clone(); + conn.put(&namespace_key, &namespace)?; + Ok(true) + } else { + return Ok(false); // If the old table name is not found in the tables vector, return false or handle it accordingly + } + } else { + return Ok(false); // If the namespace does not exist, return false or handle it accordingly + } + } +} diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..feef79b --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,15 @@ +use std::path::PathBuf; + +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Cli { + /// Sets the root of database + #[arg(short, long, value_name = "db_root", default_value = "./database")] + pub db_root: Option, +} + +pub fn parse() -> Cli { + Cli::parse() +} diff --git a/src/common/mod.rs b/src/common/mod.rs new file mode 100644 index 0000000..bef0538 --- /dev/null +++ b/src/common/mod.rs @@ -0,0 +1,2 @@ +pub mod response; +pub mod result; diff --git a/src/common/response.rs b/src/common/response.rs new file mode 100644 index 0000000..6adbed2 --- /dev/null +++ b/src/common/response.rs @@ -0,0 +1,77 @@ +use std::io::Cursor; + +use crate::common::result::{Empty, Error, ErrorType}; +use rocket::{ + http::{ContentType, Status}, + response::{self, Responder}, + serde::json::json, + Request, Response, +}; + +impl ErrorType { + fn to_status(&self) -> Status { + match self { + ErrorType::BadRequest => Status::BadRequest, + ErrorType::NotFound => Status::NotFound, + ErrorType::ServiceUnavailable => Status::ServiceUnavailable, + ErrorType::AlreadyExists => Status::Conflict, + ErrorType::Unprocessable => Status::UnprocessableEntity, + ErrorType::InternalError => Status::InternalServerError, + } + } +} + +// HTTP response builder for Error enum +impl<'r> Responder<'r, 'static> for Error { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let status = self.error_type.to_status(); + let body: String = json!({ "error": { + "code": status.code, + "type": self.error_type, + "message": self.message, + } }) + .to_string(); + + // Build and send the request. + Response::build() + .sized_body(body.len(), Cursor::new(body)) + .header(ContentType::new("application", "json")) + .status(status) + .ok() + } +} + +// HTTP response builder for Empty response 204 +impl<'r> Responder<'r, 'static> for Empty { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + // Build and send the request. + Response::build().status(Status::NoContent).ok() + } +} + +#[macro_export] +macro_rules! err { + ($error_type:expr, $location:expr, $message:expr) => { + Err(crate::common::result::Error { + error_type: $error_type, + location: $location, + message: $message, + }) + }; +} + +#[macro_export] +macro_rules! ok_json { + ($($json:tt)+) => {Ok( + rocket::serde::json::Json( + rocket::serde::json::serde_json::json!($($json)*) + ) + )}; +} + +#[macro_export] +macro_rules! ok_empty { + () => { + Ok(crate::common::result::Empty {}) + }; +} diff --git a/src/common/result.rs b/src/common/result.rs new file mode 100644 index 0000000..e01abd2 --- /dev/null +++ b/src/common/result.rs @@ -0,0 +1,86 @@ +use std::{fmt, result}; + +use derive_builder::Builder; +use rocket::serde::{json::Json, Serialize}; +use serde_json::Value; + +/// An enum that represents all types of errors that can occur when using calling catalog service. +#[derive(Clone, Serialize)] +#[serde(crate = "rocket::serde")] +#[allow(dead_code)] +pub enum ErrorType { + BadRequest, + NotFound, + ServiceUnavailable, + AlreadyExists, + Unprocessable, + InternalError, +} + +#[derive(Clone, Serialize)] +#[serde(crate = "rocket::serde")] +pub enum Location { + DB, + Namespace, + Request, + Table, +} + +#[derive(Builder, Serialize)] +#[builder(setter(into))] +#[serde(crate = "rocket::serde")] +pub struct Error { + // error type. + pub error_type: ErrorType, + // place where the error occurred. + pub location: Location, + // error message. + pub message: String, +} + +pub struct Empty {} + +// alias for Result with the error type. +pub type Result = result::Result; +pub type JsonResult = Result>; +pub type EmptyResult = Result; + +impl From<()> for Empty { + fn from(_: ()) -> Empty { + Empty {} + } +} + +impl fmt::Display for Location { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Location::Namespace => write!(f, "namespace"), + Location::Table => write!(f, "table"), + Location::DB => write!(f, "DB"), + Location::Request => write!(f, "request"), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.error_type { + ErrorType::BadRequest => write!(f, "[{}] Bad Request: {}", self.location, self.message), + ErrorType::NotFound => write!(f, "[{}] Not Found: {}", self.location, self.message), + ErrorType::ServiceUnavailable => { + write!(f, "[{}] Unavailable: {}", self.location, self.message) + } + ErrorType::AlreadyExists => write!(f, "[{}] Already Exists: {}", self.location, self.message), + ErrorType::Unprocessable => write!(f, "[{}] Unprocessable: {}", self.location, self.message), + ErrorType::InternalError => write!(f, "[{}] Internal Error: {}", self.location, self.message), + } + } +} + +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str(&format!("Error {{ msg: {} }}", self.message)) + } +} + +impl std::error::Error for Error {} diff --git a/src/db/mod.rs b/src/db/mod.rs index 2842023..f1ace8a 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,5 +1,116 @@ +use crate::{ + catalog::namespace::Namespace, + common::result::{Error, ErrorType, Location, Result}, + err, +}; use pickledb::PickleDb; -struct db { - client: PickleDb, +use rocket::serde::Serialize; +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::{fs, path::PathBuf}; + +pub struct DB { + conn: RwLock, // simple rw lock +} + +impl DB { + pub fn get_read_conn(&self) -> Result> { + let read_guard = self.conn.read(); + if read_guard.is_err() { + return err!( + ErrorType::InternalError, + Location::DB, + "Failed to get read connection".to_owned() + ); + } + + Ok(read_guard.unwrap()) + } + + pub fn get_write_conn(&self) -> Result> { + let write_guard = self.conn.write(); + if write_guard.is_err() { + return err!( + ErrorType::InternalError, + Location::DB, + "Failed to get write connection".to_owned() + ); + } + + Ok(write_guard.unwrap()) + } + + pub fn new(root_dir: PathBuf) -> Result { + println!("starting db in {:?}", root_dir); + if !std::path::Path::new(&root_dir).exists() { + let res = fs::create_dir(&root_dir); + if res.is_err() { + return err!( + ErrorType::InternalError, + Location::DB, + "Failed to create root directory".to_owned() + ); + } + } + + let mut conn = DBConnection::new(&root_dir)?; + Namespace::init(&mut conn)?; + Ok(DB { + conn: RwLock::new(conn), + }) + } +} + +pub struct DBConnection(PickleDb); + +impl DBConnection { + pub fn exists(&self, key: &str) -> bool { + self.0.exists(key) + } + + pub fn get rocket::serde::Deserialize<'de>>(&self, key: &str) -> Option { + self.0.get(key) + } + + pub fn put(&mut self, key: &str, value: &T) -> Result<()> { + match self.0.set(key, &value) { + Ok(_) => Ok(()), + Err(e) => Err(Error { + error_type: ErrorType::InternalError, + location: Location::DB, + message: format!("Failed to put key: {}, error: {}", key, e), + }), + } + } + + pub fn delete(&mut self, key: &str) -> Result<()> { + match self.0.rem(key) { + Ok(_) => Ok(()), + Err(e) => Err(Error { + error_type: ErrorType::InternalError, + location: Location::DB, + message: format!("Failed to delete key: {}, error: {}", key, e), + }), + } + } + + fn new(root_dir: &PathBuf) -> Result { + // Load the database from disk, if no database exists, create a new one. + let db_path = root_dir.join("catalog.namespace"); + match PickleDb::load( + &db_path, + pickledb::PickleDbDumpPolicy::AutoDump, + pickledb::SerializationMethod::Json, + ) { + Ok(conn) => Ok(DBConnection(conn)), + Err(_) => { + let conn = PickleDb::new( + &db_path, + pickledb::PickleDbDumpPolicy::AutoDump, + pickledb::SerializationMethod::Json, + ); + Ok(DBConnection(conn)) + } + } + } } diff --git a/src/main.rs b/src/main.rs index 7e105ac..0d9291f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,30 +1,58 @@ +mod catalog; +mod cli; +mod common; +mod db; +mod server; +mod util; + #[macro_use] extern crate rocket; -mod server; -use server::routes::*; +use db::DB; + +use server::{ + catches, + routes::{common::TableMetadataAtomicIncr, *}, +}; #[launch] -fn rocket() -> _ { - rocket::build().mount( - "/v1", - routes![ - namespace::get_namespace, - namespace::post_namespace, - namespace::head_namespace_by_name, - namespace::get_namespace_by_name, - namespace::delete_namespace_by_name, - namespace::post_namespace_properties, - table::get_table_by_namespace, - table::post_table_by_namespace, - table::register_table, - table::get_table, - table::post_table, - table::delete_table, - table::head_table, - table::rename_table, - metric::post_metrics, - config::get_config, - ], - ) +pub fn rocket() -> _ { + let cli = cli::parse(); + let db = DB::new(cli.db_root.unwrap()); + let table_metedata_generator = TableMetadataAtomicIncr::new(); + if db.is_err() { + panic!("Failed to initialize database: {:?}", db.err()); + } + + rocket::build() + .manage(db.unwrap()) + .manage(table_metedata_generator) + .attach(namespace::stage()) + .attach(catches::stage()) + .mount( + "/v1", + routes![ + table::get_table_by_namespace, + table::post_table_by_namespace, + table::register_table, + table::get_table, + table::post_table, + table::delete_table, + table::head_table, + table::rename_table, + metric::post_metrics, + config::get_config, + ], + ) +} + +#[cfg(test)] +mod test { + use rocket::local::asynchronous::Client; + + #[rocket::async_test] + async fn test_create_server() { + let client = Client::tracked(crate::rocket()).await; + assert_eq!(client.is_ok(), true); + } } diff --git a/src/server/catches.rs b/src/server/catches.rs new file mode 100644 index 0000000..44e6bb0 --- /dev/null +++ b/src/server/catches.rs @@ -0,0 +1,52 @@ +use crate::common::result::{EmptyResult, ErrorType, Location}; +use crate::err; + +#[catch(404)] +fn general_not_found() -> EmptyResult { + err!( + ErrorType::NotFound, + Location::Request, + "Resource not found, please check the URL".to_string() + ) +} + +#[catch(400)] +fn general_bad_request() -> EmptyResult { + err!( + ErrorType::BadRequest, + Location::Request, + "Bad Request".to_string() + ) +} + +#[catch(422)] +fn general_unprocessable_request() -> EmptyResult { + err!( + ErrorType::Unprocessable, + Location::Request, + "Unprocessable request".to_string() + ) +} + +#[catch(500)] +fn general_internal_error() -> EmptyResult { + err!( + ErrorType::InternalError, + Location::Request, + "Internal server error".to_string() + ) +} + +pub fn stage() -> rocket::fairing::AdHoc { + rocket::fairing::AdHoc::on_ignite("error response guard", |rocket| async { + rocket.register( + "/", + catchers![ + general_not_found, + general_bad_request, + general_internal_error, + general_unprocessable_request + ], + ) + }) +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 6a664ab..2d072fe 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1 +1,2 @@ +pub mod catches; pub mod routes; diff --git a/src/server/routes/common.rs b/src/server/routes/common.rs new file mode 100644 index 0000000..aaff0a1 --- /dev/null +++ b/src/server/routes/common.rs @@ -0,0 +1,127 @@ +// use crate::catalog::namespace::Namespace; +use rocket::serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct TableIdentifier { + pub namespace: NamespaceResponse, // TODO: Should update this Namespace + pub name: String, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct NamespaceResponse(pub Vec); + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +#[serde(untagged)] +pub enum Type { + Primitive(PrimitiveType), + Struct(StructType), + List(Box), + Map(Box), +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct PrimitiveType(String); + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct StructField { + id: i32, + name: String, + #[serde(flatten)] + type_: Type, + required: bool, + // doc: Option, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct StructType { + #[serde(rename = "type")] + pub type_: String, + pub fields: Vec, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct ListType { + #[serde(rename = "type")] + type_: String, + element_id: i32, + #[serde(flatten)] + element: Type, + element_required: bool, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct MapType { + #[serde(rename = "type")] + type_: String, + key_id: i32, + #[serde(flatten)] + key: Type, + value_id: i32, + #[serde(flatten)] + value: Type, + value_required: bool, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct Schema { + // #[serde(flatten)] + // pub struct_type: StructType, // TODO: update! + pub schema_id: i32, + pub identifier_field_ids: Vec, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct TableMetadata { + pub format_version: i32, + pub table_uuid: String, + // pub location: Option, + // pub last_updated_ms: Option, + // pub properties: HashMap, + // pub schemas: Vec, + // pub current_schema_id: Option, + // pub last_column_id: Option, + // pub partition_specs: Vec, + // pub default_spec_id: Option, + // pub last_partition_id: Option, + // pub sort_orders: Vec, + // pub default_sort_order_id: Option, + // pub snapshots: Vec, + // pub refs: SnapshotReferences, + // pub current_snapshot_id: Option, + // pub last_sequence_number: Option, + // pub snapshot_log: SnapshotLog, + // pub metadata_log: MetadataLog, +} + +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub struct TableMetadataAtomicIncr { + table_uuid_counter: AtomicUsize, +} + +impl TableMetadataAtomicIncr { + pub fn new() -> Self { + TableMetadataAtomicIncr { + table_uuid_counter: AtomicUsize::new(0), + } + } + + pub fn generate_table_metadata(&self, format_version: i32) -> TableMetadata { + let uuid = self.table_uuid_counter.fetch_add(1, Ordering::SeqCst); + let table_uuid = format!("uuid{}", uuid); // Generate UUID based on the counter value + TableMetadata { + format_version, + table_uuid, + } + } +} diff --git a/src/server/routes/metric.rs b/src/server/routes/metric.rs index 1c50f22..239f009 100644 --- a/src/server/routes/metric.rs +++ b/src/server/routes/metric.rs @@ -1,5 +1,5 @@ /// Send a metrics report to this endpoint to be processed by the backend #[post("/namespaces//tables/
/metrics")] pub fn post_metrics(namespace: &str, table: &str) { - todo!("post_metrics") + todo!("post_metrics {} {}", namespace, table) } diff --git a/src/server/routes/mod.rs b/src/server/routes/mod.rs index d93f96d..8700beb 100644 --- a/src/server/routes/mod.rs +++ b/src/server/routes/mod.rs @@ -1,5 +1,7 @@ -#[allow(dead_code)] +pub mod common; pub mod config; pub mod metric; pub mod namespace; +pub mod request; +pub mod response; pub mod table; diff --git a/src/server/routes/namespace.rs b/src/server/routes/namespace.rs index 6c08ff0..ca87d57 100644 --- a/src/server/routes/namespace.rs +++ b/src/server/routes/namespace.rs @@ -1,35 +1,394 @@ +use crate::catalog::namespace::{Namespace, NamespaceIdent}; +use crate::common::result::{self, EmptyResult, ErrorType, JsonResult, Location, Result}; +use crate::{err, ok_empty, ok_json}; +use std::collections::HashSet; + +use rocket::request::FromParam; + +use rocket::{ + serde::{ + json::{Json, Value}, + Deserialize, Serialize, + }, + State, +}; + +use crate::db::DB; + +pub struct NamespaceParam(pub Vec); + +/// Returns an instance of `PasteId` if the path segment is a valid ID. +/// Otherwise returns the invalid ID as the `Err` value. +impl<'r> FromParam<'r> for NamespaceParam { + type Error = result::Error; + fn from_param(param: &'r str) -> Result { + NamespaceParam::try_from(param) + } +} + +impl TryFrom<&str> for NamespaceParam { + type Error = result::Error; + fn try_from(param: &str) -> Result { + let parts: Vec<_> = param.split('\u{001F}').collect(); + // check if all parts are valid + if !parts + .iter() + .all(|p| p.chars().all(|c| c.is_ascii_alphanumeric())) + { + return err!( + ErrorType::BadRequest, + Location::Namespace, + "Invalid parameter".to_owned() + ); + } + Ok(NamespaceParam( + parts.into_iter().map(|x| x.to_string()).collect(), + )) + } +} + +#[derive(Deserialize, Serialize)] +#[serde(crate = "rocket::serde")] +// Create Namespace Request +pub struct CreateNamespaceRequest { + pub namespace: Vec, + // Configured string to string map of properties for the namespace + pub properties: Option, +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +// Update Namespace Request +pub struct UpdateNamespaceRequest { + pub removals: Option>, + pub updates: Option, +} + /// List namespaces, optionally providing a parent namespace to list underneath -#[get("/namespaces")] -pub fn get_namespace() { - todo!("get_namespace") +#[get("/namespaces?")] +pub async fn get(parent: Option<&str>, db: &State) -> JsonResult { + let conn = db.get_read_conn()?; + let parent = if let Some(p_str) = parent { + NamespaceParam::try_from(p_str)?.0 + } else { + vec![] + }; + let res = Namespace::list(&conn, &parent); + match res { + None => err!( + ErrorType::NotFound, + Location::Namespace, + format!("Namespace {} not found", parent.join(".")) + ), + Some(v) => ok_json!( { "namespaces": v }), + } } /// Create a namespace -#[post("/namespaces")] -pub fn post_namespace() { - todo!("post_namespace") +#[post("/", data = "")] +pub async fn post(create_request: Json, db: &State) -> JsonResult { + let mut conn = db.get_write_conn()?; + let created_namespace = Namespace::create( + &mut conn, + &create_request.namespace, + create_request.properties.clone(), // FIXME: this is a clone, can it be avoided? + )?; + ok_json!({ + "namespace": create_request.namespace.clone(), + "properties": created_namespace.properties, + }) } /// Check if a namespace exists -#[head("/namespaces/")] -pub fn head_namespace_by_name(namespace: &str) { - todo!("head_namespace_by_name") +#[head("/")] +pub async fn head_by_name(namespace: NamespaceParam, db: &State) -> EmptyResult { + let conn = db.get_read_conn()?; + let exists = Namespace::exists(&conn, &namespace.0); + match exists { + true => ok_empty!(), + false => err!( + ErrorType::NotFound, + Location::Namespace, + format!("Namespace {} not found", namespace.0.join(".")) + ), + } } /// Load the metadata properties for a namespace -#[get("/namespaces/")] -pub fn get_namespace_by_name(namespace: &str) { - todo!("get_namespace_by_name") +#[get("/")] +pub async fn get_by_name(namespace: NamespaceParam, db: &State) -> JsonResult { + let conn = db.get_read_conn()?; + let properties = Namespace::get_properties(&conn, &namespace.0)?; + ok_json!({ "properties": properties }) } /// Drop a namespace from the catalog. Namespace must be empty. -#[delete("/namespaces/")] -pub fn delete_namespace_by_name(namespace: &str) { - todo!("delete_namespace_by_name") +#[delete("/")] +pub async fn delete_by_name(namespace: NamespaceParam, db: &State) -> EmptyResult { + let mut conn = db.get_write_conn()?; + Namespace::delete(&mut conn, &namespace.0)?; + ok_empty!() } /// Set or remove properties on a namespace -#[post("/namespaces//properties")] -pub fn post_namespace_properties(namespace: &str) { - todo!("post_namespace_properties") +#[post("//properties", data = "")] +pub fn post_properties( + namespace: NamespaceParam, + mut update_request: Json, + db: &State, +) -> JsonResult { + // we don't test the uniqueness of the keys in removals, it will be treated as a no-op. + // we only test if a key is presented both in the removals and update. + if update_request.updates.is_none() && update_request.removals.is_none() { + return err!( + ErrorType::BadRequest, + Location::Request, + "No updates or removals provided".to_owned() + ); + } + if let (Some(removals), Some(updates)) = (&update_request.removals, &update_request.updates) { + let mut removed_key: HashSet<&str> = HashSet::new(); + for key in removals { + removed_key.insert(key); + } + if let Some(updates) = updates.as_object() { + for key in updates.keys() { + if removed_key.contains(key.as_str()) { + return err!( + ErrorType::BadRequest, + Location::Namespace, + format!("Key {} is present in both removals and updates", key) + ); + } + } + } + } + + let mut conn = db.get_write_conn()?; + let res = Namespace::update( + &mut conn, + &namespace.0, + update_request.removals.take(), + update_request.updates.take(), + )?; + ok_json!(res) +} + +pub fn stage() -> rocket::fairing::AdHoc { + rocket::fairing::AdHoc::on_ignite("namespace routes", |rocket| async { + rocket + .mount( + "/v1/namespaces", + routes![ + post, + head_by_name, + get_by_name, + delete_by_name, + post_properties + ], + ) + .mount("/v1", routes![get]) // for a query parameter + }) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::table::test::create_mock_client; + use rocket::http::ContentType; + use rocket::http::Status; + + #[rocket::async_test] + async fn test_list_non_exist_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "parentnonexist"; + let endpoint = format!("/v1/namespaces?parent={}", namespace_name); + + let response = client + .get(&endpoint) + .header(ContentType::JSON) + .dispatch() + .await; + + assert_eq!(response.status(), Status::NotFound); + } + + #[rocket::async_test] + async fn test_list_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = ""; + let endpoint = format!("/v1/namespaces?parent={}", namespace_name); + + let response = client + .get(&endpoint) + .header(ContentType::JSON) + .dispatch() + .await; + + assert_eq!(response.status(), Status::NotFound); // TODO: FIXME: NotFound Or Ok? + } + + #[rocket::async_test] + async fn test_create_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "create_namespace"; + let endpoint = format!("/v1/namespaces/"); + + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post(endpoint) + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + } + + #[rocket::async_test] + async fn test_get_non_exist_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "nonexist"; + let endpoint = format!("/v1/namespaces/{}", namespace_name); + + let response = client.get(&endpoint).dispatch().await; + + assert_eq!(response.status(), Status::NotFound); + } + + #[rocket::async_test] + async fn test_get_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "exist"; + let endpoint = format!("/v1/namespaces/"); + + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post(endpoint) + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let endpoint = format!("/v1/namespaces/{}", namespace_name); + + let response = client.get(&endpoint).dispatch().await; + + assert_eq!(response.status(), Status::Ok); + } + + #[rocket::async_test] + async fn test_check_namespace_exist() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "exist"; + let endpoint = format!("/v1/namespaces/"); + + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post(endpoint) + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let endpoint = format!("/v1/namespaces/{}", namespace_name); + + let response = client.head(&endpoint).dispatch().await; + + assert_eq!(response.status(), Status::NoContent); + } + + #[rocket::async_test] + async fn test_check_namespace_non_exist() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "nonexist"; + let endpoint = format!("/v1/namespaces/{}", namespace_name); + + let response = client.head(&endpoint).dispatch().await; + + assert_eq!(response.status(), Status::NotFound); + } + + #[rocket::async_test] + async fn test_delete_exist_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "exist"; + let endpoint = format!("/v1/namespaces/"); + + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post(endpoint) + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + // delete + let endpoint_delete = format!("/v1/namespaces/{}", namespace_name); + let response = client.delete(endpoint_delete).dispatch().await; + + assert_eq!(response.status(), Status::NoContent); + } + + #[rocket::async_test] + async fn test_delete_non_exist_namespace() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "nonexist"; + let endpoint_delete = format!("/v1/namespaces/{}", namespace_name); + let response = client.delete(endpoint_delete).dispatch().await; + + assert_eq!(response.status(), Status::NotFound); + } } diff --git a/src/server/routes/request.rs b/src/server/routes/request.rs new file mode 100644 index 0000000..a101a45 --- /dev/null +++ b/src/server/routes/request.rs @@ -0,0 +1,92 @@ +use crate::server::routes::common::*; +use rocket::serde::Deserialize; +use rocket::serde::Serialize; + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +// #[derive(serde::Deserialize)] +pub struct CreateTableRequest { + pub name: String, + // location: Option, + // schema: Schema, + // #[serde(rename = "partition-spec")] + // partition_spec: PartitionSpec, + // #[serde(rename = "write-order")] + // write_order: SortOrder, + // #[serde(rename = "stage-create")] + // stage_create: bool, + // properties: Option>, +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +#[allow(dead_code)] +pub struct RegisterTableRequest { + name: String, + #[serde(rename = "metadata-location")] + metadata_location: String, // TODO: need to modify to table schema +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct CommitTableRequest { + // pub identifier: TableIdentifier, + pub requirements: Vec, + pub updates: Vec, +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +#[serde(rename_all = "camelCase")] +pub struct TableRequirement { + #[serde(flatten)] + pub requirement: RequirementType, +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +#[serde(untagged)] +pub enum RequirementType { + // AssertCreate(AssertCreate), + // AssertTableUUID(AssertTableUUID), + // AssertRefSnapshotId(AssertRefSnapshotId), + // AssertLastAssignedFieldId(AssertLastAssignedFieldId), + // AssertCurrentSchemaId(AssertCurrentSchemaId), + // AssertLastAssignedPartitionId(AssertLastAssignedPartitionId), + // AssertDefaultSpecId(AssertDefaultSpecId), + // AssertDefaultSortOrderId(AssertDefaultSortOrderId), +} + +#[derive(Deserialize)] +#[serde(crate = "rocket::serde")] +#[serde(untagged)] +pub enum TableUpdate { + // AssignUUIDUpdate(AssignUUIDUpdate), + // UpgradeFormatVersionUpdate(UpgradeFormatVersionUpdate), + // AddSchemaUpdate(AddSchemaUpdate), + // SetCurrentSchemaUpdate(SetCurrentSchemaUpdate), + // AddPartitionSpecUpdate(AddPartitionSpecUpdate), + // SetDefaultSpecUpdate(SetDefaultSpecUpdate), + // AddSortOrderUpdate(AddSortOrderUpdate), + // SetDefaultSortOrderUpdate(SetDefaultSortOrderUpdate), + // AddSnapshotUpdate(AddSnapshotUpdate), + // SetSnapshotRefUpdate(SetSnapshotRefUpdate), + // RemoveSnapshotsUpdate(RemoveSnapshotsUpdate), + // RemoveSnapshotRefUpdate(RemoveSnapshotRefUpdate), + // SetLocationUpdate(SetLocationUpdate), + // SetPropertiesUpdate(SetPropertiesUpdate), + // RemovePropertiesUpdate(RemovePropertiesUpdate), +} + +#[derive(FromForm)] +#[allow(dead_code)] +pub struct PurgeRequested { + purge_requested: Option, +} + +#[derive(Serialize, Deserialize)] +#[serde(crate = "rocket::serde")] +pub struct RenameTableRequest { + pub source: TableIdentifier, + pub destination: TableIdentifier, +} diff --git a/src/server/routes/response.rs b/src/server/routes/response.rs new file mode 100644 index 0000000..ef9d2c5 --- /dev/null +++ b/src/server/routes/response.rs @@ -0,0 +1,50 @@ +use crate::server::routes::common::*; +use rocket::serde::Serialize; + +// #[get("/namespaces//tables")] --> 200: ListTablesResponse +#[derive(Serialize)] +#[serde(crate = "rocket::serde")] +pub struct ListTablesResponse { + pub identifiers: Vec, +} + +// #[post("/namespaces//tables")] --> 200: CreateTableResponse +pub type CreateTableResponse = LoadTableResult; +// #[post("/namespaces//register")] --> 200: LoadTableResponse +// #[get("/namespaces//tables/
")] --> 200: LoadTableResponse +pub type LoadTableResponse = LoadTableResult; +// LoadTableResult +#[derive(Serialize)] +#[serde(crate = "rocket::serde")] +pub struct LoadTableResult { + // pub metadata_location: Option, + pub metadata: TableMetadata, + // pub config: HashMap, +} + +#[derive(Serialize)] +#[serde(crate = "rocket::serde")] +pub struct CommitTableResponse { + pub metadata_location: String, + pub metadata: TableMetadata, +} + +// 400: BadRequestErrorResponse +// 404: IcebergErrorResponse +// 409: TableAlreadyExistsError +// 503: ServiceUnavailableResponse +// 5XX: ServerErrorResponse +#[derive(Serialize)] +#[serde(crate = "rocket::serde")] +pub struct IcebergErrorResponse { + pub error: ErrorModel, +} + +#[derive(Serialize)] +#[serde(crate = "rocket::serde")] +pub struct ErrorModel { + pub message: String, + pub r#type: String, // Using r#type to avoid conflict with the type keyword + pub code: i32, + // pub stack: Option>, +} diff --git a/src/server/routes/table.rs b/src/server/routes/table.rs index a68e9c3..2516026 100644 --- a/src/server/routes/table.rs +++ b/src/server/routes/table.rs @@ -1,47 +1,619 @@ +use crate::request::*; +use crate::server::routes::common::*; +use crate::{err, ok_empty, response::*}; +use rocket::serde::json::Json; + +use crate::catalog::table::Table; +use crate::common::result::{EmptyResult, ErrorType, Location, Result}; +use crate::server::routes::namespace::NamespaceParam; + +use crate::DB; +use rocket::State; + +pub type JsonResultGeneric = Result>; + +fn hash<'a>(level: &Vec) -> String { + if level.is_empty() { + "root".to_string() + } else { + format!("root::{}", level.join("::")) + } +} + /// List all table identifiers underneath a given namespace #[get("/namespaces//tables")] -pub fn get_table_by_namespace(namespace: &str) { - todo!("get_table_by_namespace") +pub fn get_table_by_namespace( + namespace: NamespaceParam, + db: &State, +) -> JsonResultGeneric { + let mut conn = db.get_read_conn()?; + let copy = namespace.0.clone(); + let hash_key = hash(&namespace.0); + let table_names = Table::list(&mut conn, hash_key.to_string()); + let all_table_names = table_names.clone(); + + let mut identifiers = Vec::new(); + for table_name in all_table_names.into_iter().flatten() { + let identifier = TableIdentifier { + namespace: NamespaceResponse(copy.clone()), // Assuming namespace is a Vec + name: table_name.clone(), + }; + identifiers.push(identifier); + } + + if identifiers.is_empty() { + return err!( + ErrorType::NotFound, + Location::Table, + format!("No tables found for the specified namespace") + ); + } + + // Create and return ListTablesResponse + let response = ListTablesResponse { identifiers }; + + Ok(Json(response)) } /// Create a table in the given namespace -#[post("/namespaces//tables")] -pub fn post_table_by_namespace(namespace: &str) { - todo!("post_table_by_namespace") +// TODO: check whether namespace exists first +#[post("/namespaces//tables", data = "")] +pub fn post_table_by_namespace( + namespace: NamespaceParam, + create_table_request: Json, + db: &State, + table_metedata_generator: &State, +) -> JsonResultGeneric { + let mut conn = db.get_write_conn()?; + let hash_key = hash(&namespace.0); + let new_table = Table::create( + &mut conn, + hash_key.to_string(), + create_table_request.name.clone().to_string(), + table_metedata_generator, + )?; + + // Generate metadata for the newly created table + let metadata = TableMetadata { + format_version: new_table.metadata.format_version, + table_uuid: new_table.metadata.table_uuid, + // Fill in other fields as needed + }; + + // Construct the response + let response = CreateTableResponse { metadata }; + + // Return the response as JSON + Ok(Json(response)) } /// Register a table in the given namespace using given metadata file location -#[post("/namespaces//register")] -pub fn register_table(namespace: &str) { - todo!("register_table") +#[post("/namespaces//register", data = "")] +pub fn register_table( + namespace: &str, + register_table_request: Json, +) -> JsonResultGeneric { + // Generate metadata for the newly created table + let metadata = TableMetadata { + format_version: 1, + table_uuid: "generated_uuid".to_string(), + // Fill in other fields as needed + }; + + // Construct the response + let response = LoadTableResponse { metadata }; + + // Return the response as JSON + Ok(Json(response)) } /// Load a table from the catalog #[get("/namespaces//tables/
")] -pub fn get_table(namespace: &str, table: &str) { - todo!("post_namespace_table") +pub fn get_table( + namespace: NamespaceParam, + table: &str, + db: &State, +) -> JsonResultGeneric { + let conn = db.get_read_conn()?; + let hash_key = hash(&namespace.0); + let table_data_option = Table::get( + &conn, + hash_key.to_string(), + table.to_string(), // FIXME: this is a clone, can it be avoided? + ); + + // TODO: update to real metadata + let table_data = table_data_option.unwrap(); + // Generate metadata for the newly created table + let metadata = TableMetadata { + format_version: table_data.metadata.format_version, + table_uuid: table_data.metadata.table_uuid, + // Fill in other fields as needed + }; + + // Construct the response + let response = LoadTableResponse { metadata }; + + // Return the response as JSON + Ok(Json(response)) } /// Commit updates to a table -#[post("/namespaces//tables/
")] -pub fn post_table(namespace: &str, table: &str) { - todo!("post_namespace_table") +#[post( + "/namespaces//tables/
", + data = "" +)] +pub fn post_table( + namespace: &str, + table: &str, + commit_table_request: Json, +) -> JsonResultGeneric { + // TODO: need to update metadata + // Generate metadata for the newly created table + let metadata = TableMetadata { + format_version: 1, + table_uuid: "generated_uuid".to_string(), + // Fill in other fields as needed + }; + + // Construct the response + let response = CommitTableResponse { + metadata, + metadata_location: "".to_string(), + }; + + // Return the response as JSON + Ok(Json(response)) } /// Drop a table from the catalog -#[delete("/namespaces//tables/
")] -pub fn delete_table(namespace: &str, table: &str) { - todo!("post_namespace_table") +#[delete("/namespaces//tables/
?")] +pub fn delete_table( + namespace: NamespaceParam, + table: &str, + purge_requested: PurgeRequested, + db: &State, +) -> EmptyResult { + let mut conn = db.get_write_conn()?; + let hash_key = hash(&namespace.0); + Table::delete(&mut conn, hash_key.to_string(), table.to_string())?; + ok_empty!() } /// Check if a table exists #[head("/namespaces//tables/
")] -pub fn head_table(namespace: &str, table: &str) { - todo!("post_namespace_table") +pub fn head_table(namespace: NamespaceParam, table: &str, db: &State) -> EmptyResult { + let conn = db.get_read_conn()?; + let hash_key = hash(&namespace.0); + let exists = Table::exists(&conn, hash_key.to_string(), table.to_string()); + + // let error = false; + match exists { + // true => Ok(()), + true => ok_empty!(), + false => err!( + ErrorType::NotFound, + Location::Table, + format!("Table not found") + ), + } } /// Rename a table from its current name to a new name -#[post("/tables/rename")] -pub fn rename_table() { - todo!("rename_table") +#[post("/tables/rename", data = "")] +pub fn rename_table(rename_table_request: Json, db: &State) -> EmptyResult { + let mut conn = db.get_write_conn()?; + // Table::rename(&mut conn, namespace.to_string(), table.to_string())?; + let tmp = &rename_table_request.source.namespace.0; + let namespace_hash = hash(&tmp); + + // Table::rename(&mut conn, "a".to_string(), rename_table_request.source.name.clone(), rename_table_request.destination.name.clone()); + Table::rename( + &mut conn, + namespace_hash, + rename_table_request.source.name.clone(), + rename_table_request.destination.name.clone(), + )?; + ok_empty!() +} + +#[cfg(test)] +pub(crate) mod test { + use super::*; + use crate::config; + use crate::metric; + use crate::namespace; + use crate::server::catches; + use crate::server::routes::namespace::CreateNamespaceRequest; + use crate::table; + use rocket::http::ContentType; + use rocket::http::Status; + use rocket::local::asynchronous::Client; + + use std::path::PathBuf; + + #[rocket::async_test] + async fn test_get_table_by_namespace_empty_result() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let url = format!("/v1/namespaces/{}/tables", namespace_name); + let response = client.get(&url).dispatch().await; + + assert_eq!(response.status(), Status::NotFound); + } + + #[rocket::async_test] + async fn test_get_table_by_namespace_result_found() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let url = format!("/v1/namespaces/{}/tables", namespace_name); + let get_response = client.get(&url).dispatch().await; + assert_eq!(get_response.status(), Status::NotFound); + + let create_table_request = CreateTableRequest { + name: "tablenametest".to_string(), + }; + let create_table_request_json = Json(create_table_request); + let json_bytes = serde_json::to_vec(&create_table_request_json.into_inner()).unwrap(); + + let post_response = client + .post(format!("/v1/namespaces/{}/tables", namespace_name)) + .header(ContentType::JSON) + .body(json_bytes) + .dispatch() + .await; + + assert_eq!(post_response.status(), Status::Ok); + + let get_response_2 = client.get(&url).dispatch().await; + assert_eq!(get_response_2.status(), Status::Ok); + } + + #[rocket::async_test] + async fn test_post_table_by_namespace_new_table() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "testnamespacename"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], + properties: None, + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + } + + #[rocket::async_test] + async fn test_post_table_by_namespace_conflict() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "testnamespacename"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes.clone()) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let second_response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(second_response.status(), Status::Conflict); + } + + #[rocket::async_test] + async fn test_delete_table_that_exists() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let url = format!("/v1/namespaces/{}/tables", namespace_name); + let get_response = client.get(&url).dispatch().await; + assert_eq!(get_response.status(), Status::NotFound); + + let table_name = "tablenametest"; + let create_table_request = CreateTableRequest { + name: table_name.to_string(), + }; + let create_table_request_json = Json(create_table_request); + let json_bytes = serde_json::to_vec(&create_table_request_json.into_inner()).unwrap(); + + let post_response = client + .post(format!("/v1/namespaces/{}/tables", namespace_name)) + .header(ContentType::JSON) + .body(json_bytes) + .dispatch() + .await; + + assert_eq!(post_response.status(), Status::Ok); + + let get_response_2 = client.get(&url).dispatch().await; + assert_eq!(get_response_2.status(), Status::Ok); + + let delete_url = format!("/v1/namespaces/{}/tables/{}", namespace_name, table_name); + let delete_response = client.delete(&delete_url).dispatch().await; + assert_eq!(delete_response.status(), Status::NoContent); + } + + #[rocket::async_test] + async fn test_delete_table_that_not_exists() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let url = format!("/v1/namespaces/{}/tables", namespace_name); + let get_response = client.get(&url).dispatch().await; + assert_eq!(get_response.status(), Status::NotFound); + + let table_name = "tablenametest"; + let create_table_request = CreateTableRequest { + name: table_name.to_string(), + }; + let create_table_request_json = Json(create_table_request); + let json_bytes = serde_json::to_vec(&create_table_request_json.into_inner()).unwrap(); + + let post_response = client + .post(format!("/v1/namespaces/{}/tables", namespace_name)) + .header(ContentType::JSON) + .body(json_bytes) + .dispatch() + .await; + + assert_eq!(post_response.status(), Status::Ok); + + let get_response_2 = client.get(&url).dispatch().await; + assert_eq!(get_response_2.status(), Status::Ok); + + let table_name_not_exists = "tablenamenotexist"; + let delete_url = format!( + "/v1/namespaces/{}/tables/{}", + namespace_name, table_name_not_exists + ); + let delete_response = client.delete(&delete_url).dispatch().await; + assert_eq!(delete_response.status(), Status::NotFound); + } + + #[rocket::async_test] + async fn test_head_table() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let table_name = "tablenametest"; + + let url = format!("/v1/namespaces/{}/tables/{}", namespace_name, table_name); + let head_response = client.head(&url).dispatch().await; + assert_eq!(head_response.status(), Status::NotFound); + + let create_table_request = CreateTableRequest { + name: table_name.to_string(), + }; + let create_table_request_json = Json(create_table_request); + let json_bytes = serde_json::to_vec(&create_table_request_json.into_inner()).unwrap(); + + let post_response = client + .post(format!("/v1/namespaces/{}/tables", namespace_name)) + .header(ContentType::JSON) + .body(json_bytes) + .dispatch() + .await; + + assert_eq!(post_response.status(), Status::Ok); + + let head_response_2 = client.head(&url).dispatch().await; + assert_eq!(head_response_2.status(), Status::NoContent); + } + + #[rocket::async_test] + async fn test_rename_table() { + let temp_dir = tempfile::tempdir().expect("failed to create a temporary directory"); + let client = create_mock_client(temp_dir.path().to_path_buf()).await; + + let namespace_name = "namespacenametest"; + let create_namespace_request = CreateNamespaceRequest { + namespace: vec![namespace_name.to_string()], // Use String directly + properties: None, // Adjust as needed + }; + let create_namespace_request_json = Json(create_namespace_request); + let create_namespace_request_json_bytes = + serde_json::to_vec(&create_namespace_request_json.into_inner()).unwrap(); + + let response = client + .post("/v1/namespaces") + .header(ContentType::JSON) + .body(create_namespace_request_json_bytes) + .dispatch() + .await; + + assert_eq!(response.status(), Status::Ok); + + let table_name = "tablenametest"; + let new_table_name = "renamedtable"; + + let url = format!("/v1/namespaces/{}/tables/{}", namespace_name, table_name); + let rename_url = format!( + "/v1/namespaces/{}/tables/{}", + namespace_name, new_table_name + ); + let head_response = client.head(&url).dispatch().await; + assert_eq!(head_response.status(), Status::NotFound); + + let create_table_request = CreateTableRequest { + name: table_name.to_string(), + }; + let create_table_request_json = Json(create_table_request); + let json_bytes = serde_json::to_vec(&create_table_request_json.into_inner()).unwrap(); + + let post_response = client + .post(format!("/v1/namespaces/{}/tables", namespace_name)) + .header(ContentType::JSON) + .body(json_bytes) + .dispatch() + .await; + + assert_eq!(post_response.status(), Status::Ok); + + let head_response_2 = client.head(&url).dispatch().await; + assert_eq!(head_response_2.status(), Status::NoContent); + + // rename table + // let rename_table_request = RenameTableRequest { + // source:{namespace: vec![namespace_name.to_string()], name: table_name}, + // destination: {namespace: vec![namespace_name.to_string()], name: new_table_name}, + // }; + let rename_table_request = RenameTableRequest { + source: TableIdentifier { + namespace: NamespaceResponse(vec![namespace_name.to_string()]), + name: table_name.to_string(), + }, + destination: TableIdentifier { + namespace: NamespaceResponse(vec![namespace_name.to_string()]), + name: new_table_name.to_string(), + }, + }; + let rename_table_request_json = Json(rename_table_request); + let rename_json_bytes = serde_json::to_vec(&rename_table_request_json.into_inner()).unwrap(); + + let rename_response = client + .post("/v1/tables/rename") + .header(ContentType::JSON) + .body(rename_json_bytes) + .dispatch() + .await; + assert_eq!(rename_response.status(), Status::NoContent); + + let head_response_3 = client.head(&url).dispatch().await; + assert_eq!(head_response_3.status(), Status::NotFound); + + let head_response_4 = client.head(&rename_url).dispatch().await; + assert_eq!(head_response_4.status(), Status::NoContent); + } + + pub async fn create_mock_client(temp_dir: PathBuf) -> Client { + let db_test = DB::new(temp_dir).expect("failed to create a db"); + + let table_metadata_generator = TableMetadataAtomicIncr::new(); + let mut rocket = rocket::build(); + rocket = rocket + .manage(db_test) + .manage(table_metadata_generator) + .attach(namespace::stage()) + .attach(catches::stage()) + .mount( + "/v1", + routes![ + table::get_table_by_namespace, + table::post_table_by_namespace, + table::register_table, + table::get_table, + table::post_table, + table::delete_table, + table::head_table, + table::rename_table, + metric::post_metrics, + config::get_config, + ], + ); + + Client::tracked(rocket) + .await + .expect("valid rocket instance") + } } diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 0000000..077885d --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1 @@ +pub mod time; diff --git a/src/util/time.rs b/src/util/time.rs new file mode 100644 index 0000000..0db0f77 --- /dev/null +++ b/src/util/time.rs @@ -0,0 +1,9 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +pub fn now() -> u64 { + let current_time = SystemTime::now(); + current_time + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() +}