Skip to content

Commit

Permalink
Fix syncing of P2P swap status updates between user canisters (#5230)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Jan 22, 2024
1 parent 58146fc commit cd11e1e
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 84 deletions.
4 changes: 4 additions & 0 deletions backend/canisters/escrow/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [unreleased]

### Changed

- Include `created_by` on `SwapStatusChange` messages ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230))

## [[2.0.1014](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1014-escrow)] - 2024-01-19

### Added
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/escrow/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct SwapStatusCompleted {
#[derive(Serialize, Deserialize, Debug)]
pub struct SwapStatusChange {
pub swap_id: u32,
pub created_by: UserId,
pub location: P2PSwapLocation,
pub status: SwapStatus,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn get_next(state: &mut RuntimeState) -> Option<(CanisterId, SwapStatusChange)>
canister_id,
SwapStatusChange {
swap_id: swap.id,
created_by: swap.created_by,
location: swap.location.clone(),
status: swap.status(state.env.now()),
},
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/user/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed

- Fix input amount display in p2p swaps ([#5223](https://github.com/open-chat-labs/open-chat/pull/5223))
- Fix syncing of P2P swap status updates between user canisters ([#5230](https://github.com/open-chat-labs/open-chat/pull/5230))

## [[2.0.1013](https://github.com/open-chat-labs/open-chat/releases/tag/v2.0.1013-user)] - 2024-01-18

Expand Down
43 changes: 12 additions & 31 deletions backend/canisters/user/impl/src/jobs/push_user_canister_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,25 @@ thread_local! {
}

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none()
&& (!state.data.user_canister_events_queue.is_empty() || state.data.user_canister_events_queue.sync_in_progress())
{
let timer_id = ic_cdk_timers::set_timer_interval(Duration::from_secs(10), run);
if TIMER_ID.get().is_none() && !state.data.user_canister_events_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'push_user_canister_events' job started");
true
} else {
false
}
}

fn run() {
match mutate_state(next_batch) {
NextBatchResult::Success(batch) => ic_cdk::spawn(process_batch(batch)),
NextBatchResult::Continue => {}
NextBatchResult::QueueEmpty => {
if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'push_user_canister_events' job stopped");
}
}
trace!("'push_user_canister_events' running");
TIMER_ID.set(None);
if let Some(batch) = mutate_state(next_batch) {
ic_cdk::spawn(process_batch(batch));
}
}

enum NextBatchResult {
Success(Vec<(CanisterId, Vec<UserCanisterEvent>)>),
Continue,
QueueEmpty,
}

fn next_batch(state: &mut RuntimeState) -> NextBatchResult {
if let Some(batch) = state.data.user_canister_events_queue.try_start_batch() {
NextBatchResult::Success(batch)
} else if !state.data.user_canister_events_queue.is_empty() || state.data.user_canister_events_queue.sync_in_progress() {
NextBatchResult::Continue
} else {
NextBatchResult::QueueEmpty
}
fn next_batch(state: &mut RuntimeState) -> Option<Vec<(CanisterId, Vec<UserCanisterEvent>)>> {
state.data.user_canister_events_queue.try_start_batch()
}

async fn process_batch(batch: Vec<(CanisterId, Vec<UserCanisterEvent>)>) {
Expand All @@ -60,7 +40,10 @@ async fn process_batch(batch: Vec<(CanisterId, Vec<UserCanisterEvent>)>) {

futures::future::join_all(futures).await;

mutate_state(|state| state.data.user_canister_events_queue.mark_batch_completed());
mutate_state(|state| {
state.data.user_canister_events_queue.mark_batch_completed();
start_job_if_required(state);
});
}

async fn push_events(canister_id: CanisterId, events: Vec<UserCanisterEvent>) {
Expand All @@ -74,8 +57,6 @@ async fn push_events(canister_id: CanisterId, events: Vec<UserCanisterEvent>) {
.data
.user_canister_events_queue
.mark_sync_failed_for_canister(canister_id, events);

start_job_if_required(state);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ fn c2c_notify_p2p_swap_status_change_impl(args: Args, state: &mut RuntimeState)
let P2PSwapLocation::Message(m) = args.location;

if let Chat::Direct(chat_id) = m.chat {
let my_user_id = state.env.canister_id().into();
let chat_id = if args.created_by == my_user_id { chat_id } else { args.created_by.into() };

if let Some(chat) = state.data.direct_chats.get_mut(&chat_id) {
let mut status_to_push_c2c = None;

Expand Down
102 changes: 51 additions & 51 deletions backend/integration_tests/src/p2p_swap_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,27 @@ fn p2p_swap_in_direct_chat_succeeds() {
1_000_000_000
);

let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()])
let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user1_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id));
}
}
verify_swap_status(
user1_event,
|status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id),
);

let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()])
let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user2_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id));
}
}
verify_swap_status(
user2_event,
|status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id),
);
}

#[test]
Expand Down Expand Up @@ -218,15 +216,10 @@ fn p2p_swap_in_group_succeeds() {
.unwrap()
.event;

if let ChatEvent::Message(m) = event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id));
} else {
panic!();
}
} else {
panic!()
}
verify_swap_status(
event,
|status| matches!(status, P2PSwapStatus::Completed(c) if c.accepted_by == user2.user_id),
);
}

#[test_case(true)]
Expand Down Expand Up @@ -327,29 +320,27 @@ fn cancel_p2p_swap_in_direct_chat_succeeds(delete_message: bool) {
);

if !delete_message {
let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()])
let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user1_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()));
}
}
verify_swap_status(
user1_event,
|status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()),
);

let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()])
let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user2_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()));
}
}
verify_swap_status(
user2_event,
|status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()),
);
}
}

Expand Down Expand Up @@ -463,11 +454,10 @@ fn cancel_p2p_swap_in_group_chat_succeeds(delete_message: bool) {
.unwrap()
.event;

if let ChatEvent::Message(m) = event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()));
}
}
verify_swap_status(
event,
|status| matches!(status, P2PSwapStatus::Cancelled(c) if c.token0_txn_out.is_some()),
);
}
}

Expand Down Expand Up @@ -525,34 +515,44 @@ fn deposit_refunded_if_swap_expires() {
));

env.advance_time(Duration::from_millis(DAY_IN_MS));
tick_many(env, 5);
tick_many(env, 10);

assert_eq!(
client::icrc1::happy_path::balance_of(env, canister_ids.chat_ledger, Principal::from(user1.user_id)),
original_chat_balance - (2 * Cryptocurrency::CHAT.fee().unwrap())
);

let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![0.into()])
let user1_event = client::user::happy_path::events_by_index(env, &user1, user2.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user1_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()));
}
}
verify_swap_status(
user1_event,
|status| matches!(status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()),
);

let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![0.into()])
let user2_event = client::user::happy_path::events_by_index(env, &user2, user1.user_id, vec![1.into()])
.events
.pop()
.unwrap()
.event;

if let ChatEvent::Message(m) = user2_event {
if let MessageContent::P2PSwap(p) = m.content {
assert!(matches!(p.status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()));
}
}
verify_swap_status(
user2_event,
|status| matches!(status, P2PSwapStatus::Expired(e) if e.token0_txn_out.is_some()),
);
}

fn verify_swap_status<F: FnOnce(&P2PSwapStatus) -> bool>(event: ChatEvent, predicate: F) {
let ChatEvent::Message(m) = event else {
panic!("Event is not a message. Event: {event:?}")
};

let MessageContent::P2PSwap(p) = m.content else {
panic!("Message is not a P2PSwap. Message: {:?}", m.content)
};

assert!(predicate(&p.status), "{:?}", p.status);
}
5 changes: 3 additions & 2 deletions backend/libraries/utils/src/canister_timers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ic_cdk_timers::TimerId;
use std::time::Duration;

pub fn run_now_then_interval(interval: Duration, func: fn()) {
ic_cdk_timers::set_timer_interval(interval, func);
pub fn run_now_then_interval(interval: Duration, func: fn()) -> TimerId {
ic_cdk_timers::set_timer(Duration::ZERO, func);
ic_cdk_timers::set_timer_interval(interval, func)
}

0 comments on commit cd11e1e

Please sign in to comment.