Skip to content

Commit

Permalink
First pass at serializeable storage
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Dec 10, 2024
1 parent a5c2717 commit 094f357
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 29 deletions.
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion icechunk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object_store = { version = "0.11.1" }
rand = "0.8.5"
thiserror = "2.0.3"
serde_json = "1.0.133"
serde = { version = "1.0.215", features = ["derive"] }
serde = { version = "1.0.215", features = ["derive", "rc"] }
serde_with = { version = "3.11.0", features = ["hex"] }
tokio = { version = "1.41.1", features = ["rt-multi-thread", "macros"] }
test-strategy = "0.4.0"
Expand All @@ -40,6 +40,7 @@ aws-config = "1.5.8"
aws-credential-types = "1.2.1"
typed-path = "0.9.3"
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono", "convert-streams"] }
typetag = "0.2.18"

[dev-dependencies]
pretty_assertions = "1.4.1"
Expand Down
3 changes: 2 additions & 1 deletion icechunk/src/refs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ mod tests {
mut f: F,
) -> ((Arc<ObjectStorage>, R), (Arc<ObjectStorage>, R, TempDir)) {
let prefix: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 10);
let mem_storage = Arc::new(ObjectStorage::new_in_memory_store(Some(prefix)));
let mem_storage =
Arc::new(ObjectStorage::new_in_memory_store(Some(prefix)).unwrap());
let res1 = f(Arc::clone(&mem_storage) as Arc<dyn Storage + Send + Sync>).await;

let dir = tempdir().expect("cannot create temp dir");
Expand Down
2 changes: 1 addition & 1 deletion icechunk/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use crate::{
format::{snapshot::{Snapshot, SnapshotMetadata}, SnapshotId},
refs::{
create_tag, fetch_branch_tip, fetch_ref, fetch_tag, list_branches, list_tags,
create_tag, fetch_branch_tip, fetch_tag, list_branches, list_tags,
update_branch, BranchVersion, Ref, RefError,
},
repository::{raise_if_invalid_snapshot_id, RepositoryError, RepositoryResult},
Expand Down
29 changes: 26 additions & 3 deletions icechunk/src/storage/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use quick_cache::sync::Cache;
use serde::{Deserialize, Serialize};

use crate::{
format::{
Expand All @@ -16,13 +17,19 @@ use crate::{

use super::{ListInfo, Storage, StorageError, StorageResult};

#[derive(Debug)]
#[derive(Debug, Serialize)]
#[serde(transparent)]
pub struct MemCachingStorage {
backend: Arc<dyn Storage + Send + Sync>,
#[serde(skip)]
snapshot_cache: Cache<SnapshotId, Arc<Snapshot>>,
#[serde(skip)]
manifest_cache: Cache<ManifestId, Arc<Manifest>>,
#[serde(skip)]
transactions_cache: Cache<SnapshotId, Arc<TransactionLog>>,
#[serde(skip)]
attributes_cache: Cache<AttributesId, Arc<AttributesTable>>,
#[serde(skip)]
chunk_cache: Cache<(ChunkId, ByteRange), Bytes>,
}

Expand All @@ -44,11 +51,27 @@ impl MemCachingStorage {
chunk_cache: Cache::new(num_chunks as usize),
}
}

pub fn new_with_defaults(backend: Arc<dyn Storage + Send + Sync>) -> Self {
Self::new(backend, 2, 2, 0, 2, 0)
}
}

impl private::Sealed for MemCachingStorage {}

impl<'de> Deserialize<'de> for MemCachingStorage {
fn deserialize<D>(deserializer: D) -> Result<MemCachingStorage, D::Error>
where
D: serde::Deserializer<'de>,
{
let backend: Arc<dyn Storage + Sync + Send> =
Deserialize::deserialize(deserializer)?;
Ok(Self::new_with_defaults(backend))
}
}

#[async_trait]
#[typetag::serde]
impl Storage for MemCachingStorage {
async fn fetch_snapshot(
&self,
Expand Down Expand Up @@ -225,7 +248,7 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_caching_storage_caches() -> Result<(), Box<dyn std::error::Error>> {
let backend: Arc<dyn Storage + Send + Sync> =
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into())));
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))?);

let ci1 = ChunkInfo {
node: NodeId::random(),
Expand Down Expand Up @@ -288,7 +311,7 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_caching_storage_has_limit() -> Result<(), Box<dyn std::error::Error>> {
let backend: Arc<dyn Storage + Send + Sync> =
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into())));
Arc::new(ObjectStorage::new_in_memory_store(Some("prefix".into()))?);

let ci1 = ChunkInfo {
node: NodeId::random(),
Expand Down
13 changes: 12 additions & 1 deletion icechunk/src/storage/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use super::{ListInfo, Storage, StorageError, StorageResult};
use crate::{
Expand All @@ -13,7 +14,7 @@ use crate::{
private,
};

#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct LoggingStorage {
backend: Arc<dyn Storage + Send + Sync>,
fetch_log: Mutex<Vec<(String, Vec<u8>)>>,
Expand All @@ -33,7 +34,17 @@ impl LoggingStorage {

impl private::Sealed for LoggingStorage {}

impl<'de> Deserialize<'de> for LoggingStorage {
fn deserialize<D>(_deserializer: D) -> Result<LoggingStorage, D::Error>
where
D: serde::Deserializer<'de>,
{
Err(serde::de::Error::custom("LoggingStorage cannot be deserialized directly"))
}
}

#[async_trait]
#[typetag::serde]
#[allow(clippy::expect_used)] // this implementation is intended for tests only
impl Storage for LoggingStorage {
async fn fetch_snapshot(
Expand Down
4 changes: 2 additions & 2 deletions icechunk/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use aws_sdk_s3::{
primitives::ByteStreamError,
};
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use core::fmt;
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
use std::{ffi::OsString, sync::Arc};
Expand Down Expand Up @@ -85,7 +84,8 @@ const TRANSACTION_PREFIX: &str = "transactions/";
/// Different implementation can cache the files differently, or not at all.
/// Implementations are free to assume files are never overwritten.
#[async_trait]
pub trait Storage<'de>: fmt::Debug + private::Sealed + Serialize + Deserialize<'de> {
#[typetag::serde(tag = "type")]
pub trait Storage: fmt::Debug + private::Sealed + Sync + Send {
async fn fetch_snapshot(&self, id: &SnapshotId) -> StorageResult<Arc<Snapshot>>;
async fn fetch_attributes(
&self,
Expand Down
42 changes: 25 additions & 17 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
format::{
attributes::AttributesTable, format_constants, manifest::Manifest, snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, ObjectId, Path, SnapshotId
attributes::AttributesTable, format_constants, manifest::Manifest,
snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange,
ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId,
},
private,
};
Expand All @@ -11,12 +13,10 @@ use futures::{
StreamExt, TryStreamExt,
};
use object_store::{
local::LocalFileSystem, memory::InMemory, parse_url, path::Path as ObjectPath,
Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutPayload,
parse_url, path::Path as ObjectPath, Attribute, AttributeValue, Attributes,
GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload,
};
use serde::{Deserialize, Serialize};
use url::Url;
use std::{
fs::create_dir_all,
future::ready,
Expand All @@ -27,6 +27,7 @@ use std::{
Arc,
},
};
use url::Url;

use super::{
ListInfo, Storage, StorageError, StorageResult, CHUNK_PREFIX, MANIFEST_PREFIX,
Expand Down Expand Up @@ -81,11 +82,14 @@ impl ObjectStorage {
create_dir_all(prefix).map_err(|e| e.to_string())?;
let prefix = prefix.display().to_string();
let url = format!("file://{prefix}");
Ok(Self::from_url(&url, vec![])?)
Ok(Self::from_url(&url, vec![])?)
}

/// Create an ObjectStore client from a URL and provided options
pub fn from_url(url: &str, options: Vec<(String, String)>) -> Result<ObjectStorage, String> {
pub fn from_url(
url: &str,
options: Vec<(String, String)>,
) -> Result<ObjectStorage, String> {
let url: Url = Url::parse(url).map_err(|e| e.to_string())?;
let (store, path) = parse_url(&url).map_err(|e| e.to_string())?;
let store: Arc<dyn ObjectStore> = Arc::from(store);
Expand Down Expand Up @@ -158,7 +162,12 @@ impl ObjectStorage {

fn ref_key(&self, ref_key: &str) -> ObjectPath {
// ObjectPath knows how to deal with empty path parts: bar//foo
ObjectPath::from(format!("{}/{}/{}", self.config.prefix.as_str(), REF_PREFIX, ref_key))
ObjectPath::from(format!(
"{}/{}/{}",
self.config.prefix.as_str(),
REF_PREFIX,
ref_key
))
}

async fn do_ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
Expand Down Expand Up @@ -192,18 +201,20 @@ impl ObjectStorage {

impl private::Sealed for ObjectStorage {}

impl <'de> serde::Deserialize<'de> for ObjectStorage {
impl<'de> serde::Deserialize<'de> for ObjectStorage {
fn deserialize<D>(deserializer: D) -> Result<ObjectStorage, D::Error>
where
D: serde::Deserializer<'de>,
{
let config = ObjectStorageConfig::deserialize(deserializer)?;
ObjectStorage::from_url(&config.url, config.options).map_err(serde::de::Error::custom)
ObjectStorage::from_url(&config.url, config.options)
.map_err(serde::de::Error::custom)
}
}

#[async_trait]
impl Storage<'_> for ObjectStorage {
#[typetag::serde]
impl Storage for ObjectStorage {
async fn fetch_snapshot(
&self,
id: &SnapshotId,
Expand Down Expand Up @@ -433,11 +444,7 @@ impl Storage<'_> for ObjectStorage {
bytes: Bytes,
) -> StorageResult<()> {
let key = self.ref_key(ref_key);
let mode = if overwrite_refs {
PutMode::Overwrite
} else {
PutMode::Create
};
let mode = if overwrite_refs { PutMode::Overwrite } else { PutMode::Create };
let opts = PutOptions { mode, ..PutOptions::default() };

self.store
Expand All @@ -456,7 +463,8 @@ impl Storage<'_> for ObjectStorage {
&'a self,
prefix: &str,
) -> StorageResult<BoxStream<'a, StorageResult<ListInfo<String>>>> {
let prefix = ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix));
let prefix =
ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix));
let stream = self
.store
.list(Some(&prefix))
Expand Down
3 changes: 2 additions & 1 deletion icechunk/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ impl<'de> Deserialize<'de> for S3Storage {
}

#[async_trait]
impl Storage<'_> for S3Storage {
#[typetag::serde]
impl Storage for S3Storage {
async fn fetch_snapshot(&self, id: &SnapshotId) -> StorageResult<Arc<Snapshot>> {
let key = self.get_snapshot_path(id)?;
let bytes = self.get_object(key.as_str()).await?;
Expand Down
3 changes: 1 addition & 2 deletions icechunk/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
fmt::Display,
iter,
num::NonZeroU64,
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -122,7 +121,7 @@ impl Store {
}

pub async fn clear(&self) -> StoreResult<()> {
let mut guard = self.session.write().await.clear().await?;
self.session.write().await.clear().await?;
Ok(())
}

Expand Down

0 comments on commit 094f357

Please sign in to comment.