Skip to content

Commit

Permalink
fix(meta): refresh metastore if there is zdb up/down
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk authored and LeeSmet committed Nov 19, 2024
1 parent c5f1fba commit 78e926d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 3 deletions.
88 changes: 86 additions & 2 deletions zstor/src/actors/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::actors::{
use crate::{
config::{Encryption, Meta},
encryption::AesGcm,
zdb::{SequentialZdb, UserKeyZdb, ZdbConnectionInfo, ZdbRunMode},
zdb::{NsInfo, SequentialZdb, UserKeyZdb, ZdbConnectionInfo, ZdbRunMode},
zdb_meta::ZdbMetaStore,
ZstorError,
};
Expand Down Expand Up @@ -67,6 +67,45 @@ impl BackendManagerActor {
fn check_backends(&mut self, ctx: &mut <Self as Actor>::Context) {
ctx.notify(CheckBackends);
}

/// check & returns new metadata backends if the cluster needs to be refreshed
fn check_new_metastore(
&self,
meta_info: Vec<(
ZdbConnectionInfo,
Option<UserKeyZdb>,
BackendState,
Option<NsInfo>,
)>,
) -> Option<Vec<UserKeyZdb>> {
let mut need_refresh = false;
for (ci, _, new_state, _) in &meta_info {
if let Some((_, old_state)) = self.managed_meta_dbs.get(ci) {
if old_state.is_writeable() != new_state.is_writeable() {
need_refresh = true;
break;
}
}
}
if !need_refresh {
return None;
}
// get new backends:
// - new_db
// - old_db with state == writable
let mut backends = Vec::new();
let managed_db = self.managed_meta_dbs.clone();
for (ci, new_db, new_state, _) in &meta_info {
if let Some(new_db) = new_db {
backends.push(new_db.clone());
} else if new_state.is_writeable() {
if let Some((Some(db), _)) = managed_db.get(ci) {
backends.push(db.clone());
}
}
}
Some(backends)
}
}

/// Message requesting the actor checks the backends, and updates the state of all managed
Expand Down Expand Up @@ -120,6 +159,7 @@ impl Actor for BackendManagerActor {
);
}
}

impl Handler<ReloadConfig> for BackendManagerActor {
type Result = Result<(), ZstorError>;

Expand Down Expand Up @@ -340,9 +380,51 @@ impl Handler<CheckBackends> for BackendManagerActor {
}
}
});
(data_info, join_all(futs).await)
let meta_info = join_all(futs).await;
(data_info, meta_info)
}
.into_actor(self)
.then(|(data_info, meta_info), actor, _| {
let new_meta_backends = actor.check_new_metastore(meta_info.clone());

let config_addr = actor.config_addr.clone();
let metastore = actor.metastore.clone();
async move {
if let Some(backends) = new_meta_backends {
let config = match config_addr.send(GetConfig).await {
Ok(cfg) => cfg,
Err(e) => {
error!("Failed to get running config: {}", e);
return (data_info, meta_info);
}
};
let Meta::Zdb(meta_config) = config.meta();
let encoder = meta_config.encoder();
let encryptor = match config.encryption() {
Encryption::Aes(key) => AesGcm::new(key.clone()),
};
let new_cluster = ZdbMetaStore::new(
backends,
encoder.clone(),
encryptor.clone(),
meta_config.prefix().to_owned(),
config.virtual_root().clone(),
);
let writeable = new_cluster.writable();
if let Err(e) = metastore
.send(ReplaceMetaStore {
new_store: Box::new(new_cluster),
writeable,
})
.await
{
error!("Failed to send ReplaceMetaStore message: {}", e);
}
}
(data_info, meta_info)
}
.into_actor(actor)
})
.map(|res, actor, _ctx| {
for (ci, new_db, new_state, info) in res.0.into_iter() {
// update metrics
Expand Down Expand Up @@ -565,9 +647,11 @@ impl Handler<ReplaceBackends> for BackendManagerActor {
};

// TODO: send new nodes to config and save it
let metastore_writeable = new_cluster.writable();
if let Err(e) = metastore
.send(ReplaceMetaStore {
new_store: Box::new(new_cluster),
writeable: metastore_writeable,
})
.await
{
Expand Down
5 changes: 5 additions & 0 deletions zstor/src/actors/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub struct MarkWriteable {
pub struct ReplaceMetaStore {
/// The new metastore to set
pub new_store: Box<dyn MetaStore + Send>,

/// writeable flag
pub writeable: bool,
}

/// Actor for a metastore
Expand Down Expand Up @@ -272,6 +275,8 @@ impl Handler<ReplaceMetaStore> for MetaStoreActor {
type Result = ();

fn handle(&mut self, msg: ReplaceMetaStore, _: &mut Self::Context) -> Self::Result {
log::info!("ReplaceMetaStore writeable: {}", msg.writeable);
self.meta_store = Arc::from(msg.new_store as Box<dyn MetaStore>);
self.writeable = msg.writeable;
}
}
2 changes: 1 addition & 1 deletion zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ impl UserKeyZdb {
}

/// Information about a 0-db namespace, as reported by the db itself.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NsInfo {
/// The name of the namespace.
pub name: String,
Expand Down

0 comments on commit 78e926d

Please sign in to comment.