diff --git a/Cargo.lock b/Cargo.lock index 1993c306cc..1474eae484 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2069,6 +2069,7 @@ version = "0.1.0" dependencies = [ "candid", "canister_api_macros", + "canister_client", "canister_logger", "canister_state_macros", "canister_tracing_macros", diff --git a/backend/canisters/escrow/CHANGELOG.md b/backend/canisters/escrow/CHANGELOG.md index 9b325cc3ac..37bdac1cd0 100644 --- a/backend/canisters/escrow/CHANGELOG.md +++ b/backend/canisters/escrow/CHANGELOG.md @@ -11,3 +11,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Implement `create_offer` and `notify_deposit` ([#4904](https://github.com/open-chat-labs/open-chat/pull/4904)) - Transfer out funds once trade is complete ([#4906](https://github.com/open-chat-labs/open-chat/pull/4906)) - Implement `cancel_offer` ([#4907](https://github.com/open-chat-labs/open-chat/pull/4907)) +- Support notifying a chosen canister when trade is completed ([#5167](https://github.com/open-chat-labs/open-chat/pull/5167)) diff --git a/backend/canisters/escrow/api/src/updates/create_offer.rs b/backend/canisters/escrow/api/src/updates/create_offer.rs index e093033aea..430f452641 100644 --- a/backend/canisters/escrow/api/src/updates/create_offer.rs +++ b/backend/canisters/escrow/api/src/updates/create_offer.rs @@ -1,6 +1,6 @@ use candid::CandidType; use serde::{Deserialize, Serialize}; -use types::{TimestampMillis, TokenInfo}; +use types::{CanisterId, TimestampMillis, TokenInfo}; #[derive(CandidType, Serialize, Deserialize, Debug)] pub struct Args { @@ -9,6 +9,7 @@ pub struct Args { pub output_token: TokenInfo, pub output_amount: u128, pub expires_at: TimestampMillis, + pub canister_to_notify: Option, } #[derive(CandidType, Serialize, Deserialize, Debug)] diff --git a/backend/canisters/escrow/impl/Cargo.toml b/backend/canisters/escrow/impl/Cargo.toml index e3b9e7ed85..f0d1ca74f8 100644 --- a/backend/canisters/escrow/impl/Cargo.toml +++ b/backend/canisters/escrow/impl/Cargo.toml @@ -12,6 +12,7 @@ crate-type = ["cdylib"] [dependencies] candid = { workspace = true } canister_api_macros = { path = "../../../libraries/canister_api_macros" } +canister_client = { path = "../../../libraries/canister_client" } canister_logger = { path = "../../../libraries/canister_logger" } canister_state_macros = { path = "../../../libraries/canister_state_macros" } canister_tracing_macros = { path = "../../../libraries/canister_tracing_macros" } diff --git a/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs b/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs index 1a587fa69a..c6ce490424 100644 --- a/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs +++ b/backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs @@ -71,7 +71,20 @@ async fn process_payment(pending_payment: PendingPayment) { created: created_at_time, block_index, }; - offer.transfers_out.push(transfer); + match pending_payment.reason { + PendingPaymentReason::Trade(_) => { + if pending_payment.token_info.ledger == offer.token0.ledger { + offer.token0_transfer_out = Some(transfer); + } else { + offer.token1_transfer_out = Some(transfer); + } + if offer.is_complete() { + state.data.notify_status_change_queue.push(offer.id); + crate::jobs::notify_status_change::start_job_if_required(state); + } + } + PendingPaymentReason::Refund => offer.refunds.push(transfer), + }; } }); } diff --git a/backend/canisters/escrow/impl/src/jobs/mod.rs b/backend/canisters/escrow/impl/src/jobs/mod.rs index daeac2a616..b117c1d75a 100644 --- a/backend/canisters/escrow/impl/src/jobs/mod.rs +++ b/backend/canisters/escrow/impl/src/jobs/mod.rs @@ -1,7 +1,9 @@ use crate::RuntimeState; pub mod make_pending_payments; +pub mod notify_status_change; pub(crate) fn start(state: &RuntimeState) { make_pending_payments::start_job_if_required(state); + notify_status_change::start_job_if_required(state); } diff --git a/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs new file mode 100644 index 0000000000..adb819da23 --- /dev/null +++ b/backend/canisters/escrow/impl/src/jobs/notify_status_change.rs @@ -0,0 +1,63 @@ +use crate::{mutate_state, RuntimeState}; +use canister_client::make_c2c_call; +use ic_cdk_timers::TimerId; +use std::cell::Cell; +use std::time::Duration; +use tracing::trace; +use types::{CanisterId, OfferStatus, OfferStatusChange}; + +thread_local! { + static TIMER_ID: Cell> = Cell::default(); +} + +pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool { + if TIMER_ID.get().is_none() && !state.data.notify_status_change_queue.is_empty() { + let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run); + TIMER_ID.set(Some(timer_id)); + trace!("'notify_status_change' job started"); + true + } else { + false + } +} + +pub fn run() { + if let Some((canister_id, offer_id, status)) = mutate_state(get_next) { + ic_cdk::spawn(notify_offer_status(canister_id, offer_id, status)); + } else if let Some(timer_id) = TIMER_ID.take() { + ic_cdk_timers::clear_timer(timer_id); + trace!("'notify_status_change' job stopped"); + } +} + +fn get_next(state: &mut RuntimeState) -> Option<(CanisterId, u32, OfferStatus)> { + while let Some(id) = state.data.notify_status_change_queue.pop() { + if let Some((canister_id, offer_id, status)) = state + .data + .offers + .get(id) + .and_then(|o| o.canister_to_notify.map(|c| (c, o.id, o.status(state.env.now())))) + { + return Some((canister_id, offer_id, status)); + } + } + None +} + +async fn notify_offer_status(canister_id: CanisterId, offer_id: u32, status: OfferStatus) { + if make_c2c_call( + canister_id, + "c2c_notify_p2p_offer_status_change_msgpack", + OfferStatusChange { offer_id, status }, + msgpack::serialize, + |r| msgpack::deserialize::<()>(r), + ) + .await + .is_err() + { + mutate_state(|state| { + state.data.notify_status_change_queue.push(offer_id); + start_job_if_required(state); + }); + } +} diff --git a/backend/canisters/escrow/impl/src/lib.rs b/backend/canisters/escrow/impl/src/lib.rs index 220648cf78..ebef9cde64 100644 --- a/backend/canisters/escrow/impl/src/lib.rs +++ b/backend/canisters/escrow/impl/src/lib.rs @@ -1,3 +1,4 @@ +use crate::model::notify_status_change_queue::NotifyStatusChangeQueue; use crate::model::offers::Offers; use crate::model::pending_payments_queue::PendingPaymentsQueue; use canister_state_macros::canister_state; @@ -47,6 +48,8 @@ impl RuntimeState { struct Data { pub offers: Offers, pub pending_payments_queue: PendingPaymentsQueue, + #[serde(default)] + pub notify_status_change_queue: NotifyStatusChangeQueue, pub cycles_dispenser_canister_id: CanisterId, pub rng_seed: [u8; 32], pub test_mode: bool, @@ -57,6 +60,7 @@ impl Data { Data { offers: Offers::default(), pending_payments_queue: PendingPaymentsQueue::default(), + notify_status_change_queue: NotifyStatusChangeQueue::default(), cycles_dispenser_canister_id, rng_seed: [0; 32], test_mode, diff --git a/backend/canisters/escrow/impl/src/model/mod.rs b/backend/canisters/escrow/impl/src/model/mod.rs index e9ce076028..a411e3e04a 100644 --- a/backend/canisters/escrow/impl/src/model/mod.rs +++ b/backend/canisters/escrow/impl/src/model/mod.rs @@ -1,2 +1,3 @@ +pub mod notify_status_change_queue; pub mod offers; pub mod pending_payments_queue; diff --git a/backend/canisters/escrow/impl/src/model/notify_status_change_queue.rs b/backend/canisters/escrow/impl/src/model/notify_status_change_queue.rs new file mode 100644 index 0000000000..154045c1a2 --- /dev/null +++ b/backend/canisters/escrow/impl/src/model/notify_status_change_queue.rs @@ -0,0 +1,21 @@ +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; + +#[derive(Serialize, Deserialize, Default)] +pub struct NotifyStatusChangeQueue { + offers: VecDeque, +} + +impl NotifyStatusChangeQueue { + pub fn push(&mut self, offer_id: u32) { + self.offers.push_back(offer_id); + } + + pub fn pop(&mut self) -> Option { + self.offers.pop_front() + } + + pub fn is_empty(&self) -> bool { + self.offers.is_empty() + } +} diff --git a/backend/canisters/escrow/impl/src/model/offers.rs b/backend/canisters/escrow/impl/src/model/offers.rs index 78d8f95f19..07cd082f67 100644 --- a/backend/canisters/escrow/impl/src/model/offers.rs +++ b/backend/canisters/escrow/impl/src/model/offers.rs @@ -1,6 +1,9 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use types::{icrc1::CompletedCryptoTransaction, TimestampMillis, TokenInfo, UserId}; +use types::{ + icrc1::CompletedCryptoTransaction, CanisterId, OfferStatus, OfferStatusAccepted, OfferStatusCancelled, + OfferStatusCompleted, TimestampMillis, TokenInfo, UserId, +}; #[derive(Serialize, Deserialize, Default)] pub struct Offers { @@ -14,6 +17,10 @@ impl Offers { id } + pub fn get(&self, id: u32) -> Option<&Offer> { + self.map.get(&id) + } + pub fn get_mut(&mut self, id: u32) -> Option<&mut Offer> { self.map.get_mut(&id) } @@ -33,7 +40,10 @@ pub struct Offer { pub accepted_by: Option<(UserId, TimestampMillis)>, pub token0_received: bool, pub token1_received: bool, - pub transfers_out: Vec, + pub token0_transfer_out: Option, + pub token1_transfer_out: Option, + pub refunds: Vec, + pub canister_to_notify: Option, } impl Offer { @@ -51,7 +61,40 @@ impl Offer { accepted_by: None, token0_received: false, token1_received: false, - transfers_out: Vec::new(), + token0_transfer_out: None, + token1_transfer_out: None, + refunds: Vec::new(), + canister_to_notify: args.canister_to_notify, + } + } + + pub fn is_complete(&self) -> bool { + self.token0_transfer_out.is_some() && self.token1_transfer_out.is_some() + } + + pub fn status(&self, now: TimestampMillis) -> OfferStatus { + if let Some((accepted_by, accepted_at)) = self.accepted_by { + if let (Some(token0_transfer_out), Some(token1_transfer_out)) = + (self.token0_transfer_out.clone(), self.token1_transfer_out.clone()) + { + OfferStatus::Completed(Box::new(OfferStatusCompleted { + accepted_by, + accepted_at, + token0_transfer_out, + token1_transfer_out, + })) + } else { + OfferStatus::Accepted(Box::new(OfferStatusAccepted { + accepted_by, + accepted_at, + })) + } + } else if let Some(cancelled_at) = self.cancelled_at { + OfferStatus::Cancelled(Box::new(OfferStatusCancelled { cancelled_at })) + } else if self.expires_at < now { + OfferStatus::Expired + } else { + OfferStatus::Open } } } diff --git a/backend/canisters/escrow/impl/src/model/pending_payments_queue.rs b/backend/canisters/escrow/impl/src/model/pending_payments_queue.rs index c9fba60c28..d5e4a84448 100644 --- a/backend/canisters/escrow/impl/src/model/pending_payments_queue.rs +++ b/backend/canisters/escrow/impl/src/model/pending_payments_queue.rs @@ -33,6 +33,6 @@ pub struct PendingPayment { #[derive(Serialize, Deserialize, Clone, Copy)] pub enum PendingPaymentReason { - Trade(UserId), + Trade(UserId), // The other user in the trade Refund, } diff --git a/backend/canisters/user/impl/src/updates/send_message_with_transfer.rs b/backend/canisters/user/impl/src/updates/send_message_with_transfer.rs index 4a7184f67c..a0b2f6b98c 100644 --- a/backend/canisters/user/impl/src/updates/send_message_with_transfer.rs +++ b/backend/canisters/user/impl/src/updates/send_message_with_transfer.rs @@ -8,7 +8,7 @@ use escrow_canister::deposit_subaccount; use ic_cdk_macros::update; use icrc_ledger_types::icrc1::account::Account; use types::{ - icrc1, CanisterId, CompletedCryptoTransaction, CryptoTransaction, MessageContentInitial, PendingCryptoTransaction, + icrc1, CanisterId, Chat, CompletedCryptoTransaction, CryptoTransaction, MessageContentInitial, PendingCryptoTransaction, TimestampMillis, UserId, MAX_TEXT_LENGTH, MAX_TEXT_LENGTH_USIZE, }; use user_canister::send_message_with_transfer_to_channel; @@ -32,19 +32,20 @@ async fn send_message_with_transfer_to_channel( } // Validate the content and extract the PendingCryptoTransaction - let (pending_transaction, p2p_offer_id) = match mutate_state(|state| prepare(&args.content, now, state)) { - PrepareResult::Success(t) => (t, None), - PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await { - Ok((id, t)) => (t, Some(id)), - Err(error) => return error.into(), - }, - PrepareResult::UserSuspended => return UserSuspended, - PrepareResult::TextTooLong(v) => return TextTooLong(v), - PrepareResult::RecipientBlocked => return RecipientBlocked, - PrepareResult::InvalidRequest(t) => return InvalidRequest(t), - PrepareResult::TransferCannotBeZero => return TransferCannotBeZero, - PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf, - }; + let (pending_transaction, p2p_offer_id) = + match mutate_state(|state| prepare(Chat::Channel(args.community_id, args.channel_id), &args.content, now, state)) { + PrepareResult::Success(t) => (t, None), + PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await { + Ok((id, t)) => (t, Some(id)), + Err(error) => return error.into(), + }, + PrepareResult::UserSuspended => return UserSuspended, + PrepareResult::TextTooLong(v) => return TextTooLong(v), + PrepareResult::RecipientBlocked => return RecipientBlocked, + PrepareResult::InvalidRequest(t) => return InvalidRequest(t), + PrepareResult::TransferCannotBeZero => return TransferCannotBeZero, + PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf, + }; // Make the crypto transfer let (content, completed_transaction) = match process_transaction(args.content, pending_transaction, p2p_offer_id, now).await @@ -134,19 +135,20 @@ async fn send_message_with_transfer_to_group( } // Validate the content and extract the PendingCryptoTransaction - let (pending_transaction, p2p_offer_id) = match mutate_state(|state| prepare(&args.content, now, state)) { - PrepareResult::Success(t) => (t, None), - PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await { - Ok((id, t)) => (t, Some(id)), - Err(error) => return error.into(), - }, - PrepareResult::UserSuspended => return UserSuspended, - PrepareResult::TextTooLong(v) => return TextTooLong(v), - PrepareResult::RecipientBlocked => return RecipientBlocked, - PrepareResult::InvalidRequest(t) => return InvalidRequest(t), - PrepareResult::TransferCannotBeZero => return TransferCannotBeZero, - PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf, - }; + let (pending_transaction, p2p_offer_id) = + match mutate_state(|state| prepare(Chat::Group(args.group_id), &args.content, now, state)) { + PrepareResult::Success(t) => (t, None), + PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await { + Ok((id, t)) => (t, Some(id)), + Err(error) => return error.into(), + }, + PrepareResult::UserSuspended => return UserSuspended, + PrepareResult::TextTooLong(v) => return TextTooLong(v), + PrepareResult::RecipientBlocked => return RecipientBlocked, + PrepareResult::InvalidRequest(t) => return InvalidRequest(t), + PrepareResult::TransferCannotBeZero => return TransferCannotBeZero, + PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf, + }; // Make the crypto transfer let (content, completed_transaction) = match process_transaction(args.content, pending_transaction, p2p_offer_id, now).await @@ -227,7 +229,7 @@ enum PrepareResult { TransferCannotBeToSelf, } -fn prepare(content: &MessageContentInitial, now: TimestampMillis, state: &mut RuntimeState) -> PrepareResult { +fn prepare(chat: Chat, content: &MessageContentInitial, now: TimestampMillis, state: &mut RuntimeState) -> PrepareResult { use PrepareResult::*; if state.data.suspended.value { @@ -276,6 +278,7 @@ fn prepare(content: &MessageContentInitial, now: TimestampMillis, state: &mut Ru output_token: p.output_token.clone(), output_amount: p.output_amount, expires_at: now + p.expires_in, + canister_to_notify: Some(chat.canister_id()), }; return P2PTrade(state.data.escrow_canister_id, create_offer_args); } diff --git a/backend/integration_tests/src/client/escrow.rs b/backend/integration_tests/src/client/escrow.rs index 161415ec9b..0b90194ef6 100644 --- a/backend/integration_tests/src/client/escrow.rs +++ b/backend/integration_tests/src/client/escrow.rs @@ -33,6 +33,7 @@ pub mod happy_path { output_token: output_token.try_into().unwrap(), output_amount, expires_at, + canister_to_notify: None, }, ); diff --git a/backend/libraries/types/src/chat.rs b/backend/libraries/types/src/chat.rs index 06bfcf6222..20a90bf56a 100644 --- a/backend/libraries/types/src/chat.rs +++ b/backend/libraries/types/src/chat.rs @@ -2,7 +2,7 @@ use candid::CandidType; use serde::{Deserialize, Serialize}; use std::fmt::Debug; -use crate::{ChannelId, ChatId, CommunityId}; +use crate::{CanisterId, ChannelId, ChatId, CommunityId}; #[derive(CandidType, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)] pub enum Chat { @@ -11,6 +11,16 @@ pub enum Chat { Channel(CommunityId, ChannelId), } +impl Chat { + pub fn canister_id(&self) -> CanisterId { + match *self { + Chat::Direct(c) => c.into(), + Chat::Group(g) => g.into(), + Chat::Channel(c, _) => c.into(), + } + } +} + #[derive(CandidType, Serialize, Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)] pub enum MultiUserChat { Group(ChatId), @@ -18,6 +28,13 @@ pub enum MultiUserChat { } impl MultiUserChat { + pub fn canister_id(&self) -> CanisterId { + match *self { + MultiUserChat::Group(g) => g.into(), + MultiUserChat::Channel(c, _) => c.into(), + } + } + pub fn group_id(&self) -> Option { if let MultiUserChat::Group(group_id) = self { Some(*group_id) @@ -35,3 +52,15 @@ impl From for Chat { } } } + +impl TryFrom for MultiUserChat { + type Error = (); + + fn try_from(value: Chat) -> Result { + match value { + Chat::Group(c) => Ok(MultiUserChat::Group(c)), + Chat::Channel(cm, ch) => Ok(MultiUserChat::Channel(cm, ch)), + Chat::Direct(_) => Err(()), + } + } +} diff --git a/backend/libraries/types/src/lib.rs b/backend/libraries/types/src/lib.rs index 0ae5365e24..5d75a8d316 100644 --- a/backend/libraries/types/src/lib.rs +++ b/backend/libraries/types/src/lib.rs @@ -46,6 +46,7 @@ mod message_index; mod message_match; mod notifications; mod option; +mod p2p_trades; mod phone_number; mod polls; mod proposals; @@ -110,6 +111,7 @@ pub use message_index::*; pub use message_match::*; pub use notifications::*; pub use option::*; +pub use p2p_trades::*; pub use phone_number::*; pub use polls::*; pub use proposals::*; diff --git a/backend/libraries/types/src/p2p_trades.rs b/backend/libraries/types/src/p2p_trades.rs new file mode 100644 index 0000000000..7c58fd9aa9 --- /dev/null +++ b/backend/libraries/types/src/p2p_trades.rs @@ -0,0 +1,37 @@ +use crate::icrc1::CompletedCryptoTransaction; +use crate::{TimestampMillis, UserId}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub enum OfferStatus { + Open, + Cancelled(Box), + Expired, + Accepted(Box), + Completed(Box), +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct OfferStatusCancelled { + pub cancelled_at: TimestampMillis, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct OfferStatusAccepted { + pub accepted_by: UserId, + pub accepted_at: TimestampMillis, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct OfferStatusCompleted { + pub accepted_by: UserId, + pub accepted_at: TimestampMillis, + pub token0_transfer_out: CompletedCryptoTransaction, + pub token1_transfer_out: CompletedCryptoTransaction, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct OfferStatusChange { + pub offer_id: u32, + pub status: OfferStatus, +}