Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Oct 24, 2023
1 parent 1d8f73e commit 02695c9
Showing 1 changed file with 198 additions and 7 deletions.
205 changes: 198 additions & 7 deletions tap_agent/src/tap/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use tap_core::{
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest,
tap_receipt::ReceiptCheck,
};
use tokio::sync::Mutex;
use tokio::{
sync::{Mutex, MutexGuard},
task::JoinHandle,
};

use super::managers::NewReceiptNotification;
use crate::{
Expand Down Expand Up @@ -60,7 +63,7 @@ struct Inner {

pub struct Manager {
inner: Arc<Inner>,
rav_requester_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
rav_requester_task: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}

impl Manager {
Expand Down Expand Up @@ -120,7 +123,7 @@ impl Manager {
state: Arc::new(Mutex::new(State::Running)),
config,
}),
rav_requester_task: Mutex::new(None),
rav_requester_task: Arc::new(Mutex::new(None)),
}
}

Expand Down Expand Up @@ -164,7 +167,7 @@ impl Manager {
let mut rav_requester_task = self.rav_requester_task.lock().await;
// TODO: consider making the trigger per sender, instead of per (sender, allocation).
if unaggregated_fees.value >= self.inner.config.tap.rav_request_trigger_value.into()
&& rav_requester_task.is_none()
&& !Self::rav_requester_task_is_running(&rav_requester_task)
{
*rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone())));
}
Expand All @@ -173,7 +176,7 @@ impl Manager {

pub async fn start_last_rav_request(&self) {
let mut rav_requester_task = self.rav_requester_task.lock().await;
if rav_requester_task.is_none() {
if !Self::rav_requester_task_is_running(&rav_requester_task) {
*rav_requester_task = Some(tokio::spawn(Self::rav_requester(self.inner.clone())));
}
}
Expand Down Expand Up @@ -379,13 +382,38 @@ impl Manager {
};
anyhow::Ok(())
}

fn rav_requester_task_is_running(
rav_requester_task_lock: &MutexGuard<'_, Option<JoinHandle<()>>>,
) -> bool {
if let Some(handle) = rav_requester_task_lock.as_ref() {
!handle.is_finished()
} else {
false
}
}
}

impl Drop for Manager {
/// Trying to make sure the RAV requester task is dropped when the manager is dropped.
fn drop(&mut self) {
let rav_requester_task = self.rav_requester_task.clone();

tokio::spawn(async move {
let mut rav_requester_task = rav_requester_task.lock().await;
if let Some(rav_requester_task) = rav_requester_task.take() {
rav_requester_task.abort();
}
});
}
}

#[cfg(test)]
mod tests {

use serde_json::json;
use tap_aggregator::server::run_server;
use tap_core::tap_manager::SignedRAV;
use wiremock::{
matchers::{body_string_contains, method, path},
Mock, MockServer, ResponseTemplate,
Expand Down Expand Up @@ -575,9 +603,8 @@ mod tests {
);
}

/// Test the RAV requester
#[sqlx::test(migrations = "../migrations")]
async fn test_rav_requester(pgpool: PgPool) {
async fn test_rav_requester_manual(pgpool: PgPool) {
// Start a TAP aggregator server.
let (handle, aggregator_endpoint) =
run_server(0, SENDER.0.clone(), domain(), 100 * 1024, 100 * 1024, 1)
Expand Down Expand Up @@ -627,4 +654,168 @@ mod tests {
handle.stop().unwrap();
handle.stopped().await;
}

#[sqlx::test(migrations = "../migrations")]
async fn test_rav_requester_auto(pgpool: PgPool) {
// Start a TAP aggregator server.
let (handle, aggregator_endpoint) =
run_server(0, SENDER.0.clone(), domain(), 100 * 1024, 100 * 1024, 1)
.await
.unwrap();

// Start a mock graphql server using wiremock
let mock_server = MockServer::start().await;

// Mock result for TAP redeem txs for (allocation, sender) pair.
mock_server
.register(
Mock::given(method("POST"))
.and(path("/escrow-subgraph"))
.and(body_string_contains("transactions"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(json!({ "data": { "transactions": []}})),
),
)
.await;

// Create a manager.
let manager = create_manager(
pgpool.clone(),
"http://".to_owned() + &aggregator_endpoint.to_string(),
(mock_server.uri() + "/escrow-subgraph").as_str(),
)
.await;

// Add receipts to the database and call the `handle_new_receipt_notification` method
// correspondingly.
let mut total_value = 0;
let mut trigger_value = 0;
for i in 0..10 {
// These values should be enough to trigger a RAV request at i == 7 since we set the
// `rav_request_trigger_value` to 100.
let value = (i + 10) as u128;

let receipt =
create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, value, i).await;
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
manager
.handle_new_receipt_notification(NewReceiptNotification {
allocation_id: *ALLOCATION_ID,
sender_address: SENDER.1,
id: i,
timestamp_ns: i + 1,
value,
})
.await;

total_value += value;
if total_value >= 100 && trigger_value == 0 {
trigger_value = total_value;
}
}

// Wait for the RAV requester to finish.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _ = manager.rav_requester_task.lock().await;

// Get the latest RAV from the database.
let latest_rav = sqlx::query!(
r#"
SELECT rav
FROM scalar_tap_latest_ravs
WHERE allocation_id = $1 AND sender_address = $2
"#,
ALLOCATION_ID
.to_string()
.strip_prefix("0x")
.unwrap()
.to_owned(),
SENDER.1.to_string().strip_prefix("0x").unwrap().to_owned()
)
.fetch_optional(&pgpool)
.await
.map(|r| r.map(|r| r.rav))
.unwrap();

let latest_rav = latest_rav
.map(|r| serde_json::from_value::<SignedRAV>(r).unwrap())
.unwrap();

// Check that the latest RAV value is correct.
assert!(latest_rav.message.value_aggregate >= trigger_value);

// Check that the unaggregated fees value is reduced.
assert!(manager.inner.unaggregated_fees.lock().await.value <= trigger_value);

// Reset the total value and trigger value.
total_value = manager.inner.unaggregated_fees.lock().await.value;
trigger_value = 0;

// Add more receipts
for i in 10..20 {
let value = (i + 10) as u128;

let receipt =
create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await;
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();

manager
.handle_new_receipt_notification(NewReceiptNotification {
allocation_id: *ALLOCATION_ID,
sender_address: SENDER.1,
id: i,
timestamp_ns: i + 1,
value,
})
.await;

total_value += value;
if total_value >= 100 && trigger_value == 0 {
trigger_value = total_value;
}
}

// Wait for the RAV requester to finish.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let _ = manager.rav_requester_task.lock().await;

// Get the latest RAV from the database.
let latest_rav = sqlx::query!(
r#"
SELECT rav
FROM scalar_tap_latest_ravs
WHERE allocation_id = $1 AND sender_address = $2
"#,
ALLOCATION_ID
.to_string()
.strip_prefix("0x")
.unwrap()
.to_owned(),
SENDER.1.to_string().strip_prefix("0x").unwrap().to_owned()
)
.fetch_optional(&pgpool)
.await
.map(|r| r.map(|r| r.rav))
.unwrap();

let latest_rav = latest_rav
.map(|r| serde_json::from_value::<SignedRAV>(r).unwrap())
.unwrap();

// Check that the latest RAV value is correct.

assert!(latest_rav.message.value_aggregate >= trigger_value);

// Check that the unaggregated fees value is reduced.
assert!(manager.inner.unaggregated_fees.lock().await.value <= trigger_value);

// Stop the TAP aggregator server.
handle.stop().unwrap();
handle.stopped().await;
}
}

0 comments on commit 02695c9

Please sign in to comment.