Skip to content

Commit

Permalink
Snapshot repository bucket layout
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 20, 2024
1 parent 4028f4b commit 56e659f
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 43 deletions.
2 changes: 1 addition & 1 deletion crates/partition-store/src/tests/snapshots_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) async fn run_tests(manager: PartitionStoreManager, mut partition_stor
partition_id,
node_name: "node".to_string(),
created_at: humantime::Timestamp::from(SystemTime::from(MillisSinceEpoch::new(0))),
snapshot_id: SnapshotId::from_parts(0, 0),
snapshot_id: SnapshotId::from(0u128),
key_range: key_range.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
db_comparator_name: snapshot.db_comparator_name.clone(),
Expand Down
112 changes: 70 additions & 42 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ use restate_partition_store::snapshots::PartitionSnapshotMetadata;
use restate_types::config::SnapshotsOptions;

/// Provides read and write access to the long-term partition snapshot storage destination.
///
/// The repository wraps access to an object store "bucket" that contains snapshot metadata and data
/// optimised for efficient retrieval. The bucket layout is split into two top-level prefixes for
/// snapshot metadata and data respectively. While full snapshot archives contain all relevant
/// metadata, this split layout allows for efficient retrieval of only the metadata upfront. It also
/// enables us to evolve the data storage layout independently in the future.
///
/// - `[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json`
/// - `[<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar`
#[derive(Clone)]
pub struct SnapshotRepository {
object_store: Arc<dyn ObjectStore>,
Expand All @@ -34,23 +43,21 @@ pub struct SnapshotRepository {
staging_path: PathBuf,
}

// todo(pavel): finalize repository object layout
impl SnapshotRepository {
pub async fn create(
base_dir: PathBuf,
snapshots_options: &SnapshotsOptions,
) -> anyhow::Result<SnapshotRepository> {
let destination =
if let Some(ref destination) = snapshots_options.destination {
destination.clone()
} else {
base_dir
.join("pp-snapshots")
.into_os_string()
.into_string()
.map(|path| format!("file://{path}"))
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?
};
let destination = if let Some(ref destination) = snapshots_options.destination {
destination.clone()
} else {
base_dir
.join("pp-snapshots")
.into_os_string()
.into_string()
.map(|path| format!("file://{path}"))
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?
};
let destination =
Url::parse(&destination).context("Failed parsing snapshot repository URL")?;

Expand Down Expand Up @@ -84,12 +91,21 @@ impl SnapshotRepository {
.into()
};

let prefix = destination.path().into();
let staging_path = base_dir.join("snapshot-staging");
tokio::fs::create_dir_all(&staging_path).await?;

// prefix must be stripped of any leading slash and, unless zero-length, end in a single "/" character
let prefix: String = destination.path().into();
let prefix = match prefix.as_str() {
"" | "/" => "".to_string(),
prefix => format!("{}/", prefix.trim_start_matches('/').trim_end_matches('/')),
};

Ok(SnapshotRepository {
object_store,
destination,
prefix,
staging_path: base_dir.clone().join("snapshot-staging"),
staging_path,
})
}

Expand All @@ -114,53 +130,65 @@ impl SnapshotRepository {
// the latest snapshot is always first.
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64());

// The snapshot data key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar
let snapshot_key = match self.prefix.as_str() {
"" | "/" => format!(
"{partition_id}/{sk}_{lsn}_{snapshot_id}.tar",
sk = inverted_sort_key,
lsn = metadata.min_applied_lsn,
),
prefix => format!(
"{trimmed_prefix}/{partition_id}/{sk}_{lsn}_{snapshot_id}.tar",
trimmed_prefix = prefix.trim_start_matches('/').trim_end_matches('/'),
sk = inverted_sort_key,
),
};

let staging_path = self.staging_path.clone();
tokio::fs::create_dir_all(&staging_path).await?;
let metadata_json_path = local_snapshot_path.join("metadata.json");
let metadata_key = format!(
"{prefix}metadata/{partition_id}/{sk}_{snapshot_id}-lsn_{lsn}.json",
prefix = self.prefix,
sk = inverted_sort_key,
lsn = metadata.min_applied_lsn,
);

let tarball_path = staging_path.join(format!("{snapshot_id}.tar"));
let snapshot_tarball = tokio::fs::File::create_new(tarball_path.clone()).await?;
let snapshot_archive_path = self.staging_path.join(format!("{snapshot_id}.tar"));
let snapshot_tarball = tokio::fs::File::create_new(snapshot_archive_path.as_path()).await?;
let snapshot_key = format!(
"{prefix}snapshot/{partition_id}/{sk}_{snapshot_id}-lsn_{lsn}.tar",
prefix = self.prefix,
sk = inverted_sort_key,
lsn = metadata.min_applied_lsn,
);

let local_files_path = local_snapshot_path.clone();
let mut tar = tokio_tar::Builder::new(snapshot_tarball);
debug!(
"Creating snapshot tarball from {:?} as {:?}",
local_files_path,
local_snapshot_path.as_path(),
tar.get_ref()
);
tar.append_dir_all(".", local_files_path).await?;
tar.append_dir_all(".", local_snapshot_path.as_path())
.await?;
tar.finish().await?;

let key = object_store::path::Path::from(snapshot_key.clone());
let key = object_store::path::Path::from(snapshot_key.as_str());
let put_result =
put_snapshot_object(tarball_path.as_path(), &key, self.object_store.clone()).await?;

put_snapshot_object(snapshot_archive_path.as_path(), &key, &self.object_store).await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
"Successfully published snapshot to: {snapshot_key}",
);

tokio::fs::remove_dir_all(local_snapshot_path.clone()).await?;
let metadata_json_payload = PutPayload::from(tokio::fs::read(metadata_json_path).await?);
let put_result = self
.object_store
.put(
&object_store::path::Path::from(metadata_key.as_str()),
metadata_json_payload,
)
.await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
"Successfully published snapshot metadata to: {metadata_key}",
);

tokio::fs::remove_dir_all(local_snapshot_path.as_path()).await?;
trace!(
"Removed local snapshot files: {}",
local_snapshot_path.display()
);

tokio::fs::remove_file(tarball_path.clone()).await?;
trace!("Removed local snapshot tarball: {}", tarball_path.display());
tokio::fs::remove_file(snapshot_archive_path.as_path()).await?;
trace!(
"Removed local snapshot tarball: {}",
snapshot_archive_path.display()
);

Ok(())
}
Expand All @@ -173,7 +201,7 @@ const MULTIPART_UPLOAD_THRESHOLD_BYTES: usize = 5 * 1024 * 1024;
async fn put_snapshot_object(
snapshot_path: &Path,
key: &object_store::path::Path,
object_store: Arc<dyn ObjectStore>,
object_store: &Arc<dyn ObjectStore>,
) -> anyhow::Result<object_store::PutResult> {
let mut snapshot = tokio::fs::File::open(snapshot_path).await?;

Expand Down

0 comments on commit 56e659f

Please sign in to comment.