Skip to content

Commit

Permalink
Fix airdrop_bot actions processing + more logging (#6147)
Browse files Browse the repository at this point in the history
  • Loading branch information
megrogan authored Jul 30, 2024
1 parent ff1e581 commit 131f9d1
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn execute_airdrop(particpants: Vec<(UserId, i32)>, state: &mut RuntimeState) {
state
.data
.pending_actions_queue
.push(Action::Transfer(Box::new(AirdropTransfer {
.push_back(Action::Transfer(Box::new(AirdropTransfer {
recipient: user_id,
amount: prize.chat_won,
airdrop_type: AirdropType::Lottery(LotteryAirdrop {
Expand All @@ -177,7 +177,7 @@ fn execute_airdrop(particpants: Vec<(UserId, i32)>, state: &mut RuntimeState) {
state
.data
.pending_actions_queue
.push(Action::Transfer(Box::new(AirdropTransfer {
.push_back(Action::Transfer(Box::new(AirdropTransfer {
recipient: *user_id,
amount: prize.chat_won,
airdrop_type: AirdropType::Main(MainAidrop {
Expand All @@ -192,7 +192,7 @@ fn execute_airdrop(particpants: Vec<(UserId, i32)>, state: &mut RuntimeState) {
state
.data
.pending_actions_queue
.push(Action::Transfer(Box::new(AirdropTransfer {
.push_back(Action::Transfer(Box::new(AirdropTransfer {
recipient: user_id,
amount: prize.chat_won,
airdrop_type: AirdropType::Lottery(LotteryAirdrop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use icrc_ledger_types::icrc1::transfer::{TransferArg, TransferError};
use rand::Rng;
use std::cell::Cell;
use std::time::Duration;
use tracing::{error, trace};
use tracing::{error, info, trace};
use types::icrc1::{self};
use types::{
BotMessage, CanisterId, ChannelId, CommunityId, CompletedCryptoTransaction, CryptoContent, CryptoTransaction,
Expand All @@ -18,15 +18,13 @@ use utils::time::{MonthKey, MONTHS};

use super::execute_airdrop::start_airdrop_timer;

const MAX_BATCH_SIZE: usize = 5;

thread_local! {
static TIMER_ID: Cell<Option<TimerId>> = Cell::default();
}

pub(crate) fn start_job_if_required(state: &RuntimeState, after: Option<Duration>) -> bool {
if TIMER_ID.get().is_none() && !state.data.pending_actions_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(after.unwrap_or_default(), run);
let timer_id = ic_cdk_timers::set_timer(after.unwrap_or_default(), run);
TIMER_ID.set(Some(timer_id));
trace!("'process_pending_actions' job started");
true
Expand All @@ -36,25 +34,12 @@ pub(crate) fn start_job_if_required(state: &RuntimeState, after: Option<Duration
}

fn run() {
let batch = mutate_state(next_batch);
if !batch.is_empty() {
ic_cdk::spawn(process_actions(batch));
} else if let Some(timer_id) = TIMER_ID.take() {
ic_cdk_timers::clear_timer(timer_id);
trace!("'process_pending_actions' job stopped");
}
}
TIMER_ID.set(None);

fn next_batch(state: &mut RuntimeState) -> Vec<Action> {
(0..MAX_BATCH_SIZE)
.map_while(|_| state.data.pending_actions_queue.pop())
.collect()
}

async fn process_actions(actions: Vec<Action>) {
let futures: Vec<_> = actions.into_iter().map(process_action).collect();

futures::future::join_all(futures).await;
if let Some(action) = mutate_state(|state| state.data.pending_actions_queue.pop()) {
ic_cdk::spawn(process_action(action));
read_state(|state| start_job_if_required(state, None));
}
}

async fn process_action(action: Action) {
Expand All @@ -69,6 +54,8 @@ async fn process_action(action: Action) {
}

async fn join_channel(community_id: CommunityId, channel_id: ChannelId) {
info!(?community_id, ?channel_id, "Join channel");

let local_user_index_canister_id = match community_canister_c2c_client::local_user_index(
community_id.into(),
&community_canister::local_user_index::Args {},
Expand Down Expand Up @@ -106,10 +93,16 @@ async fn join_channel(community_id: CommunityId, channel_id: ChannelId) {
}
}

mutate_state(|state| state.data.channels_joined.insert((community_id, channel_id)));

read_state(start_airdrop_timer);
}

async fn handle_transfer_action(action: AirdropTransfer) {
let amount = action.amount.into();

info!(?amount, "CHAT Transfer");

let (this_canister_id, ledger_canister_id, now_nanos) = read_state(|state| {
(
state.env.canister_id(),
Expand All @@ -131,7 +124,7 @@ async fn handle_transfer_action(action: AirdropTransfer) {
fee: token.fee().map(|f| f.into()),
created_at_time: Some(now_nanos),
memo: Some(memo.to_vec().into()),
amount: action.amount.into(),
amount,
};

match icrc_ledger_canister_c2c_client::icrc1_transfer(ledger_canister_id, &args).await {
Expand All @@ -140,24 +133,24 @@ async fn handle_transfer_action(action: AirdropTransfer) {
let fee = token.fee().unwrap();
let block_index = block_index.0.try_into().unwrap();

state.enqueue_pending_action(
Action::SendMessage(Box::new(AirdropMessage {
recipient: action.recipient,
transaction: CompletedCryptoTransaction::ICRC1(icrc1::CompletedCryptoTransaction {
ledger: ledger_canister_id,
token,
amount: action.amount,
fee,
from: Account::from(this_canister_id).into(),
to: to.into(),
memo: Some(memo.to_vec().into()),
created: now_nanos,
block_index,
}),
airdrop_type: action.airdrop_type.clone(),
})),
None,
);
let message_action = Action::SendMessage(Box::new(AirdropMessage {
recipient: action.recipient,
transaction: CompletedCryptoTransaction::ICRC1(icrc1::CompletedCryptoTransaction {
ledger: ledger_canister_id,
token,
amount: action.amount,
fee,
from: Account::from(this_canister_id).into(),
to: to.into(),
memo: Some(memo.to_vec().into()),
created: now_nanos,
block_index,
}),
airdrop_type: action.airdrop_type.clone(),
}));

state.data.pending_actions_queue.push_front(message_action);
start_job_if_required(state, None);

match action.airdrop_type {
AirdropType::Lottery(LotteryAirdrop { position }) => {
Expand All @@ -181,6 +174,8 @@ async fn handle_transfer_action(action: AirdropTransfer) {
}

async fn handle_main_message_action(action: AirdropMessage) {
info!("Send DM");

let AirdropType::Main(MainAidrop { chit, shares }) = action.airdrop_type else {
return;
};
Expand Down Expand Up @@ -220,6 +215,8 @@ async fn handle_main_message_action(action: AirdropMessage) {
}

async fn handle_lottery_message_action(action: AirdropMessage) {
info!("Send lottery winners message");

let AirdropType::Lottery(LotteryAirdrop { position }): AirdropType = action.airdrop_type else {
return;
};
Expand Down
6 changes: 5 additions & 1 deletion backend/canisters/airdrop_bot/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl RuntimeState {
}

pub fn enqueue_pending_action(&mut self, action: Action, after: Option<Duration>) {
self.data.pending_actions_queue.push(action);
self.data.pending_actions_queue.push_back(action);
jobs::process_pending_actions::start_job_if_required(self, after);
}

Expand All @@ -58,6 +58,8 @@ impl RuntimeState {
chat_ledger: self.data.chat_ledger_canister_id,
},
airdrops: self.data.airdrops.metrics(),
pending_actions: self.data.pending_actions_queue.len(),
channels_joined: self.data.channels_joined.iter().cloned().collect(),
}
}
}
Expand Down Expand Up @@ -109,6 +111,8 @@ pub struct Metrics {
pub git_commit_id: String,
pub canister_ids: CanisterIds,
pub airdrops: AirdropsMetrics,
pub pending_actions: usize,
pub channels_joined: Vec<(CommunityId, ChannelId)>,
}

#[derive(Serialize, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion backend/canisters/airdrop_bot/impl/src/model/airdrops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ impl Airdrops {

let fund = config.main_chat_fund;
let prizes = config.lottery_prizes.len();
let mut share = fund / total_shares as u128;
share -= share % 1_000_000;

let participants = user_shares
.into_iter()
Expand All @@ -138,7 +140,7 @@ impl Airdrops {
shares,
prize: if shares > 0 {
Some(Prize {
chat_won: (fund * shares as u128) / total_shares as u128,
chat_won: shares as u128 * share,
block_index: None,
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ pub struct PendingActionsQueue {
}

impl PendingActionsQueue {
pub fn push(&mut self, action: Action) {
pub fn push_back(&mut self, action: Action) {
self.queue.push_back(action);
}

pub fn push_front(&mut self, action: Action) {
self.queue.push_front(action);
}

pub fn pop(&mut self) -> Option<Action> {
self.queue.pop_front()
}
Expand All @@ -25,40 +29,40 @@ impl PendingActionsQueue {
}
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Action {
JoinChannel(CommunityId, ChannelId),
Transfer(Box<AirdropTransfer>),
SendMessage(Box<AirdropMessage>),
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AirdropTransfer {
pub recipient: UserId,
pub amount: u128,
pub airdrop_type: AirdropType,
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct AirdropMessage {
pub recipient: UserId,
pub transaction: CompletedCryptoTransaction,
pub airdrop_type: AirdropType,
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum AirdropType {
Main(MainAidrop),
Lottery(LotteryAirdrop),
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MainAidrop {
pub chit: u32,
pub shares: u32,
}

#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct LotteryAirdrop {
pub position: usize,
}
2 changes: 1 addition & 1 deletion backend/integration_tests/src/airdrop_bot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn airdrop_end_to_end() {
// Advance time to just after the airdrop is due
env.advance_time(Duration::from_millis(1000 + start_airdrop - now_millis(env)));

tick_many(env, 10);
tick_many(env, 30);

// Assert the channel is now locked
//
Expand Down

0 comments on commit 131f9d1

Please sign in to comment.