-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce SnapshotRepository and object store integration #2310
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pcholakov for creating this PR. It looks good to me! I left 2 very minor comments
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; | |
.context("Unable to convert path to string")?; |
This will still include the 'inner' error in the output string when printed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really good. The one question I have is whether there is a way to avoid materializing the tarball and re-reading into memory. It would be awesome if we can stream the tarballing into the object-store upload.
/// Write a partition snapshot to the snapshot repository. | ||
pub(crate) async fn put( | ||
&self, | ||
partition_id: PartitionId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't partition_id
already part of PartitionSnapshotMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed :-)
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would indeed be great. Especially once we have larger snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ObjecStore already supports multi part upload, you can use that to upload the tar in chunks instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
// the latest snapshot is always first. | ||
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); | ||
|
||
// The snapshot data / metadata key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the idea for distinguishing full from incremental snapshots in the future? Would the latter have a completely different path or contain a marker file that denotes them as incremental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about to introduce this shortly to this PR - the key idea is to upload the tar archives and metadata JSON files separately, so that interested nodes can easily query just the metadata. We can gradually introduce additional attributes to the metadata JSON schema to support referencing the constituent parts of an incremental snapshot. The snapshot format version field within the metadata blob will allow nodes to know how to interpret it - or fail loudly if the Restate server is an older version that doesn't understand it.
The paths will be something like:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
[<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar
I imagine that at some point we'll add incremental snapshots and the repository format will then look something along the lines of:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
(V2)[<prefix>/]files/<partition_id>/<snapshot_id>-{filename}.sst
In this world, there will no longer be 1:1 metadata-to-snapshot correspondence but rather a 1:n relationship. Additionally, we may want to write some sort of index metadata to make it cheaper to garbage collect disused SSTs - but I haven't thought too much about that yet.
9f6d162
to
d686e7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tillrohrmann and @muhamadazmy for your early input, it was really valuable! I've pushed a new revision but I still want to remove tar archiving before I mark it ready for review.
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); | ||
debug!( | ||
"Creating snapshot tarball of {:?} in: {:?}...", | ||
&staging_path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed staging_path
to local_snapshot_path
for clarity - that's the raw RocksDB column family export directory with the SSTs plus our own metadata JSON blob. We then tar that directory up into an archive at the path snapshot_archive_path
.
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
768bddf
to
56e659f
Compare
56e659f
to
cee99e6
Compare
76f4843
to
38268d6
Compare
Substantial changes since initial revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really nice. I left a few minor comments. The one question I had was whether concurrent modifications of a snapshot metadata.json
or the latest.json
can be a problem (e.g. if an old and new leader upload a snapshot at the same time)?
SnapshotExportError(PartitionId, #[source] anyhow::Error), | ||
#[error("Snapshot failed for partition {0}: {1}")] | ||
SnapshotMetadataHeaderError(PartitionId, #[source] io::Error), | ||
#[error("Internal error creating snapshot for partition {0}: {1}")] | ||
#[error("Snapshot IO error: {1}")] | ||
SnapshotIoError(PartitionId, #[source] io::Error), | ||
#[error("Snapshot repository IO error: {1}")] | ||
RepositoryIoError(PartitionId, #[source] anyhow::Error), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably don't have to add the "Error" suffix. It's a variant of SnapshotError
so it should be clear that it is an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done ✅
/// Restate cluster name which produced the snapshot. | ||
pub lsn: Lsn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment seems to be a bit off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire field is redundant! (We have min_applied_lsn
below.)
let relative_snapshot_path = format!("lsn_{lsn}", lsn = snapshot.min_applied_lsn); | ||
let snapshot_prefix = format!( | ||
"{prefix}{partition_id}/{relative_snapshot_path}", | ||
prefix = self.prefix, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't know that this is possible. Interesting.
debug!( | ||
%lsn, | ||
"Publishing partition snapshot to: {}", | ||
self.destination, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can instrument put
via #[instrument()]
and include the lsn
, snapshot id, etc.
let put_result = self | ||
.object_store | ||
.put(&metadata_key, metadata_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility for two processes taking a snapshot for the same lsn (e.g. an old leader and a new one) which aren't exactly the same because the effective lsn is different? If this is possible, is this a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! This is partly why I'm still on the fence about the exact snapshot naming scheme. One simple solution is to use snapshot IDs to disambiguate snapshots for the same LSN as they must be (modulo ULID collision) unique across nodes. I'd combine that with conditional put (only succeed if file does not exist) and complain loudly if it ever fails.
let put_result = self | ||
.object_store | ||
.put(&latest_path, latest_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here but for different lsns. How are we gonna us e the latest.json
? I could imagine how a slow old leader completes a snapshot after a new snapshot has been completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an idea here that hadn't made it into the PR just yet: just download the previous pointer and check that we aren't moving backwards. This should be enough to prevent the worst case of some node going to sleep mid-snapshot and wreaking havoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I wrote that comment, we have been blessed with proper S3 conditional put, so I rewrote the update logic to perform a CAS 🎉 I'm not doing this preemptively since this path should be uncontended, but the check is there as a defensive measure against going backwards and overwriting something we didn't mean to.
for file in &snapshot.files { | ||
let filename = file.name.trim_start_matches("/"); | ||
let key = object_store::path::Path::from(format!( | ||
"{}/{}", | ||
snapshot_prefix.as_str(), | ||
filename | ||
)); | ||
let put_result = put_snapshot_object( | ||
local_snapshot_path.join(filename).as_path(), | ||
&key, | ||
&self.object_store, | ||
) | ||
.await?; | ||
debug!( | ||
etag = put_result.e_tag.unwrap_or_default(), | ||
?key, | ||
"Put snapshot data file completed", | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uploading multiple files concurrently, will probably only cause higher and less predictable resource utilization. And we aren't in a rush, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! It was easier to be more predictable with a single upload stream. The impact I'm most concerned about is the memory overhead. S3 advises using fairly large chunks - order 100MB - for maximum throughput so maybe it's worth looking into memory mapped IO down the line.
} else { | ||
let mut upload = object_store.put_multipart(key).await?; | ||
loop { | ||
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we reuse this buffer across iterations and ideally also across different file uploads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the list, will be fixed in the next iteration :-)
loop { | ||
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES]; | ||
let n = snapshot.read(&mut buf).await?; | ||
if n == 0 { | ||
break; | ||
} | ||
let part = PutPayload::from(buf); | ||
upload | ||
.put_part(part) | ||
.await | ||
.context("Failed to put snapshot part in repository")?; | ||
trace!("Uploaded chunk of {} bytes", n); | ||
} | ||
upload | ||
.complete() | ||
.await | ||
.context("Failed to put snapshot in repository") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call upload.abort
in case an error occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we must! Thanks for flagging this - I wanted to see how the multipart upload API works first, hardening coming up in the next revision.
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/metadata.json` - snapshot descriptor | ||
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/*.sst` - data files (explicitly named in `metadata.json`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the YYYY-MM-DD
still up to date? I couldn't find it in the path when we upload snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, my bad! I kept flip-flopping on this but we can't have it half and half 😆 Taken out. We can make this configurable later if customers request it - either using dates or inverted padded sort-key for easier list navigation.
I've now included ${snapshot_id}
here to disambiguate any potential same-LSN conflicts.
defc6ee
to
7291ede
Compare
|
||
if !buf.is_empty() { | ||
upload | ||
.put_part(PutPayload::from_bytes(buf.split().freeze())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting the split()
+ freeze()
/ reserve()
just right took me an embarrassingly long time! I have a reasonable grasp over Bytes
/BytesMut
now though, so that's a bonus. However please do review this carefully.
Something I wasn't 100% sure is whether it's possible for AsyncReadExt::read_buf
to return less than a full buffer's worth. From what I can tell, it is possible - and S3 will throw an error if we try write a less than min chunk size part.
@tillrohrmann if you could take another look please, that would be great! I think I've covered all the comments:
The partition snapshot prefix looks like this in S3 with the latest changes: |
7291ede
to
ff6d9ce
Compare
let store = AmazonS3Builder::new() | ||
.with_url(destination.clone()) | ||
.with_region(aws_region.to_string()) | ||
.with_credentials(Arc::new(AwsSdkCredentialsProvider { | ||
credentials_provider: DefaultCredentialsChain::builder().build().await, | ||
})) | ||
.build()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@igalshilman I'd love to also use this approach in #2309 or a follow-up - this enables using AWS_PROFILE
and SSO session credentials, which is great for dev and safer for production deployments than static keys.
This change introduces a SnapshotRepository responsible for uploading snapshots to a remote object store.
Sample usage
Configuration:
Currently only
s3://
andfile://
URLs are supported and work just as expected.Snapshot creation:
Future work:
Closes: #2197