Skip to content

Commit

Permalink
fix: implement metadata rebuild.
Browse files Browse the repository at this point in the history
- implement scan keys to get all of the keys
- implement rebuild meta by key
  • Loading branch information
iwanbk committed Nov 29, 2024
1 parent cd24f42 commit bab88aa
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 13 deletions.
14 changes: 12 additions & 2 deletions zstor/src/actors/backends.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::actors::{
config::{ConfigActor, GetConfig, ReloadConfig, ReplaceMetaBackend},
explorer::{ExpandStorage, SizeRequest},
meta::{MarkWriteable, MetaStoreActor, ReplaceMetaStore},
meta::{MarkWriteable, MetaStoreActor, RebuildAllMeta, ReplaceMetaStore},
metrics::{MetricsActor, SetDataBackendInfo, SetMetaBackendInfo},
};
use crate::{
Expand Down Expand Up @@ -165,6 +165,8 @@ impl Handler<ReloadConfig> for BackendManagerActor {

fn handle(&mut self, _: ReloadConfig, ctx: &mut Self::Context) -> Self::Result {
let cfg_addr = self.config_addr.clone();
//let metastore = self.metastore.clone();

let fut = Box::pin(
async move {
let (managed_seq_dbs, managed_meta_dbs) =
Expand All @@ -182,16 +184,24 @@ impl Handler<ReloadConfig> for BackendManagerActor {
});
}
}

let mut any_new_meta = false;
// remove the meta backends that are no longer managed from the metrics
for (ci, _) in actor.managed_meta_dbs.iter() {
if !meta_dbs.contains_key(ci) {
any_new_meta = true;
actor.metrics.do_send(SetMetaBackendInfo {
ci: ci.clone(),
info: None,
});
}
}

if any_new_meta {
log::info!("New metadata backends, rebuilding metadata cluster");
if let Err(err) = actor.metastore.try_send(RebuildAllMeta) {
error!("Failed to send RebuildAllMeta message: {}", err);
}
}
actor.managed_seq_dbs = seq_dbs;
actor.managed_meta_dbs = meta_dbs;
}),
Expand Down
72 changes: 72 additions & 0 deletions zstor/src/actors/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ pub struct MarkWriteable {
pub writeable: bool,
}

#[derive(Message)]
#[rtype(result = "()")]
/// Message to replace the metastore in use.
pub struct RebuildAllMeta;
#[derive(Message)]
#[rtype(result = "()")]
/// Message to replace the metastore in use.
Expand Down Expand Up @@ -263,6 +267,74 @@ impl Handler<GetFailures> for MetaStoreActor {
}
}

impl Handler<RebuildAllMeta> for MetaStoreActor {
type Result = ResponseFuture<()>;

fn handle(&mut self, _: RebuildAllMeta, ctx: &mut Self::Context) -> Self::Result {
let metastore = self.meta_store.clone();
let addr = ctx.address();
log::info!("Rebuilding all meta handler");
Box::pin(async move {
let mut cursor = None;
let mut backend_idx = None;
use std::time::{SystemTime, UNIX_EPOCH};

let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

log::info!("Starting rebuild at timestamp: {}", timestamp);
loop {
let (idx, new_cursor, keys) = match metastore
.scan_meta_keys(cursor.clone(), backend_idx, Some(timestamp))
.await
{
Ok((idx, c, keys)) => (idx, c, keys),
Err(e) => {
log::error!("Error scanning keys: {}", e);
break;
}
};

for key in keys {
log::info!("Rebuilding meta key: {}", key);
let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await {
Ok(Ok(m)) => m.unwrap(),
Ok(Err(e)) => {
log::error!("Error loading meta by key:{} - {}", key, e);
continue;
}
Err(e) => {
log::error!("Error loading meta by key:{} - {}", key, e);
continue;
}
};

// save meta by key
match addr
.send(SaveMetaByKey {
key: key.clone(),
meta,
})
.await
{
Ok(Ok(_)) => {}
Ok(Err(e)) => {
log::error!("Error saving meta by key:{} - {}", key, e);
}
Err(e) => {
log::error!("Error saving meta by key:{} - {}", key, e);
}
}
}
cursor = Some(new_cursor);
backend_idx = Some(idx);
}
})
}
}

impl Handler<MarkWriteable> for MetaStoreActor {
type Result = ();

Expand Down
9 changes: 9 additions & 0 deletions zstor/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use path_clean::PathClean;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{fmt, io};

/// The length of file and shard checksums
pub const CHECKSUM_LENGTH: usize = 16;
/// A checksum of a data object
Expand Down Expand Up @@ -207,6 +208,14 @@ pub trait MetaStore {
/// Check to see if a Zdb backend has been marked as replaced based on its connection info
async fn is_replaced(&self, ci: &ZdbConnectionInfo) -> Result<bool, MetaStoreError>;

/// scan the metadata keys
async fn scan_meta_keys(
&self,
cursor: Option<Vec<u8>>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError>;

/// Get the (key, metadata) for all stored objects
async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError>;

Expand Down
60 changes: 59 additions & 1 deletion zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl fmt::Debug for InternalZdb {

/// The type returned by the SCAN command in 0-db. In practice, the outer vec only contains a
/// single element.
type ScanEntry = Vec<(Vec<u8>, u32, u32)>;
type ScanEntry = Vec<(Vec<u8>, u32, u64)>; // payload, size of payload in byte, creation timestamp
/// The outpout of a `SCAN` Cmd (future).
type ScanResult = redis::RedisResult<(Vec<u8>, Vec<ScanEntry>)>;

Expand Down Expand Up @@ -244,6 +244,7 @@ impl Stream for CollectionKeys {

// Set new cursor
self.cursor = Some(res.0);

// New buffer - Converting the Vec into a VecDeque will, sadly, realloc the vec (under the
// assumption that the vec has cap == len).
self.buffer = res.1.into();
Expand Down Expand Up @@ -540,6 +541,31 @@ impl InternalZdb {
internal: ErrorCause::Redis(e),
})
}
async fn scan(&self, cursor: Option<Vec<u8>>) -> ZdbResult<(Vec<u8>, Vec<ScanEntry>)> {
trace!(
"scanning namespace {} ",
self.ci.namespace.as_deref().unwrap_or("default"),
);
self.select_ns().await?;

let mut scan_cmd = redis::cmd("SCAN");
if let Some(ref cur) = cursor {
scan_cmd.arg(cur as &[u8]);
}

let mut conn = self.conn.clone();
let res: (Vec<u8>, Vec<ScanEntry>) = match scan_cmd.query_async(&mut conn).await {
Ok(r) => r,
Err(e) => {
return Err(ZdbError {
kind: ZdbErrorKind::Read,
remote: self.ci.clone(),
internal: ErrorCause::Redis(e),
})
}
};
Ok(res)
}

/// Get a stream of all the keys in the namespace
fn keys(&self) -> CollectionKeys {
Expand Down Expand Up @@ -786,6 +812,38 @@ impl UserKeyZdb {
self.internal.delete(key.as_ref()).await
}

/// scan the namespace for keys. The cursor can be used to continue a previous scan.
pub async fn scan(
&self,
cursor: Option<Vec<u8>>,
prefix: Option<&str>,
max_timestamp: Option<u64>,
) -> ZdbResult<(Vec<u8>, Vec<String>)> {
let (cursor, entries): (Vec<u8>, Vec<ScanEntry>) = self.internal.scan(cursor).await?;

let mut keys = Vec::new();
for entry in &entries {
// check timestamp
if let Some(ts) = max_timestamp {
if entry[0].2 > ts {
continue;
}
}
// check prefix
let raw_key = entry[0].0.clone();
if let Some(p) = prefix {
if !raw_key.starts_with(p.as_bytes()) {
continue;
}
}
if let Ok(s) = String::from_utf8(raw_key) {
keys.push(s)
}
}

Ok((cursor, keys))
}

/// Get a stream which yields all the keys in the namespace.
pub fn keys(&self) -> impl Stream<Item = Vec<u8>> + '_ {
trace!("key iteration on {}", self.connection_info().address());
Expand Down
62 changes: 52 additions & 10 deletions zstor/src/zdb_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,8 @@ where
Ok(())
}

/// Return a stream of all function with a given prefix
async fn keys<'a>(
&'a self,
prefix: &'a str,
) -> ZdbMetaStoreResult<impl Stream<Item = String> + 'a> {
debug!("Starting metastore key iteration with prefix {}", prefix);

// First get the lengh of all the backends
/// Helper function to get the backend with the most keys
async fn get_most_keys_backend(&self) -> ZdbMetaStoreResult<usize> {
let mut ns_requests = Vec::with_capacity(self.backends.len());
for backend in self.backends.iter() {
ns_requests.push(backend.ns_info());
Expand All @@ -262,15 +256,46 @@ where
}
}
}

// If there is no reachable backend, we can't list the keys.
if !healthy_backend {
return Err(ZdbMetaStoreError {
kind: ErrorKind::InsufficientHealthBackends,
internal: InternalError::Other("no healthy backend found to list keys".into()),
});
}

Ok(most_keys_idx)
}

async fn scan_keys(
&self,
cursor: Option<Vec<u8>>,
prefix: Option<&str>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> ZdbMetaStoreResult<(usize, Vec<u8>, Vec<String>)> {
let most_keys_idx = match backend_idx {
Some(idx) => idx,
None => self.get_most_keys_backend().await?,
};

let (new_cursor, keys) = self.backends[most_keys_idx]
.scan(cursor, prefix, max_timestamp)
.await?;
Ok((most_keys_idx, new_cursor, keys))
}

/// Return a stream of all function with a given prefix
async fn keys<'a>(
&'a self,
prefix: &'a str,
) -> ZdbMetaStoreResult<impl Stream<Item = String> + 'a> {
debug!("Starting metastore key iteration with prefix {}", prefix);

let most_keys_idx = match self.get_most_keys_backend().await {
Ok(idx) => idx,
Err(e) => return Err(e),
};

// Now iterate over the keys of the longest backend
Ok(self.backends[most_keys_idx]
.keys()
Expand Down Expand Up @@ -583,6 +608,23 @@ where
Ok(self.save_meta_by_key(&self.build_key(path)?, meta).await?)
}

async fn scan_meta_keys(
&self,
cursor: Option<Vec<u8>>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError> {
let prefix = format!("/{}/meta/", self.prefix);

match self
.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp)
.await
{
Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)),
Err(e) => Err(MetaStoreError::from(e)),
}
}

async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError> {
// pin the stream on the heap for now
let prefix = format!("/{}/meta/", self.prefix);
Expand Down

0 comments on commit bab88aa

Please sign in to comment.