Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #113 from n0-computer/async-slice-decoder-newtype
Browse files Browse the repository at this point in the history
refactor: make AsyncSliceDecoder pub(crate)
  • Loading branch information
rklaehn authored Feb 3, 2023
2 parents 8c254f3 + be48294 commit 694d117
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/bao_slice_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl<R: Read> Read for SliceDecoder<R> {
}

#[derive(Debug)]
pub struct AsyncSliceDecoder<R: tokio::io::AsyncRead + Unpin> {
pub(crate) struct AsyncSliceDecoder<R: tokio::io::AsyncRead + Unpin> {
inner: SliceValidator<R>,
current_item: Option<StreamItem>,
}
Expand Down
48 changes: 39 additions & 9 deletions src/get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::io;
use std::net::SocketAddr;
use std::time::{Duration, Instant};

Expand All @@ -9,7 +10,7 @@ use postcard::experimental::max_size::MaxSize;
use s2n_quic::stream::ReceiveStream;
use s2n_quic::Connection;
use s2n_quic::{client::Connect, Client};
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};
use tracing::debug;

use crate::bao_slice_decoder::AsyncSliceDecoder;
Expand Down Expand Up @@ -66,6 +67,37 @@ pub struct Stats {
pub mbits: f64,
}

/// A verified stream of data coming from the provider
///
/// We guarantee that the data is correct by incrementally verifying a hash
#[repr(transparent)]
#[derive(Debug)]
pub struct DataStream(AsyncSliceDecoder<ReceiveStream>);

impl DataStream {
fn new(inner: ReceiveStream, hash: bao::Hash) -> Self {
DataStream(AsyncSliceDecoder::new(inner, hash, 0, u64::MAX))
}

async fn read_size(&mut self) -> io::Result<u64> {
self.0.read_size().await
}

fn into_inner(self) -> ReceiveStream {
self.0.into_inner()
}
}

impl AsyncRead for DataStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut ReadBuf,
) -> std::task::Poll<std::io::Result<()>> {
std::pin::Pin::new(&mut self.0).poll_read(cx, buf)
}
}

pub async fn run<A, B, C, FutA, FutB, FutC>(
hash: bao::Hash,
token: AuthToken,
Expand All @@ -79,8 +111,8 @@ where
FutA: Future<Output = Result<()>>,
B: FnMut(Collection) -> FutB,
FutB: Future<Output = Result<()>>,
C: FnMut(bao::Hash, AsyncSliceDecoder<ReceiveStream>, Option<String>) -> FutC,
FutC: Future<Output = Result<AsyncSliceDecoder<ReceiveStream>>>,
C: FnMut(bao::Hash, DataStream, Option<String>) -> FutC,
FutC: Future<Output = Result<DataStream>>,
{
let now = Instant::now();
let (_client, mut connection) = setup(opts).await?;
Expand Down Expand Up @@ -208,13 +240,11 @@ where
///
/// Returns an `AsyncReader`
/// The `AsyncReader` can be used to read the content.
async fn handle_blob_response<
R: AsyncRead + futures::io::AsyncRead + Send + Sync + Unpin + 'static,
>(
async fn handle_blob_response(
hash: bao::Hash,
mut reader: R,
mut reader: ReceiveStream,
buffer: &mut BytesMut,
) -> Result<AsyncSliceDecoder<R>> {
) -> Result<DataStream> {
match read_lp_data(&mut reader, buffer).await? {
Some(response_buffer) => {
let response: Response = postcard::from_bytes(&response_buffer)?;
Expand All @@ -231,7 +261,7 @@ async fn handle_blob_response<
// next blob in collection will be sent over
Res::Found => {
assert!(buffer.is_empty());
let decoder = AsyncSliceDecoder::new(reader, hash, 0, u64::MAX);
let decoder = DataStream::new(reader, hash);
Ok(decoder)
}
}
Expand Down

0 comments on commit 694d117

Please sign in to comment.