Skip to content

Commit

Permalink
Fix stream erroring out on empty journal
Browse files Browse the repository at this point in the history
  • Loading branch information
dmzmk committed Sep 21, 2023
1 parent 8299cb7 commit 1efddc1
Showing 1 changed file with 35 additions and 30 deletions.
65 changes: 35 additions & 30 deletions journal/src/async_journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,39 +251,19 @@ impl<F: AsyncRead + AsyncWrite + AsyncSeek + std::marker::Unpin> AsyncJournal<F>
pub fn stream(
&mut self,
) -> impl Stream<Item = Result<(SnapshotHeader, BlobHeader, Vec<u8>)>> + '_ {
let mut initialized = false;
let mut eoi = false;
try_stream! {
loop {
// step 1: early exit
if eoi {
break
}
// step 1: update header.
if !initialized {
self.update_header().await?;
initialized = true;
}

// step 2: read snapshot header
self.update_header().await?;
let mut eoi = self.header.snapshot_counter == 0;
while !eoi {
let snapshot_header = self.read_snapshot().await?;

loop {
// step 3: read blob header
let blob_header = self.read_blob_header().await?;

if !blob_header.is_last() {
// step 4: read the blob bytes
let blob = self.read_blob(blob_header.blob_size).await?;

// step 5: yield the results
yield (snapshot_header, blob_header, blob)
} else {
if snapshot_header.id + 1 == self.header.snapshot_counter {
eoi = true;
}
if blob_header.is_last() {
eoi = snapshot_header.id + 1 == self.header.snapshot_counter;
break
}
let blob = self.read_blob(blob_header.blob_size).await?;
yield (snapshot_header, blob_header, blob)
}
}
}
Expand All @@ -292,21 +272,37 @@ impl<F: AsyncRead + AsyncWrite + AsyncSeek + std::marker::Unpin> AsyncJournal<F>

#[cfg(test)]
mod tests {
use std::path::Path;
use tokio_stream::StreamExt;
use super::*;

pub struct DropFile<'a> {
path: &'a Path
}

impl Drop for DropFile<'_> {
fn drop(&mut self) {
std::fs::remove_file(self.path).ok();
}
}

#[tokio::test]
async fn journal_create_works() {
let future = AsyncJournal::create("/tmp/asdf.txt");
let result = future.await;
let journal_path = tempfile::NamedTempFile::new().unwrap();
let journal_path = DropFile{ path: journal_path.path() };
let result = AsyncJournal::create(journal_path.path).await;
assert!(result.is_ok());
let journal = result.unwrap();
assert_eq!(journal.blob_count, None);
assert_eq!(journal.header, Header::default());
}


#[tokio::test]
async fn journal_add_and_commit_works() {
let result = AsyncJournal::create("/tmp/asdf.txt").await;
let journal_path = tempfile::NamedTempFile::new().unwrap();
let journal_path = DropFile{ path: journal_path.path() };
let result = AsyncJournal::create(journal_path.path).await;
assert!(result.is_ok());
let mut journal = result.unwrap();
assert_eq!(journal.blob_count, None);
Expand All @@ -323,4 +319,13 @@ mod tests {
assert!(result.is_ok());
assert_ne!(journal.header, Header::default());
}

#[tokio::test]
async fn journal_empty_stream() {
let journal_path = tempfile::NamedTempFile::new().unwrap();
let journal_path = DropFile{ path: journal_path.path() };
let mut journal = AsyncJournal::create(journal_path.path).await.unwrap();
let stream = journal.stream().collect::<Vec<_>>().await;
assert!(stream.len() == 0, "{:#?}", stream);
}
}

0 comments on commit 1efddc1

Please sign in to comment.