Skip to content

Commit

Permalink
Add object store serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
mpiannucci committed Dec 9, 2024
1 parent 1427488 commit a5c2717
Showing 1 changed file with 64 additions and 38 deletions.
102 changes: 64 additions & 38 deletions icechunk/src/storage/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::{
format::{
attributes::AttributesTable, format_constants, manifest::Manifest,
snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange,
ChunkId, FileTypeTag, ManifestId, ObjectId, SnapshotId,
attributes::AttributesTable, format_constants, manifest::Manifest, snapshot::Snapshot, transaction_log::TransactionLog, AttributesId, ByteRange, ChunkId, FileTypeTag, ManifestId, ObjectId, Path, SnapshotId
},
private,
};
Expand All @@ -13,11 +11,12 @@ use futures::{
StreamExt, TryStreamExt,
};
use object_store::{
local::LocalFileSystem, memory::InMemory, path::Path as ObjectPath, Attribute,
AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore, PutMode,
PutOptions, PutPayload,
local::LocalFileSystem, memory::InMemory, parse_url, path::Path as ObjectPath,
Attribute, AttributeValue, Attributes, GetOptions, GetRange, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutPayload,
};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use url::Url;
use std::{
fs::create_dir_all,
future::ready,
Expand Down Expand Up @@ -48,52 +47,69 @@ impl From<&ByteRange> for Option<GetRange> {
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ObjectStorageConfig {
url: String,
prefix: String,
options: Vec<(String, String)>,
}

#[derive(Debug, Serialize)]
pub struct ObjectStorage {
#[serde(skip)]
store: Arc<dyn ObjectStore>,
prefix: String,
// We need this because object_store's local file implementation doesn't sort refs. Since this
// implementation is used only for tests, it's OK to sort in memory.
artificially_sort_refs_in_mem: bool,

supports_create_if_not_exists: bool,
supports_metadata: bool,
config: ObjectStorageConfig,
}

impl ObjectStorage {
/// Create an in memory Storage implementation
///
/// This implementation should not be used in production code.
pub fn new_in_memory_store(prefix: Option<String>) -> ObjectStorage {
pub fn new_in_memory_store(prefix: Option<String>) -> Result<ObjectStorage, String> {
#[allow(clippy::expect_used)]
let prefix =
prefix.or(Some("".to_string())).expect("bad prefix but this should not fail");
ObjectStorage {
store: Arc::new(InMemory::new()),
prefix,
artificially_sort_refs_in_mem: false,
supports_create_if_not_exists: true,
supports_metadata: true,
}

let url = format!("memory://{prefix}");
Ok(Self::from_url(&url, vec![])?)
}

/// Create an local filesystem Storage implementation
///
/// This implementation should not be used in production code.
pub fn new_local_store(prefix: &StdPath) -> Result<ObjectStorage, std::io::Error> {
create_dir_all(prefix)?;
pub fn new_local_store(prefix: &StdPath) -> Result<ObjectStorage, String> {
create_dir_all(prefix).map_err(|e| e.to_string())?;
let prefix = prefix.display().to_string();
let store = Arc::new(LocalFileSystem::new_with_prefix(prefix.clone())?);
let url = format!("file://{prefix}");
Ok(Self::from_url(&url, vec![])?)
}

/// Create an ObjectStore client from a URL and provided options
pub fn from_url(url: &str, options: Vec<(String, String)>) -> Result<ObjectStorage, String> {
let url: Url = Url::parse(url).map_err(|e| e.to_string())?;
let (store, path) = parse_url(&url).map_err(|e| e.to_string())?;
let store: Arc<dyn ObjectStore> = Arc::from(store);
Ok(ObjectStorage {
store,
prefix: "".to_string(),
artificially_sort_refs_in_mem: true,
supports_create_if_not_exists: true,
supports_metadata: false,
config: ObjectStorageConfig {
url: url.to_string(),
prefix: path.to_string(),
options,
},
})
}

/// We need this because object_store's local file implementation doesn't sort refs. Since this
/// implementation is used only for tests, it's OK to sort in memory.
pub fn artificially_sort_refs_in_mem(&self) -> bool {
self.config.url.starts_with("file://")
}

/// We need this because object_store's local file implementation doesn't support metadata.
pub fn supports_metadata(&self) -> bool {
self.config.url.starts_with("file://")
}

/// Return all keys in the store
///
/// Intended for testing and debugging purposes only.
Expand All @@ -107,7 +123,7 @@ impl ObjectStorage {
}

fn get_path_str(&self, file_prefix: &str, id: &str) -> ObjectPath {
let path = format!("{}/{}/{}", self.prefix, file_prefix, id);
let path = format!("{}/{}/{}", self.config.prefix, file_prefix, id);
ObjectPath::from(path)
}

Expand Down Expand Up @@ -142,7 +158,7 @@ impl ObjectStorage {

fn ref_key(&self, ref_key: &str) -> ObjectPath {
// ObjectPath knows how to deal with empty path parts: bar//foo
ObjectPath::from(format!("{}/{}/{}", self.prefix.as_str(), REF_PREFIX, ref_key))
ObjectPath::from(format!("{}/{}/{}", self.config.prefix.as_str(), REF_PREFIX, ref_key))
}

async fn do_ref_versions(&self, ref_name: &str) -> BoxStream<StorageResult<String>> {
Expand Down Expand Up @@ -176,8 +192,18 @@ impl ObjectStorage {

impl private::Sealed for ObjectStorage {}

impl <'de> serde::Deserialize<'de> for ObjectStorage {
fn deserialize<D>(deserializer: D) -> Result<ObjectStorage, D::Error>
where
D: serde::Deserializer<'de>,
{
let config = ObjectStorageConfig::deserialize(deserializer)?;
ObjectStorage::from_url(&config.url, config.options).map_err(serde::de::Error::custom)
}
}

#[async_trait]
impl Storage for ObjectStorage {
impl Storage<'_> for ObjectStorage {
async fn fetch_snapshot(
&self,
id: &SnapshotId,
Expand Down Expand Up @@ -225,7 +251,7 @@ impl Storage for ObjectStorage {
) -> Result<(), StorageError> {
let path = self.get_snapshot_path(&id);
let bytes = rmp_serde::to_vec(snapshot.as_ref())?;
let attributes = if self.supports_metadata {
let attributes = if self.supports_metadata() {
Attributes::from_iter(vec![
(
Attribute::ContentType,
Expand Down Expand Up @@ -266,7 +292,7 @@ impl Storage for ObjectStorage {
) -> Result<(), StorageError> {
let path = self.get_manifest_path(&id);
let bytes = rmp_serde::to_vec(manifest.as_ref())?;
let attributes = if self.supports_metadata {
let attributes = if self.supports_metadata() {
Attributes::from_iter(vec![
(
Attribute::ContentType,
Expand Down Expand Up @@ -299,7 +325,7 @@ impl Storage for ObjectStorage {
) -> StorageResult<()> {
let path = self.get_transaction_path(&id);
let bytes = rmp_serde::to_vec(log.as_ref())?;
let attributes = if self.supports_metadata {
let attributes = if self.supports_metadata() {
Attributes::from_iter(vec![
(
Attribute::ContentType,
Expand Down Expand Up @@ -386,7 +412,7 @@ impl Storage for ObjectStorage {
ref_name: &str,
) -> StorageResult<BoxStream<StorageResult<String>>> {
let res = self.do_ref_versions(ref_name).await;
if self.artificially_sort_refs_in_mem {
if self.artificially_sort_refs_in_mem() {
#[allow(clippy::expect_used)]
// This branch is used for local tests, not in production. We don't expect the size of
// these streams to be large, so we can collect in memory and fail early if there is an
Expand All @@ -407,7 +433,7 @@ impl Storage for ObjectStorage {
bytes: Bytes,
) -> StorageResult<()> {
let key = self.ref_key(ref_key);
let mode = if overwrite_refs || !self.supports_create_if_not_exists {
let mode = if overwrite_refs {
PutMode::Overwrite
} else {
PutMode::Create
Expand All @@ -430,7 +456,7 @@ impl Storage for ObjectStorage {
&'a self,
prefix: &str,
) -> StorageResult<BoxStream<'a, StorageResult<ListInfo<String>>>> {
let prefix = ObjectPath::from(format!("{}/{}", self.prefix.as_str(), prefix));
let prefix = ObjectPath::from(format!("{}/{}", self.config.prefix.as_str(), prefix));
let stream = self
.store
.list(Some(&prefix))
Expand Down

0 comments on commit a5c2717

Please sign in to comment.