diff --git a/zstor/src/actors/backends.rs b/zstor/src/actors/backends.rs index d9d2929..fc95422 100644 --- a/zstor/src/actors/backends.rs +++ b/zstor/src/actors/backends.rs @@ -1,7 +1,7 @@ use crate::actors::{ config::{ConfigActor, GetConfig, ReloadConfig, ReplaceMetaBackend}, explorer::{ExpandStorage, SizeRequest}, - meta::{MarkWriteable, MetaStoreActor, ReplaceMetaStore}, + meta::{MarkWriteable, MetaStoreActor, RebuildAllMeta, ReplaceMetaStore}, metrics::{MetricsActor, SetDataBackendInfo, SetMetaBackendInfo}, }; use crate::{ @@ -165,6 +165,8 @@ impl Handler for BackendManagerActor { fn handle(&mut self, _: ReloadConfig, ctx: &mut Self::Context) -> Self::Result { let cfg_addr = self.config_addr.clone(); + //let metastore = self.metastore.clone(); + let fut = Box::pin( async move { let (managed_seq_dbs, managed_meta_dbs) = @@ -182,16 +184,24 @@ impl Handler for BackendManagerActor { }); } } - + let mut any_new_meta = false; // remove the meta backends that are no longer managed from the metrics for (ci, _) in actor.managed_meta_dbs.iter() { if !meta_dbs.contains_key(ci) { + any_new_meta = true; actor.metrics.do_send(SetMetaBackendInfo { ci: ci.clone(), info: None, }); } } + + if any_new_meta { + log::info!("New metadata backends, rebuilding metadata cluster"); + if let Err(err) = actor.metastore.try_send(RebuildAllMeta) { + error!("Failed to send RebuildAllMeta message: {}", err); + } + } actor.managed_seq_dbs = seq_dbs; actor.managed_meta_dbs = meta_dbs; }), diff --git a/zstor/src/actors/meta.rs b/zstor/src/actors/meta.rs index fbd6d5b..9791e2f 100644 --- a/zstor/src/actors/meta.rs +++ b/zstor/src/actors/meta.rs @@ -100,6 +100,10 @@ pub struct MarkWriteable { pub writeable: bool, } +#[derive(Message)] +#[rtype(result = "()")] +/// Message to replace the metastore in use. +pub struct RebuildAllMeta; #[derive(Message)] #[rtype(result = "()")] /// Message to replace the metastore in use. @@ -263,6 +267,74 @@ impl Handler for MetaStoreActor { } } +impl Handler for MetaStoreActor { + type Result = ResponseFuture<()>; + + fn handle(&mut self, _: RebuildAllMeta, ctx: &mut Self::Context) -> Self::Result { + let metastore = self.meta_store.clone(); + let addr = ctx.address(); + log::info!("Rebuilding all meta handler"); + Box::pin(async move { + let mut cursor = None; + let mut backend_idx = None; + use std::time::{SystemTime, UNIX_EPOCH}; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + log::info!("Starting rebuild at timestamp: {}", timestamp); + loop { + let (idx, new_cursor, keys) = match metastore + .scan_meta_keys(cursor.clone(), backend_idx, Some(timestamp)) + .await + { + Ok((idx, c, keys)) => (idx, c, keys), + Err(e) => { + log::error!("Error scanning keys: {}", e); + break; + } + }; + + for key in keys { + log::info!("Rebuilding meta key: {}", key); + let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await { + Ok(Ok(m)) => m.unwrap(), + Ok(Err(e)) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + Err(e) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + }; + + // save meta by key + match addr + .send(SaveMetaByKey { + key: key.clone(), + meta: meta, + }) + .await + { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + log::error!("Error saving meta by key:{} - {}", key, e); + } + Err(e) => { + log::error!("Error saving meta by key:{} - {}", key, e); + } + } + } + cursor = Some(new_cursor); + backend_idx = Some(idx); + } + }) + } +} + impl Handler for MetaStoreActor { type Result = (); diff --git a/zstor/src/meta.rs b/zstor/src/meta.rs index e99e469..7524f02 100644 --- a/zstor/src/meta.rs +++ b/zstor/src/meta.rs @@ -8,6 +8,7 @@ use path_clean::PathClean; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::{fmt, io}; + /// The length of file and shard checksums pub const CHECKSUM_LENGTH: usize = 16; /// A checksum of a data object @@ -207,6 +208,14 @@ pub trait MetaStore { /// Check to see if a Zdb backend has been marked as replaced based on its connection info async fn is_replaced(&self, ci: &ZdbConnectionInfo) -> Result; + /// scan the metadata keys + async fn scan_meta_keys( + &self, + cursor: Option>, + backend_idx: Option, + max_timestamp: Option, + ) -> Result<(usize, Vec, Vec), MetaStoreError>; + /// Get the (key, metadata) for all stored objects async fn object_metas(&self) -> Result, MetaStoreError>; diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index 9717e20..b960e8a 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -93,7 +93,7 @@ impl fmt::Debug for InternalZdb { /// The type returned by the SCAN command in 0-db. In practice, the outer vec only contains a /// single element. -type ScanEntry = Vec<(Vec, u32, u32)>; +type ScanEntry = Vec<(Vec, u32, u64)>; // payload, size of payload in byte, creation timestamp /// The outpout of a `SCAN` Cmd (future). type ScanResult = redis::RedisResult<(Vec, Vec)>; @@ -244,6 +244,7 @@ impl Stream for CollectionKeys { // Set new cursor self.cursor = Some(res.0); + // New buffer - Converting the Vec into a VecDeque will, sadly, realloc the vec (under the // assumption that the vec has cap == len). self.buffer = res.1.into(); @@ -540,6 +541,31 @@ impl InternalZdb { internal: ErrorCause::Redis(e), }) } + async fn scan(&self, cursor: Option>) -> ZdbResult<(Vec, Vec)> { + trace!( + "scanning namespace {} ", + self.ci.namespace.as_deref().unwrap_or("default"), + ); + self.select_ns().await?; + + let mut scan_cmd = redis::cmd("SCAN"); + if let Some(ref cur) = cursor { + scan_cmd.arg(cur as &[u8]); + } + + let mut conn = self.conn.clone(); + let res: (Vec, Vec) = match scan_cmd.query_async(&mut conn).await { + Ok(r) => r, + Err(e) => { + return Err(ZdbError { + kind: ZdbErrorKind::Read, + remote: self.ci.clone(), + internal: ErrorCause::Redis(e), + }) + } + }; + Ok(res) + } /// Get a stream of all the keys in the namespace fn keys(&self) -> CollectionKeys { @@ -786,6 +812,38 @@ impl UserKeyZdb { self.internal.delete(key.as_ref()).await } + /// scan the namespace for keys. The cursor can be used to continue a previous scan. + pub async fn scan( + &self, + cursor: Option>, + prefix: Option<&str>, + max_timestamp: Option, + ) -> ZdbResult<(Vec, Vec)> { + let (cursor, entries): (Vec, Vec) = self.internal.scan(cursor).await?; + + let mut keys = Vec::new(); + for entry in &entries { + // check timestamp + if let Some(ts) = max_timestamp { + if entry[0].2 > ts { + continue; + } + } + // check prefix + let raw_key = entry[0].0.clone(); + if let Some(p) = prefix { + if !raw_key.starts_with(p.as_bytes()) { + continue; + } + } + if let Ok(s) = String::from_utf8(raw_key) { + keys.push(s) + } + } + + Ok((cursor, keys)) + } + /// Get a stream which yields all the keys in the namespace. pub fn keys(&self) -> impl Stream> + '_ { trace!("key iteration on {}", self.connection_info().address()); diff --git a/zstor/src/zdb_meta.rs b/zstor/src/zdb_meta.rs index 4b1c065..1db380d 100644 --- a/zstor/src/zdb_meta.rs +++ b/zstor/src/zdb_meta.rs @@ -237,14 +237,8 @@ where Ok(()) } - /// Return a stream of all function with a given prefix - async fn keys<'a>( - &'a self, - prefix: &'a str, - ) -> ZdbMetaStoreResult + 'a> { - debug!("Starting metastore key iteration with prefix {}", prefix); - - // First get the lengh of all the backends + /// Helper function to get the backend with the most keys + async fn get_most_keys_backend(&self) -> ZdbMetaStoreResult { let mut ns_requests = Vec::with_capacity(self.backends.len()); for backend in self.backends.iter() { ns_requests.push(backend.ns_info()); @@ -262,8 +256,6 @@ where } } } - - // If there is no reachable backend, we can't list the keys. if !healthy_backend { return Err(ZdbMetaStoreError { kind: ErrorKind::InsufficientHealthBackends, @@ -271,6 +263,39 @@ where }); } + Ok(most_keys_idx) + } + + async fn scan_keys( + &self, + cursor: Option>, + prefix: Option<&str>, + backend_idx: Option, + max_timestamp: Option, + ) -> ZdbMetaStoreResult<(usize, Vec, Vec)> { + let most_keys_idx = match backend_idx { + Some(idx) => idx, + None => self.get_most_keys_backend().await?, + }; + + let (new_cursor, keys) = self.backends[most_keys_idx] + .scan(cursor, prefix, max_timestamp) + .await?; + Ok((most_keys_idx, new_cursor, keys)) + } + + /// Return a stream of all function with a given prefix + async fn keys<'a>( + &'a self, + prefix: &'a str, + ) -> ZdbMetaStoreResult + 'a> { + debug!("Starting metastore key iteration with prefix {}", prefix); + + let most_keys_idx = match self.get_most_keys_backend().await { + Ok(idx) => idx, + Err(e) => return Err(e), + }; + // Now iterate over the keys of the longest backend Ok(self.backends[most_keys_idx] .keys() @@ -583,6 +608,23 @@ where Ok(self.save_meta_by_key(&self.build_key(path)?, meta).await?) } + async fn scan_meta_keys( + &self, + cursor: Option>, + backend_idx: Option, + max_timestamp: Option, + ) -> Result<(usize, Vec, Vec), MetaStoreError> { + let prefix = format!("/{}/meta/", self.prefix); + + match self + .scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp) + .await + { + Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)), + Err(e) => Err(MetaStoreError::from(e)), + } + } + async fn object_metas(&self) -> Result, MetaStoreError> { // pin the stream on the heap for now let prefix = format!("/{}/meta/", self.prefix);