Skip to content

Commit

Permalink
fix: instrument futures, not joinhandles (#223)
Browse files Browse the repository at this point in the history
* fix: instrument futures, not joinhandles

* chore: Changelogs
  • Loading branch information
prestwich authored Nov 15, 2022
1 parent 73e5690 commit 9f0b429
Show file tree
Hide file tree
Showing 20 changed files with 619 additions and 572 deletions.
2 changes: 2 additions & 0 deletions agents/kathy/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Unreleased

- fix: instrument futures, not joinhandles

### [email protected]

- make \*Settings::new async for optionally fetching config from a remote url
Expand Down
87 changes: 44 additions & 43 deletions agents/kathy/src/kathy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use color_eyre::Result;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tracing::instrument::Instrumented;
use tracing::{info, Instrument};

use ethers::core::types::H256;
Expand Down Expand Up @@ -81,51 +80,53 @@ impl NomadAgent for Kathy {
}

#[tracing::instrument]
fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
let home = channel.home();
let destination = channel.replica().local_domain();
let mut generator = channel.generator;
let home_lock = channel.home_lock;
let messages_dispatched = channel.messages_dispatched;
let interval = channel.interval;

loop {
let msg = generator.gen_chat();
let recipient = generator.gen_recipient();

match msg {
Some(body) => {
let message = Message {
destination,
recipient,
body,
};
info!(
target: "outgoing_messages",
"Enqueuing message of length {} to {}::{}",
length = message.body.len(),
destination = message.destination,
recipient = message.recipient
);

let guard = home_lock.lock().await;
home.dispatch(&message).await?;

messages_dispatched.inc();

drop(guard);
fn run(channel: Self::Channel) -> JoinHandle<Result<()>> {
tokio::spawn(
async move {
let home = channel.home();
let destination = channel.replica().local_domain();
let mut generator = channel.generator;
let home_lock = channel.home_lock;
let messages_dispatched = channel.messages_dispatched;
let interval = channel.interval;

loop {
let msg = generator.gen_chat();
let recipient = generator.gen_recipient();

match msg {
Some(body) => {
let message = Message {
destination,
recipient,
body,
};
info!(
target: "outgoing_messages",
"Enqueuing message of length {} to {}::{}",
length = message.body.len(),
destination = message.destination,
recipient = message.recipient
);

let guard = home_lock.lock().await;
home.dispatch(&message).await?;

messages_dispatched.inc();

drop(guard);
}
_ => {
info!("Reached the end of the static message queue. Shutting down.");
return Ok(());
}
}
_ => {
info!("Reached the end of the static message queue. Shutting down.");
return Ok(());
}
}

sleep(Duration::from_secs(interval)).await;
sleep(Duration::from_secs(interval)).await;
}
}
})
.in_current_span()
.in_current_span(),
)
}
}

Expand Down
1 change: 1 addition & 0 deletions agents/processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Unreleased

- use `std::fmt::Display` to log contracts
- fix: instrument futures, not joinhandles

### [email protected]

Expand Down
121 changes: 62 additions & 59 deletions agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use std::{
time::Duration,
};
use tokio::{sync::RwLock, task::JoinHandle, time::sleep};
use tracing::{
debug, error, info, info_span, instrument, instrument::Instrumented, warn, Instrument,
};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};

use nomad_base::{
cancel_task, decl_agent, decl_channel, AgentCore, CachingHome, CachingReplica,
Expand Down Expand Up @@ -370,79 +368,84 @@ impl NomadAgent for Processor {
}
}

fn run(channel: Self::Channel) -> Instrumented<JoinHandle<Result<()>>> {
tokio::spawn(async move {
Replica {
interval: channel.interval,
replica: channel.replica(),
home: channel.home(),
db: channel.db(),
allowed: channel.allowed,
denied: channel.denied,
next_message_nonce: channel.next_message_nonce,
fn run(channel: Self::Channel) -> JoinHandle<Result<()>> {
tokio::spawn(
async move {
Replica {
interval: channel.interval,
replica: channel.replica(),
home: channel.home(),
db: channel.db(),
allowed: channel.allowed,
denied: channel.denied,
next_message_nonce: channel.next_message_nonce,
}
.main()
.await?
}
.main()
.await?
})
.in_current_span()
.in_current_span(),
)
}

fn run_all(self) -> Instrumented<JoinHandle<Result<()>>>
fn run_all(self) -> JoinHandle<Result<()>>
where
Self: Sized + 'static,
{
tokio::spawn(async move {
self.assert_home_not_failed().await??;
tokio::spawn(
async move {
self.assert_home_not_failed().await??;

info!("Starting Processor tasks");
info!("Starting Processor tasks");

// tree sync
info!("Starting ProverSync");
let db = NomadDB::new(self.home().name(), self.db());
let sync = ProverSync::from_disk(db.clone());
let prover_sync_task = sync.spawn();
// tree sync
info!("Starting ProverSync");
let db = NomadDB::new(self.home().name(), self.db());
let sync = ProverSync::from_disk(db.clone());
let prover_sync_task = sync.spawn();

info!("Starting indexer");
let home_sync_task = self.home().sync();
info!("Starting indexer");
let home_sync_task = self.home().sync();

let home_fail_watch_task = self.watch_home_fail(self.interval);
let home_fail_watch_task = self.watch_home_fail(self.interval);

info!("started indexer, sync and home fail watch");
info!("started indexer, sync and home fail watch");

// instantiate task array here so we can optionally push run_task
let mut tasks = vec![home_sync_task, prover_sync_task, home_fail_watch_task];
// instantiate task array here so we can optionally push run_task
let mut tasks = vec![home_sync_task, prover_sync_task, home_fail_watch_task];

if !self.subsidized_remotes.is_empty() {
// Get intersection of specified remotes (replicas in settings)
// and subsidized remotes
let specified_subsidized: Vec<&str> = self
.subsidized_remotes
.iter()
.filter(|r| self.replicas().contains_key(*r))
.map(AsRef::as_ref)
.collect();
if !self.subsidized_remotes.is_empty() {
// Get intersection of specified remotes (replicas in settings)
// and subsidized remotes
let specified_subsidized: Vec<&str> = self
.subsidized_remotes
.iter()
.filter(|r| self.replicas().contains_key(*r))
.map(AsRef::as_ref)
.collect();

if !specified_subsidized.is_empty() {
tasks.push(self.run_many(&specified_subsidized));
if !specified_subsidized.is_empty() {
tasks.push(self.run_many(&specified_subsidized));
}
}
}

// if we have a bucket, add a task to push to it
if let Some(config) = &self.config {
info!(bucket = %config.bucket, "Starting S3 push tasks");
let pusher = Pusher::new(self.core.home.name(), &config.bucket, db.clone()).await;
tasks.push(pusher.spawn())
}
// if we have a bucket, add a task to push to it
if let Some(config) = &self.config {
info!(bucket = %config.bucket, "Starting S3 push tasks");
let pusher =
Pusher::new(self.core.home.name(), &config.bucket, db.clone()).await;
tasks.push(pusher.spawn())
}

// find the first task to shut down. Then cancel all others
debug!(tasks = tasks.len(), "Selecting across Processor tasks");
let (res, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
cancel_task!(task);
}
// find the first task to shut down. Then cancel all others
debug!(tasks = tasks.len(), "Selecting across Processor tasks");
let (res, _, remaining) = select_all(tasks).await;
for task in remaining.into_iter() {
cancel_task!(task);
}

res?
})
.instrument(info_span!("Processor::run_all"))
res?
}
.instrument(info_span!("Processor::run_all")),
)
}
}
99 changes: 51 additions & 48 deletions agents/processor/src/prover_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{
task::JoinHandle,
time::{sleep, timeout},
};
use tracing::{debug, error, info, info_span, instrument, instrument::Instrumented, Instrument};
use tracing::{debug, error, info, info_span, instrument, Instrument};

/// Struct to sync prover.
#[derive(Debug)]
Expand Down Expand Up @@ -181,59 +181,62 @@ impl ProverSync {
/// local merkle tree with all leaves between local root and
/// new root. Use short interval for bootup syncing and longer
/// interval for regular polling.
pub fn spawn(mut self) -> Instrumented<JoinHandle<Result<()>>> {
pub fn spawn(mut self) -> JoinHandle<Result<()>> {
let span = info_span!("ProverSync", self = %self);
tokio::spawn(async move {
loop {
// Try to retrieve new signed update
let local_root = self.local_root();
let signed_update_opt = self.db.update_by_previous_root(local_root)?;

if let Some(signed_update) = signed_update_opt {
let previous_root = signed_update.update.previous_root;
let new_root = signed_update.update.new_root;

info!(
previous_root = ?previous_root,
new_root = ?new_root,
"Have signed update from {} to {}",
previous_root,
new_root
);

// Update in-memory prover tree until local tree root
// matches newly found new_root
let pre_update_size = self.prover.count();
self.update_prover_tree(new_root).await?;

// Double check that update new root now equals current prover root
let current_root = self.prover.root();
if current_root != new_root {
bail!(ProverSyncError::MismatchedRoots {
local_root: current_root,
new_root,
});
}
tokio::spawn(
async move {
loop {
// Try to retrieve new signed update
let local_root = self.local_root();
let signed_update_opt = self.db.update_by_previous_root(local_root)?;

if let Some(signed_update) = signed_update_opt {
let previous_root = signed_update.update.previous_root;
let new_root = signed_update.update.new_root;

info!(
previous_root = ?previous_root,
new_root = ?new_root,
"Have signed update from {} to {}",
previous_root,
new_root
);

// Update in-memory prover tree until local tree root
// matches newly found new_root
let pre_update_size = self.prover.count();
self.update_prover_tree(new_root).await?;

// Double check that update new root now equals current prover root
let current_root = self.prover.root();
if current_root != new_root {
bail!(ProverSyncError::MismatchedRoots {
local_root: current_root,
new_root,
});
}

// Ensure there is a proof in the db for all leaves
for idx in pre_update_size..self.prover.count() {
if self.db.proof_by_leaf_index(idx as u32)?.is_none() {
self.store_proof(idx as u32)?;
// Ensure there is a proof in the db for all leaves
for idx in pre_update_size..self.prover.count() {
if self.db.proof_by_leaf_index(idx as u32)?.is_none() {
self.store_proof(idx as u32)?;
}
}

// Store latest root for which we know we have all leaves/
// proofs for
self.db.store_prover_latest_committed(new_root)?;
} else if !local_root.is_zero()
&& self.db.update_by_new_root(local_root)?.is_none()
{
bail!(ProverSyncError::InvalidLocalRoot { local_root });
}

// Store latest root for which we know we have all leaves/
// proofs for
self.db.store_prover_latest_committed(new_root)?;
} else if !local_root.is_zero() && self.db.update_by_new_root(local_root)?.is_none()
{
bail!(ProverSyncError::InvalidLocalRoot { local_root });
// kludge
sleep(Duration::from_millis(100)).await;
}

// kludge
sleep(Duration::from_millis(100)).await;
}
})
.instrument(span)
.instrument(span),
)
}
}
Loading

0 comments on commit 9f0b429

Please sign in to comment.