diff --git a/Cargo.lock b/Cargo.lock index 5c6ad6f0..e698f8a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1133,7 +1133,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.11.0", "lazy_static", "lazycell", "log", @@ -1840,11 +1840,11 @@ dependencies = [ [[package]] name = "cranelift-bforest" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ba4f80548f22dc9c43911907b5e322c5555544ee85f785115701e6a28c9abe1" +checksum = "540b193ff98b825a1f250a75b3118911af918a734154c69d80bcfcf91e7e9522" dependencies = [ - "cranelift-entity 0.114.0", + "cranelift-entity 0.113.1", ] [[package]] @@ -1859,9 +1859,9 @@ dependencies = [ [[package]] name = "cranelift-bitset" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "005884e3649c3e5ff2dc79e8a94b138f11569cc08a91244a292714d2a86e9156" +checksum = "c7cb269598b9557ab942d687d3c1086d77c4b50dcf35813f3a65ba306fd42279" dependencies = [ "serde", "serde_derive", @@ -1892,24 +1892,23 @@ dependencies = [ [[package]] name = "cranelift-codegen" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe4036255ec33ce9a37495dfbcfc4e1118fd34e693eff9a1e106336b7cd16a9b" +checksum = "46566d7c83a8bff4150748d66020f4c7224091952aa4b4df1ec4959c39d937a1" dependencies = [ "bumpalo", - "cranelift-bforest 0.114.0", - "cranelift-bitset 0.114.0", - "cranelift-codegen-meta 0.114.0", - "cranelift-codegen-shared 0.114.0", - "cranelift-control 0.114.0", - "cranelift-entity 0.114.0", - "cranelift-isle 0.114.0", + "cranelift-bforest 0.113.1", + "cranelift-bitset 0.113.1", + "cranelift-codegen-meta 0.113.1", + "cranelift-codegen-shared 0.113.1", + "cranelift-control 0.113.1", + "cranelift-entity 0.113.1", + "cranelift-isle 0.113.1", "gimli 0.31.1", "hashbrown 0.14.5", "log", "regalloc2", "rustc-hash 2.0.0", - "serde", "smallvec", "target-lexicon", ] @@ -1925,11 +1924,11 @@ dependencies = [ [[package]] name = "cranelift-codegen-meta" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7ca74f4b68319da11d39e894437cb6e20ec7c2e11fbbda823c3bf207beedff7" +checksum = "2df8a86a34236cc75a8a6a271973da779c2aeb36c43b6e14da474cf931317082" dependencies = [ - "cranelift-codegen-shared 0.114.0", + "cranelift-codegen-shared 0.113.1", ] [[package]] @@ -1940,9 +1939,9 @@ checksum = "18f81aefad1f80ed4132ae33f40b92779eeb57edeb1e28bb24424a4098c963a2" [[package]] name = "cranelift-codegen-shared" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897e54f433a0269c4187871aa06d452214d5515d228d5bdc22219585e9eef895" +checksum = "cf75340b6a57b7c7c1b74f10d3d90883ee6d43a554be8131a4046c2ebcf5eb65" [[package]] name = "cranelift-control" @@ -1955,9 +1954,9 @@ dependencies = [ [[package]] name = "cranelift-control" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29cb4018f5bf59fb53f515fa9d80e6f8c5ce19f198dc538984ebd23ecf8965ec" +checksum = "2e84495bc5d23d86aad8c86f8ade4af765b94882af60d60e271d3153942f1978" dependencies = [ "arbitrary", ] @@ -1975,11 +1974,11 @@ dependencies = [ [[package]] name = "cranelift-entity" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "305399fd781a2953ac78c1396f02ff53144f39c33eb7fc7789cf4e8936d13a96" +checksum = "963c17147b80df351965e57c04d20dbedc85bcaf44c3436780a59a3f1ff1b1c2" dependencies = [ - "cranelift-bitset 0.114.0", + "cranelift-bitset 0.113.1", "serde", "serde_derive", ] @@ -1998,11 +1997,11 @@ dependencies = [ [[package]] name = "cranelift-frontend" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9230b460a128d53653456137751d27baf567947a3ab8c0c4d6e31fd08036d81e" +checksum = "727f02acbc4b4cb2ba38a6637101d579db50190df1dd05168c68e762851a3dd5" dependencies = [ - "cranelift-codegen 0.114.0", + "cranelift-codegen 0.113.1", "log", "smallvec", "target-lexicon", @@ -2016,9 +2015,9 @@ checksum = "464a6b958ce05e0c237c8b25508012b6c644e8c37348213a8c786ba29e28cfdb" [[package]] name = "cranelift-isle" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b961e24ae3ec9813a24a15ae64bbd2a42e4de4d79a7f3225a412e3b94e78d1c8" +checksum = "32b00cc2e03c748f2531eea01c871f502b909d30295fdcad43aec7bf5c5b4667" [[package]] name = "cranelift-native" @@ -2033,11 +2032,11 @@ dependencies = [ [[package]] name = "cranelift-native" -version = "0.114.0" +version = "0.113.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d5bd76df6c9151188dfa428c863b33da5b34561b67f43c0cf3f24a794f9fa1f" +checksum = "bbeaf978dc7c1a2de8bbb9162510ed218eb156697bc45590b8fbdd69bb08e8de" dependencies = [ - "cranelift-codegen 0.114.0", + "cranelift-codegen 0.113.1", "libc", "target-lexicon", ] @@ -3148,9 +3147,9 @@ checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "fd-lock" @@ -3809,7 +3808,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3947,7 +3946,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.3.0" -source = "git+https://github.com/splitgraph/iceberg-rust?rev=dbe5858039e676b45329a5cfc46cc1e7d9b98402#dbe5858039e676b45329a5cfc46cc1e7d9b98402" +source = "git+https://github.com/splitgraph/iceberg-rust?rev=6e87893e733379558df597aa7d66f026549e3b0c#6e87893e733379558df597aa7d66f026549e3b0c" dependencies = [ "anyhow", "apache-avro", @@ -3993,7 +3992,7 @@ dependencies = [ [[package]] name = "iceberg-datafusion" version = "0.3.0" -source = "git+https://github.com/splitgraph/iceberg-rust?rev=dbe5858039e676b45329a5cfc46cc1e7d9b98402#dbe5858039e676b45329a5cfc46cc1e7d9b98402" +source = "git+https://github.com/splitgraph/iceberg-rust?rev=6e87893e733379558df597aa7d66f026549e3b0c#6e87893e733379558df597aa7d66f026549e3b0c" dependencies = [ "anyhow", "async-trait", @@ -4508,7 +4507,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -5728,8 +5727,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.5.0", - "itertools 0.13.0", + "heck 0.4.1", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -5749,7 +5748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.89", @@ -5795,11 +5794,11 @@ dependencies = [ [[package]] name = "pulley-interpreter" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3b8d81cf799e20564931e9867ca32de545188c6ee4c2e0f6e41d32f0c7dc6fb" +checksum = "df33e7f8a43ccc7f93b330fef4baf271764674926f3f4d40f4a196d54de8af26" dependencies = [ - "cranelift-bitset 0.114.0", + "cranelift-bitset 0.113.1", "log", "sptr", ] @@ -6699,6 +6698,7 @@ dependencies = [ "datafusion-functions-nested", "datafusion-remote-tables", "deltalake", + "fastrand", "futures", "hex", "iceberg", @@ -6711,6 +6711,8 @@ dependencies = [ "moka", "object_store", "object_store_factory", + "opendal", + "parquet", "percent-encoding", "prost", "rand", @@ -7056,7 +7058,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03c3c6b7927ffe7ecaa769ee0e3994da3b8cafc8f444578982c83ecb161af917" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.89", @@ -7979,9 +7981,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", "pin-project-lite", @@ -8528,12 +8530,11 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.219.1" +version = "0.218.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29cbbd772edcb8e7d524a82ee8cef8dd046fc14033796a754c3ad246d019fa54" +checksum = "22b896fa8ceb71091ace9bcb81e853f54043183a1c9667cf93422c40252ffa0a" dependencies = [ "leb128", - "wasmparser 0.219.1", ] [[package]] @@ -8575,9 +8576,9 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.219.1" +version = "0.218.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c771866898879073c53b565a6c7b49953795159836714ac56a5befb581227c5" +checksum = "b09e46c7fceceaa72b2dd1a8a137ea7fd8f93dfaa69806010a709918e496c5dc" dependencies = [ "ahash 0.8.11", "bitflags 2.6.0", @@ -8610,13 +8611,13 @@ dependencies = [ [[package]] name = "wasmprinter" -version = "0.219.1" +version = "0.218.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "228cdc1f30c27816da225d239ce4231f28941147d34713dee8f1fff7cb330e54" +checksum = "0ace089155491837b75f474bf47c99073246d1b737393fe722d6dee311595ddc" dependencies = [ "anyhow", "termcolor", - "wasmparser 0.219.1", + "wasmparser 0.218.0", ] [[package]] @@ -8677,9 +8678,9 @@ dependencies = [ [[package]] name = "wasmtime" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b79302e3e084713249cc5622e8608e7410afdeeea8c8026d04f491d1fab0b4b" +checksum = "51e762e163fd305770c6c341df3290f0cabb3c264e7952943018e9a1ced8d917" dependencies = [ "anyhow", "async-trait", @@ -8708,17 +8709,17 @@ dependencies = [ "smallvec", "sptr", "target-lexicon", - "wasmparser 0.219.1", - "wasmtime-asm-macros 27.0.0", - "wasmtime-component-macro 27.0.0", - "wasmtime-component-util 27.0.0", - "wasmtime-cranelift 27.0.0", - "wasmtime-environ 27.0.0", - "wasmtime-fiber 27.0.0", - "wasmtime-jit-icache-coherence 27.0.0", - "wasmtime-slab 27.0.0", - "wasmtime-versioned-export-macros 27.0.0", - "wasmtime-winch 27.0.0", + "wasmparser 0.218.0", + "wasmtime-asm-macros 26.0.1", + "wasmtime-component-macro 26.0.1", + "wasmtime-component-util 26.0.1", + "wasmtime-cranelift 26.0.1", + "wasmtime-environ 26.0.1", + "wasmtime-fiber 26.0.1", + "wasmtime-jit-icache-coherence 26.0.1", + "wasmtime-slab 26.0.1", + "wasmtime-versioned-export-macros 26.0.1", + "wasmtime-winch 26.0.1", "windows-sys 0.59.0", ] @@ -8733,9 +8734,9 @@ dependencies = [ [[package]] name = "wasmtime-asm-macros" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe53a24e7016a5222875d8ca3ad6024b464465985693c42098cd0bb710002c28" +checksum = "63caa7aebb546374e26257a1900fb93579171e7c02514cde26805b9ece3ef812" dependencies = [ "cfg-if", ] @@ -8777,17 +8778,17 @@ dependencies = [ [[package]] name = "wasmtime-component-macro" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e118acbd2bc09b32ad8606bc7cef793bf5019c1b107772e64dc6c76b5055d40b" +checksum = "d61a4b5ce2ad9c15655e830f0eac0c38b8def30c74ecac71f452d3901e491b68" dependencies = [ "anyhow", "proc-macro2", "quote", "syn 2.0.89", - "wasmtime-component-util 27.0.0", - "wasmtime-wit-bindgen 27.0.0", - "wit-parser 0.219.1", + "wasmtime-component-util 26.0.1", + "wasmtime-wit-bindgen 26.0.1", + "wit-parser 0.218.0", ] [[package]] @@ -8798,9 +8799,9 @@ checksum = "fdc29d2b56629d66d2fd791d1b46471d0016e0d684ed2dc299e870d127082268" [[package]] name = "wasmtime-component-util" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6db4f3ee18c699629eabb9c64e77efe5a93a5137f098db7cab295037ba41c2" +checksum = "35e87a1212270dbb84a49af13d82594e00a92769d6952b0ea7fc4366c949f6ad" [[package]] name = "wasmtime-cranelift" @@ -8829,17 +8830,17 @@ dependencies = [ [[package]] name = "wasmtime-cranelift" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b87e6c78f562b50aff1afd87ff32a57e241424c846c1c8f3c5fd352d2d62906" +checksum = "7cb40dddf38c6a5eefd5ce7c1baf43b00fe44eada11a319fab22e993a960262f" dependencies = [ "anyhow", "cfg-if", - "cranelift-codegen 0.114.0", - "cranelift-control 0.114.0", - "cranelift-entity 0.114.0", - "cranelift-frontend 0.114.0", - "cranelift-native 0.114.0", + "cranelift-codegen 0.113.1", + "cranelift-control 0.113.1", + "cranelift-entity 0.113.1", + "cranelift-frontend 0.113.1", + "cranelift-native 0.113.1", "gimli 0.31.1", "itertools 0.12.1", "log", @@ -8847,9 +8848,9 @@ dependencies = [ "smallvec", "target-lexicon", "thiserror 1.0.69", - "wasmparser 0.219.1", - "wasmtime-environ 27.0.0", - "wasmtime-versioned-export-macros 27.0.0", + "wasmparser 0.218.0", + "wasmtime-environ 26.0.1", + "wasmtime-versioned-export-macros 26.0.1", ] [[package]] @@ -8881,13 +8882,13 @@ dependencies = [ [[package]] name = "wasmtime-environ" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c25bfeaa16432d59a0706e2463d315ef4c9ebcfaf5605670b99d46373bdf9f27" +checksum = "8613075e89e94a48c05862243c2b718eef1b9c337f51493ebf951e149a10fa19" dependencies = [ "anyhow", - "cranelift-bitset 0.114.0", - "cranelift-entity 0.114.0", + "cranelift-bitset 0.113.1", + "cranelift-entity 0.113.1", "gimli 0.31.1", "indexmap 2.6.0", "log", @@ -8898,10 +8899,10 @@ dependencies = [ "serde_derive", "smallvec", "target-lexicon", - "wasm-encoder 0.219.1", - "wasmparser 0.219.1", - "wasmprinter 0.219.1", - "wasmtime-component-util 27.0.0", + "wasm-encoder 0.218.0", + "wasmparser 0.218.0", + "wasmprinter 0.218.0", + "wasmtime-component-util 26.0.1", ] [[package]] @@ -8921,16 +8922,16 @@ dependencies = [ [[package]] name = "wasmtime-fiber" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759ab0caa3821a6211743fe1eed448ab9df439e3af6c60dea15486c055611806" +checksum = "77acabfbcd89a4d47ad117fb31e340c824e2f49597105402c3127457b6230995" dependencies = [ "anyhow", "cc", "cfg-if", "rustix", - "wasmtime-asm-macros 27.0.0", - "wasmtime-versioned-export-macros 27.0.0", + "wasmtime-asm-macros 26.0.1", + "wasmtime-versioned-export-macros 26.0.1", "windows-sys 0.59.0", ] @@ -8960,9 +8961,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-icache-coherence" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91b218a92866f74f35162f5d03a4e0f62cd0e1cc624285b1014275e5d4575fad" +checksum = "da47fba49af72581bc0dc67c8faaf5ee550e6f106e285122a184a675193701a5" dependencies = [ "anyhow", "cfg-if", @@ -8978,9 +8979,9 @@ checksum = "f75cba1a8cc327839f493cfc3036c9de3d077d59ab76296bc710ee5f95be5391" [[package]] name = "wasmtime-slab" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d5f8acf677ee6b3b8ba400dd9753ea4769e56a95c4b30b045ac6d2d54b2f8ea" +checksum = "770e10cdefb15f2b6304152978e115bd062753c1ebe7221c0b6b104fa0419ff6" [[package]] name = "wasmtime-types" @@ -9009,9 +9010,9 @@ dependencies = [ [[package]] name = "wasmtime-versioned-export-macros" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df09be00c38f49172ca9936998938476e3f2df782673a39ae2ef9fb0838341b6" +checksum = "db8efb877c9e5e67239d4553bb44dd2a34ae5cfb728f3cf2c5e64439c6ca6ee7" dependencies = [ "proc-macro2", "quote", @@ -9020,9 +9021,9 @@ dependencies = [ [[package]] name = "wasmtime-wasi" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad5cf227161565057fc994edf14180341817372a218f1597db48a43946e5f875" +checksum = "f16c8d87a45168131be6815045e6d46d7f6ddf65897c49444ab210488bce10dc" dependencies = [ "anyhow", "async-trait", @@ -9037,14 +9038,15 @@ dependencies = [ "futures", "io-extras", "io-lifetimes", + "once_cell", "rustix", "system-interface", "thiserror 1.0.69", "tokio", "tracing", "url", - "wasmtime 27.0.0", - "wiggle 27.0.0", + "wasmtime 26.0.1", + "wiggle 26.0.1", "windows-sys 0.59.0", ] @@ -9067,19 +9069,19 @@ dependencies = [ [[package]] name = "wasmtime-winch" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d6b5297bea14d8387c3974b2b011de628cc9b188f135cec752b74fd368964b" +checksum = "4f7a267367382ceec3e7f7ace63a63b83d86f4a680846743dead644e10f08150" dependencies = [ "anyhow", - "cranelift-codegen 0.114.0", + "cranelift-codegen 0.113.1", "gimli 0.31.1", "object", "target-lexicon", - "wasmparser 0.219.1", - "wasmtime-cranelift 27.0.0", - "wasmtime-environ 27.0.0", - "winch-codegen 27.0.0", + "wasmparser 0.218.0", + "wasmtime-cranelift 26.0.1", + "wasmtime-environ 26.0.1", + "winch-codegen 26.0.1", ] [[package]] @@ -9096,14 +9098,14 @@ dependencies = [ [[package]] name = "wasmtime-wit-bindgen" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf3963c9c29df91564d8bd181eb00d0dbaeafa1b2a01e15952bb7391166b704e" +checksum = "4bef2a726fd8d1ee9b0144655e16c492dc32eb4c7c9f7e3309fcffe637870933" dependencies = [ "anyhow", "heck 0.5.0", "indexmap 2.6.0", - "wit-parser 0.219.1", + "wit-parser 0.218.0", ] [[package]] @@ -9212,17 +9214,17 @@ dependencies = [ [[package]] name = "wiggle" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e0f6ef83a263c0fa11957c363aeaa76dc84832484d0e119f22810d4d0e09a7" +checksum = "b0f25588cf5ea16f56c1af13244486d50c5a2cf67cc0c4e990c665944d741546" dependencies = [ "anyhow", "async-trait", "bitflags 2.6.0", "thiserror 1.0.69", "tracing", - "wasmtime 27.0.0", - "wiggle-macro 27.0.0", + "wasmtime 26.0.1", + "wiggle-macro 26.0.1", ] [[package]] @@ -9242,9 +9244,9 @@ dependencies = [ [[package]] name = "wiggle-generate" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd266b290a0fdace3af6a05c6ebbcc54de303a774448ecf5a98cd0bc12d89c52" +checksum = "28ff23bed568b335dac6a324b8b167318a0c60555199445fcc89745a5eb42452" dependencies = [ "anyhow", "heck 0.5.0", @@ -9269,14 +9271,14 @@ dependencies = [ [[package]] name = "wiggle-macro" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b8eb1a5783540696c59cefbfc9e52570c2d5e62bd47bdf0bdcef29231879db2" +checksum = "7f13be83541aa0b033ac5ec8a8b59c9a8d8b32305845b8466dd066e722cb0004" dependencies = [ "proc-macro2", "quote", "syn 2.0.89", - "wiggle-generate 27.0.0", + "wiggle-generate 26.0.1", ] [[package]] @@ -9301,7 +9303,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -9329,19 +9331,19 @@ dependencies = [ [[package]] name = "winch-codegen" -version = "27.0.0" +version = "26.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b42b678c8651ec4900d7600037d235429fc985c31cbc33515885ec0d2a9e158" +checksum = "07ab957fc71a36c63834b9b51cc2e087c4260d5ff810a5309ab99f7fbeb19567" dependencies = [ "anyhow", - "cranelift-codegen 0.114.0", + "cranelift-codegen 0.113.1", "gimli 0.31.1", "regalloc2", "smallvec", "target-lexicon", - "wasmparser 0.219.1", - "wasmtime-cranelift 27.0.0", - "wasmtime-environ 27.0.0", + "wasmparser 0.218.0", + "wasmtime-cranelift 26.0.1", + "wasmtime-environ 26.0.1", ] [[package]] @@ -9594,9 +9596,9 @@ dependencies = [ [[package]] name = "wit-parser" -version = "0.219.1" +version = "0.218.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a86f669283257e8e424b9a4fc3518e3ade0b95deb9fbc0f93a1876be3eda598" +checksum = "0d3d1066ab761b115f97fef2b191090faabcb0f37b555b758d3caf42d4ed9e55" dependencies = [ "anyhow", "id-arena", @@ -9607,7 +9609,7 @@ dependencies = [ "serde_derive", "serde_json", "unicode-xid", - "wasmparser 0.219.1", + "wasmparser 0.218.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5c40cad8..950b8834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,8 @@ datafusion-functions-nested = "43.0.0" futures = "0.3" -iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" } -iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "dbe5858039e676b45329a5cfc46cc1e7d9b98402" } +iceberg = { git = "https://github.com/splitgraph/iceberg-rust", rev = "6e87893e733379558df597aa7d66f026549e3b0c" } +iceberg-datafusion = { git = "https://github.com/splitgraph/iceberg-rust", rev = "6e87893e733379558df597aa7d66f026549e3b0c" } itertools = ">=0.10.0" object_store = { version = "0.11", features = ["aws", "azure", "gcp"] } @@ -100,6 +100,7 @@ datafusion-functions-nested = { workspace = true } datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true } deltalake = { git = "https://github.com/splitgraph/delta-rs", rev = "eff5735698279c12ae4a3aac2afa268d168242b2", features = ["datafusion", "s3"] } +fastrand = "2.2.0" futures = "0.3" hex = ">=0.4.0" @@ -115,6 +116,8 @@ metrics-exporter-prometheus = { version = "0.15.3" } moka = { version = "0.12.5", default-features = false, features = ["future", "atomic64", "quanta"] } object_store = { workspace = true } object_store_factory = { path = "object_store_factory" } +opendal = { version = "0.50" } +parquet = "53.3.0" percent-encoding = "2.2.0" prost = { workspace = true } @@ -150,7 +153,7 @@ warp = "0.3.6" # For WASM user-defined functions wasi-common = "25.0.0" wasmtime = "25.0.2" -wasmtime-wasi = "27.0.0" +wasmtime-wasi = "26.0.1" [dev-dependencies] assert_cmd = "2" diff --git a/src/context/iceberg.rs b/src/context/iceberg.rs new file mode 100644 index 00000000..ae03f763 --- /dev/null +++ b/src/context/iceberg.rs @@ -0,0 +1,379 @@ +use core::str; +use std::collections::HashMap; +use std::error::Error; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use arrow::array::RecordBatch; +use arrow_schema::{Field, Schema, SchemaRef}; +use datafusion::error::Result; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::ExecutionPlanProperties; +use datafusion_common::{DataFusionError, TableReference}; +use futures::{pin_mut, StreamExt, TryStream, TryStreamExt}; +use iceberg::io::FileIO; +use iceberg::spec::{ + BoundPartitionSpec, DataContentType, DataFileFormat, FormatVersion, Manifest, + ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, + ManifestMetadata, ManifestStatus, ManifestWriter, Operation, Snapshot, + SnapshotReference, SnapshotRetention, Struct, Summary, TableMetadata, + TableMetadataBuilder, +}; +use iceberg::table::Table; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::{FileWriter, FileWriterBuilder, ParquetWriterBuilder}; +use iceberg::TableCreation; +use opendal; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::file::properties::WriterProperties; +use tracing::info; +use url::Url; +use uuid::Uuid; + +use super::{LakehouseTableProvider, SeafowlContext}; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum DataLoadingError { + #[error("I/O error")] + IoError(#[from] std::io::Error), + #[error("Iceberg error")] + IcebergError(#[from] iceberg::Error), + #[error("optimistic concurrency error")] + OptimisticConcurrencyError(), + #[error("bad input error")] + BadInputError(String), +} + +// Create an empty table metadata object that contains no snapshots +fn create_empty_metadata( + iceberg_schema: &iceberg::spec::Schema, + target_url: String, +) -> Result { + let table_creation = TableCreation::builder() + .name("dummy_name".to_string()) // Required by TableCreationBuilder. Doesn't affect output + .schema(iceberg_schema.clone()) + .location(target_url.to_string()) + .build(); + + let table_metadata = + TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + Ok(table_metadata.into()) +} + +// Clone an arrow schema, assigning sequential field IDs starting from 1 +fn assign_field_ids(arrow_schema: Arc) -> Schema { + let mut field_id_counter = 1; + let new_fields: Vec = arrow_schema + .fields + .iter() + .map(|field_ref| { + let mut field: Field = (**field_ref).clone(); + let mut metadata = field_ref.metadata().clone(); + metadata.insert( + PARQUET_FIELD_ID_META_KEY.to_owned(), + field_id_counter.to_string(), + ); + field_id_counter += 1; + field.set_metadata(metadata); + field + }) + .collect(); + Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone()) +} + +// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata +fn update_metadata_snapshot( + previous_metadata: &TableMetadata, + previous_metadata_location: Option, + snapshot: Snapshot, +) -> Result { + let snapshot_id = snapshot.snapshot_id(); + let new_metadata: TableMetadata = TableMetadataBuilder::new_from_metadata( + previous_metadata.clone(), + previous_metadata_location, + ) + .add_snapshot(snapshot)? + .set_ref( + "main", + SnapshotReference::new(snapshot_id, SnapshotRetention::branch(None, None, None)), + )? + .build()? + .into(); + Ok(new_metadata) +} + +async fn get_manifest_files( + file_io: &FileIO, + table_metadata: &TableMetadata, +) -> Result>, DataLoadingError> { + let snapshot = match table_metadata.current_snapshot() { + None => return Ok(None), + Some(s) => s, + }; + let manifest_list = snapshot.load_manifest_list(file_io, table_metadata).await?; + Ok(Some(manifest_list.consume_entries().into_iter().collect())) +} + +const DEFAULT_SCHEMA_ID: i32 = 0; + +pub async fn record_batches_to_iceberg( + record_batch_stream: impl TryStream>, + arrow_schema: SchemaRef, + table: &Table, +) -> Result<(), DataLoadingError> { + pin_mut!(record_batch_stream); + + let table_location = table.metadata().location(); + let table_base_url = Url::parse(table_location).unwrap(); + + let file_io = table.file_io(); + let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone()); + let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema( + &arrow_schema_with_ids, + )?); + + let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url); + let version_hint_input = file_io.new_input(&version_hint_location)?; + let old_version_hint: Option = if version_hint_input.exists().await? { + let version_hint_bytes = version_hint_input.read().await?; + let version_hint_string: String = String::from_utf8(version_hint_bytes.to_vec()) + .map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in version-hint.text", + )) + })?; + let version_hint_u64 = + version_hint_string.trim().parse::().map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse integer version in version-hint.text", + )) + })?; + Some(version_hint_u64) + } else { + None + }; + let (previous_metadata, previous_metadata_location) = match old_version_hint { + Some(version_hint) => { + let old_metadata_location = format!( + "{}/metadata/v{}.metadata.json", + table_base_url, version_hint + ); + let old_metadata_bytes = + file_io.new_input(&old_metadata_location)?.read().await?; + let old_metadata_string = + str::from_utf8(&old_metadata_bytes).map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse UTF-8 in old metadata file", + )) + })?; + let old_metadata = serde_json::from_str::(old_metadata_string) + .map_err(|_| { + DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::DataInvalid, + "Could not parse old metadata file", + )) + })?; + if old_metadata.current_schema() != &iceberg_schema { + return Err(DataLoadingError::IcebergError(iceberg::Error::new( + iceberg::ErrorKind::FeatureUnsupported, + "Schema changes not supported", + ))); + } + (old_metadata, Some(old_metadata_location)) + } + None => { + let empty_metadata = + create_empty_metadata(&iceberg_schema, table_base_url.to_string())?; + (empty_metadata, None) + } + }; + + let file_writer_builder = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + iceberg_schema.clone(), + file_io.clone(), + DefaultLocationGenerator::new(previous_metadata.clone()).unwrap(), + DefaultFileNameGenerator::new( + "part".to_string(), + Some(Uuid::new_v4().to_string()), + DataFileFormat::Parquet, + ), + ); + let mut file_writer = file_writer_builder.build().await.unwrap(); + + while let Some(maybe_batch) = record_batch_stream.next().await { + let batch = maybe_batch?; + file_writer.write(&batch).await?; + } + let data_files: Vec<_> = file_writer + .close() + .await? + .iter_mut() + .map(|data_file_builder| { + let data_file = data_file_builder + .content(DataContentType::Data) + .partition(Struct::empty()) + .build() + .unwrap(); + info!("Wrote data file: {:?}", data_file.file_path()); + data_file + }) + .collect(); + + let snapshot_id = fastrand::i64(..); + let sequence_number = previous_metadata.last_sequence_number() + 1; + + let manifest_file_path = format!( + "{}/metadata/manifest-{}.avro", + table_base_url, + Uuid::new_v4() + ); + let manifest_file_output = file_io.new_output(manifest_file_path)?; + let manifest_writer: ManifestWriter = + ManifestWriter::new(manifest_file_output, snapshot_id, vec![]); + let manifest_metadata = ManifestMetadata::builder() + .schema_id(DEFAULT_SCHEMA_ID) + .schema(iceberg_schema.clone()) + .partition_spec( + BoundPartitionSpec::builder(iceberg_schema.clone()) + .with_spec_id(0) + .build()?, + ) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .build(); + let manifest = Manifest::new( + manifest_metadata, + data_files + .iter() + .map(|data_file| { + ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(snapshot_id) + .data_file(data_file.clone()) + .build() + }) + .collect(), + ); + let new_manifest_file: ManifestFile = manifest_writer.write(manifest).await?; + info!("Wrote manifest file: {:?}", new_manifest_file.manifest_path); + + let new_manifest_files_vec: Vec = + match get_manifest_files(file_io, &previous_metadata).await? { + Some(mut manifest_files) => { + // Include new manifest and all manifests from previous snapshot + manifest_files.push(new_manifest_file); + manifest_files + } + None => vec![new_manifest_file], // Only include new manifest + }; + + let manifest_list_path = format!( + "{}/metadata/manifest-list-{}.avro", + table_base_url, + Uuid::new_v4() + ); + let manifest_file_output = file_io.new_output(manifest_list_path.clone())?; + let mut manifest_list_writer: ManifestListWriter = + ManifestListWriter::v2(manifest_file_output, snapshot_id, None, sequence_number); + manifest_list_writer.add_manifests(new_manifest_files_vec.into_iter())?; + manifest_list_writer.close().await?; + info!("Wrote manifest list: {:?}", manifest_list_path); + + let snapshot = Snapshot::builder() + .with_snapshot_id(snapshot_id) + .with_schema_id(DEFAULT_SCHEMA_ID) + .with_manifest_list(manifest_list_path.clone()) + .with_sequence_number(sequence_number) + .with_timestamp_ms( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64, + ) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .build(); + + let new_metadata = update_metadata_snapshot( + &previous_metadata, + previous_metadata_location, + snapshot, + )?; + let new_version_hint = match old_version_hint { + Some(x) => x + 1, + None => 0, + }; + let new_metadata_location = format!( + "{}/metadata/v{}.metadata.json", + table_base_url, new_version_hint + ); + + if let Err(iceberg_error) = file_io + .new_output(&new_metadata_location)? + .write_exclusive(serde_json::to_vec(&new_metadata).unwrap().into()) + .await + { + if let Some(iceberg_error_source) = iceberg_error.source() { + if let Some(opendal_error) = + iceberg_error_source.downcast_ref::() + { + if opendal_error.kind() == opendal::ErrorKind::ConditionNotMatch { + return Err(DataLoadingError::OptimisticConcurrencyError()); + } + } + } + return Err(iceberg_error.into()); + }; + info!("Wrote new metadata: {:?}", new_metadata_location); + + file_io + .new_output(&version_hint_location)? + .write(new_version_hint.to_string().into()) + .await?; + info!("Wrote version hint: {:?}", version_hint_location); + + Ok(()) +} + +impl SeafowlContext { + pub async fn plan_to_iceberg_table( + &self, + name: impl Into, + plan: &Arc, + ) -> Result<()> { + let provider = match self.get_lakehouse_table_provider(name).await? { + LakehouseTableProvider::Iceberg(p) => p, + _ => panic!("Expected iceberg provider"), + }; + let table = provider.table(); + + for i in 0..plan.output_partitioning().partition_count() { + let task_ctx = Arc::new(TaskContext::from(&self.inner.state())); + let stream = plan.execute(i, task_ctx)?; + + let schema = plan.schema(); + + record_batches_to_iceberg( + stream.map_err(|e| { + DataLoadingError::BadInputError(format!("Datafusion error: {}", e)) + }), + schema, + &table, + ) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + Ok(()) + } +} diff --git a/src/context/mod.rs b/src/context/mod.rs index 46b706ab..91b6f1fe 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -1,4 +1,5 @@ pub mod delta; +pub mod iceberg; pub mod logical; pub mod physical; diff --git a/src/context/physical.rs b/src/context/physical.rs index 5ef47d9c..ddbd5ab6 100644 --- a/src/context/physical.rs +++ b/src/context/physical.rs @@ -1,4 +1,5 @@ use super::delta::CreateDeltaTableDetails; +use super::LakehouseTableProvider; use crate::catalog::{DEFAULT_SCHEMA, STAGING_SCHEMA}; use crate::context::delta::plan_to_delta_adds; use crate::context::SeafowlContext; @@ -196,11 +197,21 @@ impl SeafowlContext { .. }) => { let physical = self.inner.state().create_physical_plan(input).await?; - - self.plan_to_delta_table(table_name.clone(), &physical) - .await?; - - Ok(make_dummy_exec()) + match self + .get_lakehouse_table_provider(table_name.clone()) + .await? + { + LakehouseTableProvider::Delta(_) => { + self.plan_to_delta_table(table_name.clone(), &physical) + .await?; + Ok(make_dummy_exec()) + } + LakehouseTableProvider::Iceberg(_) => { + self.plan_to_iceberg_table(table_name.clone(), &physical) + .await?; + Ok(make_dummy_exec()) + } + } } LogicalPlan::Dml(DmlStatement { table_name, diff --git a/tests/data/iceberg/default.db/iceberg_table_2/data/00000-0-e53e6d40-ff10-4bb1-984c-df4223889a07.parquet b/tests/data/iceberg/default.db/iceberg_table_2/data/00000-0-e53e6d40-ff10-4bb1-984c-df4223889a07.parquet new file mode 100644 index 00000000..d8a827b1 Binary files /dev/null and b/tests/data/iceberg/default.db/iceberg_table_2/data/00000-0-e53e6d40-ff10-4bb1-984c-df4223889a07.parquet differ diff --git a/tests/data/iceberg/default.db/iceberg_table_2/metadata/e53e6d40-ff10-4bb1-984c-df4223889a07-m0.avro b/tests/data/iceberg/default.db/iceberg_table_2/metadata/e53e6d40-ff10-4bb1-984c-df4223889a07-m0.avro new file mode 100644 index 00000000..84ee1d26 Binary files /dev/null and b/tests/data/iceberg/default.db/iceberg_table_2/metadata/e53e6d40-ff10-4bb1-984c-df4223889a07-m0.avro differ diff --git a/tests/data/iceberg/default.db/iceberg_table_2/metadata/snap-7031737507927837441-0-e53e6d40-ff10-4bb1-984c-df4223889a07.avro b/tests/data/iceberg/default.db/iceberg_table_2/metadata/snap-7031737507927837441-0-e53e6d40-ff10-4bb1-984c-df4223889a07.avro new file mode 100644 index 00000000..4dcaeba1 Binary files /dev/null and b/tests/data/iceberg/default.db/iceberg_table_2/metadata/snap-7031737507927837441-0-e53e6d40-ff10-4bb1-984c-df4223889a07.avro differ diff --git a/tests/data/iceberg/default.db/iceberg_table_2/metadata/v0.metadata.json b/tests/data/iceberg/default.db/iceberg_table_2/metadata/v0.metadata.json new file mode 100644 index 00000000..eff5c97f --- /dev/null +++ b/tests/data/iceberg/default.db/iceberg_table_2/metadata/v0.metadata.json @@ -0,0 +1 @@ +{"location":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table_2","table-uuid":"8fb14c2b-4a31-475e-9c17-bc9c500a39f5","last-updated-ms":1733931512993,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"key","type":"int","required":false},{"id":2,"name":"value","type":"string","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{},"snapshots":[],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":2,"last-sequence-number":0} diff --git a/tests/data/iceberg/default.db/iceberg_table_2/metadata/v1.metadata.json b/tests/data/iceberg/default.db/iceberg_table_2/metadata/v1.metadata.json new file mode 100644 index 00000000..a4dfacbb --- /dev/null +++ b/tests/data/iceberg/default.db/iceberg_table_2/metadata/v1.metadata.json @@ -0,0 +1 @@ +{"location":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table_2","table-uuid":"8fb14c2b-4a31-475e-9c17-bc9c500a39f5","last-updated-ms":1733931513445,"last-column-id":2,"schemas":[{"type":"struct","fields":[{"id":1,"name":"key","type":"int","required":false},{"id":2,"name":"value","type":"string","required":false}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":999,"properties":{},"current-snapshot-id":7031737507927837441,"snapshots":[{"snapshot-id":7031737507927837441,"sequence-number":1,"timestamp-ms":1733931513445,"manifest-list":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table_2/metadata/snap-7031737507927837441-0-e53e6d40-ff10-4bb1-984c-df4223889a07.avro","summary":{"operation":"append","added-files-size":"1049","added-data-files":"1","added-records":"4","total-data-files":"1","total-delete-files":"0","total-records":"4","total-files-size":"1049","total-position-deletes":"0","total-equality-deletes":"0"},"schema-id":0}],"snapshot-log":[{"snapshot-id":7031737507927837441,"timestamp-ms":1733931513445}],"metadata-log":[{"metadata-file":"s3://seafowl-test-bucket/test-data/iceberg/default.db/iceberg_table_2/metadata/v0.metadata.json","timestamp-ms":1733931512993}],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{"main":{"snapshot-id":7031737507927837441,"type":"branch"}},"format-version":2,"last-sequence-number":1} diff --git a/tests/data/iceberg/default.db/iceberg_table_2/metadata/version-hint.text b/tests/data/iceberg/default.db/iceberg_table_2/metadata/version-hint.text new file mode 100644 index 00000000..56a6051c --- /dev/null +++ b/tests/data/iceberg/default.db/iceberg_table_2/metadata/version-hint.text @@ -0,0 +1 @@ +1 \ No newline at end of file diff --git a/tests/data/iceberg/iceberg_catalog.db b/tests/data/iceberg/iceberg_catalog.db index 698e7e35..608da4ee 100644 Binary files a/tests/data/iceberg/iceberg_catalog.db and b/tests/data/iceberg/iceberg_catalog.db differ diff --git a/tests/fixtures.rs b/tests/fixtures.rs index b83c245a..7c6e1a46 100644 --- a/tests/fixtures.rs +++ b/tests/fixtures.rs @@ -68,6 +68,12 @@ pub fn schemas(include_file_without_store: bool) -> ListSchemaResponse { store: Some("minio".to_string()), format: TableFormat::Iceberg.into(), }, + TableObject { + name: "iceberg_hdfs_v1".to_string(), + path: "test-data/iceberg/default.db/iceberg_table_2/metadata/v1.metadata.json".to_string(), + store: Some("minio".to_string()), + format: TableFormat::Iceberg.into(), + }, TableObject { name: "iceberg_public".to_string(), path: "iceberg/default.db/iceberg_table/metadata/00001-f394d7ec-944b-432d-a44f-78b5ec95aae2.metadata.json".to_string(), diff --git a/tests/flight/inline_metastore.rs b/tests/flight/inline_metastore.rs index 37f0b961..eaa35d86 100644 --- a/tests/flight/inline_metastore.rs +++ b/tests/flight/inline_metastore.rs @@ -1,3 +1,5 @@ +use clade::schema::{TableFormat, TableObject}; + use crate::flight::*; #[rstest] @@ -53,3 +55,91 @@ async fn test_inline_query( ]; assert_batches_eq!(expected, &batches); } + +#[tokio::test] +async fn test_inline_iceberg_write() { + let (_context, mut client) = flight_server(TestServerType::InlineOnly).await; + + // Verify the v1 dataset is as expected + let batches = get_flight_batches_inlined( + &mut client, + "SELECT * FROM s3.iceberg_hdfs_v1 ORDER BY key".to_string(), + schemas(false), + ) + .await + .unwrap(); + + let expected = [ + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", + ]; + assert_batches_eq!(expected, &batches); + + // WHEN data is inserted into the Iceberg table at v1 + get_flight_batches_inlined( + &mut client, + "INSERT INTO s3.iceberg_hdfs_v1 (key, value) VALUES (5, 'five'), (6, 'six')" + .to_string(), + schemas(false), + ) + .await + .unwrap(); + + // THEN the v1 dataset is not affected + let batches = get_flight_batches_inlined( + &mut client, + "SELECT * FROM s3.iceberg_hdfs_v1 ORDER BY key".to_string(), + schemas(false), + ) + .await + .unwrap(); + + let expected = [ + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "+-----+-------+", + ]; + assert_batches_eq!(expected, &batches); + + // THEN the v2 dataset contains the v1 data and the inserted data + let mut s = schemas(false); + s.schemas[1].tables.push(TableObject { + name: "iceberg_hdfs_v2".to_string(), + path: "test-data/iceberg/default.db/iceberg_table_2/metadata/v2.metadata.json" + .to_string(), + store: Some("minio".to_string()), + format: TableFormat::Iceberg.into(), + }); + let batches = get_flight_batches_inlined( + &mut client, + "SELECT * FROM s3.iceberg_hdfs_v2 ORDER BY key".to_string(), + s, + ) + .await + .unwrap(); + + let expected = [ + "+-----+-------+", + "| key | value |", + "+-----+-------+", + "| 1 | one |", + "| 2 | two |", + "| 3 | three |", + "| 4 | four |", + "| 5 | five |", + "| 6 | six |", + "+-----+-------+", + ]; + assert_batches_eq!(expected, &batches); +} diff --git a/tests/http/mod.rs b/tests/http/mod.rs index 6c60bb09..8ce820cc 100644 --- a/tests/http/mod.rs +++ b/tests/http/mod.rs @@ -1,45 +1,10 @@ -mod query; -mod upload; - -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use tokio::process::Command; - -use futures::Future; -use futures::FutureExt; -use seafowl::config::context::{build_context, setup_metrics, HTTP_REQUESTS}; -use seafowl::config::schema::{load_config_from_string, Metrics}; -use seafowl::frontend::http::filters; - +use itertools::Itertools; use warp::hyper::body::to_bytes; -use warp::hyper::client::HttpConnector; use warp::hyper::Body; use warp::hyper::Client; -use warp::hyper::Method; -use warp::hyper::Request; use warp::hyper::Response; use warp::hyper::StatusCode; -use arrow::array::{ - BooleanArray, Float64Array, Int32Array, StringArray, TimestampMicrosecondArray, - TimestampNanosecondArray, -}; -use arrow::csv::WriterBuilder; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; -use arrow_schema::TimeUnit; -use datafusion::assert_batches_eq; -use datafusion::parquet::arrow::ArrowWriter; -use itertools::Itertools; -use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; -use rstest::rstest; -use seafowl::context::SeafowlContext; -use std::net::SocketAddr; -use tempfile::Builder; -use tokio::sync::oneshot; -use tokio::sync::oneshot::Sender; - // Hack because integration tests do not set cfg(test) // https://users.rust-lang.org/t/sharing-helper-function-between-unit-and-integration-tests/9941/2 #[allow(clippy::duplicate_mod)] @@ -47,97 +12,11 @@ use tokio::sync::oneshot::Sender; #[path = "../../src/testutils.rs"] mod testutils; -/// Make an HTTP server that listens on a random free port, -/// uses an in-memory SQLite and requires a password ("write_password") for writes -/// Returns the server's address, the actual server Future and a channel to stop the server -async fn make_read_only_http_server() -> ( - SocketAddr, - Pin + Send>>, - Sender<()>, - Arc, -) { - let config_text = r#" -[object_store] -type = "memory" - -[catalog] -type = "sqlite" -dsn = ":memory:" - -[frontend.http] -# sha hash of "write_password" -write_access = "b786e07f52fc72d32b2163b6f63aa16344fd8d2d84df87b6c231ab33cd5aa125""#; - - let config = load_config_from_string(config_text, false, None).unwrap(); - let context = Arc::from(build_context(config).await.unwrap()); - - let filters = filters( - context.clone(), - context.config.frontend.http.as_ref().unwrap().clone(), - ); - let (tx, rx) = oneshot::channel(); - let (addr, server) = warp::serve(filters).bind_with_graceful_shutdown( - // Pass port :0 to pick a random free port - "127.0.0.1:0".parse::().unwrap(), - async { - rx.await.ok(); - }, - ); - - dbg!(format!("Starting the server on {addr:?}")); - (addr, server.boxed(), tx, context) -} - pub async fn response_text(response: Response) -> String { let body_bytes = to_bytes(response.into_body()).await.unwrap(); String::from_utf8(body_bytes.to_vec()).unwrap() } -fn query_body(query: &str) -> Body { - Body::from(serde_json::to_string(&HashMap::from([("query", query)])).unwrap()) -} - -async fn post_query( - client: &Client, - uri: &str, - query: &str, - token: Option<&str>, -) -> Response { - q(client, Method::POST, uri, query, token).await -} - -async fn q( - client: &Client, - method: Method, - uri: &str, - query: &str, - token: Option<&str>, -) -> Response { - let uri = if method == Method::GET { - format!("{uri}/{}", utf8_percent_encode(query, NON_ALPHANUMERIC)) - } else { - uri.to_string() - }; - - let mut builder = Request::builder() - .method(method.clone()) - .uri(uri) - .header("content-type", "application/json"); - - if let Some(t) = token { - builder = builder.header("Authorization", format!("Bearer {t}")); - } - - let req = builder - .body(if method == Method::POST { - query_body(query) - } else { - Body::empty() - }) - .unwrap(); - client.request(req).await.unwrap() -} - pub async fn get_metrics(metrics_type: &str, port: u16) -> Vec { let resp = Client::new() .get( diff --git a/tests/http/query.rs b/tests/http/query.rs deleted file mode 100644 index b6affcd8..00000000 --- a/tests/http/query.rs +++ /dev/null @@ -1,155 +0,0 @@ -use crate::http::*; - -#[tokio::test] -async fn test_http_server_reader_writer() { - // It's questionable how much value this testing adds on top of the tests in http.rs, but we do: - // - test the code consumes the config correctly, which we don't do in HTTP tests - // - hit the server that's actually listening on a port instead of calling warp routines directly. - // Still, this test is mostly to make sure the whole thing roughly does what is intended by the config. - - let (addr, server, terminate, _) = make_read_only_http_server().await; - - tokio::task::spawn(server); - let client = Client::new(); - let uri = format!("http://{addr}/q"); - - // Configure metrics - setup_metrics(&Metrics::default()); - - // Test health endpoint - let resp = client - .get(format!("http://{addr}/readyz").try_into().unwrap()) - .await - .expect("Can query health endpoint"); - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(response_text(resp).await, "ready"); - - // GET & POST SELECT 1 as a read-only user - for method in [Method::GET, Method::POST] { - let resp = q(&client, method, &uri, "SELECT 1", None).await; - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - testutils::schema_from_header(resp.headers()), - Schema::new(vec![Field::new("Int64(1)", DataType::Int64, false),]) - ); - assert_eq!(response_text(resp).await, "{\"Int64(1)\":1}\n"); - } - - // POST CREATE TABLE as a read-only user - let resp = post_query(&client, &uri, "CREATE TABLE test_table (col INT)", None).await; - dbg!(&resp); - assert_eq!(resp.status(), StatusCode::FORBIDDEN); - assert_eq!(response_text(resp).await, "WRITE_FORBIDDEN"); - - // Same, wrong token - let resp = post_query( - &client, - &uri, - "CREATE TABLE test_table (col INT)", - Some("wrongpw"), - ) - .await; - dbg!(&resp); - assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); - assert_eq!(response_text(resp).await, "INVALID_ACCESS_TOKEN"); - - // Perform a write correctly - let resp = post_query( - &client, - &uri, - "CREATE TABLE test_table (col INT); INSERT INTO test_table VALUES(1); SELECT * FROM test_table", - Some("write_password"), - ) - .await; - dbg!(&resp); - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - testutils::schema_from_header(resp.headers()), - Schema::new(vec![Field::new("col", DataType::Int32, true),]) - ); - assert_eq!(response_text(resp).await, "{\"col\":1}\n"); - - // Test the DB-scoped endpoint variant - // First create the new database - let resp = post_query( - &client, - &uri, - "CREATE DATABASE new_db", - Some("write_password"), - ) - .await; - dbg!(&resp); - assert_eq!(resp.status(), StatusCode::OK); - - let scoped_uri = format!("http://{addr}/new_db/q"); - - // Also test serialization of schemas with special characters - let resp = post_query( - &client, - &scoped_uri, - r#"CREATE TABLE new_table ("col_with_ :;.,\/""'?!(){}[]@<>=-+*#$&`|~^%" INT); INSERT INTO new_table VALUES(2); SELECT * FROM new_table"#, - Some("write_password") - ).await; - dbg!(&resp); - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - testutils::schema_from_header(resp.headers()), - Schema::new(vec![Field::new( - r#"col_with_ :;.,\/"'?!(){}[]@<>=-+*#$&`|~^%"#, - DataType::Int32, - true - ),]) - ); - assert_eq!( - response_text(resp).await, - "{\"col_with_ :;.,\\\\/\\\"'?!(){}[]@<>=-+*#$&`|~^%\":2}\n" - ); - - // Test multi-statement query starting with external table creation; it gets changed with almost - // every DataFusion upgrade, so it can sometimes introduce regressions. - let resp = post_query( - &client, - &uri, - "CREATE EXTERNAL TABLE test_external \ - STORED AS PARQUET \ - LOCATION './tests/data/table_with_ns_column.parquet'; \ - CREATE TABLE test_external AS SELECT * FROM staging.test_external; \ - SELECT * FROM test_external LIMIT 1", - Some("write_password"), - ) - .await; - assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - testutils::schema_from_header(resp.headers()), - Schema::new(vec![ - Field::new("some_int_value", DataType::Int64, true), - Field::new( - "some_time", - DataType::Timestamp(TimeUnit::Microsecond, None), - true - ), - Field::new("some_value", DataType::Float32, true), - ]) - ); - assert_eq!(response_text(resp).await, "{\"some_int_value\":1111,\"some_time\":\"2022-01-01T20:01:01\",\"some_value\":42.0}\n"); - - // Finally test HTTP-related metrics - assert_eq!( - get_metrics(HTTP_REQUESTS, 9090).await, - vec![ - "# HELP http_requests Counter tracking HTTP request statistics", - "# TYPE http_requests counter", - "http_requests{method=\"GET\",route=\"/q\",status=\"200\"} 1", - "http_requests{method=\"GET\",route=\"/readyz\",status=\"200\"} 1", - "http_requests{method=\"POST\",route=\"/q\",status=\"200\"} 5", - "http_requests{method=\"POST\",route=\"/q\",status=\"403\"} 1", - "http_requests{method=\"POST\",route=\"/q\",status=\"500\"} 1", - ] - ); - - // Stop the server - // NB this won't run if the test fails, but at that point we're terminating the process - // anyway. Maybe it'll become a problem if we have a bunch of tests running that all - // start servers and don't stop them. - terminate.send(()).unwrap(); -} diff --git a/tests/http/upload.rs b/tests/http/upload.rs deleted file mode 100644 index 172d2c61..00000000 --- a/tests/http/upload.rs +++ /dev/null @@ -1,370 +0,0 @@ -use crate::http::*; - -#[rstest] -#[case::csv_schema_supplied_with_headers("csv", true, Some(true))] -#[case::csv_schema_supplied_no_headers("csv", true, Some(false))] -#[case::csv_schema_inferred_with_headers("csv", false, Some(true))] -#[case::csv_schema_inferred_no_headers("csv", false, Some(false))] -#[case::parquet("parquet", false, None)] -#[tokio::test] -async fn test_upload_base( - #[case] file_format: &str, - #[case] include_schema: bool, - #[case] add_headers: Option, - #[values(None, Some("test_db"))] db_prefix: Option<&str>, -) { - let (addr, server, terminate, mut context) = make_read_only_http_server().await; - - tokio::task::spawn(server); - - let table_name = format!("{file_format}_table"); - - // Prepare the schema + data (a single record batch) which we'll save to a temp file via - // a corresponding writer - let schema = Arc::new(Schema::new(vec![ - Field::new("number", DataType::Int32, false), - Field::new("parity", DataType::Utf8, false), - ])); - - // For CSV uploads we can supply the schema as another part of the multipart request, to - // remove the ambiguity resulting from automatic schema inference - let schema_json = serde_json::to_string(&schema).unwrap(); - - let range = 0..2000; - let mut input_batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(range.clone().collect_vec())), - Arc::new(StringArray::from( - range - .map(|number| if number % 2 == 0 { "even" } else { "odd" }) - .collect_vec(), - )), - ], - ) - .unwrap(); - - // Open a temp file to write the data into - let mut named_tempfile = Builder::new() - .suffix(format!(".{file_format}").as_str()) - .tempfile() - .unwrap(); - - // Write out the CSV/Parquet format data to a temp file - // drop the writer early to release the borrow. - if file_format == "csv" { - let mut writer = WriterBuilder::new() - .with_header(add_headers.unwrap_or(true)) - .build(&mut named_tempfile); - writer.write(&input_batch).unwrap(); - } else if file_format == "parquet" { - let mut writer = ArrowWriter::try_new(&mut named_tempfile, schema, None).unwrap(); - writer.write(&input_batch).unwrap(); - writer.close().unwrap(); - } - - // Generate curl arguments - let mut curl_args: Vec = vec![ - "-H".to_string(), - "Authorization: Bearer write_password".to_string(), - ]; - if include_schema { - curl_args.append(&mut vec![ - "-F".to_string(), - format!("schema={schema_json};type=application/json"), - ]); - } - if let Some(has_headers) = add_headers { - curl_args.append(&mut vec![ - "-F".to_string(), - format!("has_header={has_headers}"), - ]) - } - - let db_path = if let Some(db_name) = db_prefix { - // Create the target database first - context - .collect( - context - .plan_query(format!("CREATE DATABASE {db_name}").as_str()) - .await - .unwrap(), - ) - .await - .unwrap(); - format!("/{db_name}") - } else { - String::from("") - }; - - curl_args.append(&mut vec![ - "-F".to_string(), - format!("data=@{}", named_tempfile.path().to_str().unwrap()), - format!("http://{addr}{db_path}/upload/test_upload/{table_name}"), - ]); - - let mut child = Command::new("curl").args(curl_args).spawn().unwrap(); - let status = child.wait().await.unwrap(); - dbg!("Upload status is {}", status); - - // Verify the newly created table contents - if let Some(db_name) = db_prefix { - context = context.scope_to_catalog(db_name.to_string()); - } - let plan = context - .plan_query(format!("SELECT * FROM test_upload.{table_name}").as_str()) - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - // Generate expected output from the input batch that was used in the multipart request - if !include_schema && add_headers == Some(false) { - // Rename the columns, as they will be inferred without a schema - input_batch = RecordBatch::try_from_iter_with_nullable(vec![ - ("column_1", input_batch.column(0).clone(), false), - ("column_2", input_batch.column(1).clone(), false), - ]) - .unwrap(); - } - let formatted = arrow::util::pretty::pretty_format_batches(&[input_batch]) - .unwrap() - .to_string(); - - let expected: Vec<&str> = formatted.trim().lines().collect(); - - assert_batches_eq!(expected, &results); - - terminate.send(()).unwrap(); -} - -#[tokio::test] -async fn test_upload_to_existing_table() { - let (addr, server, terminate, context) = make_read_only_http_server().await; - - tokio::task::spawn(server); - - // Create a pre-existing table to upload into - context - .collect( - context - .plan_query("CREATE TABLE test_table(col_1 INT, col_2 TEXT, col_3 DOUBLE, col_4 TIMESTAMP) \ - AS VALUES (1, 'one', 1.0, '2001-01-01 01:01:01'), (2, 'two', 2.0, '2002-02-02 02:02:02')") - .await - .unwrap() - ) - .await - .unwrap(); - - // Prepare the schema that matches the existing table + some data - let schema = Arc::new(Schema::new(vec![ - Field::new("col_1", DataType::Int32, true), - Field::new("col_2", DataType::Utf8, true), - Field::new("col_3", DataType::Float64, true), - Field::new( - "col_4", - DataType::Timestamp(TimeUnit::Microsecond, None), - true, - ), - ])); - - let input_batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![3, 4])), - Arc::new(StringArray::from(vec![Some("three"), Some("four")])), - Arc::new(Float64Array::from(vec![3.0, 4.0])), - Arc::new(TimestampMicrosecondArray::from(vec![ - 1046660583000000, - 1081051444000000, - ])), - ], - ) - .unwrap(); - - let mut named_tempfile = Builder::new().suffix(".parquet").tempfile().unwrap(); - // drop the writer early to release the borrow. - { - let mut writer = ArrowWriter::try_new(&mut named_tempfile, schema, None).unwrap(); - writer.write(&input_batch).unwrap(); - writer.close().unwrap(); - } - - let mut child = Command::new("curl") - .args([ - "-H", - "Authorization: Bearer write_password", - "-F", - format!("data=@{}", named_tempfile.path().to_str().unwrap()).as_str(), - format!("http://{addr}/upload/public/test_table").as_str(), - ]) - .spawn() - .unwrap(); - let status = child.wait().await.unwrap(); - dbg!("Upload status is {}", status); - - // Verify the newly created table contents - let plan = context - .plan_query("SELECT * FROM test_table ORDER BY col_1") - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - let expected = [ - "+-------+-------+-------+---------------------+", - "| col_1 | col_2 | col_3 | col_4 |", - "+-------+-------+-------+---------------------+", - "| 1 | one | 1.0 | 2001-01-01T01:01:01 |", - "| 2 | two | 2.0 | 2002-02-02T02:02:02 |", - "| 3 | three | 3.0 | 2003-03-03T03:03:03 |", - "| 4 | four | 4.0 | 2004-04-04T04:04:04 |", - "+-------+-------+-------+---------------------+", - ]; - - assert_batches_eq!(expected, &results); - - // Now try with schema that doesn't match the existing table, but can be coerced to it; also - // has one missing column supposed to be replaced with NULL's, and one extra column that should - // be ignored. - let schema = Arc::new(Schema::new(vec![ - Field::new("col_1", DataType::Float64, true), - Field::new("col_3", DataType::Int32, true), - Field::new("col_5", DataType::Boolean, true), - Field::new( - "col_4", - DataType::Timestamp(TimeUnit::Nanosecond, None), - true, - ), - ])); - - let input_batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Float64Array::from(vec![5.0, 6.0])), - Arc::new(Int32Array::from(vec![5, 6])), - Arc::new(BooleanArray::from(vec![Some(false), Some(true)])), - Arc::new(TimestampNanosecondArray::from(vec![ - 1115269505000000000, - 1149573966000000000, - ])), - ], - ) - .unwrap(); - - // Open a temp file to write the data into - let mut named_tempfile = Builder::new().suffix(".parquet").tempfile().unwrap(); - // drop the writer early to release the borrow. - { - let mut writer = ArrowWriter::try_new(&mut named_tempfile, schema, None).unwrap(); - writer.write(&input_batch).unwrap(); - writer.close().unwrap(); - } - - Command::new("curl") - .args([ - "-H", - "Authorization: Bearer write_password", - "-F", - format!("data=@{}", named_tempfile.path().to_str().unwrap()).as_str(), - format!("http://{addr}/upload/public/test_table").as_str(), - ]) - .output() - .await - .unwrap(); - - dbg!("Upload status is {}", status); - - // Verify that the rows have been appended - let plan = context - .plan_query("SELECT * FROM test_table ORDER BY col_1") - .await - .unwrap(); - let results = context.collect(plan).await.unwrap(); - - let expected = [ - "+-------+-------+-------+---------------------+", - "| col_1 | col_2 | col_3 | col_4 |", - "+-------+-------+-------+---------------------+", - "| 1 | one | 1.0 | 2001-01-01T01:01:01 |", - "| 2 | two | 2.0 | 2002-02-02T02:02:02 |", - "| 3 | three | 3.0 | 2003-03-03T03:03:03 |", - "| 4 | four | 4.0 | 2004-04-04T04:04:04 |", - "| 5 | | 5.0 | 2005-05-05T05:05:05 |", - "| 6 | | 6.0 | 2006-06-06T06:06:06 |", - "+-------+-------+-------+---------------------+", - ]; - - assert_batches_eq!(expected, &results); - - // Finally try with schema that can't be coerced to the existing table. - let schema = Arc::new(Schema::new(vec![ - Field::new("col_1", DataType::Boolean, true), - Field::new("col_2", DataType::Boolean, true), - Field::new("col_3", DataType::Boolean, true), - Field::new("col_4", DataType::Boolean, true), - ])); - - let input_batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(BooleanArray::from(vec![Some(false), Some(true)])), - Arc::new(BooleanArray::from(vec![Some(false), Some(true)])), - Arc::new(BooleanArray::from(vec![Some(false), Some(true)])), - Arc::new(BooleanArray::from(vec![Some(false), Some(true)])), - ], - ) - .unwrap(); - - // Open a temp file to write the data into - let mut named_tempfile = Builder::new().suffix(".parquet").tempfile().unwrap(); - // drop the writer early to release the borrow. - { - let mut writer = ArrowWriter::try_new(&mut named_tempfile, schema, None).unwrap(); - writer.write(&input_batch).unwrap(); - writer.close().unwrap(); - } - - let output = Command::new("curl") - .args([ - "-H", - "Authorization: Bearer write_password", - "-F", - format!("data=@{}", named_tempfile.path().to_str().unwrap()).as_str(), - format!("http://{addr}/upload/public/test_table").as_str(), - ]) - .output() - .await - .unwrap(); - - dbg!("Upload status is {}", status); - - assert!( - String::from_utf8(output.stdout).unwrap().contains("Error during planning: Cannot cast file schema field col_4 of type Boolean to table schema field of type Timestamp(Microsecond, None)") - ); - - terminate.send(()).unwrap(); -} - -#[tokio::test] -async fn test_upload_not_writer_authz() { - let (addr, server, terminate, _context) = make_read_only_http_server().await; - - tokio::task::spawn(server); - - let output = Command::new("curl") - .args([ - "-H", - "Authorization: Bearer wrong_password", - "-F", - "data='doesntmatter'", - format!("http://{addr}/upload/public/test_table").as_str(), - ]) - .output() - .await - .unwrap(); - - assert_eq!( - "INVALID_ACCESS_TOKEN".to_string(), - String::from_utf8(output.stdout).unwrap() - ); - terminate.send(()).unwrap(); -}