Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix segment reconstruction #1850

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 36 additions & 64 deletions crates/subspace-archiving/src/piece_reconstructor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,73 +77,45 @@ impl PiecesReconstructor {
) -> Result<(ArchivedHistorySegment, Polynomial), ReconstructorError> {
let mut reconstructed_pieces = ArchivedHistorySegment::default();

if !input_pieces
.iter()
// Take each source shards here
.step_by(2)
.zip(
reconstructed_pieces
.iter_mut()
.map(|piece| piece.record_mut().iter_mut()),
)
.all(|(maybe_piece, raw_record)| {
if let Some(piece) = maybe_piece {
piece
.record()
.iter()
.zip(raw_record)
.for_each(|(source, target)| {
*target = *source;
});
true
} else {
false
}
})
{
// If not all data pieces are available, need to reconstruct data shards using erasure
// coding.

// Scratch buffer to avoid re-allocation
let mut tmp_shards_scalars =
Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
// Iterate over the chunks of `Scalar::SAFE_BYTES` bytes of all records
for record_offset in 0..RawRecord::SIZE / Scalar::SAFE_BYTES {
// Collect chunks of each record at the same offset
for maybe_piece in input_pieces.iter() {
let maybe_scalar = maybe_piece
.as_ref()
.map(|piece| {
piece
.record()
.iter()
.nth(record_offset)
.expect("Statically guaranteed to exist in a piece; qed")
})
.map(Scalar::try_from)
.transpose()
.map_err(ReconstructorError::DataShardsReconstruction)?;

tmp_shards_scalars.push(maybe_scalar);
}

self.erasure_coding
.recover(&tmp_shards_scalars)
.map_err(ReconstructorError::DataShardsReconstruction)?
.into_iter()
.zip(reconstructed_pieces.iter_mut().map(|piece| {
// Scratch buffer to avoid re-allocation
let mut tmp_shards_scalars =
Vec::<Option<Scalar>>::with_capacity(ArchivedHistorySegment::NUM_PIECES);
// Iterate over the chunks of `Scalar::SAFE_BYTES` bytes of all records
for record_offset in 0..RawRecord::SIZE / Scalar::SAFE_BYTES {
// Collect chunks of each record at the same offset
for maybe_piece in input_pieces.iter() {
let maybe_scalar = maybe_piece
.as_ref()
.map(|piece| {
piece
.record_mut()
.iter_mut()
.record()
.iter()
.nth(record_offset)
.expect("Statically guaranteed to exist in a piece; qed")
}))
.for_each(|(source_scalar, segment_data)| {
segment_data.copy_from_slice(&source_scalar.to_bytes());
});
})
.map(Scalar::try_from)
.transpose()
.map_err(ReconstructorError::DataShardsReconstruction)?;

tmp_shards_scalars.clear();
tmp_shards_scalars.push(maybe_scalar);
}

self.erasure_coding
.recover(&tmp_shards_scalars)
.map_err(ReconstructorError::DataShardsReconstruction)?
.into_iter()
.zip(reconstructed_pieces.iter_mut().map(|piece| {
piece
.record_mut()
.iter_mut()
.nth(record_offset)
.expect("Statically guaranteed to exist in a piece; qed")
}))
.for_each(|(source_scalar, segment_data)| {
segment_data.copy_from_slice(&source_scalar.to_bytes());
});

tmp_shards_scalars.clear();
}

let source_record_commitments = {
Expand Down Expand Up @@ -257,12 +229,12 @@ impl PiecesReconstructor {
segment_pieces: &[Option<Piece>],
piece_position: usize,
) -> Result<Piece, ReconstructorError> {
let (reconstructed_records, polynomial) = self.reconstruct_shards(segment_pieces)?;

if piece_position >= ArchivedHistorySegment::NUM_PIECES {
return Err(ReconstructorError::IncorrectPiecePosition);
}

let (reconstructed_records, polynomial) = self.reconstruct_shards(segment_pieces)?;

let mut piece = Piece::from(&reconstructed_records[piece_position]);

piece.witness_mut().copy_from_slice(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ fn piece_reconstruction_works() {
let missing_pieces = maybe_pieces
.iter_mut()
.enumerate()
.skip(100)
.take(40)
.skip(120)
.take(10)
.map(|(piece_position, piece)| (piece_position, piece.take().unwrap()))
.collect::<Vec<_>>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) async fn recover_missing_piece<PG: PieceGetter>(
let required_pieces_number = RecordedHistorySegment::NUM_RAW_RECORDS;

let mut received_segment_pieces = segment_index
.segment_piece_indexes_source_first()
.segment_piece_indexes()
.map(|piece_index| async move {
let _permit = match semaphore.acquire().await {
Ok(permit) => permit,
Expand Down