Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ckbtc): add timestamps to suspended UTXOs #2939

Merged
merged 13 commits into from
Dec 5, 2024
10 changes: 10 additions & 0 deletions rs/bitcoin/ckbtc/minter/ckbtc_minter.did
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,22 @@ type PendingUtxo = record {
confirmations: nat32;
};

// Number of nanoseconds since the Unix Epoch
type Timestamp = nat64;

type SuspendedUtxo = record {
utxo : Utxo;
reason : SuspendedReason;
earliest_retry: Timestamp;
};

type UpdateBalanceError = variant {
// There are no new UTXOs to process.
NoNewUtxos : record {
current_confirmations: opt nat32;
required_confirmations: nat32;
pending_utxos: opt vec PendingUtxo;
suspended_utxos: opt vec SuspendedUtxo;
THLO marked this conversation as resolved.
Show resolved Hide resolved
};
// The minter is already processing another update balance request for the caller.
AlreadyProcessing;
Expand Down
45 changes: 45 additions & 0 deletions rs/bitcoin/ckbtc/minter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,3 +1349,48 @@ impl CanisterRuntime for IcCanisterRuntime {
updates::update_balance::mint(amount, to, memo).await
}
}

/// Time in nanoseconds since the epoch (1970-01-01).
#[derive(Eq, Clone, Copy, PartialEq, Debug, Default)]
pub struct Timestamp(u64);

impl Timestamp {
pub const fn new(ns_since_epoch: u64) -> Self {
Self(ns_since_epoch)
}

/// Number of nanoseconds since `UNIX EPOCH`.
pub fn as_nanos_since_unix_epoch(self) -> u64 {
self.0
}

pub fn checked_sub(self, rhs: Duration) -> Option<Timestamp> {
if let Ok(rhs_nanos) = u64::try_from(rhs.as_nanos()) {
Some(Timestamp(self.0.checked_sub(rhs_nanos)?))
} else {
None
}
}

pub fn checked_duration_since(self, rhs: Timestamp) -> Option<Duration> {
self.0.checked_sub(rhs.0).map(Duration::from_nanos)
}

pub fn checked_add(self, rhs: Duration) -> Option<Timestamp> {
if let Ok(rhs_nanos) = u64::try_from(rhs.as_nanos()) {
Some(Self(self.0.checked_add(rhs_nanos)?))
} else {
None
}
}

pub fn saturating_add(self, rhs: Duration) -> Self {
self.checked_add(rhs).unwrap_or(Timestamp(u64::MAX))
}
}

impl From<u64> for Timestamp {
fn from(timestamp: u64) -> Self {
Self(timestamp)
}
}
151 changes: 108 additions & 43 deletions rs/bitcoin/ckbtc/minter/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::lifecycle::init::InitArgs;
use crate::lifecycle::upgrade::UpgradeArgs;
use crate::logs::P0;
use crate::state::invariants::{CheckInvariants, CheckInvariantsImpl};
use crate::{address::BitcoinAddress, ECDSAPublicKey};
use crate::updates::update_balance::SuspendedUtxo;
use crate::{address::BitcoinAddress, ECDSAPublicKey, Timestamp};
use candid::{CandidType, Deserialize, Principal};
use ic_base_types::CanisterId;
pub use ic_btc_interface::Network;
Expand Down Expand Up @@ -953,7 +954,10 @@ impl CkBtcMinterState {
&self,
all_utxos_for_account: I,
account: &Account,
) -> ProcessableUtxos {
now: &Timestamp,
) -> (ProcessableUtxos, Vec<SuspendedUtxo>) {
const DAY: std::time::Duration = std::time::Duration::from_secs(24 * 60 * 60);

let is_known = |utxo: &Utxo| {
self.utxos_state_addresses
.get(account)
Expand All @@ -965,53 +969,50 @@ impl CkBtcMinterState {
.map(|utxos| utxos.contains(utxo))
.unwrap_or(false)
};
let mut new_utxos = BTreeSet::new();
//TODO XC-230: only re-evaluate the ignored and quarantined utxos at most once a day
let mut previously_ignored_utxos = BTreeSet::new();
let mut previously_quarantined_utxos = BTreeSet::new();
let mut processable_utxos = ProcessableUtxos::default();
let mut suspended_utxos = vec![];

for utxo in all_utxos_for_account.into_iter() {
match self.suspended_utxos.contains_utxo(&utxo, account) {
Some(SuspendedReason::ValueTooSmall) => {
previously_ignored_utxos.insert(utxo);
(Some(last_time_checked), Some(reason)) => {
match now.checked_duration_since(*last_time_checked) {
Some(elapsed) if elapsed >= DAY => {
processable_utxos.insert_once_suspended_utxo(utxo, reason);
}
_ => suspended_utxos.push(SuspendedUtxo {
THLO marked this conversation as resolved.
Show resolved Hide resolved
utxo,
reason: *reason,
earliest_retry: last_time_checked
.saturating_add(DAY)
.as_nanos_since_unix_epoch(),
}),
}
}
Some(SuspendedReason::Quarantined) => {
previously_quarantined_utxos.insert(utxo);
(None, Some(reason)) => {
processable_utxos.insert_once_suspended_utxo(utxo, reason);
}
None => {
(_, None) => {
if !is_known(&utxo) {
new_utxos.insert(utxo);
processable_utxos.insert_once_new_utxo(utxo);
}
}
}
}

debug_assert_eq!(
new_utxos.intersection(&previously_ignored_utxos).next(),
None
);
debug_assert_eq!(
new_utxos.intersection(&previously_quarantined_utxos).next(),
None
);
debug_assert_eq!(
previously_ignored_utxos
.intersection(&previously_quarantined_utxos)
.next(),
None
);

ProcessableUtxos {
new_utxos,
previously_ignored_utxos,
previously_quarantined_utxos,
}
(processable_utxos, suspended_utxos)
}

/// Adds given UTXO to the set of suspended UTXOs.
pub fn suspend_utxo(&mut self, utxo: Utxo, account: Account, reason: SuspendedReason) -> bool {
pub fn suspend_utxo(
&mut self,
utxo: Utxo,
account: Account,
reason: SuspendedReason,
now: Timestamp,
) -> bool {
self.ensure_reason_consistent_with_state(&utxo, reason);
self.suspended_utxos.insert(account, utxo, reason)
self.suspended_utxos
.insert(account, utxo, reason, Some(now))
}

#[deprecated(note = "Use discard_utxo() instead")]
Expand Down Expand Up @@ -1168,11 +1169,26 @@ impl CkBtcMinterState {
other.utxos_state_addresses,
"utxos_state_addresses do not match"
);
ensure_eq!(
self.suspended_utxos,
other.suspended_utxos,
"suspended_utxos do not match"
);
{
let SuspendedUtxos {
utxos_without_account,
utxos,
last_time_checked_cache: _,
} = &self.suspended_utxos;
let SuspendedUtxos {
utxos_without_account: other_utxos_without_account,
utxos: other_utxos,
last_time_checked_cache: _,
} = &other.suspended_utxos;
// last_time_checked_cache are not preserved on upgrades
// to avoid adding an event every time a suspended UTXO is re-evaluated with the same outcome.
ensure_eq!(
utxos_without_account,
other_utxos_without_account,
"suspended_utxos::utxos_without_account does not match"
);
ensure_eq!(utxos, other_utxos, "suspended_utxos::utxos does not match");
}

ensure_eq!(
self.checked_utxos,
Expand Down Expand Up @@ -1295,6 +1311,36 @@ impl IntoIterator for ProcessableUtxos {
}
}

impl ProcessableUtxos {
pub fn insert_once_suspended_utxo(&mut self, utxo: Utxo, reason: &SuspendedReason) {
self.assert_utxo_is_fresh(&utxo);
match reason {
SuspendedReason::ValueTooSmall => self.previously_ignored_utxos.insert(utxo),
SuspendedReason::Quarantined => self.previously_quarantined_utxos.insert(utxo),
};
}

pub fn insert_once_new_utxo(&mut self, utxo: Utxo) {
self.assert_utxo_is_fresh(&utxo);
self.new_utxos.insert(utxo);
}

fn assert_utxo_is_fresh(&self, utxo: &Utxo) {
assert!(
!self.new_utxos.contains(utxo),
"BUG: UTXO is already known in new_utxos"
);
assert!(
!self.previously_quarantined_utxos.contains(utxo),
"BUG: UTXO is already known in previously_quarantined_utxos"
);
assert!(
!self.previously_ignored_utxos.contains(utxo),
"BUG: UTXO is already known in previously_ignored_utxos"
);
}
}

#[derive(Eq, Clone, PartialEq, Debug, Default)]
pub struct SuspendedUtxos {
/// Suspended UTXOS were initially stored without account information.
Expand All @@ -1303,6 +1349,7 @@ pub struct SuspendedUtxos {
/// or move it to the other field containing this time the `Account` information.
utxos_without_account: BTreeMap<Utxo, SuspendedReason>,
utxos: BTreeMap<Account, BTreeMap<Utxo, SuspendedReason>>,
last_time_checked_cache: BTreeMap<Utxo, Timestamp>,
}

#[derive(Clone, Copy, Eq, PartialEq, Debug, CandidType, Serialize, Deserialize)]
Expand All @@ -1314,7 +1361,16 @@ pub enum SuspendedReason {
}

impl SuspendedUtxos {
pub fn insert(&mut self, account: Account, utxo: Utxo, reason: SuspendedReason) -> bool {
pub fn insert(
&mut self,
account: Account,
utxo: Utxo,
reason: SuspendedReason,
now: Option<Timestamp>,
) -> bool {
if let Some(timestamp) = now {
self.last_time_checked_cache.insert(utxo.clone(), timestamp);
}
if self.utxos.get(&account).and_then(|u| u.get(&utxo)) == Some(&reason) {
return false;
}
Expand All @@ -1336,14 +1392,22 @@ impl SuspendedUtxos {
.chain(self.utxos.values().flat_map(|v| v.iter()))
}

pub fn contains_utxo(&self, utxo: &Utxo, account: &Account) -> Option<&SuspendedReason> {
self.utxos
pub fn contains_utxo(
&self,
utxo: &Utxo,
account: &Account,
) -> (Option<&Timestamp>, Option<&SuspendedReason>) {
let last_time_checked = self.last_time_checked_cache.get(utxo);
let suspended_reason = self
.utxos
.get(account)
.and_then(|u| u.get(utxo))
.or_else(|| self.utxos_without_account.get(utxo))
.or_else(|| self.utxos_without_account.get(utxo));
(last_time_checked, suspended_reason)
}

pub fn remove(&mut self, account: &Account, utxo: &Utxo) {
self.last_time_checked_cache.remove(utxo);
self.utxos_without_account.remove(utxo);
if let Some(utxos) = self.utxos.get_mut(account) {
utxos.remove(utxo);
Expand All @@ -1352,6 +1416,7 @@ impl SuspendedUtxos {

#[deprecated(note = "Use remove() instead")]
pub fn remove_without_account(&mut self, utxo: &Utxo) {
self.last_time_checked_cache.remove(utxo);
self.utxos_without_account.remove(utxo);
for utxos in self.utxos.values_mut() {
if utxos.remove(utxo).is_some() {
Expand Down
13 changes: 7 additions & 6 deletions rs/bitcoin/ckbtc/minter/src/state/audit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
use crate::state::invariants::CheckInvariantsImpl;
use crate::state::{ReimburseDepositTask, ReimbursedDeposit};
use crate::storage::record_event;
use crate::ReimbursementReason;
use crate::{ReimbursementReason, Timestamp};
use candid::Principal;
use ic_btc_interface::{Txid, Utxo};
use icrc_ledger_types::icrc1::account::Account;
Expand Down Expand Up @@ -79,23 +79,24 @@ pub fn mark_utxo_checked(state: &mut CkBtcMinterState, utxo: Utxo, account: Acco
state.mark_utxo_checked_v2(utxo, &account);
}

pub fn quarantine_utxo(state: &mut CkBtcMinterState, utxo: Utxo, account: Account) {
discard_utxo(state, utxo, account, SuspendedReason::Quarantined);
pub fn quarantine_utxo(state: &mut CkBtcMinterState, utxo: Utxo, account: Account, now: Timestamp) {
discard_utxo(state, utxo, account, SuspendedReason::Quarantined, now);
}

pub fn ignore_utxo(state: &mut CkBtcMinterState, utxo: Utxo, account: Account) {
discard_utxo(state, utxo, account, SuspendedReason::ValueTooSmall);
pub fn ignore_utxo(state: &mut CkBtcMinterState, utxo: Utxo, account: Account, now: Timestamp) {
discard_utxo(state, utxo, account, SuspendedReason::ValueTooSmall, now);
}

fn discard_utxo(
state: &mut CkBtcMinterState,
utxo: Utxo,
account: Account,
reason: SuspendedReason,
now: Timestamp,
) {
// ignored UTXOs are periodically re-evaluated and should not trigger
// an event if they are still ignored.
if state.suspend_utxo(utxo.clone(), account, reason) {
if state.suspend_utxo(utxo.clone(), account, reason, now) {
record_event(&Event::SuspendedUtxo {
utxo,
account,
Expand Down
2 changes: 1 addition & 1 deletion rs/bitcoin/ckbtc/minter/src/state/eventlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub fn replay<I: CheckInvariants>(
account,
reason,
} => {
state.suspended_utxos.insert(account, utxo, reason);
state.suspended_utxos.insert(account, utxo, reason, None);
ninegua marked this conversation as resolved.
Show resolved Hide resolved
}
Event::DistributedKytFee {
kyt_provider,
Expand Down
Loading
Loading