Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove canister_id from state of Runtime implementations #10

Merged
merged 1 commit into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ candid = "0.10.3"
ic-agent = "0.32.0"
ic-cdk = "=0.12.0"
ic-cdk-timers = "0.6.0"
ic_principal = "0.1.1"
ic-stable-structures = "0.6.2"
pocket-ic = "2.0.1"
rand = "0.8.5"
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ic_principal.workspace = true
event_sink_canister.path = "../canister/api"
2 changes: 1 addition & 1 deletion client/agent_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

candid.workspace = true
event_sink_canister.path = "../../canister/api"
event_sink_client.path = ".."
ic-agent.workspace = true
ic_principal.workspace = true
rand.workspace = true
tokio.workspace = true
tokio.features = ["full"]
Expand Down
8 changes: 3 additions & 5 deletions client/agent_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use candid::Principal;
use event_sink_canister::{IdempotentEvent, PushEventsArgs, TimestampMillis};
use event_sink_client::Runtime;
use ic_agent::Agent;
use ic_principal::Principal;
use rand::random;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio_util::sync::CancellationToken;

pub struct AgentRuntime {
canister_id: Principal,
agent: Agent,
scheduler_task_cancellation_token: Option<CancellationToken>,
}

impl AgentRuntime {
pub fn new(canister_id: Principal, agent: Agent) -> AgentRuntime {
pub fn new(agent: Agent) -> AgentRuntime {
AgentRuntime {
canister_id,
agent,
scheduler_task_cancellation_token: None,
}
Expand Down Expand Up @@ -44,11 +42,11 @@ impl Runtime for AgentRuntime {

fn flush<F: FnOnce() + Send + 'static>(
&mut self,
canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
) {
self.cancel_scheduler_task();
let canister_id = self.canister_id;
let agent = self.agent.clone();

tokio::spawn(async move { flush_async(canister_id, agent, events, trigger_retry).await });
Expand Down
2 changes: 1 addition & 1 deletion client/cdk_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
candid.workspace = true
event_sink_canister.path = "../../canister/api"
event_sink_client.path = ".."
ic-cdk.workspace = true
ic-cdk-timers.workspace = true
ic_principal.workspace = true
rand.workspace = true
29 changes: 15 additions & 14 deletions client/cdk_runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use candid::Principal;
use event_sink_canister::{IdempotentEvent, PushEventsArgs, TimestampMillis};
use event_sink_client::Runtime;
use ic_cdk_timers::TimerId;
use ic_principal::Principal;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::time::Duration;

pub struct CdkRuntime {
canister_id: Principal,
rng: StdRng,
scheduled_flush_timer: Option<TimerId>,
}

impl CdkRuntime {
pub fn new(canister_id: Principal) -> CdkRuntime {
let mut seed = [0; 32];
seed[..8].copy_from_slice(&ic_cdk::api::time().to_ne_bytes());

CdkRuntime {
canister_id,
rng: StdRng::from_seed(seed),
scheduled_flush_timer: None,
}
}

fn clear_timer(&mut self) {
if let Some(timer_id) = self.scheduled_flush_timer.take() {
ic_cdk_timers::clear_timer(timer_id);
Expand All @@ -39,11 +27,12 @@ impl Runtime for CdkRuntime {

fn flush<F: FnOnce() + Send + 'static>(
&mut self,
canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
) {
self.clear_timer();
ic_cdk::spawn(flush_async(self.canister_id, events, trigger_retry))
ic_cdk::spawn(flush_async(canister_id, events, trigger_retry))
}

fn rng(&mut self) -> u128 {
Expand All @@ -67,3 +56,15 @@ async fn flush_async<F: FnOnce()>(
trigger_retry();
}
}

impl Default for CdkRuntime {
fn default() -> Self {
let mut seed = [0; 32];
seed[..8].copy_from_slice(&ic_cdk::api::time().to_ne_bytes());

CdkRuntime {
rng: StdRng::from_seed(seed),
scheduled_flush_timer: None,
}
}
}
21 changes: 18 additions & 3 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use event_sink_canister::{IdempotentEvent, TimestampMillis};
use ic_principal::Principal;
use std::mem;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::Duration;
Expand All @@ -11,6 +12,7 @@ pub struct Client<R> {
}

struct ClientInner<R> {
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Duration,
max_batch_size: usize,
Expand All @@ -24,6 +26,7 @@ pub trait Runtime {
fn schedule_flush<F: FnOnce() + Send + 'static>(&mut self, delay: Duration, callback: F);
fn flush<F: FnOnce() + Send + 'static>(
&mut self,
event_sync_canister_id: Principal,
events: Vec<IdempotentEvent>,
trigger_retry: F,
);
Expand All @@ -38,15 +41,17 @@ impl<R> Client<R> {
}

pub struct ClientBuilder<R> {
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Option<Duration>,
max_batch_size: Option<u32>,
events: Vec<IdempotentEvent>,
}

impl<R: Runtime + Send + 'static> ClientBuilder<R> {
pub fn new(runtime: R) -> ClientBuilder<R> {
pub fn new(event_sink_canister_id: Principal, runtime: R) -> ClientBuilder<R> {
ClientBuilder {
event_sink_canister_id,
runtime,
flush_delay: None,
max_batch_size: None,
Expand Down Expand Up @@ -74,6 +79,7 @@ impl<R: Runtime + Send + 'static> ClientBuilder<R> {
let max_batch_size = self.max_batch_size.unwrap_or(DEFAULT_MAX_BATCH_SIZE) as usize;
let client = Client {
inner: Arc::new(Mutex::new(ClientInner::new(
self.event_sink_canister_id,
self.runtime,
flush_delay,
max_batch_size,
Expand Down Expand Up @@ -115,9 +121,12 @@ impl<R: Runtime + Send + 'static> Client<R> {
};

let clone = self.clone();
let event_sink_canister_id = guard.event_sink_canister_id;
guard
.runtime
.flush(events.clone(), move || clone.requeue_events(events));
.flush(event_sink_canister_id, events.clone(), move || {
clone.requeue_events(events)
});
}
}

Expand Down Expand Up @@ -164,8 +173,14 @@ impl<R> Clone for Client<R> {
}

impl<R> ClientInner<R> {
pub fn new(runtime: R, flush_delay: Duration, max_batch_size: usize) -> ClientInner<R> {
pub fn new(
event_sink_canister_id: Principal,
runtime: R,
flush_delay: Duration,
max_batch_size: usize,
) -> ClientInner<R> {
ClientInner {
event_sink_canister_id,
runtime,
flush_delay,
max_batch_size,
Expand Down
Loading