diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index 88e1b637..d5cce269 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -1,8 +1,6 @@ use crate::{ format::{ - attributes::AttributesTable, format_constants, manifest::Manifest, - snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, - ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId, + attributes::AttributesTable, format_constants, manifest::Manifest, snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, ObjectId, Path, SnapshotId }, private, }; @@ -13,11 +11,12 @@ use futures::{ StreamExt, TryStreamExt, }; use object_store::{ - local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, Attribute, - AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode, - PutOptions, PutPayload, + local::LocalFileSystem, memory::InMemory, parse_url, path::Path as ObjectPath, + Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, + PutMode, PutOptions, PutPayload, }; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use url::Url; use std::{ fs::create_dir_all, future::ready, @@ -48,52 +47,69 @@ impl From<&ByteRange> for Option { } } +#[derive(Debug, Serialize, Deserialize)] +pub struct ObjectStorageConfig { + url: String, + prefix: String, + options: Vec<(String, String)>, +} + #[derive(Debug, Serialize)] pub struct ObjectStorage { #[serde(skip)] store: Arc, - prefix: String, - // We need this because object_store's local file implementation doesn't sort refs. Since this - // implementation is used only for tests, it's OK to sort in memory. - artificially_sort_refs_in_mem: bool, - - supports_create_if_not_exists: bool, - supports_metadata: bool, + config: ObjectStorageConfig, } impl ObjectStorage { /// Create an in memory Storage implementation /// /// This implementation should not be used in production code. - pub fn new_in_memory_store(prefix: Option) -> ObjectStorage { + pub fn new_in_memory_store(prefix: Option) -> Result { #[allow(clippy::expect_used)] let prefix = prefix.or(Some("".to_string())).expect("bad prefix but this should not fail"); - ObjectStorage { - store: Arc::new(InMemory::new()), - prefix, - artificially_sort_refs_in_mem: false, - supports_create_if_not_exists: true, - supports_metadata: true, - } + + let url = format!("memory://{prefix}"); + Ok(Self::from_url(&url, vec![])?) } /// Create an local filesystem Storage implementation /// /// This implementation should not be used in production code. - pub fn new_local_store(prefix: &StdPath) -> Result { - create_dir_all(prefix)?; + pub fn new_local_store(prefix: &StdPath) -> Result { + create_dir_all(prefix).map_err(|e| e.to_string())?; let prefix = prefix.display().to_string(); - let store = Arc::new(LocalFileSystem::new_with_prefix(prefix.clone())?); + let url = format!("file://{prefix}"); + Ok(Self::from_url(&url, vec![])?) + } + + /// Create an ObjectStore client from a URL and provided options + pub fn from_url(url: &str, options: Vec<(String, String)>) -> Result { + let url: Url = Url::parse(url).map_err(|e| e.to_string())?; + let (store, path) = parse_url(&url).map_err(|e| e.to_string())?; + let store: Arc = Arc::from(store); Ok(ObjectStorage { store, - prefix: "".to_string(), - artificially_sort_refs_in_mem: true, - supports_create_if_not_exists: true, - supports_metadata: false, + config: ObjectStorageConfig { + url: url.to_string(), + prefix: path.to_string(), + options, + }, }) } + /// We need this because object_store's local file implementation doesn't sort refs. Since this + /// implementation is used only for tests, it's OK to sort in memory. + pub fn artificially_sort_refs_in_mem(&self) -> bool { + self.config.url.starts_with("file://") + } + + /// We need this because object_store's local file implementation doesn't support metadata. + pub fn supports_metadata(&self) -> bool { + self.config.url.starts_with("file://") + } + /// Return all keys in the store /// /// Intended for testing and debugging purposes only. @@ -107,7 +123,7 @@ impl ObjectStorage { } fn get_path_str(&self, file_prefix: &str, id: &str) -> ObjectPath { - let path = format!("{}/{}/{}", self.prefix, file_prefix, id); + let path = format!("{}/{}/{}", self.config.prefix, file_prefix, id); ObjectPath::from(path) } @@ -142,7 +158,7 @@ impl ObjectStorage { fn ref_key(&self, ref_key: &str) -> ObjectPath { // ObjectPath knows how to deal with empty path parts: bar//foo - ObjectPath::from(format!("{}/{}/{}", self.prefix.as_str(), REF_PREFIX, ref_key)) + ObjectPath::from(format!("{}/{}/{}", self.config.prefix.as_str(), REF_PREFIX, ref_key)) } async fn do_ref_versions(&self, ref_name: &str) -> BoxStream> { @@ -176,8 +192,18 @@ impl ObjectStorage { impl private::Sealed for ObjectStorage {} +impl <'de> serde::Deserialize<'de> for ObjectStorage { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let config = ObjectStorageConfig::deserialize(deserializer)?; + ObjectStorage::from_url(&config.url, config.options).map_err(serde::de::Error::custom) + } +} + #[async_trait] -impl Storage for ObjectStorage { +impl Storage<'_> for ObjectStorage { async fn fetch_snapshot( &self, id: &SnapshotId, @@ -225,7 +251,7 @@ impl Storage for ObjectStorage { ) -> Result<(), StorageError> { let path = self.get_snapshot_path(&id); let bytes = rmp_serde::to_vec(snapshot.as_ref())?; - let attributes = if self.supports_metadata { + let attributes = if self.supports_metadata() { Attributes::from_iter(vec![ ( Attribute::ContentType, @@ -266,7 +292,7 @@ impl Storage for ObjectStorage { ) -> Result<(), StorageError> { let path = self.get_manifest_path(&id); let bytes = rmp_serde::to_vec(manifest.as_ref())?; - let attributes = if self.supports_metadata { + let attributes = if self.supports_metadata() { Attributes::from_iter(vec![ ( Attribute::ContentType, @@ -299,7 +325,7 @@ impl Storage for ObjectStorage { ) -> StorageResult<()> { let path = self.get_transaction_path(&id); let bytes = rmp_serde::to_vec(log.as_ref())?; - let attributes = if self.supports_metadata { + let attributes = if self.supports_metadata() { Attributes::from_iter(vec![ ( Attribute::ContentType, @@ -386,7 +412,7 @@ impl Storage for ObjectStorage { ref_name: &str, ) -> StorageResult>> { let res = self.do_ref_versions(ref_name).await; - if self.artificially_sort_refs_in_mem { + if self.artificially_sort_refs_in_mem() { #[allow(clippy::expect_used)] // This branch is used for local tests, not in production. We don't expect the size of // these streams to be large, so we can collect in memory and fail early if there is an @@ -407,7 +433,7 @@ impl Storage for ObjectStorage { bytes: Bytes, ) -> StorageResult<()> { let key = self.ref_key(ref_key); - let mode = if overwrite_refs || !self.supports_create_if_not_exists { + let mode = if overwrite_refs { PutMode::Overwrite } else { PutMode::Create @@ -430,7 +456,7 @@ impl Storage for ObjectStorage { &'a self, prefix: &str, ) -> StorageResult>>> { - let prefix = ObjectPath::from(format!("{}/{}", self.prefix.as_str(), prefix)); + let prefix = ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix)); let stream = self .store .list(Some(&prefix))