Skip to content

Commit

Permalink
feat(configurer): send UpdateConfig unboundedly
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Nov 22, 2024
1 parent 9c647f3 commit 8380ed9
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- core/addr: expose the `addr` module.
- core: generate `NodeNo` randomly if not provided.
- network: don't force `safe-*` features of the `lz4_flex` crate ([#136]).
- configurer: send `UpdateConfig` unboundedly.

### Fixed
- core: update the `idr-ebr` crate to v0.3 to fix possible crash in `Context::finished()`.
Expand Down
59 changes: 12 additions & 47 deletions elfo-configurer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ use tracing::{error, info, warn};

use elfo_core::{
config::AnyConfig,
errors::RequestError,
messages::{
EntrypointError, Ping, StartEntrypoint, StartEntrypointRejected, UpdateConfig,
ValidateConfig,
EntrypointError, StartEntrypoint, StartEntrypointRejected, UpdateConfig, ValidateConfig,
},
msg, scope,
signal::{Signal, SignalKind},
Expand Down Expand Up @@ -344,27 +342,16 @@ impl Configurer {
}

async fn update_all(&self, configs: &[ConfigWithMeta]) {
let futures = configs
.iter()
.cloned()
.map(|item| {
let group = item.group_name;
// While `UpdateConfig` is defined as a request to cover more use cases, default
// configurer simply sends out new configs instead of waiting for all groups to
// process the message and respond. This is done for performance reasons. If an
// actor has a congested mailbox or spends a lot of time processing each
// message, updating configs using requests can take a lot of time.
let fut = self.ctx.send_to(item.addr, UpdateConfig::new(item.config));

wrap_long_running_future(
fut,
group,
"some actors in the group have congested mailboxes, config update is stalled",
)
})
.collect::<Vec<_>>();

future::join_all(futures).await;
for item in configs {
let message = UpdateConfig::new(item.config.clone()); // cheap due to `Arc`s.

// While `UpdateConfig` is defined as a request to cover more use cases, default
// configurer simply sends out new configs instead of waiting for all groups to
// process the message and respond. This is done for performance reasons. If an
// actor has a congested mailbox or spends a lot of time processing each
// message, updating configs using requests can take a lot of time.
let _ = self.ctx.unbounded_send_to(item.addr, message);
}
}
}

Expand All @@ -384,29 +371,6 @@ async fn wrap_long_running_future<F: Future>(
}
}

#[allow(dead_code)]
async fn ping(ctx: &Context, config_list: &[ConfigWithMeta]) -> bool {
let futures = config_list
.iter()
.cloned()
.map(|item| ctx.request_to(item.addr, Ping::default()).all().resolve())
.collect::<Vec<_>>();

// TODO: use `try_join_all`.
let errors = future::join_all(futures)
.await
.into_iter()
.flatten()
.filter_map(|result| match result {
Ok(()) | Err(RequestError::Ignored) => None,
Err(RequestError::Failed) => Some(String::from("some group is closed")),
})
// TODO: include actor keys in the error message.
.inspect(|reason| error!(%reason, "ping failed"));

errors.count() == 0
}

async fn load_raw_config(path: impl AsRef<Path>) -> Result<Value, String> {
let content = fs::read_to_string(path)
.await
Expand All @@ -433,6 +397,7 @@ fn match_configs(topology: &Topology, config: &Value) -> Vec<ConfigWithMeta> {
}
})
.collect();

// Config parsing happens in the supervisor, which is executed in this actor
// when it performs `send_to(addr, UpdateConfig)`. User actor groups can
// have arbitrary large configs, taking a considerable time to deserialize.
Expand Down

0 comments on commit 8380ed9

Please sign in to comment.