From cd11e1e8789adbbbd328f2705d73e1a43b5a6d60 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Mon, 22 Jan 2024 14:31:00 +0000 Subject: [PATCH] Fix syncing of P2P swap status updates between user canisters (#5230) --- backend/canisters/escrow/CHANGELOG.md | 4 + backend/canisters/escrow/api/src/lib.rs | 1 + .../impl/src/jobs/notify_status_change.rs | 1 + backend/canisters/user/CHANGELOG.md | 1 + .../src/jobs/push_user_canister_events.rs | 43 +++----- .../c2c_notify_p2p_swap_status_change.rs | 3 + .../integration_tests/src/p2p_swap_tests.rs | 102 +++++++++--------- .../libraries/utils/src/canister_timers.rs | 5 +- 8 files changed, 76 insertions(+), 84 deletions(-) diff --git a/backend/canisters/escrow/CHANGELOG.md b/backend/canisters/escrow/CHANGELOG.md index 5f5accc953..515c769b7a 100644 --- a/backend/canisters/escrow/CHANGELOG.md +++ b/backend/canisters/escrow/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Changed + +- Include `created_by` on `SwapStatusChange` messages ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230)) + ## [[2.0.1014](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1014-escrow)] - 2024-01-19 ### Added diff --git a/backend/canisters/escrow/api/src/lib.rs b/backend/canisters/escrow/api/src/lib.rs index 13051ee926..d58d5b1d6d 100644 --- a/backend/canisters/escrow/api/src/lib.rs +++ b/backend/canisters/escrow/api/src/lib.rs @@ -49,6 +49,7 @@ pub struct SwapStatusCompleted { #[derive(Serialize, Deserialize, Debug)] pub struct SwapStatusChange { pub swap_id: u32, + pub created_by: UserId, pub location: P2PSwapLocation, pub status: SwapStatus, } diff --git a/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs index 59b4efc48c..218b317573 100644 --- a/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs +++ b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs @@ -39,6 +39,7 @@ fn get_next(state: &mut RuntimeState) -> Option<(CanisterId, SwapStatusChange)> canister_id, SwapStatusChange { swap_id: swap.id, + created_by: swap.created_by, location: swap.location.clone(), status: swap.status(state.env.now()), }, diff --git a/backend/canisters/user/CHANGELOG.md b/backend/canisters/user/CHANGELOG.md index 4e54a140d4..7bb164c22a 100644 --- a/backend/canisters/user/CHANGELOG.md +++ b/backend/canisters/user/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Fix input amount display in p2p swaps ([#5223](https://github.com/open-chat-labs/open-chat/pull/5223)) +- Fix syncing of P2P swap status updates between user canisters ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230)) ## [[2.0.1013](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1013-user)] - 2024-01-18 diff --git a/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs b/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs index c695b1f317..0944b531c7 100644 --- a/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs +++ b/backend/canisters/user/impl/src/jobs/push_user_canister_events.rs @@ -11,12 +11,9 @@ thread_local! { } pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { - if TIMER_ID.get().is_none() - && (!state.data.user_canister_events_queue.is_empty() || state.data.user_canister_events_queue.sync_in_progress()) - { - let timer_id = ic_cdk_timers::set_timer_interval(Duration::from_secs(10), run); + if TIMER_ID.get().is_none() && !state.data.user_canister_events_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run); TIMER_ID.set(Some(timer_id)); - trace!("'push_user_canister_events' job started"); true } else { false @@ -24,32 +21,15 @@ pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { } fn run() { - match mutate_state(next_batch) { - NextBatchResult::Success(batch) => ic_cdk::spawn(process_batch(batch)), - NextBatchResult::Continue => {} - NextBatchResult::QueueEmpty => { - if let Some(timer_id) = TIMER_ID.take() { - ic_cdk_timers::clear_timer(timer_id); - trace!("'push_user_canister_events' job stopped"); - } - } + trace!("'push_user_canister_events' running"); + TIMER_ID.set(None); + if let Some(batch) = mutate_state(next_batch) { + ic_cdk::spawn(process_batch(batch)); } } -enum NextBatchResult { - Success(Vec<(CanisterId, Vec)>), - Continue, - QueueEmpty, -} - -fn next_batch(state: &mut RuntimeState) -> NextBatchResult { - if let Some(batch) = state.data.user_canister_events_queue.try_start_batch() { - NextBatchResult::Success(batch) - } else if !state.data.user_canister_events_queue.is_empty() || state.data.user_canister_events_queue.sync_in_progress() { - NextBatchResult::Continue - } else { - NextBatchResult::QueueEmpty - } +fn next_batch(state: &mut RuntimeState) -> Option)>> { + state.data.user_canister_events_queue.try_start_batch() } async fn process_batch(batch: Vec<(CanisterId, Vec)>) { @@ -60,7 +40,10 @@ async fn process_batch(batch: Vec<(CanisterId, Vec)>) { futures::future::join_all(futures).await; - mutate_state(|state| state.data.user_canister_events_queue.mark_batch_completed()); + mutate_state(|state| { + state.data.user_canister_events_queue.mark_batch_completed(); + start_job_if_required(state); + }); } async fn push_events(canister_id: CanisterId, events: Vec) { @@ -74,8 +57,6 @@ async fn push_events(canister_id: CanisterId, events: Vec) { .data .user_canister_events_queue .mark_sync_failed_for_canister(canister_id, events); - - start_job_if_required(state); }); } } diff --git a/backend/canisters/user/impl/src/updates/c2c_notify_p2p_swap_status_change.rs b/backend/canisters/user/impl/src/updates/c2c_notify_p2p_swap_status_change.rs index 65ec5d36d6..18acc922c2 100644 --- a/backend/canisters/user/impl/src/updates/c2c_notify_p2p_swap_status_change.rs +++ b/backend/canisters/user/impl/src/updates/c2c_notify_p2p_swap_status_change.rs @@ -18,6 +18,9 @@ fn c2c_notify_p2p_swap_status_change_impl(args: Args, state: &mut RuntimeState) let P2PSwapLocation::Message(m) = args.location; if let Chat::Direct(chat_id) = m.chat { + let my_user_id = state.env.canister_id().into(); + let chat_id = if args.created_by == my_user_id { chat_id } else { args.created_by.into() }; + if let Some(chat) = state.data.direct_chats.get_mut(&chat_id) { let mut status_to_push_c2c = None; diff --git a/backend/integration_tests/src/p2p_swap_tests.rs b/backend/integration_tests/src/p2p_swap_tests.rs index 081a25efc4..1b0fda9e8d 100644 --- a/backend/integration_tests/src/p2p_swap_tests.rs +++ b/backend/integration_tests/src/p2p_swap_tests.rs @@ -96,29 +96,27 @@ fn p2p_swap_in_direct_chat_succeeds() { 1_000_000_000 ); - let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()]) + let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user1_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id)); - } - } + verify_swap_status( + user1_event, + |status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id), + ); - let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()]) + let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user2_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id)); - } - } + verify_swap_status( + user2_event, + |status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id), + ); } #[test] @@ -218,15 +216,10 @@ fn p2p_swap_in_group_succeeds() { .unwrap() .event; - if let ChatEvent::Message(m) = event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id)); - } else { - panic!(); - } - } else { - panic!() - } + verify_swap_status( + event, + |status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id), + ); } #[test_case(true)] @@ -327,29 +320,27 @@ fn cancel_p2p_swap_in_direct_chat_succeeds(delete_message: bool) { ); if !delete_message { - let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()]) + let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user1_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some())); - } - } + verify_swap_status( + user1_event, + |status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()), + ); - let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()]) + let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user2_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some())); - } - } + verify_swap_status( + user2_event, + |status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()), + ); } } @@ -463,11 +454,10 @@ fn cancel_p2p_swap_in_group_chat_succeeds(delete_message: bool) { .unwrap() .event; - if let ChatEvent::Message(m) = event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some())); - } - } + verify_swap_status( + event, + |status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()), + ); } } @@ -525,34 +515,44 @@ fn deposit_refunded_if_swap_expires() { )); env.advance_time(Duration::from_millis(DAY_IN_MS)); - tick_many(env, 5); + tick_many(env, 10); assert_eq!( client::icrc1::happy_path::balance_of(env, canister_ids.chat_ledger, Principal::from(user1.user_id)), original_chat_balance - (2 * Cryptocurrency::CHAT.fee().unwrap()) ); - let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()]) + let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user1_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some())); - } - } + verify_swap_status( + user1_event, + |status| matches!(status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()), + ); - let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()]) + let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()]) .events .pop() .unwrap() .event; - if let ChatEvent::Message(m) = user2_event { - if let MessageContent::P2PSwap(p) = m.content { - assert!(matches!(p.status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some())); - } - } + verify_swap_status( + user2_event, + |status| matches!(status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()), + ); +} + +fn verify_swap_status bool>(event: ChatEvent, predicate: F) { + let ChatEvent::Message(m) = event else { + panic!("Event is not a message. Event: {event:?}") + }; + + let MessageContent::P2PSwap(p) = m.content else { + panic!("Message is not a P2PSwap. Message: {:?}", m.content) + }; + + assert!(predicate(&p.status), "{:?}", p.status); } diff --git a/backend/libraries/utils/src/canister_timers.rs b/backend/libraries/utils/src/canister_timers.rs index 6517ea1b5d..faf2dc26df 100644 --- a/backend/libraries/utils/src/canister_timers.rs +++ b/backend/libraries/utils/src/canister_timers.rs @@ -1,6 +1,7 @@ +use ic_cdk_timers::TimerId; use std::time::Duration; -pub fn run_now_then_interval(interval: Duration, func: fn()) { - ic_cdk_timers::set_timer_interval(interval, func); +pub fn run_now_then_interval(interval: Duration, func: fn()) -> TimerId { ic_cdk_timers::set_timer(Duration::ZERO, func); + ic_cdk_timers::set_timer_interval(interval, func) }