Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SnapshotRepository and object store integration #2310

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

pcholakov
Copy link
Contributor

@pcholakov pcholakov commented Nov 18, 2024

This change introduces a SnapshotRepository responsible for uploading snapshots to a remote object store.

Sample usage

Configuration:

[worker.snapshots]
destination = "file:///Users/pavel/test-cluster-snapshots"

Currently only s3:// and file:// URLs are supported and work just as expected.

Snapshot creation:

> tree ~/test-cluster-snapshots/
/Users/pavel/test-cluster-snapshots/
├── 0
│   ├── latest.json
│   ├── lsn_85
│   │   ├── 000499.sst
│   │   ├── 000559.sst
│   │   ├── 000578.sst
│   │   └── metadata.json
│   └── lsn_88
│       ├── 000579.sst
│       ├── 000602.sst
│       └── metadata.json
├── 1
│   ├── latest.json
│   ├── lsn_94
│   │   ├── 000491.sst
│   │   ├── 000555.sst
│   │   ├── 000586.sst
│   │   └── metadata.json
│   └── lsn_96
│       ├── 000587.sst
│       ├── 000603.sst
│       └── metadata.json
└── 2
    ├── latest.json
    ├── lsn_71
    │   ├── 000475.sst
    │   ├── 000547.sst
    │   ├── 000594.sst
    │   └── metadata.json
    └── lsn_72
        ├── 000595.sst
        ├── 000604.sst
        └── metadata.json

10 directories, 24 files

> cat ~/test-cluster-snapshots/1/latest.json
{
  "version": "V1",
  "lsn": 96,
  "partition_id": 1,
  "node_name": "Pavels-MacBook-Pro.local",
  "created_at": "2024-11-21T19:17:15.755049000Z",
  "snapshot_id": "snap_17IyaexCTaiY7ZvMJcsPdg5",
  "min_applied_lsn": 96,
  "path": "lsn_96"
}

> cat ~/test-cluster-snapshots/1/lsn_96/metadata.json
{
  "version": "V1",
  "cluster_name": "localcluster",
  "partition_id": 1,
  "node_name": "Pavels-MacBook-Pro.local",
  "created_at": "2024-11-21T19:17:15.755049000Z",
  "snapshot_id": "snap_17IyaexCTaiY7ZvMJcsPdg5",
  "key_range": {
    "start": 768614336404564651,
    "end": 1537228672809129301
  },
  "min_applied_lsn": 96,
  "db_comparator_name": "leveldb.BytewiseComparator",
  "files": [
    {
      "column_family_name": "",
      "name": "/000603.sst",
      "directory": "/Users/pavel/restate/restate/restate-data/Pavels-MacBook-Pro.local/db-snapshots/1/snap_17IyaexCTaiY7ZvMJcsPdg5",
      "size": 1268,
      "level": 0,
      "start_key": "64650000000000000001010453454c46",
      "end_key": "667300000000000000010000000000000002",
      "smallest_seqno": 6063,
      "largest_seqno": 6064,
      "num_entries": 0,
      "num_deletions": 0
    },
    {
      "column_family_name": "",
      "name": "/000587.sst",
      "directory": "/Users/pavel/restate/restate/restate-data/Pavels-MacBook-Pro.local/db-snapshots/1/snap_17IyaexCTaiY7ZvMJcsPdg5",
      "size": 1143,
      "level": 6,
      "start_key": "64650000000000000001010453454c46",
      "end_key": "667300000000000000010000000000000002",
      "smallest_seqno": 0,
      "largest_seqno": 0,
      "num_entries": 0,
      "num_deletions": 0
    }
  ]
}

Future work:

  • Implement fetching and bootstrapping from snapshot
  • Implement parallel multi-part upload
  • Implement trim-gap handling using snapshots

Closes: #2197

Copy link

github-actions bot commented Nov 18, 2024

Test Results

  7 files  ±0    7 suites  ±0   4m 20s ⏱️ -4s
 47 tests ±0   46 ✅ ±0  1 💤 ±0  0 ❌ ±0 
182 runs  ±0  179 ✅ ±0  3 💤 ±0  0 ❌ ±0 

Results for commit ff6d9ce. ± Comparison against base commit 49dec91.

♻️ This comment has been updated with latest results.

muhamadazmy
muhamadazmy previously approved these changes Nov 18, 2024
Copy link
Contributor

@muhamadazmy muhamadazmy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pcholakov for creating this PR. It looks good to me! I left 2 very minor comments

crates/worker/src/lib.rs Outdated Show resolved Hide resolved
.into_string()
.map(|path| format!("file://{path}"))
})
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
.context("Unable to convert path to string")?;

This will still include the 'inner' error in the output string when printed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That approach doesn't work here because OsString::into_string() returns Result<String, OsString>, which doesn't meet Anyhow's trait bounds :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @pcholakov. The changes look really good. The one question I have is whether there is a way to avoid materializing the tarball and re-reading into memory. It would be awesome if we can stream the tarballing into the object-store upload.

crates/types/Cargo.toml Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
/// Write a partition snapshot to the snapshot repository.
pub(crate) async fn put(
&self,
partition_id: PartitionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't partition_id already part of PartitionSnapshotMetadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Comment on lines 156 to 157
// todo(pavel): don't buffer the entire snapshot in memory!
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would indeed be great. Especially once we have larger snapshots.

Copy link
Contributor

@muhamadazmy muhamadazmy Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjecStore already supports multi part upload, you can use that to upload the tar in chunks instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in the latest revision! 🎉

// the latest snapshot is always first.
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64());

// The snapshot data / metadata key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the idea for distinguishing full from incremental snapshots in the future? Would the latter have a completely different path or contain a marker file that denotes them as incremental?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm about to introduce this shortly to this PR - the key idea is to upload the tar archives and metadata JSON files separately, so that interested nodes can easily query just the metadata. We can gradually introduce additional attributes to the metadata JSON schema to support referencing the constituent parts of an incremental snapshot. The snapshot format version field within the metadata blob will allow nodes to know how to interpret it - or fail loudly if the Restate server is an older version that doesn't understand it.

The paths will be something like:

  • [<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
  • [<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar

I imagine that at some point we'll add incremental snapshots and the repository format will then look something along the lines of:

  • [<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json (V2)
  • [<prefix>/]files/<partition_id>/<snapshot_id>-{filename}.sst

In this world, there will no longer be 1:1 metadata-to-snapshot correspondence but rather a 1:n relationship. Additionally, we may want to write some sort of index metadata to make it cheaper to garbage collect disused SSTs - but I haven't thought too much about that yet.

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Copy link
Contributor Author

@pcholakov pcholakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tillrohrmann and @muhamadazmy for your early input, it was really valuable! I've pushed a new revision but I still want to remove tar archiving before I mark it ready for review.

crates/worker/src/lib.rs Outdated Show resolved Hide resolved
crates/worker/Cargo.toml Outdated Show resolved Hide resolved
.into_string()
.map(|path| format!("file://{path}"))
})
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That approach doesn't work here because OsString::into_string() returns Result<String, OsString>, which doesn't meet Anyhow's trait bounds :-)

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?);
debug!(
"Creating snapshot tarball of {:?} in: {:?}...",
&staging_path,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed staging_path to local_snapshot_path for clarity - that's the raw RocksDB column family export directory with the SSTs plus our own metadata JSON blob. We then tar that directory up into an archive at the path snapshot_archive_path.

crates/worker/src/partition/snapshots/repository.rs Outdated Show resolved Hide resolved
Comment on lines 156 to 157
// todo(pavel): don't buffer the entire snapshot in memory!
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in the latest revision! 🎉

Base automatically changed from refactor/snapshots-to-ppm to main November 21, 2024 10:54
@pcholakov pcholakov changed the title Introduce SnapshotsRepository backed by object_store Introduce SnapshotRepository and object store integration Nov 21, 2024
@pcholakov pcholakov marked this pull request as ready for review November 21, 2024 19:22
@pcholakov pcholakov dismissed muhamadazmy’s stale review November 22, 2024 13:33

Substantial changes since initial revision

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this PR @pcholakov. The changes look really nice. I left a few minor comments. The one question I had was whether concurrent modifications of a snapshot metadata.json or the latest.json can be a problem (e.g. if an old and new leader upload a snapshot at the same time)?

Comment on lines 85 to 89
SnapshotExportError(PartitionId, #[source] anyhow::Error),
#[error("Snapshot failed for partition {0}: {1}")]
SnapshotMetadataHeaderError(PartitionId, #[source] io::Error),
#[error("Internal error creating snapshot for partition {0}: {1}")]
#[error("Snapshot IO error: {1}")]
SnapshotIoError(PartitionId, #[source] io::Error),
#[error("Snapshot repository IO error: {1}")]
RepositoryIoError(PartitionId, #[source] anyhow::Error),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably don't have to add the "Error" suffix. It's a variant of SnapshotError so it should be clear that it is an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ✅

Comment on lines 58 to 59
/// Restate cluster name which produced the snapshot.
pub lsn: Lsn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment seems to be a bit off.

Copy link
Contributor Author

@pcholakov pcholakov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire field is redundant! (We have min_applied_lsn below.)

let relative_snapshot_path = format!("lsn_{lsn}", lsn = snapshot.min_applied_lsn);
let snapshot_prefix = format!(
"{prefix}{partition_id}/{relative_snapshot_path}",
prefix = self.prefix,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't know that this is possible. Interesting.

Comment on lines 153 to 167
debug!(
%lsn,
"Publishing partition snapshot to: {}",
self.destination,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can instrument put via #[instrument()] and include the lsn, snapshot id, etc.

Comment on lines +196 to +210
let put_result = self
.object_store
.put(&metadata_key, metadata_json_payload)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility for two processes taking a snapshot for the same lsn (e.g. an old leader and a new one) which aren't exactly the same because the effective lsn is different? If this is possible, is this a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! This is partly why I'm still on the fence about the exact snapshot naming scheme. One simple solution is to use snapshot IDs to disambiguate snapshots for the same LSN as they must be (modulo ULID collision) unique across nodes. I'd combine that with conditional put (only succeed if file does not exist) and complain loudly if it ever fails.

Comment on lines 222 to 294
let put_result = self
.object_store
.put(&latest_path, latest_json_payload)
.await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here but for different lsns. How are we gonna us e the latest.json? I could imagine how a slow old leader completes a snapshot after a new snapshot has been completed.

Copy link
Contributor Author

@pcholakov pcholakov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea here that hadn't made it into the PR just yet: just download the previous pointer and check that we aren't moving backwards. This should be enough to prevent the worst case of some node going to sleep mid-snapshot and wreaking havoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I wrote that comment, we have been blessed with proper S3 conditional put, so I rewrote the update logic to perform a CAS 🎉 I'm not doing this preemptively since this path should be uncontended, but the check is there as a defensive measure against going backwards and overwriting something we didn't mean to.

Comment on lines 171 to 198
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{}/{}",
snapshot_prefix.as_str(),
filename
));
let put_result = put_snapshot_object(
local_snapshot_path.join(filename).as_path(),
&key,
&self.object_store,
)
.await?;
debug!(
etag = put_result.e_tag.unwrap_or_default(),
?key,
"Put snapshot data file completed",
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploading multiple files concurrently, will probably only cause higher and less predictable resource utilization. And we aren't in a rush, I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! It was easier to be more predictable with a single upload stream. The impact I'm most concerned about is the memory overhead. S3 advises using fairly large chunks - order 100MB - for maximum throughput so maybe it's worth looking into memory mapped IO down the line.

} else {
let mut upload = object_store.put_multipart(key).await?;
loop {
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse this buffer across iterations and ideally also across different file uploads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the list, will be fixed in the next iteration :-)

Comment on lines 258 to 276
loop {
let mut buf = vec![0; MULTIPART_UPLOAD_THRESHOLD_BYTES];
let n = snapshot.read(&mut buf).await?;
if n == 0 {
break;
}
let part = PutPayload::from(buf);
upload
.put_part(part)
.await
.context("Failed to put snapshot part in repository")?;
trace!("Uploaded chunk of {} bytes", n);
}
upload
.complete()
.await
.context("Failed to put snapshot in repository")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call upload.abort in case an error occurs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we must! Thanks for flagging this - I wanted to see how the multipart upload API works first, hardening coming up in the next revision.

Comment on lines 44 to 45
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/metadata.json` - snapshot descriptor
/// - `[<prefix>/]<partition_id>/YYYY-MM-DD/{lsn}/*.sst` - data files (explicitly named in `metadata.json`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the YYYY-MM-DD still up to date? I couldn't find it in the path when we upload snapshots.

Copy link
Contributor Author

@pcholakov pcholakov Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, my bad! I kept flip-flopping on this but we can't have it half and half 😆 Taken out. We can make this configurable later if customers request it - either using dates or inverted padded sort-key for easier list navigation.

I've now included ${snapshot_id} here to disambiguate any potential same-LSN conflicts.

@pcholakov pcholakov force-pushed the feat/snapshot-upload branch 3 times, most recently from defc6ee to 7291ede Compare November 26, 2024 21:35

if !buf.is_empty() {
upload
.put_part(PutPayload::from_bytes(buf.split().freeze()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting the split() + freeze() / reserve() just right took me an embarrassingly long time! I have a reasonable grasp over Bytes/BytesMut now though, so that's a bonus. However please do review this carefully.

Something I wasn't 100% sure is whether it's possible for AsyncReadExt::read_buf to return less than a full buffer's worth. From what I can tell, it is possible - and S3 will throw an error if we try write a less than min chunk size part.

@pcholakov
Copy link
Contributor Author

@tillrohrmann if you could take another look please, that would be great! I think I've covered all the comments:

  • added a unique snapshot ID to the snapshot path to guarantee uniqueness
  • eliminated Error suffix from various error variants
  • reusing a BytesMut buffer to minimize allocation (still sequential multipart upload though; ran out of time - let's park that a a future improvement if desired)
  • now performing an S3 CAS on pointer bump 🎉
  • a partial multipart upload is cleaned up on error

The partition snapshot prefix looks like this in S3 with the latest changes:

image image

Comment on lines +114 to +120
let store = AmazonS3Builder::new()
.with_url(destination.clone())
.with_region(aws_region.to_string())
.with_credentials(Arc::new(AwsSdkCredentialsProvider {
credentials_provider: DefaultCredentialsChain::builder().build().await,
}))
.build()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@igalshilman I'd love to also use this approach in #2309 or a follow-up - this enables using AWS_PROFILE and SSO session credentials, which is great for dev and safer for production deployments than static keys.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Uploading state snapshots to an object store
3 participants