From 094f357cb742c3b7f72410f058fb511fcdb92b69 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 10 Dec 2024 09:36:16 -0500 Subject: [PATCH] First pass at serializeable storage --- Cargo.lock | 47 ++++++++++++++++++++++++++++ icechunk/Cargo.toml | 3 +- icechunk/src/refs.rs | 3 +- icechunk/src/repo.rs | 2 +- icechunk/src/storage/caching.rs | 29 +++++++++++++++-- icechunk/src/storage/logging.rs | 13 +++++++- icechunk/src/storage/mod.rs | 4 +-- icechunk/src/storage/object_store.rs | 42 +++++++++++++++---------- icechunk/src/storage/s3.rs | 3 +- icechunk/src/store.rs | 3 +- 10 files changed, 120 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6ff09beb..4326cfcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -839,6 +839,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "erased-serde" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24e2389d65ab4fab27dc2a5de7b191e1f6617d1f1c8855c0dc569c94a4cbb18d" +dependencies = [ + "serde", + "typeid", +] + [[package]] name = "errno" version = "0.3.9" @@ -1239,6 +1249,7 @@ dependencies = [ "thiserror", "tokio", "typed-path", + "typetag", "url", ] @@ -1431,6 +1442,12 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "inventory" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767" + [[package]] name = "itertools" version = "0.13.0" @@ -2585,12 +2602,42 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82205ffd44a9697e34fc145491aa47310f9871540bb7909eaa9365e0a9a46607" +[[package]] +name = "typeid" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e13db2e0ccd5e14a544e8a246ba2312cd25223f616442d7f2cb0e3db614236e" + [[package]] name = "typenum" version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "typetag" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ba3b6e86ffe0054b2c44f2d86407388b933b16cb0a70eea3929420db1d9bbe" +dependencies = [ + "erased-serde", + "inventory", + "once_cell", + "serde", + "typetag-impl", +] + +[[package]] +name = "typetag-impl" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70b20a22c42c8f1cd23ce5e34f165d4d37038f5b663ad20fb6adbdf029172483" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unarray" version = "0.1.4" diff --git a/icechunk/Cargo.toml b/icechunk/Cargo.toml index c20554c4..7d135b99 100644 --- a/icechunk/Cargo.toml +++ b/icechunk/Cargo.toml @@ -22,7 +22,7 @@ object_store = { version = "0.11.1" } rand = "0.8.5" thiserror = "2.0.3" serde_json = "1.0.133" -serde = { version = "1.0.215", features = ["derive"] } +serde = { version = "1.0.215", features = ["derive", "rc"] } serde_with = { version = "3.11.0", features = ["hex"] } tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] } test-strategy = "0.4.0" @@ -40,6 +40,7 @@ aws-config = "1.5.8" aws-credential-types = "1.2.1" typed-path = "0.9.3" aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono", "convert-streams"] } +typetag = "0.2.18" [dev-dependencies] pretty_assertions = "1.4.1" diff --git a/icechunk/src/refs.rs b/icechunk/src/refs.rs index 761b8141..75015678 100644 --- a/icechunk/src/refs.rs +++ b/icechunk/src/refs.rs @@ -360,7 +360,8 @@ mod tests { mut f: F, ) -> ((Arc, R), (Arc, R, TempDir)) { let prefix: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 10); - let mem_storage = Arc::new(ObjectStorage::new_in_memory_store(Some(prefix))); + let mem_storage = + Arc::new(ObjectStorage::new_in_memory_store(Some(prefix)).unwrap()); let res1 = f(Arc::clone(&mem_storage) as Arc).await; let dir = tempdir().expect("cannot create temp dir"); diff --git a/icechunk/src/repo.rs b/icechunk/src/repo.rs index e70724ac..41723b58 100644 --- a/icechunk/src/repo.rs +++ b/icechunk/src/repo.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use crate::{ format::{snapshot::{Snapshot, SnapshotMetadata}, SnapshotId}, refs::{ - create_tag, fetch_branch_tip, fetch_ref, fetch_tag, list_branches, list_tags, + create_tag, fetch_branch_tip, fetch_tag, list_branches, list_tags, update_branch, BranchVersion, Ref, RefError, }, repository::{raise_if_invalid_snapshot_id, RepositoryError, RepositoryResult}, diff --git a/icechunk/src/storage/caching.rs b/icechunk/src/storage/caching.rs index 49dba59a..31b579c8 100644 --- a/icechunk/src/storage/caching.rs +++ b/icechunk/src/storage/caching.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; use quick_cache::sync::Cache; +use serde::{Deserialize, Serialize}; use crate::{ format::{ @@ -16,13 +17,19 @@ use crate::{ use super::{ListInfo, Storage, StorageError, StorageResult}; -#[derive(Debug)] +#[derive(Debug, Serialize)] +#[serde(transparent)] pub struct MemCachingStorage { backend: Arc, + #[serde(skip)] snapshot_cache: Cache>, + #[serde(skip)] manifest_cache: Cache>, + #[serde(skip)] transactions_cache: Cache>, + #[serde(skip)] attributes_cache: Cache>, + #[serde(skip)] chunk_cache: Cache<(ChunkId, ByteRange), Bytes>, } @@ -44,11 +51,27 @@ impl MemCachingStorage { chunk_cache: Cache::new(num_chunks as usize), } } + + pub fn new_with_defaults(backend: Arc) -> Self { + Self::new(backend, 2, 2, 0, 2, 0) + } } impl private::Sealed for MemCachingStorage {} +impl<'de> Deserialize<'de> for MemCachingStorage { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let backend: Arc = + Deserialize::deserialize(deserializer)?; + Ok(Self::new_with_defaults(backend)) + } +} + #[async_trait] +#[typetag::serde] impl Storage for MemCachingStorage { async fn fetch_snapshot( &self, @@ -225,7 +248,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_caching_storage_caches() -> Result<(), Box> { let backend: Arc = - Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))?); let ci1 = ChunkInfo { node: NodeId::random(), @@ -288,7 +311,7 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_caching_storage_has_limit() -> Result<(), Box> { let backend: Arc = - Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))); + Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))?); let ci1 = ChunkInfo { node: NodeId::random(), diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 2ef2ffd5..f1a54469 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; +use serde::{Deserialize, Serialize}; use super::{ListInfo, Storage, StorageError, StorageResult}; use crate::{ @@ -13,7 +14,7 @@ use crate::{ private, }; -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct LoggingStorage { backend: Arc, fetch_log: Mutex)>>, @@ -33,7 +34,17 @@ impl LoggingStorage { impl private::Sealed for LoggingStorage {} +impl<'de> Deserialize<'de> for LoggingStorage { + fn deserialize(_deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + Err(serde::de::Error::custom("LoggingStorage cannot be deserialized directly")) + } +} + #[async_trait] +#[typetag::serde] #[allow(clippy::expect_used)] // this implementation is intended for tests only impl Storage for LoggingStorage { async fn fetch_snapshot( diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 59c478c0..bf8cef61 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -8,7 +8,6 @@ use aws_sdk_s3::{ primitives::ByteStreamError, }; use chrono::{DateTime, Utc}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; use core::fmt; use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt}; use std::{ffi::OsString, sync::Arc}; @@ -85,7 +84,8 @@ const TRANSACTION_PREFIX: &str = "transactions/"; /// Different implementation can cache the files differently, or not at all. /// Implementations are free to assume files are never overwritten. #[async_trait] -pub trait Storage<'de>: fmt::Debug + private::Sealed + Serialize + Deserialize<'de> { +#[typetag::serde(tag = "type")] +pub trait Storage: fmt::Debug + private::Sealed + Sync + Send { async fn fetch_snapshot(&self, id: &SnapshotId) -> StorageResult>; async fn fetch_attributes( &self, diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index d5cce269..5c6dbfa0 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,6 +1,8 @@ use crate::{ format::{ - attributes::AttributesTable, format_constants, manifest::Manifest, snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, ObjectId, Path, SnapshotId + attributes::AttributesTable, format_constants, manifest::Manifest, + snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, + ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, }, private, }; @@ -11,12 +13,10 @@ use futures::{ StreamExt, TryStreamExt, }; use object_store::{ - local::LocalFileSystem, memory::InMemory, parse_url, path::Path as ObjectPath, - Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, - PutMode, PutOptions, PutPayload, + parse_url, path::Path as ObjectPath, Attribute, AttributeValue, Attributes, + GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload, }; use serde::{Deserialize, Serialize}; -use url::Url; use std::{ fs::create_dir_all, future::ready, @@ -27,6 +27,7 @@ use std::{ Arc, }, }; +use url::Url; use super::{ ListInfo, Storage, StorageError, StorageResult, CHUNK_PREFIX, MANIFEST_PREFIX, @@ -81,11 +82,14 @@ impl ObjectStorage { create_dir_all(prefix).map_err(|e| e.to_string())?; let prefix = prefix.display().to_string(); let url = format!("file://{prefix}"); - Ok(Self::from_url(&url, vec![])?) + Ok(Self::from_url(&url, vec![])?) } /// Create an ObjectStore client from a URL and provided options - pub fn from_url(url: &str, options: Vec<(String, String)>) -> Result { + pub fn from_url( + url: &str, + options: Vec<(String, String)>, + ) -> Result { let url: Url = Url::parse(url).map_err(|e| e.to_string())?; let (store, path) = parse_url(&url).map_err(|e| e.to_string())?; let store: Arc = Arc::from(store); @@ -158,7 +162,12 @@ impl ObjectStorage { fn ref_key(&self, ref_key: &str) -> ObjectPath { // ObjectPath knows how to deal with empty path parts: bar//foo - ObjectPath::from(format!("{}/{}/{}", self.config.prefix.as_str(), REF_PREFIX, ref_key)) + ObjectPath::from(format!( + "{}/{}/{}", + self.config.prefix.as_str(), + REF_PREFIX, + ref_key + )) } async fn do_ref_versions(&self, ref_name: &str) -> BoxStream> { @@ -192,18 +201,20 @@ impl ObjectStorage { impl private::Sealed for ObjectStorage {} -impl <'de> serde::Deserialize<'de> for ObjectStorage { +impl<'de> serde::Deserialize<'de> for ObjectStorage { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { let config = ObjectStorageConfig::deserialize(deserializer)?; - ObjectStorage::from_url(&config.url, config.options).map_err(serde::de::Error::custom) + ObjectStorage::from_url(&config.url, config.options) + .map_err(serde::de::Error::custom) } } #[async_trait] -impl Storage<'_> for ObjectStorage { +#[typetag::serde] +impl Storage for ObjectStorage { async fn fetch_snapshot( &self, id: &SnapshotId, @@ -433,11 +444,7 @@ impl Storage<'_> for ObjectStorage { bytes: Bytes, ) -> StorageResult<()> { let key = self.ref_key(ref_key); - let mode = if overwrite_refs { - PutMode::Overwrite - } else { - PutMode::Create - }; + let mode = if overwrite_refs { PutMode::Overwrite } else { PutMode::Create }; let opts = PutOptions { mode, ..PutOptions::default() }; self.store @@ -456,7 +463,8 @@ impl Storage<'_> for ObjectStorage { &'a self, prefix: &str, ) -> StorageResult>>> { - let prefix = ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix)); + let prefix = + ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix)); let stream = self .store .list(Some(&prefix)) diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index 8e18d4b0..c4063a63 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -282,7 +282,8 @@ impl<'de> Deserialize<'de> for S3Storage { } #[async_trait] -impl Storage<'_> for S3Storage { +#[typetag::serde] +impl Storage for S3Storage { async fn fetch_snapshot(&self, id: &SnapshotId) -> StorageResult> { let key = self.get_snapshot_path(id)?; let bytes = self.get_object(key.as_str()).await?; diff --git a/icechunk/src/store.rs b/icechunk/src/store.rs index 42d9f577..dea901d2 100644 --- a/icechunk/src/store.rs +++ b/icechunk/src/store.rs @@ -3,7 +3,6 @@ use std::{ fmt::Display, iter, num::NonZeroU64, - ops::{Deref, DerefMut}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, @@ -122,7 +121,7 @@ impl Store { } pub async fn clear(&self) -> StoreResult<()> { - let mut guard = self.session.write().await.clear().await?; + self.session.write().await.clear().await?; Ok(()) }