diff --git a/zstor/src/actors/backends.rs b/zstor/src/actors/backends.rs index d9d2929..0fe0ec0 100644 --- a/zstor/src/actors/backends.rs +++ b/zstor/src/actors/backends.rs @@ -1,7 +1,7 @@ use crate::actors::{ config::{ConfigActor, GetConfig, ReloadConfig, ReplaceMetaBackend}, explorer::{ExpandStorage, SizeRequest}, - meta::{MarkWriteable, MetaStoreActor, ReplaceMetaStore}, + meta::{MarkWriteable, MetaStoreActor, RebuildAllMeta, ReplaceMetaStore}, metrics::{MetricsActor, SetDataBackendInfo, SetMetaBackendInfo}, }; use crate::{ @@ -9,14 +9,14 @@ use crate::{ encryption::AesGcm, zdb::{NsInfo, SequentialZdb, UserKeyZdb, ZdbConnectionInfo, ZdbRunMode}, zdb_meta::ZdbMetaStore, - ZstorError, + ZstorError, ZstorErrorKind, }; use actix::prelude::*; use futures::{ future::{join, join_all}, {stream, StreamExt}, }; -use log::{debug, error, warn}; +use log::{debug, error, info, warn}; use std::{ collections::HashMap, time::{Duration, Instant}, @@ -79,6 +79,8 @@ impl BackendManagerActor { )>, ) -> Option> { let mut need_refresh = false; + // check if the state of the backends has changed + // - if there is change in writeable state for (ci, _, new_state, _) in &meta_info { if let Some((_, old_state)) = self.managed_meta_dbs.get(ci) { if old_state.is_writeable() != new_state.is_writeable() { @@ -120,6 +122,17 @@ struct CheckBackends; #[rtype(result = "()")] struct ReplaceBackends; +/// Message to do metastore refresh +#[derive(Debug, Message)] +#[rtype(result = "Result<(), ZstorError>")] +struct RefreshMeta { + /// the backends + backends: Vec, + + /// rebuild all meta flag + rebuild_meta: bool, +} + /// Message to request connections to backends with a given capabitlity. If a healthy connection /// is being managed to his backend, this is returned. If a connection to this backend is not /// managed, or the connection is in an unhealthy state, a new connection is attempted. @@ -160,11 +173,62 @@ impl Actor for BackendManagerActor { } } +impl Handler for BackendManagerActor { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: RefreshMeta, _: &mut Self::Context) -> Self::Result { + let config_addr = self.config_addr.clone(); + let metastore = self.metastore.clone(); + + Box::pin(async move { + let config = match config_addr.send(GetConfig).await { + Ok(cfg) => cfg, + Err(e) => { + error!("Failed to get running config: {}", e); + return Err(ZstorError::new(ZstorErrorKind::Config, Box::new(e))); + } + }; + + let Meta::Zdb(meta_config) = config.meta(); + let encoder = meta_config.encoder(); + let encryptor = match config.encryption() { + Encryption::Aes(key) => AesGcm::new(key.clone()), + }; + let backends = msg.backends; + let new_cluster = ZdbMetaStore::new( + backends, + encoder.clone(), + encryptor.clone(), + meta_config.prefix().to_owned(), + config.virtual_root().clone(), + ); + let writeable = new_cluster.writable(); + if let Err(e) = metastore + .send(ReplaceMetaStore { + new_store: Box::new(new_cluster), + writeable, + }) + .await + { + error!("Failed to send ReplaceMetaStore message: {}", e); + } + if msg.rebuild_meta && writeable { + if let Err(err) = metastore.try_send(RebuildAllMeta) { + error!("Failed to send RebuildAllMeta message: {}", err); + } + } + Ok(()) + }) + } +} + impl Handler for BackendManagerActor { type Result = Result<(), ZstorError>; fn handle(&mut self, _: ReloadConfig, ctx: &mut Self::Context) -> Self::Result { let cfg_addr = self.config_addr.clone(); + let addr = ctx.address(); + let fut = Box::pin( async move { let (managed_seq_dbs, managed_meta_dbs) = @@ -172,7 +236,7 @@ impl Handler for BackendManagerActor { (managed_seq_dbs, managed_meta_dbs) } .into_actor(self) - .map(|(seq_dbs, meta_dbs), actor, _| { + .map(move |(seq_dbs, meta_dbs), actor, _| { // remove the data backends that are no longer managed from the metrics for (ci, _) in actor.managed_seq_dbs.iter() { if !seq_dbs.contains_key(ci) { @@ -183,20 +247,38 @@ impl Handler for BackendManagerActor { } } + let mut refresh_meta = false; // remove the meta backends that are no longer managed from the metrics for (ci, _) in actor.managed_meta_dbs.iter() { if !meta_dbs.contains_key(ci) { + refresh_meta = true; actor.metrics.do_send(SetMetaBackendInfo { ci: ci.clone(), info: None, }); } } + actor.managed_seq_dbs = seq_dbs; actor.managed_meta_dbs = meta_dbs; + + if refresh_meta { + let meta_backends: Vec = actor + .managed_meta_dbs + .values() + .filter_map(|(zdb, _)| zdb.clone()) + .collect(); + if let Err(err) = addr.try_send(RefreshMeta { + backends: meta_backends, + rebuild_meta: true, + }) { + error!("Failed to send MyReplaceMeta message: {}", err); + } + } }), ); ctx.spawn(fut); + Ok(()) } } @@ -287,7 +369,7 @@ async fn get_zdbs_from_config( impl Handler for BackendManagerActor { type Result = ResponseActFuture; - fn handle(&mut self, _: CheckBackends, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, _: CheckBackends, ctx: &mut Self::Context) -> Self::Result { let data_backend_info = self .managed_seq_dbs .iter() @@ -303,6 +385,7 @@ impl Handler for BackendManagerActor { .map(|(ci, (db, state))| (ci.clone(), (db.clone(), state.clone()))) .collect::>(); + let actor_addr = ctx.address(); Box::pin( async move { let futs = data_backend_info @@ -385,40 +468,17 @@ impl Handler for BackendManagerActor { } .into_actor(self) .then(|(data_info, meta_info), actor, _| { + // in this block, we will check if the metadata backends need to be refreshed + // and then do the necessary actions let new_meta_backends = actor.check_new_metastore(meta_info.clone()); - - let config_addr = actor.config_addr.clone(); - let metastore = actor.metastore.clone(); async move { if let Some(backends) = new_meta_backends { - let config = match config_addr.send(GetConfig).await { - Ok(cfg) => cfg, - Err(e) => { - error!("Failed to get running config: {}", e); - return (data_info, meta_info); - } - }; - let Meta::Zdb(meta_config) = config.meta(); - let encoder = meta_config.encoder(); - let encryptor = match config.encryption() { - Encryption::Aes(key) => AesGcm::new(key.clone()), - }; - let new_cluster = ZdbMetaStore::new( + info!("Refreshing metadata cluster"); + if let Err(err) = actor_addr.try_send(RefreshMeta { backends, - encoder.clone(), - encryptor.clone(), - meta_config.prefix().to_owned(), - config.virtual_root().clone(), - ); - let writeable = new_cluster.writable(); - if let Err(e) = metastore - .send(ReplaceMetaStore { - new_store: Box::new(new_cluster), - writeable, - }) - .await - { - error!("Failed to send ReplaceMetaStore message: {}", e); + rebuild_meta: false, + }) { + error!("Failed to send MyReplaceMeta message: {}", err); } } (data_info, meta_info) diff --git a/zstor/src/actors/meta.rs b/zstor/src/actors/meta.rs index fbd6d5b..ba48610 100644 --- a/zstor/src/actors/meta.rs +++ b/zstor/src/actors/meta.rs @@ -100,6 +100,10 @@ pub struct MarkWriteable { pub writeable: bool, } +#[derive(Message)] +#[rtype(result = "()")] +/// Message to replace the metastore in use. +pub struct RebuildAllMeta; #[derive(Message)] #[rtype(result = "()")] /// Message to replace the metastore in use. @@ -263,6 +267,82 @@ impl Handler for MetaStoreActor { } } +/// Rebuild all meta data in the metastore: +/// - scan all keys in the metastore before current timestamp +/// - load meta by key +/// - save meta by key +impl Handler for MetaStoreActor { + type Result = ResponseFuture<()>; + + fn handle(&mut self, _: RebuildAllMeta, ctx: &mut Self::Context) -> Self::Result { + let metastore = self.meta_store.clone(); + let addr = ctx.address(); + log::info!("Rebuilding all meta handler"); + Box::pin(async move { + let mut cursor = None; + let mut backend_idx = None; + + // get current timestamp + // we use this timestamp to prevent rebuilding meta that created after this timestamp, + // otherwise we will have endless loop of meta rebuild + use std::time::{SystemTime, UNIX_EPOCH}; + + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + log::info!("Starting rebuild at timestamp: {}", timestamp); + loop { + let (idx, new_cursor, keys) = match metastore + .scan_meta_keys(cursor.clone(), backend_idx, Some(timestamp)) + .await + { + Ok((idx, c, keys)) => (idx, c, keys), + Err(e) => { + log::error!("Error scanning keys: {}", e); + break; + } + }; + + for key in keys { + log::info!("Rebuilding meta key: {}", key); + let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await { + Ok(Ok(m)) => m.unwrap(), + Ok(Err(e)) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + Err(e) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + }; + + // save meta by key + match addr + .send(SaveMetaByKey { + key: key.clone(), + meta, + }) + .await + { + Ok(Ok(_)) => {} + Ok(Err(e)) => { + log::error!("Error saving meta by key:{} - {}", key, e); + } + Err(e) => { + log::error!("Error saving meta by key:{} - {}", key, e); + } + } + } + cursor = Some(new_cursor); + backend_idx = Some(idx); + } + }) + } +} + impl Handler for MetaStoreActor { type Result = (); diff --git a/zstor/src/encryption.rs b/zstor/src/encryption.rs index 5403f9e..8c52364 100644 --- a/zstor/src/encryption.rs +++ b/zstor/src/encryption.rs @@ -124,7 +124,7 @@ impl<'de> Deserialize<'de> for SymmetricKey { struct SymKeyVisitor; -impl<'de> Visitor<'de> for SymKeyVisitor { +impl Visitor<'_> for SymKeyVisitor { type Value = SymmetricKey; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { diff --git a/zstor/src/main.rs b/zstor/src/main.rs index 94f03b1..22fb428 100644 --- a/zstor/src/main.rs +++ b/zstor/src/main.rs @@ -708,7 +708,7 @@ impl<'a> DropFile<'a> { } } -impl<'a> Drop for DropFile<'a> { +impl Drop for DropFile<'_> { fn drop(&mut self) { // try to remove the file if let Err(e) = fs::remove_file(self.path) { diff --git a/zstor/src/meta.rs b/zstor/src/meta.rs index e99e469..d219ad7 100644 --- a/zstor/src/meta.rs +++ b/zstor/src/meta.rs @@ -8,6 +8,7 @@ use path_clean::PathClean; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::{fmt, io}; + /// The length of file and shard checksums pub const CHECKSUM_LENGTH: usize = 16; /// A checksum of a data object @@ -207,6 +208,19 @@ pub trait MetaStore { /// Check to see if a Zdb backend has been marked as replaced based on its connection info async fn is_replaced(&self, ci: &ZdbConnectionInfo) -> Result; + /// scan the metadata keys + /// + /// If `cursor` is `None`, the scan will start from the beginning. + /// If `backend_idx` is `None`, the scan will use backend which has the most keys. + /// + /// Returns the backend index and cursor for the next scan and the keys themselves + async fn scan_meta_keys( + &self, + cursor: Option>, + backend_idx: Option, + max_timestamp: Option, + ) -> Result<(usize, Vec, Vec), MetaStoreError>; + /// Get the (key, metadata) for all stored objects async fn object_metas(&self) -> Result, MetaStoreError>; diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index 9717e20..b960e8a 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -93,7 +93,7 @@ impl fmt::Debug for InternalZdb { /// The type returned by the SCAN command in 0-db. In practice, the outer vec only contains a /// single element. -type ScanEntry = Vec<(Vec, u32, u32)>; +type ScanEntry = Vec<(Vec, u32, u64)>; // payload, size of payload in byte, creation timestamp /// The outpout of a `SCAN` Cmd (future). type ScanResult = redis::RedisResult<(Vec, Vec)>; @@ -244,6 +244,7 @@ impl Stream for CollectionKeys { // Set new cursor self.cursor = Some(res.0); + // New buffer - Converting the Vec into a VecDeque will, sadly, realloc the vec (under the // assumption that the vec has cap == len). self.buffer = res.1.into(); @@ -540,6 +541,31 @@ impl InternalZdb { internal: ErrorCause::Redis(e), }) } + async fn scan(&self, cursor: Option>) -> ZdbResult<(Vec, Vec)> { + trace!( + "scanning namespace {} ", + self.ci.namespace.as_deref().unwrap_or("default"), + ); + self.select_ns().await?; + + let mut scan_cmd = redis::cmd("SCAN"); + if let Some(ref cur) = cursor { + scan_cmd.arg(cur as &[u8]); + } + + let mut conn = self.conn.clone(); + let res: (Vec, Vec) = match scan_cmd.query_async(&mut conn).await { + Ok(r) => r, + Err(e) => { + return Err(ZdbError { + kind: ZdbErrorKind::Read, + remote: self.ci.clone(), + internal: ErrorCause::Redis(e), + }) + } + }; + Ok(res) + } /// Get a stream of all the keys in the namespace fn keys(&self) -> CollectionKeys { @@ -786,6 +812,38 @@ impl UserKeyZdb { self.internal.delete(key.as_ref()).await } + /// scan the namespace for keys. The cursor can be used to continue a previous scan. + pub async fn scan( + &self, + cursor: Option>, + prefix: Option<&str>, + max_timestamp: Option, + ) -> ZdbResult<(Vec, Vec)> { + let (cursor, entries): (Vec, Vec) = self.internal.scan(cursor).await?; + + let mut keys = Vec::new(); + for entry in &entries { + // check timestamp + if let Some(ts) = max_timestamp { + if entry[0].2 > ts { + continue; + } + } + // check prefix + let raw_key = entry[0].0.clone(); + if let Some(p) = prefix { + if !raw_key.starts_with(p.as_bytes()) { + continue; + } + } + if let Ok(s) = String::from_utf8(raw_key) { + keys.push(s) + } + } + + Ok((cursor, keys)) + } + /// Get a stream which yields all the keys in the namespace. pub fn keys(&self) -> impl Stream> + '_ { trace!("key iteration on {}", self.connection_info().address()); diff --git a/zstor/src/zdb_meta.rs b/zstor/src/zdb_meta.rs index 4b1c065..1db380d 100644 --- a/zstor/src/zdb_meta.rs +++ b/zstor/src/zdb_meta.rs @@ -237,14 +237,8 @@ where Ok(()) } - /// Return a stream of all function with a given prefix - async fn keys<'a>( - &'a self, - prefix: &'a str, - ) -> ZdbMetaStoreResult + 'a> { - debug!("Starting metastore key iteration with prefix {}", prefix); - - // First get the lengh of all the backends + /// Helper function to get the backend with the most keys + async fn get_most_keys_backend(&self) -> ZdbMetaStoreResult { let mut ns_requests = Vec::with_capacity(self.backends.len()); for backend in self.backends.iter() { ns_requests.push(backend.ns_info()); @@ -262,8 +256,6 @@ where } } } - - // If there is no reachable backend, we can't list the keys. if !healthy_backend { return Err(ZdbMetaStoreError { kind: ErrorKind::InsufficientHealthBackends, @@ -271,6 +263,39 @@ where }); } + Ok(most_keys_idx) + } + + async fn scan_keys( + &self, + cursor: Option>, + prefix: Option<&str>, + backend_idx: Option, + max_timestamp: Option, + ) -> ZdbMetaStoreResult<(usize, Vec, Vec)> { + let most_keys_idx = match backend_idx { + Some(idx) => idx, + None => self.get_most_keys_backend().await?, + }; + + let (new_cursor, keys) = self.backends[most_keys_idx] + .scan(cursor, prefix, max_timestamp) + .await?; + Ok((most_keys_idx, new_cursor, keys)) + } + + /// Return a stream of all function with a given prefix + async fn keys<'a>( + &'a self, + prefix: &'a str, + ) -> ZdbMetaStoreResult + 'a> { + debug!("Starting metastore key iteration with prefix {}", prefix); + + let most_keys_idx = match self.get_most_keys_backend().await { + Ok(idx) => idx, + Err(e) => return Err(e), + }; + // Now iterate over the keys of the longest backend Ok(self.backends[most_keys_idx] .keys() @@ -583,6 +608,23 @@ where Ok(self.save_meta_by_key(&self.build_key(path)?, meta).await?) } + async fn scan_meta_keys( + &self, + cursor: Option>, + backend_idx: Option, + max_timestamp: Option, + ) -> Result<(usize, Vec, Vec), MetaStoreError> { + let prefix = format!("/{}/meta/", self.prefix); + + match self + .scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp) + .await + { + Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)), + Err(e) => Err(MetaStoreError::from(e)), + } + } + async fn object_metas(&self) -> Result, MetaStoreError> { // pin the stream on the heap for now let prefix = format!("/{}/meta/", self.prefix);