Skip to content

Commit

Permalink
add retry mechanism on data loading when rebuild the data.
Browse files Browse the repository at this point in the history
We only retry on temporary errors like timeout.
retry also only enabled on data rebuild, not retrieve,
because latency is not crucial during data rebuild.
  • Loading branch information
iwanbk committed Dec 19, 2024
1 parent 7101627 commit 5a3115c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
9 changes: 5 additions & 4 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Handler<Retrieve> for ZstorActor {
)
})?;

let shards = load_data(&metadata).await?;
let shards = load_data(&metadata, 1).await?;

pipeline
.send(RecoverFile {
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Handler<Rebuild> for ZstorActor {
};

// load the data from the storage backends
let input = load_data(&old_metadata).await?;
let input = load_data(&old_metadata, 3).await?;
let existing_data = input.clone();

// rebuild the data (in memory only)
Expand Down Expand Up @@ -454,7 +454,8 @@ impl Handler<ReloadConfig> for ZstorActor {
}
}

async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
/// load data from the storage backends
async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult<Vec<Option<Vec<u8>>>> {
// attempt to retrieve all shards
let mut shard_loads: Vec<JoinHandle<(usize, Result<(_, _), ZstorError>)>> =
Vec::with_capacity(metadata.shards().len());
Expand All @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
Ok(ok) => ok,
Err(e) => return (idx, Err(e.into())),
};
match db.get(&key).await {
match db.get_with_retry(&key, max_attempts).await {
Ok(potential_shard) => match potential_shard {
Some(shard) => (idx, Ok((shard, chksum))),
None => (
Expand Down
32 changes: 31 additions & 1 deletion zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,36 @@ impl SequentialZdb {
Ok(Some(data))
}

/// get data from the zdb with a retry mechanism.
/// The retry will only happen at temporary errors,
/// currently only timeouts.
pub async fn get_with_retry(
&self,
keys: &[Key],
max_attempts: u64,
) -> ZdbResult<Option<Vec<u8>>> {
if max_attempts < 2 {
return self.get(keys).await;
}

let mut last_error = None;

for _ in 0..max_attempts {
match self.get(keys).await {
Ok(result) => return Ok(result),
Err(e) => {
if e.internal == ErrorCause::Timeout {
last_error = Some(e);
continue;
}
return Err(e);
}
}
}

Err(last_error.unwrap())
}

/// Returns the [`ZdbConnectionInfo`] object used to connect to this db.
#[inline]
pub fn connection_info(&self) -> &ZdbConnectionInfo {
Expand Down Expand Up @@ -1037,7 +1067,7 @@ impl ZdbError {
}

/// The cause of a zero db error.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
enum ErrorCause {
Redis(redis::RedisError),
Other(String),
Expand Down

0 comments on commit 5a3115c

Please sign in to comment.