Skip to content

Commit

Permalink
feat(core/mailbox): configure capacity and change the default value
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Jul 11, 2024
1 parent 259fe1b commit 2bb04ed
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- core: directly accept never returning functions in `ActorGroup::exec()` ([#127]).
- core/context: add `Context::unbounded_send(_to)` methods.
- errors: add `From<SendError> for TrySendError` and `SendError::{into_inner,map}` methods.
- core/config: add the `system.mailbox.capacity` parameter to set the mailbox capacity.
- core/context: add `Context::set_mailbox_capacity()`.

### Changed
- **BREAKING** core/mailbox: default capacity is `100` now.
- **BREAKING** macros: remove the `network` feature ([#127]).
- **BREAKING** core/message: remove `AnyMessage::upcast()` in favor of `AnyMessage::new()` ([#127]).
- **BREAKING** core/envelope: `Envelope::message()` returns `AnyMessageRef` ([#127]).
Expand Down
4 changes: 4 additions & 0 deletions benches/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ fn make_consumers<const FLAGS: Flags>(actor_count: u32) -> Blueprint {
})
}))
.exec(move |mut ctx| async move {
// Measure throughput without extra context switches.
// The number of switches are controlled by `yield_now()` in producers.
ctx.set_mailbox_capacity(1_000_000);

while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
msg @ Sample => {
Expand Down
2 changes: 1 addition & 1 deletion elfo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ elfo-utils = { version = "0.2.5", path = "../elfo-utils" }

stability.workspace = true
metrics.workspace = true
tokio = { version = "1.16", features = ["rt", "sync", "time", "signal", "macros"] }
tokio = { version = "1.37", features = ["rt", "sync", "time", "signal", "macros"] }
idr-ebr = "0.2"
futures-intrusive = "0.5"
cordyceps = "0.3.2"
Expand Down
39 changes: 33 additions & 6 deletions elfo-core/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
envelope::Envelope,
errors::{SendError, TrySendError},
group::TerminationPolicy,
mailbox::{Mailbox, RecvResult},
mailbox::{Mailbox, MailboxConfig, RecvResult},
messages::{ActorStatusReport, Terminate},
msg,
request_table::RequestTable,
Expand Down Expand Up @@ -182,32 +182,39 @@ pub(crate) struct Actor {
termination_policy: TerminationPolicy,
mailbox: Mailbox,
request_table: RequestTable,
control: RwLock<ControlBlock>,
control: RwLock<Control>,
finished: ManualResetEvent, // TODO: remove in favor of `status_subscription`?
status_subscription: Arc<SubscriptionManager>,
}

struct ControlBlock {
struct Control {
status: ActorStatus,
/// If `None`, a group's policy will be used.
restart_policy: Option<RestartPolicy>,
/// A mailbox capacity set in the config.
mailbox_capacity_config: usize,
/// Explicitly set mailbox capacity via `Context::set_mailbox_capacity()`.
mailbox_capacity_override: Option<usize>,
}

impl Actor {
pub(crate) fn new(
meta: Arc<ActorMeta>,
addr: Addr,
mailbox_config: &MailboxConfig,
termination_policy: TerminationPolicy,
status_subscription: Arc<SubscriptionManager>,
) -> Self {
Actor {
meta,
termination_policy,
mailbox: Mailbox::new(),
mailbox: Mailbox::new(mailbox_config),
request_table: RequestTable::new(addr),
control: RwLock::new(ControlBlock {
control: RwLock::new(Control {
status: ActorStatus::INITIALIZING,
restart_policy: None,
mailbox_capacity_config: mailbox_config.capacity,
mailbox_capacity_override: None,
}),
finished: ManualResetEvent::new(false),
status_subscription,
Expand Down Expand Up @@ -271,6 +278,26 @@ impl Actor {
&self.request_table
}

pub(crate) fn set_mailbox_capacity_config(&self, capacity: usize) {
self.control.write().mailbox_capacity_config = capacity;
self.update_mailbox_capacity();
}

pub(crate) fn set_mailbox_capacity_override(&self, capacity: Option<usize>) {
self.control.write().mailbox_capacity_override = capacity;
self.update_mailbox_capacity();
}

fn update_mailbox_capacity(&self) {
let control = self.control.read();

let capacity = control
.mailbox_capacity_override
.unwrap_or(control.mailbox_capacity_config);

self.mailbox.set_capacity(capacity);
}

pub(crate) fn restart_policy(&self) -> Option<RestartPolicy> {
self.control.read().restart_policy.clone()
}
Expand Down Expand Up @@ -348,7 +375,7 @@ impl Actor {
})
}

fn send_status_to_subscribers(&self, control: &ControlBlock) {
fn send_status_to_subscribers(&self, control: &Control) {
self.status_subscription.send(ActorStatusReport {
meta: self.meta.clone(),
status: control.status.clone(),
Expand Down
1 change: 1 addition & 0 deletions elfo-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl<'de> Deserializer<'de> for AnyConfig {
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
pub(crate) struct SystemConfig {
pub(crate) mailbox: crate::mailbox::MailboxConfig,
pub(crate) logging: crate::logging::LoggingConfig,
pub(crate) dumping: crate::dumping::DumpingConfig,
pub(crate) telemetry: crate::telemetry::TelemetryConfig,
Expand Down
23 changes: 22 additions & 1 deletion elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,28 @@ impl<C, K> Context<C, K> {
ward!(self.actor.as_ref().and_then(|o| o.as_actor())).set_status(status);
}

/// Overrides the group's default restart policy.
/// Overrides the group's default mailbox capacity, which set in the config.
///
/// Note: after restart the actor will be created from scratch, so this
/// override will be also reset to the group's default mailbox capacity.
///
/// # Example
/// ```
/// # use elfo_core as elfo;
/// # fn exec(ctx: elfo::Context) {
/// // Override the group's default mailbox capacity.
/// ctx.set_mailbox_capacity(42);
///
/// // Set the group's default mailbox capacity.
/// ctx.set_mailbox_capacity(None);
/// # }
/// ```
pub fn set_mailbox_capacity(&self, capacity: impl Into<Option<usize>>) {
ward!(self.actor.as_ref().and_then(|o| o.as_actor()))
.set_mailbox_capacity_override(capacity.into());
}

/// Overrides the group's default restart policy, which set in the config.
///
/// Note: after restart the actor will be created from scratch, so this
/// override will be also reset to the group's default restart policy.
Expand Down
3 changes: 2 additions & 1 deletion elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ pub async fn do_start<F: Future>(
let actor = Actor::new(
meta.clone(),
addr,
Default::default(),
&<_>::default(),
<_>::default(),
Arc::new(SubscriptionManager::new(ctx.clone())),
);

Expand Down
74 changes: 63 additions & 11 deletions elfo-core/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ use crate::{
tracing::TraceId,
};

// === MailboxConfig ===

#[derive(Debug, PartialEq, serde::Deserialize)]
#[serde(default)]
pub(crate) struct MailboxConfig {
pub(crate) capacity: usize,
}

impl Default for MailboxConfig {
fn default() -> Self {
Self { capacity: 100 }
}
}

// === Mailbox ===

pub(crate) type Link = Links<EnvelopeHeader>;

assert_not_impl_any!(EnvelopeHeader: Unpin);
Expand Down Expand Up @@ -77,9 +93,6 @@ unsafe impl Linked<Link> for EnvelopeHeader {
}
}

// TODO: make configurable (a config + `ctx.set_mailbox_capacity(_)`).
const LIMIT: usize = 100_000;

pub(crate) struct Mailbox {
/// A storage for envelopes based on an intrusive linked list.
/// Note: `cordyceps` uses terms "head" and "tail" in the opposite way.
Expand All @@ -93,18 +106,52 @@ pub(crate) struct Mailbox {
// TODO: replace with `diatomic-waker` (3-5% faster).
rx_notify: CachePadded<Notify>,

/// Use `Mutex` here for synchronization on close/configure.
control: Mutex<Control>,
}

struct Control {
/// A trace ID that should be assigned once the mailbox is closed.
/// Use `Mutex` here for synchronization on close, more in `close()`.
closed_trace_id: Mutex<Option<TraceId>>,
closed_trace_id: Option<TraceId>,
/// A real capacity of the mailbox.
capacity: usize,
}

impl Mailbox {
pub(crate) fn new() -> Self {
pub(crate) fn new(config: &MailboxConfig) -> Self {
let capacity = clamp_capacity(config.capacity);

Self {
queue: MpscQueue::new_with_stub(Envelope::stub()),
tx_semaphore: Semaphore::new(LIMIT),
tx_semaphore: Semaphore::new(capacity),
rx_notify: CachePadded::new(Notify::new()),
closed_trace_id: Mutex::new(None),
control: Mutex::new(Control {
closed_trace_id: None,
capacity,
}),
}
}

pub(crate) fn set_capacity(&self, capacity: usize) {
let mut control = self.control.lock();

if capacity == control.capacity {
return;
}

if capacity < control.capacity {
let delta = control.capacity - capacity;
let real_delta = self.tx_semaphore.forget_permits(delta);

// Note that we cannot reduce the number of active permits
// (relates to messages that already stored in the queue) in tokio impl.
// Sadly, in such cases, we violate provided `capacity`.
debug_assert!(real_delta <= delta);
control.capacity -= real_delta;
} else {
let real_delta = clamp_capacity(capacity) - control.capacity;
self.tx_semaphore.add_permits(real_delta);
control.capacity += real_delta;
}
}

Expand Down Expand Up @@ -180,13 +227,13 @@ impl Mailbox {
// channel. If we take a lock after closing the channel, data race is
// possible when we try to `recv()` after the channel is closed, but
// before the `closed_trace_id` is assigned.
let mut closed_trace_id = self.closed_trace_id.lock();
let mut control = self.control.lock();

if self.tx_semaphore.is_closed() {
return false;
}

*closed_trace_id = Some(trace_id);
control.closed_trace_id = Some(trace_id);

self.tx_semaphore.close();
self.rx_notify.notify_one();
Expand All @@ -204,7 +251,8 @@ impl Mailbox {
match self.queue.dequeue() {
Some(envelope) => RecvResult::Data(envelope),
None => {
let trace_id = self.closed_trace_id.lock().expect("called before close()");
let control = self.control.lock();
let trace_id = control.closed_trace_id.expect("called before close()");
RecvResult::Closed(trace_id)
}
}
Expand All @@ -215,3 +263,7 @@ pub(crate) enum RecvResult {
Data(Envelope),
Closed(TraceId),
}

fn clamp_capacity(capacity: usize) -> usize {
capacity.min(Semaphore::MAX_PERMITS)
}
46 changes: 30 additions & 16 deletions elfo-core/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ pub(crate) struct Supervisor<R: Router<C>, C, X> {
objects: DashMap<R::Key, OwnedObject, FxBuildHasher>,
router: R,
exec: X,
control: CachePadded<RwLock<ControlBlock<C>>>,
control: CachePadded<RwLock<Control<C>>>,
scope_shared: Arc<ScopeGroupShared>,
status_subscription: Arc<SubscriptionManager>,
rt_manager: RuntimeManager,
}

struct ControlBlock<C> {
struct Control<C> {
system_config: Arc<SystemConfig>,
user_config: Option<Arc<C>>,
is_started: bool,
Expand Down Expand Up @@ -87,7 +87,7 @@ where
termination_policy: TerminationPolicy,
rt_manager: RuntimeManager,
) -> Self {
let control = ControlBlock {
let control = Control {
system_config: Default::default(),
user_config: None,
is_started: false,
Expand Down Expand Up @@ -303,6 +303,18 @@ where
.with_key(key.clone())
.with_config(user_config);

let meta = Arc::new(ActorMeta {
group: self.meta.group.clone(),
key: key_str,
});
let actor = Actor::new(
meta.clone(),
addr,
&system_config.mailbox,
self.termination_policy.clone(),
self.status_subscription.clone(),
);

drop(control);

let sv = self.clone();
Expand Down Expand Up @@ -389,19 +401,8 @@ where
sv.context.book().remove(addr);
};

let meta = Arc::new(ActorMeta {
group: self.meta.group.clone(),
key: key_str,
});

let rt = self.rt_manager.get(&meta);

let actor = Actor::new(
meta.clone(),
addr,
self.termination_policy.clone(),
self.status_subscription.clone(),
);
entry.insert(Object::new(addr, actor));

let scope = Scope::new(scope::trace_id(), addr, meta, self.scope_shared.clone())
Expand Down Expand Up @@ -436,17 +437,30 @@ where
}
}

fn update_config(&self, control: &mut ControlBlock<C>, config: &AnyConfig) {
fn update_config(&self, control: &mut Control<C>, config: &AnyConfig) {
let system = config.get_system();
self.scope_shared.configure(system);

let need_to_update_actors = control.system_config.mailbox != system.mailbox;

// Update user's config.
control.system_config = config.get_system().clone();
control.system_config = system.clone();
control.user_config = Some(config.get_user::<C>().clone());

self.router
.update(control.user_config.as_ref().expect("just saved"));

if need_to_update_actors {
for object in self.objects.iter() {
let actor = object
.value()
.as_actor()
.expect("a supervisor stores only actors");

actor.set_mailbox_capacity_config(system.mailbox.capacity);
}
}

self.in_scope(|| {
debug!(
message = "config updated",
Expand Down
1 change: 1 addition & 0 deletions elfo-test/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl Proxy {
/// Now it's implemented as multiple calls `yield_now()`,
/// but the implementation can be changed in the future.
pub async fn sync(&mut self) {
// TODO: it should probably be `request(Ping).await`.
for _ in 0..SYNC_YIELD_COUNT {
task::yield_now().await;
}
Expand Down
Loading

0 comments on commit 2bb04ed

Please sign in to comment.