Skip to content

Commit

Permalink
Support notifying a chosen canister when trade is completed (#5167)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Jan 11, 2024
1 parent 71361b5 commit 0497a75
Show file tree
Hide file tree
Showing 17 changed files with 258 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/canisters/escrow/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Implement `create_offer` and `notify_deposit` ([#4904](https://github.com/open-chat-labs/open-chat/pull/4904))
- Transfer out funds once trade is complete ([#4906](https://github.com/open-chat-labs/open-chat/pull/4906))
- Implement `cancel_offer` ([#4907](https://github.com/open-chat-labs/open-chat/pull/4907))
- Support notifying a chosen canister when trade is completed ([#5167](https://github.com/open-chat-labs/open-chat/pull/5167))
3 changes: 2 additions & 1 deletion backend/canisters/escrow/api/src/updates/create_offer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use candid::CandidType;
use serde::{Deserialize, Serialize};
use types::{TimestampMillis, TokenInfo};
use types::{CanisterId, TimestampMillis, TokenInfo};

#[derive(CandidType, Serialize, Deserialize, Debug)]
pub struct Args {
Expand All @@ -9,6 +9,7 @@ pub struct Args {
pub output_token: TokenInfo,
pub output_amount: u128,
pub expires_at: TimestampMillis,
pub canister_to_notify: Option<CanisterId>,
}

#[derive(CandidType, Serialize, Deserialize, Debug)]
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/escrow/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ crate-type = ["cdylib"]
[dependencies]
candid = { workspace = true }
canister_api_macros = { path = "../../../libraries/canister_api_macros" }
canister_client = { path = "../../../libraries/canister_client" }
canister_logger = { path = "../../../libraries/canister_logger" }
canister_state_macros = { path = "../../../libraries/canister_state_macros" }
canister_tracing_macros = { path = "../../../libraries/canister_tracing_macros" }
Expand Down
15 changes: 14 additions & 1 deletion backend/canisters/escrow/impl/src/jobs/make_pending_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,20 @@ async fn process_payment(pending_payment: PendingPayment) {
created: created_at_time,
block_index,
};
offer.transfers_out.push(transfer);
match pending_payment.reason {
PendingPaymentReason::Trade(_) => {
if pending_payment.token_info.ledger == offer.token0.ledger {
offer.token0_transfer_out = Some(transfer);
} else {
offer.token1_transfer_out = Some(transfer);
}
if offer.is_complete() {
state.data.notify_status_change_queue.push(offer.id);
crate::jobs::notify_status_change::start_job_if_required(state);
}
}
PendingPaymentReason::Refund => offer.refunds.push(transfer),
};
}
});
}
Expand Down
2 changes: 2 additions & 0 deletions backend/canisters/escrow/impl/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::RuntimeState;

pub mod make_pending_payments;
pub mod notify_status_change;

pub(crate) fn start(state: &RuntimeState) {
make_pending_payments::start_job_if_required(state);
notify_status_change::start_job_if_required(state);
}
63 changes: 63 additions & 0 deletions backend/canisters/escrow/impl/src/jobs/notify_status_change.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use crate::{mutate_state, RuntimeState};
use canister_client::make_c2c_call;
use ic_cdk_timers::TimerId;
use std::cell::Cell;
use std::time::Duration;
use tracing::trace;
use types::{CanisterId, OfferStatus, OfferStatusChange};

thread_local! {
static TIMER_ID: Cell<Option<TimerId>> = Cell::default();
}

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
if TIMER_ID.get().is_none() && !state.data.notify_status_change_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
TIMER_ID.set(Some(timer_id));
trace!("'notify_status_change' job started");
true
} else {
false
}
}

pub fn run() {
if let Some((canister_id, offer_id, status)) = mutate_state(get_next) {
ic_cdk::spawn(notify_offer_status(canister_id, offer_id, status));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'notify_status_change' job stopped");
}
}

fn get_next(state: &mut RuntimeState) -> Option<(CanisterId, u32, OfferStatus)> {
while let Some(id) = state.data.notify_status_change_queue.pop() {
if let Some((canister_id, offer_id, status)) = state
.data
.offers
.get(id)
.and_then(|o| o.canister_to_notify.map(|c| (c, o.id, o.status(state.env.now()))))
{
return Some((canister_id, offer_id, status));
}
}
None
}

async fn notify_offer_status(canister_id: CanisterId, offer_id: u32, status: OfferStatus) {
if make_c2c_call(
canister_id,
"c2c_notify_p2p_offer_status_change_msgpack",
OfferStatusChange { offer_id, status },
msgpack::serialize,
|r| msgpack::deserialize::<()>(r),
)
.await
.is_err()
{
mutate_state(|state| {
state.data.notify_status_change_queue.push(offer_id);
start_job_if_required(state);
});
}
}
4 changes: 4 additions & 0 deletions backend/canisters/escrow/impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::model::notify_status_change_queue::NotifyStatusChangeQueue;
use crate::model::offers::Offers;
use crate::model::pending_payments_queue::PendingPaymentsQueue;
use canister_state_macros::canister_state;
Expand Down Expand Up @@ -47,6 +48,8 @@ impl RuntimeState {
struct Data {
pub offers: Offers,
pub pending_payments_queue: PendingPaymentsQueue,
#[serde(default)]
pub notify_status_change_queue: NotifyStatusChangeQueue,
pub cycles_dispenser_canister_id: CanisterId,
pub rng_seed: [u8; 32],
pub test_mode: bool,
Expand All @@ -57,6 +60,7 @@ impl Data {
Data {
offers: Offers::default(),
pending_payments_queue: PendingPaymentsQueue::default(),
notify_status_change_queue: NotifyStatusChangeQueue::default(),
cycles_dispenser_canister_id,
rng_seed: [0; 32],
test_mode,
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/escrow/impl/src/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod notify_status_change_queue;
pub mod offers;
pub mod pending_payments_queue;
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;

#[derive(Serialize, Deserialize, Default)]
pub struct NotifyStatusChangeQueue {
offers: VecDeque<u32>,
}

impl NotifyStatusChangeQueue {
pub fn push(&mut self, offer_id: u32) {
self.offers.push_back(offer_id);
}

pub fn pop(&mut self) -> Option<u32> {
self.offers.pop_front()
}

pub fn is_empty(&self) -> bool {
self.offers.is_empty()
}
}
49 changes: 46 additions & 3 deletions backend/canisters/escrow/impl/src/model/offers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use types::{icrc1::CompletedCryptoTransaction, TimestampMillis, TokenInfo, UserId};
use types::{
icrc1::CompletedCryptoTransaction, CanisterId, OfferStatus, OfferStatusAccepted, OfferStatusCancelled,
OfferStatusCompleted, TimestampMillis, TokenInfo, UserId,
};

#[derive(Serialize, Deserialize, Default)]
pub struct Offers {
Expand All @@ -14,6 +17,10 @@ impl Offers {
id
}

pub fn get(&self, id: u32) -> Option<&Offer> {
self.map.get(&id)
}

pub fn get_mut(&mut self, id: u32) -> Option<&mut Offer> {
self.map.get_mut(&id)
}
Expand All @@ -33,7 +40,10 @@ pub struct Offer {
pub accepted_by: Option<(UserId, TimestampMillis)>,
pub token0_received: bool,
pub token1_received: bool,
pub transfers_out: Vec<CompletedCryptoTransaction>,
pub token0_transfer_out: Option<CompletedCryptoTransaction>,
pub token1_transfer_out: Option<CompletedCryptoTransaction>,
pub refunds: Vec<CompletedCryptoTransaction>,
pub canister_to_notify: Option<CanisterId>,
}

impl Offer {
Expand All @@ -51,7 +61,40 @@ impl Offer {
accepted_by: None,
token0_received: false,
token1_received: false,
transfers_out: Vec::new(),
token0_transfer_out: None,
token1_transfer_out: None,
refunds: Vec::new(),
canister_to_notify: args.canister_to_notify,
}
}

pub fn is_complete(&self) -> bool {
self.token0_transfer_out.is_some() && self.token1_transfer_out.is_some()
}

pub fn status(&self, now: TimestampMillis) -> OfferStatus {
if let Some((accepted_by, accepted_at)) = self.accepted_by {
if let (Some(token0_transfer_out), Some(token1_transfer_out)) =
(self.token0_transfer_out.clone(), self.token1_transfer_out.clone())
{
OfferStatus::Completed(Box::new(OfferStatusCompleted {
accepted_by,
accepted_at,
token0_transfer_out,
token1_transfer_out,
}))
} else {
OfferStatus::Accepted(Box::new(OfferStatusAccepted {
accepted_by,
accepted_at,
}))
}
} else if let Some(cancelled_at) = self.cancelled_at {
OfferStatus::Cancelled(Box::new(OfferStatusCancelled { cancelled_at }))
} else if self.expires_at < now {
OfferStatus::Expired
} else {
OfferStatus::Open
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ pub struct PendingPayment {

#[derive(Serialize, Deserialize, Clone, Copy)]
pub enum PendingPaymentReason {
Trade(UserId),
Trade(UserId), // The other user in the trade
Refund,
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use escrow_canister::deposit_subaccount;
use ic_cdk_macros::update;
use icrc_ledger_types::icrc1::account::Account;
use types::{
icrc1, CanisterId, CompletedCryptoTransaction, CryptoTransaction, MessageContentInitial, PendingCryptoTransaction,
icrc1, CanisterId, Chat, CompletedCryptoTransaction, CryptoTransaction, MessageContentInitial, PendingCryptoTransaction,
TimestampMillis, UserId, MAX_TEXT_LENGTH, MAX_TEXT_LENGTH_USIZE,
};
use user_canister::send_message_with_transfer_to_channel;
Expand All @@ -32,19 +32,20 @@ async fn send_message_with_transfer_to_channel(
}

// Validate the content and extract the PendingCryptoTransaction
let (pending_transaction, p2p_offer_id) = match mutate_state(|state| prepare(&args.content, now, state)) {
PrepareResult::Success(t) => (t, None),
PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await {
Ok((id, t)) => (t, Some(id)),
Err(error) => return error.into(),
},
PrepareResult::UserSuspended => return UserSuspended,
PrepareResult::TextTooLong(v) => return TextTooLong(v),
PrepareResult::RecipientBlocked => return RecipientBlocked,
PrepareResult::InvalidRequest(t) => return InvalidRequest(t),
PrepareResult::TransferCannotBeZero => return TransferCannotBeZero,
PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf,
};
let (pending_transaction, p2p_offer_id) =
match mutate_state(|state| prepare(Chat::Channel(args.community_id, args.channel_id), &args.content, now, state)) {
PrepareResult::Success(t) => (t, None),
PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await {
Ok((id, t)) => (t, Some(id)),
Err(error) => return error.into(),
},
PrepareResult::UserSuspended => return UserSuspended,
PrepareResult::TextTooLong(v) => return TextTooLong(v),
PrepareResult::RecipientBlocked => return RecipientBlocked,
PrepareResult::InvalidRequest(t) => return InvalidRequest(t),
PrepareResult::TransferCannotBeZero => return TransferCannotBeZero,
PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf,
};

// Make the crypto transfer
let (content, completed_transaction) = match process_transaction(args.content, pending_transaction, p2p_offer_id, now).await
Expand Down Expand Up @@ -134,19 +135,20 @@ async fn send_message_with_transfer_to_group(
}

// Validate the content and extract the PendingCryptoTransaction
let (pending_transaction, p2p_offer_id) = match mutate_state(|state| prepare(&args.content, now, state)) {
PrepareResult::Success(t) => (t, None),
PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await {
Ok((id, t)) => (t, Some(id)),
Err(error) => return error.into(),
},
PrepareResult::UserSuspended => return UserSuspended,
PrepareResult::TextTooLong(v) => return TextTooLong(v),
PrepareResult::RecipientBlocked => return RecipientBlocked,
PrepareResult::InvalidRequest(t) => return InvalidRequest(t),
PrepareResult::TransferCannotBeZero => return TransferCannotBeZero,
PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf,
};
let (pending_transaction, p2p_offer_id) =
match mutate_state(|state| prepare(Chat::Group(args.group_id), &args.content, now, state)) {
PrepareResult::Success(t) => (t, None),
PrepareResult::P2PTrade(escrow_canister_id, args) => match set_up_p2p_trade(escrow_canister_id, args).await {
Ok((id, t)) => (t, Some(id)),
Err(error) => return error.into(),
},
PrepareResult::UserSuspended => return UserSuspended,
PrepareResult::TextTooLong(v) => return TextTooLong(v),
PrepareResult::RecipientBlocked => return RecipientBlocked,
PrepareResult::InvalidRequest(t) => return InvalidRequest(t),
PrepareResult::TransferCannotBeZero => return TransferCannotBeZero,
PrepareResult::TransferCannotBeToSelf => return TransferCannotBeToSelf,
};

// Make the crypto transfer
let (content, completed_transaction) = match process_transaction(args.content, pending_transaction, p2p_offer_id, now).await
Expand Down Expand Up @@ -227,7 +229,7 @@ enum PrepareResult {
TransferCannotBeToSelf,
}

fn prepare(content: &MessageContentInitial, now: TimestampMillis, state: &mut RuntimeState) -> PrepareResult {
fn prepare(chat: Chat, content: &MessageContentInitial, now: TimestampMillis, state: &mut RuntimeState) -> PrepareResult {
use PrepareResult::*;

if state.data.suspended.value {
Expand Down Expand Up @@ -276,6 +278,7 @@ fn prepare(content: &MessageContentInitial, now: TimestampMillis, state: &mut Ru
output_token: p.output_token.clone(),
output_amount: p.output_amount,
expires_at: now + p.expires_in,
canister_to_notify: Some(chat.canister_id()),
};
return P2PTrade(state.data.escrow_canister_id, create_offer_args);
}
Expand Down
1 change: 1 addition & 0 deletions backend/integration_tests/src/client/escrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod happy_path {
output_token: output_token.try_into().unwrap(),
output_amount,
expires_at,
canister_to_notify: None,
},
);

Expand Down
Loading

0 comments on commit 0497a75

Please sign in to comment.