Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
megrogan committed Jul 24, 2024
1 parent fc38934 commit d8558f9
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 78 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion backend/canisters/airdrop_bot/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ path = "src/lib.rs"
crate-type = ["cdylib"]

[dependencies]
airdrop_bot_canister = { path = "../api" }
candid = { workspace = true }
canister_api_macros = { path = "../../../libraries/canister_api_macros" }
canister_logger = { path = "../../../libraries/canister_logger" }
Expand All @@ -25,9 +26,10 @@ ic-ledger-types = { workspace = true }
ic-stable-structures = { workspace = true }
icrc_ledger_canister_c2c_client = { path = "../../../external_canisters/icrc_ledger/c2c_client" }
icrc-ledger-types = { workspace = true }
local_user_index_canister_c2c_client = { path = "../../local_user_index/c2c_client" }
local_user_index_canister = { path = "../../local_user_index/api" }
msgpack = { path = "../../../libraries/msgpack" }
rand = { workspace = true }
airdrop_bot_canister = { path = "../api" }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
Expand Down
41 changes: 32 additions & 9 deletions backend/canisters/airdrop_bot/impl/src/jobs/execute_airdrop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use ic_cdk_timers::TimerId;
use std::cell::Cell;
use std::time::Duration;
use tracing::{error, trace};
use types::{AccessGate, OptionUpdate, UserId};
use types::{AccessGate, CanisterId, OptionUpdate, UserId};
use utils::time::MonthKey;

use super::process_pending_actions;

Expand Down Expand Up @@ -45,14 +46,17 @@ fn run() {
trace!("'execute_airdrop' running");
TIMER_ID.set(None);

if let Some(config) = read_state(|state| state.data.airdrops.next().cloned()) {
ic_cdk::spawn(run_airdrop(config));
let (config, user_index_canister_id) =
read_state(|state| (state.data.airdrops.next().cloned(), state.data.user_index_canister_id));

if let Some(config) = config {
ic_cdk::spawn(prepare_airdrop(config, user_index_canister_id));
} else {
trace!("No airdrop configured");
};
}

async fn run_airdrop(config: AirdropConfig) {
async fn prepare_airdrop(config: AirdropConfig, user_index_canister_id: CanisterId) {
// Call the configured community canister to set the `locked` gate on the configured channel
match community_canister_c2c_client::update_channel(
config.community_id.into(),
Expand Down Expand Up @@ -99,20 +103,39 @@ async fn run_airdrop(config: AirdropConfig) {
}
Err(err) => {
error!("{err:?}");
let timer_id = ic_cdk_timers::set_timer(Duration::from_millis(60_000), run);
let timer_id = ic_cdk_timers::set_timer(Duration::from_secs(60), run);
TIMER_ID.set(Some(timer_id));
return;
}
};

// Call the user_index to get the particpants' CHIT balances for the given month
let particpants = members.into_iter().map(|m| (m.user_id, 10000)).collect();
let mk = MonthKey::from_timestamp(config.start).previous();

let particpants = match user_index_canister_c2c_client::chit_balances(
user_index_canister_id,
&user_index_canister::chit_balances::Args {
users: members.into_iter().map(|m| m.user_id).collect(),
year: mk.year() as u16,
month: mk.month(),
},
)
.await
{
Ok(user_index_canister::chit_balances::Response::Success(result)) => result.balances,
Err(err) => {
error!("{err:?}");
let timer_id = ic_cdk_timers::set_timer(Duration::from_secs(60), run);
TIMER_ID.set(Some(timer_id));
return;
}
};

// Execute the airdrop
mutate_state(|state| execute_airdrop(particpants, state));
mutate_state(|state| execute_airdrop(particpants.into_iter().collect(), state));
}

fn execute_airdrop(particpants: Vec<(UserId, u32)>, state: &mut RuntimeState) {
fn execute_airdrop(particpants: Vec<(UserId, i32)>, state: &mut RuntimeState) {
let rng = state.env.rng();

if let Some(airdrop) = state.data.airdrops.execute(particpants, rng) {
Expand Down Expand Up @@ -168,6 +191,6 @@ fn execute_airdrop(particpants: Vec<(UserId, u32)>, state: &mut RuntimeState) {
})))
}

process_pending_actions::start_job_if_required(state);
process_pending_actions::start_job_if_required(state, None);
}
}
2 changes: 1 addition & 1 deletion backend/canisters/airdrop_bot/impl/src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ pub mod process_pending_actions;

pub(crate) fn start(state: &RuntimeState) {
execute_airdrop::start_job_if_required(state);
process_pending_actions::start_job_if_required(state);
process_pending_actions::start_job_if_required(state, None);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,22 @@ use time::macros::format_description;
use tracing::{error, trace};
use types::icrc1::{self};
use types::{
BotMessage, CanisterId, CommunityId, CompletedCryptoTransaction, CryptoContent, CryptoTransaction, Cryptocurrency,
MessageContentInitial,
BotMessage, CanisterId, ChannelId, CommunityId, CompletedCryptoTransaction, CryptoContent, CryptoTransaction,
Cryptocurrency, MessageContentInitial,
};
use utils::consts::{MEMO_CHIT_FOR_CHAT_AIRDROP, MEMO_CHIT_FOR_CHAT_LOTTERY};

use super::execute_airdrop::start_airdrop_timer;

const MAX_BATCH_SIZE: usize = 5;

thread_local! {
static TIMER_ID: Cell<Option<TimerId>> = Cell::default();
}

pub(crate) fn start_job_if_required(state: &RuntimeState) -> bool {
pub(crate) fn start_job_if_required(state: &RuntimeState, after: Option<Duration>) -> bool {
if TIMER_ID.get().is_none() && !state.data.pending_actions_queue.is_empty() {
let timer_id = ic_cdk_timers::set_timer_interval(Duration::ZERO, run);
let timer_id = ic_cdk_timers::set_timer_interval(after.unwrap_or_default(), run);
TIMER_ID.set(Some(timer_id));
trace!("'process_pending_actions' job started");
true
Expand Down Expand Up @@ -57,7 +59,7 @@ async fn process_actions(actions: Vec<Action>) {

async fn process_action(action: Action) {
match action.clone() {
Action::JoinCommunity(community_id) => join_community(community_id).await,
Action::JoinChannel(community_id, channel_id) => join_channel(community_id, channel_id).await,
Action::SendMessage(action) if matches!(action.airdrop_type, AirdropType::Lottery(_)) => {
handle_lottery_message_action(*action).await
}
Expand All @@ -66,8 +68,45 @@ async fn process_action(action: Action) {
}
}

async fn join_community(community_id: CommunityId) {
// TODO
async fn join_channel(community_id: CommunityId, channel_id: ChannelId) {
let local_user_index_canister_id = match community_canister_c2c_client::local_user_index(
community_id.into(),
&community_canister::local_user_index::Args {},
)
.await
{
Ok(community_canister::local_user_index::Response::Success(canister_id)) => canister_id,
Err(err) => {
error!("Failed to get local_user_index {err:?}");
mutate_state(|state| {
state.enqueue_pending_action(Action::JoinChannel(community_id, channel_id), Some(Duration::from_secs(60)))
});
return;
}
};

match local_user_index_canister_c2c_client::join_channel(
local_user_index_canister_id,
&local_user_index_canister::join_channel::Args {
community_id,
channel_id,
invite_code: None,
verified_credential_args: None,
},
)
.await
{
Ok(_) => (),
Err(err) => {
error!("Failed to get join_channel {err:?}");
mutate_state(|state| {
state.enqueue_pending_action(Action::JoinChannel(community_id, channel_id), Some(Duration::from_secs(60)))
});
return;
}
}

read_state(start_airdrop_timer);
}

async fn handle_transfer_action(action: AirdropTransfer) {
Expand Down Expand Up @@ -101,21 +140,24 @@ async fn handle_transfer_action(action: AirdropTransfer) {
let fee = token.fee().unwrap();
let block_index = block_index.0.try_into().unwrap();

state.enqueue_pending_action(Action::SendMessage(Box::new(AirdropMessage {
recipient: action.recipient,
transaction: CompletedCryptoTransaction::ICRC1(icrc1::CompletedCryptoTransaction {
ledger: ledger_canister_id,
token,
amount: action.amount,
fee,
from: Account::from(this_canister_id).into(),
to: to.into(),
memo: Some(memo.to_vec().into()),
created: now_nanos,
block_index,
}),
airdrop_type: action.airdrop_type.clone(),
})));
state.enqueue_pending_action(
Action::SendMessage(Box::new(AirdropMessage {
recipient: action.recipient,
transaction: CompletedCryptoTransaction::ICRC1(icrc1::CompletedCryptoTransaction {
ledger: ledger_canister_id,
token,
amount: action.amount,
fee,
from: Account::from(this_canister_id).into(),
to: to.into(),
memo: Some(memo.to_vec().into()),
created: now_nanos,
block_index,
}),
airdrop_type: action.airdrop_type.clone(),
})),
None,
);

match action.airdrop_type {
AirdropType::Lottery(LotteryAirdrop { position }) => {
Expand All @@ -133,7 +175,7 @@ async fn handle_transfer_action(action: AirdropTransfer) {
}
Err(error) => {
error!(?args, ?error, "Failed to transfer CHAT, retrying");
mutate_state(|state| state.enqueue_pending_action(Action::Transfer(Box::new(action))))
mutate_state(|state| state.enqueue_pending_action(Action::Transfer(Box::new(action)), None))
}
}
}
Expand Down Expand Up @@ -179,7 +221,7 @@ async fn handle_main_message_action(action: AirdropMessage) {
.await
.is_err()
{
mutate_state(|state| state.enqueue_pending_action(Action::SendMessage(Box::new(action))));
mutate_state(|state| state.enqueue_pending_action(Action::SendMessage(Box::new(action)), None));
}
}

Expand Down Expand Up @@ -238,6 +280,6 @@ async fn handle_lottery_message_action(action: AirdropMessage) {
.await
.is_err()
{
mutate_state(|state| state.enqueue_pending_action(Action::SendMessage(Box::new(action))));
mutate_state(|state| state.enqueue_pending_action(Action::SendMessage(Box::new(action)), None));
}
}
12 changes: 6 additions & 6 deletions backend/canisters/airdrop_bot/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use candid::Principal;
use canister_state_macros::canister_state;
use model::airdrops::{Airdrops, AirdropsMetrics};
use serde::{Deserialize, Serialize};
use std::cell::RefCell;
use std::collections::HashSet;
use types::{BuildVersion, CanisterId, CommunityId, Cycles, Document, TimestampMillis, Timestamped};
use std::{cell::RefCell, time::Duration};
use types::{BuildVersion, CanisterId, ChannelId, CommunityId, Cycles, Document, TimestampMillis, Timestamped};
use utils::env::Environment;

mod guards;
Expand Down Expand Up @@ -37,9 +37,9 @@ impl RuntimeState {
self.data.admins.contains(&caller)
}

pub fn enqueue_pending_action(&mut self, action: Action) {
pub fn enqueue_pending_action(&mut self, action: Action, after: Option<Duration>) {
self.data.pending_actions_queue.push(action);
jobs::process_pending_actions::start_job_if_required(self);
jobs::process_pending_actions::start_job_if_required(self, after);
}

pub fn metrics(&self) -> Metrics {
Expand Down Expand Up @@ -73,7 +73,7 @@ struct Data {
pub username: String,
pub display_name: Option<String>,
pub airdrops: Airdrops,
pub communities_joined: HashSet<CommunityId>,
pub channels_joined: HashSet<(CommunityId, ChannelId)>,
pub pending_actions_queue: PendingActionsQueue,
pub initialized: bool,
pub rng_seed: [u8; 32],
Expand All @@ -97,7 +97,7 @@ impl Data {
username: "".to_string(),
display_name: None,
airdrops: Airdrops::default(),
communities_joined: HashSet::default(),
channels_joined: HashSet::default(),
pending_actions_queue: PendingActionsQueue::default(),
initialized: false,
rng_seed: [0; 32],
Expand Down
7 changes: 4 additions & 3 deletions backend/canisters/airdrop_bot/impl/src/model/airdrops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl Airdrops {
self.next.take()
}

pub fn execute<R: RngCore>(&mut self, users: Vec<(UserId, u32)>, rng: &mut R) -> Option<&Airdrop> {
pub fn execute<R: RngCore>(&mut self, users: Vec<(UserId, i32)>, rng: &mut R) -> Option<&Airdrop> {
let config = self.next.take()?;

let mut total_shares: u32 = 0;
Expand All @@ -109,6 +109,7 @@ impl Airdrops {
let mut ticket_holders: Vec<UserId> = Vec::new();

for (user_id, chit) in users {
let chit = chit as u32;
let shares = chit / config.main_chit_band;
let tickets = chit / config.lottery_chit_band;

Expand Down Expand Up @@ -275,9 +276,9 @@ mod tests {
airdrops
}

fn generate_random_users() -> Vec<(UserId, u32)> {
fn generate_random_users() -> Vec<(UserId, i32)> {
(0..1000)
.map(|_| (random_principal().into(), rand::thread_rng().next_u32() % 110_000))
.map(|_| (random_principal().into(), (rand::thread_rng().next_u32() % 110_000) as i32))
.collect()
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use std::collections::vec_deque::VecDeque;
use types::{CommunityId, CompletedCryptoTransaction, UserId};
use types::{ChannelId, CommunityId, CompletedCryptoTransaction, UserId};

#[derive(Serialize, Deserialize, Default)]
pub struct PendingActionsQueue {
Expand All @@ -27,7 +27,7 @@ impl PendingActionsQueue {

#[derive(Serialize, Deserialize, Clone)]
pub enum Action {
JoinCommunity(CommunityId),
JoinChannel(CommunityId, ChannelId),
Transfer(Box<AirdropTransfer>),
SendMessage(Box<AirdropMessage>),
}
Expand Down
4 changes: 2 additions & 2 deletions backend/canisters/airdrop_bot/impl/src/updates/set_airdrop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ fn set_airdrop_impl(args: Args, state: &mut RuntimeState) -> Response {

match state.data.airdrops.set_next(config, state.env.now()) {
SetNextResult::Success => {
if state.data.communities_joined.contains(&args.community_id) {
if state.data.channels_joined.contains(&(args.community_id, args.channel_id)) {
start_airdrop_timer(state);
} else {
state.enqueue_pending_action(Action::JoinCommunity(args.community_id));
state.enqueue_pending_action(Action::JoinChannel(args.community_id, args.channel_id), None);
}
Response::Success
}
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/community/c2c_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ generate_c2c_call!(c2c_events_by_index);
generate_c2c_call!(c2c_events_window);
generate_c2c_call!(c2c_summary);
generate_c2c_call!(c2c_summary_updates);
generate_c2c_call!(local_user_index);
generate_candid_c2c_call!(selected_channel_initial);

// Updates
Expand Down
1 change: 1 addition & 0 deletions backend/canisters/local_user_index/c2c_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ generate_c2c_call!(chat_events);
generate_c2c_call!(c2c_notify_low_balance);
generate_c2c_call!(c2c_notify_user_index_events);
generate_c2c_call!(c2c_upgrade_user_canister_wasm);
generate_c2c_call!(join_channel);

generate_candid_c2c_call!(join_group);

Expand Down
1 change: 1 addition & 0 deletions backend/canisters/user_index/c2c_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use user_index_canister::*;

// Queries
generate_c2c_call!(c2c_lookup_user);
generate_c2c_call!(chit_balances);
generate_candid_c2c_call!(platform_moderators_group);
generate_c2c_call!(user);

Expand Down
Loading

0 comments on commit d8558f9

Please sign in to comment.