From 9bf3c9574a624e2965afc11d848a0214ba2d7966 Mon Sep 17 00:00:00 2001 From: Hamish Peebles Date: Thu, 8 Feb 2024 18:17:25 +0000 Subject: [PATCH] Push anonymised `user_online` events to the `event_sink` canister (#5337) --- .gitignore | 2 +- Cargo.lock | 126 +++++++++++++----- Cargo.toml | 6 +- backend/canister_installer/src/lib.rs | 2 + backend/canisters/event_relay/CHANGELOG.md | 10 ++ backend/canisters/event_relay/api/Cargo.toml | 1 + .../event_relay/api/src/lifecycle/init.rs | 1 + .../event_relay/api/src/updates/mod.rs | 2 +- .../api/src/updates/push_events.rs | 2 + backend/canisters/event_relay/impl/Cargo.toml | 5 + .../canisters/event_relay/impl/src/guards.rs | 9 ++ backend/canisters/event_relay/impl/src/lib.rs | 46 ++++++- .../event_relay/impl/src/lifecycle/init.rs | 1 + .../event_relay/impl/src/lifecycle/mod.rs | 7 +- .../event_relay/impl/src/updates/mod.rs | 1 + .../impl/src/updates/push_events.rs | 30 +++++ backend/canisters/online_users/CHANGELOG.md | 4 + .../online_users/api/src/lifecycle/init.rs | 1 + .../canisters/online_users/impl/Cargo.toml | 2 + .../canisters/online_users/impl/src/lib.rs | 29 +++- .../online_users/impl/src/lifecycle/init.rs | 7 +- .../impl/src/updates/mark_as_online.rs | 11 +- backend/integration_tests/Cargo.toml | 1 + backend/integration_tests/src/lib.rs | 1 + backend/integration_tests/src/setup.rs | 21 ++- backend/integration_tests/src/wasms.rs | 1 + 26 files changed, 283 insertions(+), 46 deletions(-) create mode 100644 backend/canisters/event_relay/CHANGELOG.md create mode 100644 backend/canisters/event_relay/api/src/updates/push_events.rs create mode 100644 backend/canisters/event_relay/impl/src/guards.rs create mode 100644 backend/canisters/event_relay/impl/src/updates/push_events.rs diff --git a/.gitignore b/.gitignore index 3b41fbc6df..b779e7dfad 100644 --- a/.gitignore +++ b/.gitignore @@ -11,9 +11,9 @@ wasms/ **/*.pem **/*.rs.bk pocket-ic* -changelog.md msg.json notes.md notification.did +release_changelog.md summary.md temp.did \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 6c93b60d75..6705a05cb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,7 +275,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -889,9 +889,9 @@ dependencies = [ [[package]] name = "candid" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39be580be172631a35cac2fc6c765f365709de459edb88121b3e7a80cce6c1ec" +checksum = "182543fbc03b4ad0bfc384e6b68346e0b0aad0b11d075b71b4fcaa5d07f8862c" dependencies = [ "anyhow", "binread", @@ -919,7 +919,7 @@ dependencies = [ "lazy_static", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -927,7 +927,7 @@ name = "candid_gen" version = "0.1.0" dependencies = [ "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -951,7 +951,7 @@ dependencies = [ "quote", "serde", "serde_tokenstream 0.2.0", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1055,7 +1055,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1196,7 +1196,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1767,7 +1767,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2067,6 +2067,7 @@ version = "0.1.0" dependencies = [ "candid", "candid_gen", + "event_sink_canister", "serde", "types", ] @@ -2080,6 +2081,10 @@ dependencies = [ "canister_state_macros", "canister_tracing_macros", "event_relay_canister", + "event_sink_canister", + "event_sink_client", + "event_sink_client_cdk_runtime", + "event_sink_utils", "http_request", "ic-cdk", "ic-cdk-macros", @@ -2088,12 +2093,56 @@ dependencies = [ "rand", "serde", "serializer", + "sha256", "stable_memory", "tracing", "types", "utils", ] +[[package]] +name = "event_sink_canister" +version = "0.1.0" +source = "git+https://github.com/open-chat-labs/event-sink?rev=84f5ffb8aa638e4ed43d0392ad4e119963fad3df#84f5ffb8aa638e4ed43d0392ad4e119963fad3df" +dependencies = [ + "candid", + "serde", + "serde_bytes", +] + +[[package]] +name = "event_sink_client" +version = "0.1.0" +source = "git+https://github.com/open-chat-labs/event-sink?rev=84f5ffb8aa638e4ed43d0392ad4e119963fad3df#84f5ffb8aa638e4ed43d0392ad4e119963fad3df" +dependencies = [ + "event_sink_canister", + "ic_principal", + "serde", +] + +[[package]] +name = "event_sink_client_cdk_runtime" +version = "0.1.0" +source = "git+https://github.com/open-chat-labs/event-sink?rev=84f5ffb8aa638e4ed43d0392ad4e119963fad3df#84f5ffb8aa638e4ed43d0392ad4e119963fad3df" +dependencies = [ + "event_sink_canister", + "event_sink_client", + "ic-cdk", + "ic-cdk-timers", + "ic_principal", + "rand", + "serde", + "tracing", +] + +[[package]] +name = "event_sink_utils" +version = "0.1.0" +source = "git+https://github.com/open-chat-labs/event-sink?rev=84f5ffb8aa638e4ed43d0392ad4e119963fad3df#84f5ffb8aa638e4ed43d0392ad4e119963fad3df" +dependencies = [ + "serde", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -2268,7 +2317,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2803,7 +2852,7 @@ name = "human_readable_derive" version = "0.1.0" dependencies = [ "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -3388,6 +3437,7 @@ dependencies = [ "community_canister", "cycles_dispenser_canister", "escrow_canister", + "event_relay_canister", "group_canister", "group_index_canister", "ic-cdk", @@ -4394,6 +4444,8 @@ dependencies = [ "canister_logger", "canister_state_macros", "canister_tracing_macros", + "event_sink_client", + "event_sink_client_cdk_runtime", "http_request", "ic-cdk", "ic-cdk-macros", @@ -4440,7 +4492,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -4810,9 +4862,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -4931,9 +4983,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -5609,18 +5661,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.192" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] [[package]] name = "serde_bytes" -version = "0.11.12" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab33ec92f677585af6d88c65593ae2375adde54efdbf16d597f2cbc7a6d368ff" +checksum = "8b8497c313fd43ab992087548117643f6fcd935cbf36f176ffda0aacf9591734" dependencies = [ "serde", ] @@ -5637,13 +5689,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -5677,7 +5729,7 @@ checksum = "3081f5ffbb02284dda55132aa26daecedd7372a42417bbbab6f14ab7d6bb9145" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -5700,7 +5752,7 @@ dependencies = [ "proc-macro2", "quote", "serde", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -5737,7 +5789,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -6293,9 +6345,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -6378,7 +6430,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -6390,7 +6442,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "test-case-core", ] @@ -6411,7 +6463,7 @@ checksum = "49922ecae66cc8a249b77e68d1d0623c1b2c514f0060c27cdc68bd62a1219d35" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -6514,7 +6566,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -6614,7 +6666,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -7159,7 +7211,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -7193,7 +7245,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -7521,7 +7573,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -7541,7 +7593,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0004f9a81c..234382bdc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,13 +154,17 @@ aws-sdk-dynamodb = "0.35.0" aws-types = "0.57.1" base64 = "0.21.5" bitflags = { version = "2.4.1", features = ["serde"] } -candid = "0.10.2" +candid = "0.10.3" canister_sig_util = { git = "https://github.com/dfinity/internet-identity", rev = "f46da3bfefff638a844117606a4dea70b2dd4405" } ciborium = "0.2.1" clap = "4.4.8" dfx-core = { git = "https://github.com/hpeebles/dfinity-sdk", rev = "d39dde0611fda8f30a6796b40b1dbb13a0541597" } dirs = "5.0.1" dotenv = "0.15.0" +event_sink_canister = { git = "https://github.com/open-chat-labs/event-sink", rev = "84f5ffb8aa638e4ed43d0392ad4e119963fad3df" } +event_sink_client = { git = "https://github.com/open-chat-labs/event-sink", rev = "84f5ffb8aa638e4ed43d0392ad4e119963fad3df" } +event_sink_client_cdk_runtime = { git = "https://github.com/open-chat-labs/event-sink", rev = "84f5ffb8aa638e4ed43d0392ad4e119963fad3df" } +event_sink_utils = { git = "https://github.com/open-chat-labs/event-sink", rev = "84f5ffb8aa638e4ed43d0392ad4e119963fad3df" } futures = "0.3.29" getrandom = { version = "0.2.11", features = ["custom"] } hex = "0.4.3" diff --git a/backend/canister_installer/src/lib.rs b/backend/canister_installer/src/lib.rs index 322c8c7d93..d813e9b24d 100644 --- a/backend/canister_installer/src/lib.rs +++ b/backend/canister_installer/src/lib.rs @@ -123,6 +123,7 @@ async fn install_service_canisters_impl( let online_users_canister_wasm = get_canister_wasm(CanisterName::OnlineUsers, version); let online_users_init_args = online_users_canister::init::Args { user_index_canister_id: canister_ids.user_index, + event_relay_canister_id: canister_ids.event_relay, cycles_dispenser_canister_id: canister_ids.cycles_dispenser, wasm_version: version, test_mode, @@ -224,6 +225,7 @@ async fn install_service_canisters_impl( let event_relay_canister_wasm = get_canister_wasm(CanisterName::EventRelay, version); let event_relay_init_args = event_relay_canister::init::Args { push_events_whitelist: vec![canister_ids.user_index, canister_ids.online_users], + event_sink_canister_id: Principal::anonymous(), cycles_dispenser_canister_id: canister_ids.cycles_dispenser, wasm_version: version, test_mode, diff --git a/backend/canisters/event_relay/CHANGELOG.md b/backend/canisters/event_relay/CHANGELOG.md new file mode 100644 index 0000000000..2ac563f6a9 --- /dev/null +++ b/backend/canisters/event_relay/CHANGELOG.md @@ -0,0 +1,10 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). + +## [unreleased] + +### Added + +- Forward anonymised events on to the `event_sink` canister ([#5337](https://github.com/open-chat-labs/open-chat/pull/5337)) diff --git a/backend/canisters/event_relay/api/Cargo.toml b/backend/canisters/event_relay/api/Cargo.toml index edb2e1234c..4da88adab2 100644 --- a/backend/canisters/event_relay/api/Cargo.toml +++ b/backend/canisters/event_relay/api/Cargo.toml @@ -8,5 +8,6 @@ edition = "2021" [dependencies] candid = { workspace = true } candid_gen = { path = "../../../libraries/candid_gen" } +event_sink_canister = { workspace = true } serde = { workspace = true } types = { path = "../../../libraries/types" } \ No newline at end of file diff --git a/backend/canisters/event_relay/api/src/lifecycle/init.rs b/backend/canisters/event_relay/api/src/lifecycle/init.rs index ae7e98964f..f3a3759b9b 100644 --- a/backend/canisters/event_relay/api/src/lifecycle/init.rs +++ b/backend/canisters/event_relay/api/src/lifecycle/init.rs @@ -5,6 +5,7 @@ use types::{BuildVersion, CanisterId}; #[derive(CandidType, Serialize, Deserialize, Debug)] pub struct Args { pub push_events_whitelist: Vec, + pub event_sink_canister_id: CanisterId, pub cycles_dispenser_canister_id: CanisterId, pub wasm_version: BuildVersion, pub test_mode: bool, diff --git a/backend/canisters/event_relay/api/src/updates/mod.rs b/backend/canisters/event_relay/api/src/updates/mod.rs index 8b13789179..38403c1f3a 100644 --- a/backend/canisters/event_relay/api/src/updates/mod.rs +++ b/backend/canisters/event_relay/api/src/updates/mod.rs @@ -1 +1 @@ - +pub mod push_events; diff --git a/backend/canisters/event_relay/api/src/updates/push_events.rs b/backend/canisters/event_relay/api/src/updates/push_events.rs new file mode 100644 index 0000000000..341da47054 --- /dev/null +++ b/backend/canisters/event_relay/api/src/updates/push_events.rs @@ -0,0 +1,2 @@ +pub type Args = event_sink_canister::PushEventsArgs; +pub type Response = (); diff --git a/backend/canisters/event_relay/impl/Cargo.toml b/backend/canisters/event_relay/impl/Cargo.toml index 870b80875d..a61f2e64e9 100644 --- a/backend/canisters/event_relay/impl/Cargo.toml +++ b/backend/canisters/event_relay/impl/Cargo.toml @@ -15,6 +15,10 @@ canister_logger = { path = "../../../libraries/canister_logger" } canister_state_macros = { path = "../../../libraries/canister_state_macros" } canister_tracing_macros = { path = "../../../libraries/canister_tracing_macros" } event_relay_canister = { path = "../api" } +event_sink_canister = { workspace = true } +event_sink_client = { workspace = true } +event_sink_client_cdk_runtime = { workspace = true } +event_sink_utils = { workspace = true } http_request = { path = "../../../libraries/http_request" } ic-cdk = { workspace = true } ic-cdk-macros = { workspace = true } @@ -23,6 +27,7 @@ ic-stable-structures = { workspace = true } rand = { workspace = true } serde = { workspace = true } serializer = { path = "../../../libraries/serializer" } +sha256 = { path = "../../../libraries/sha256" } stable_memory = { path = "../../../libraries/stable_memory" } tracing = { workspace = true } types = { path = "../../../libraries/types" } diff --git a/backend/canisters/event_relay/impl/src/guards.rs b/backend/canisters/event_relay/impl/src/guards.rs new file mode 100644 index 0000000000..4da62a02a7 --- /dev/null +++ b/backend/canisters/event_relay/impl/src/guards.rs @@ -0,0 +1,9 @@ +use crate::read_state; + +pub fn caller_can_push_events() -> Result<(), String> { + if read_state(|state| state.can_caller_push_events()) { + Ok(()) + } else { + Err("Caller is not whitelisted to push events".to_string()) + } +} diff --git a/backend/canisters/event_relay/impl/src/lib.rs b/backend/canisters/event_relay/impl/src/lib.rs index 6461e8be4e..87d6cf641c 100644 --- a/backend/canisters/event_relay/impl/src/lib.rs +++ b/backend/canisters/event_relay/impl/src/lib.rs @@ -1,11 +1,17 @@ use candid::Principal; use canister_state_macros::canister_state; +use event_sink_client::{EventSinkClient, EventSinkClientBuilder, EventSinkClientInfo}; +use event_sink_client_cdk_runtime::CdkRuntime; +use event_sink_utils::EventDeduper; use serde::{Deserialize, Serialize}; +use sha256::sha256_string; use std::cell::RefCell; use std::collections::HashSet; +use std::time::Duration; use types::{BuildVersion, CanisterId, Cycles, TimestampMillis, Timestamped}; use utils::env::Environment; +mod guards; mod lifecycle; mod memory; mod model; @@ -28,7 +34,15 @@ impl RuntimeState { RuntimeState { env, data } } + pub fn can_caller_push_events(&self) -> bool { + let caller = self.env.caller(); + self.data.push_events_whitelist.contains(&caller) + } + pub fn metrics(&self) -> Metrics { + let event_sink_client_info = self.data.events_sink_client.info(); + let event_sink_canister_id = event_sink_client_info.event_sink_canister_id; + Metrics { memory_used: utils::memory::used(), now: self.env.now(), @@ -36,7 +50,9 @@ impl RuntimeState { wasm_version: WASM_VERSION.with_borrow(|v| **v), git_commit_id: utils::git::git_commit_id().to_string(), push_events_whitelist: self.data.push_events_whitelist.iter().copied().collect(), + event_sink_client_info, canister_ids: CanisterIds { + event_sink: event_sink_canister_id, cycles_dispenser: self.data.cycles_dispenser_canister_id, }, } @@ -46,20 +62,46 @@ impl RuntimeState { #[derive(Serialize, Deserialize)] struct Data { pub push_events_whitelist: HashSet, + pub events_sink_client: EventSinkClient, + pub event_deduper: EventDeduper, pub cycles_dispenser_canister_id: CanisterId, + pub salt: [u8; 32], pub rng_seed: [u8; 32], pub test_mode: bool, } impl Data { - pub fn new(push_events_whitelist: HashSet, cycles_dispenser_canister_id: CanisterId, test_mode: bool) -> Data { + pub fn new( + push_events_whitelist: HashSet, + events_sink_canister_id: CanisterId, + cycles_dispenser_canister_id: CanisterId, + test_mode: bool, + ) -> Data { Data { push_events_whitelist, + events_sink_client: EventSinkClientBuilder::new(events_sink_canister_id, CdkRuntime::default()) + .with_flush_delay(Duration::from_secs(60)) + .build(), + event_deduper: EventDeduper::default(), cycles_dispenser_canister_id, + salt: [0; 32], rng_seed: [0; 32], test_mode, } } + + pub fn obfuscate_user(&self, user: String) -> String { + // We only want to obfuscate userId principals, so if the string is not a principal we return it as is + if Principal::from_text(&user).is_err() { + return user; + } + + // Generates a 32 character string from the input value + the salt + let mut bytes = Vec::new(); + bytes.extend_from_slice(user.as_bytes()); + bytes.extend_from_slice(&self.salt); + sha256_string(&bytes).split_off(32) + } } #[derive(Serialize, Debug)] @@ -70,10 +112,12 @@ pub struct Metrics { pub wasm_version: BuildVersion, pub git_commit_id: String, pub push_events_whitelist: Vec, + pub event_sink_client_info: EventSinkClientInfo, pub canister_ids: CanisterIds, } #[derive(Serialize, Debug)] pub struct CanisterIds { + pub event_sink: CanisterId, pub cycles_dispenser: CanisterId, } diff --git a/backend/canisters/event_relay/impl/src/lifecycle/init.rs b/backend/canisters/event_relay/impl/src/lifecycle/init.rs index be22a36e23..156393c360 100644 --- a/backend/canisters/event_relay/impl/src/lifecycle/init.rs +++ b/backend/canisters/event_relay/impl/src/lifecycle/init.rs @@ -15,6 +15,7 @@ fn init(args: Args) { let env = init_env([0; 32]); let data = Data::new( args.push_events_whitelist.into_iter().collect(), + args.event_sink_canister_id, args.cycles_dispenser_canister_id, args.test_mode, ); diff --git a/backend/canisters/event_relay/impl/src/lifecycle/mod.rs b/backend/canisters/event_relay/impl/src/lifecycle/mod.rs index aaf9b1d99f..d8eca93275 100644 --- a/backend/canisters/event_relay/impl/src/lifecycle/mod.rs +++ b/backend/canisters/event_relay/impl/src/lifecycle/mod.rs @@ -32,7 +32,12 @@ fn reseed_rng() { let seed = get_random_seed().await; mutate_state(|state| { state.data.rng_seed = seed; - state.env = Box::new(CanisterEnv::new(seed)) + state.env = Box::new(CanisterEnv::new(seed)); + + // We only want to set the salt once + if state.data.salt == [0; 32] { + state.data.salt = seed; + } }); trace!("Successfully reseeded rng"); } diff --git a/backend/canisters/event_relay/impl/src/updates/mod.rs b/backend/canisters/event_relay/impl/src/updates/mod.rs index f77132607f..1d191f224d 100644 --- a/backend/canisters/event_relay/impl/src/updates/mod.rs +++ b/backend/canisters/event_relay/impl/src/updates/mod.rs @@ -1 +1,2 @@ +mod push_events; mod wallet_receive; diff --git a/backend/canisters/event_relay/impl/src/updates/push_events.rs b/backend/canisters/event_relay/impl/src/updates/push_events.rs new file mode 100644 index 0000000000..9f08313e95 --- /dev/null +++ b/backend/canisters/event_relay/impl/src/updates/push_events.rs @@ -0,0 +1,30 @@ +use crate::guards::caller_can_push_events; +use crate::{mutate_state, RuntimeState}; +use canister_tracing_macros::trace; +use event_relay_canister::push_events::*; +use event_sink_canister::Event; +use ic_cdk_macros::update; + +#[update(guard = "caller_can_push_events")] +#[trace] +fn push_events(args: Args) { + mutate_state(|state| push_events_impl(args, state)) +} + +fn push_events_impl(args: Args, state: &mut RuntimeState) { + let now = state.env.now(); + + for event in args.events { + if state.data.event_deduper.try_push(event.idempotency_key, now) { + let user = event.user.map(|u| state.data.obfuscate_user(u)); + + state.data.events_sink_client.push_event(Event { + name: event.name, + timestamp: event.timestamp, + user, + source: event.source, + payload: event.payload, + }); + } + } +} diff --git a/backend/canisters/online_users/CHANGELOG.md b/backend/canisters/online_users/CHANGELOG.md index 84569b126e..10fdc5c53a 100644 --- a/backend/canisters/online_users/CHANGELOG.md +++ b/backend/canisters/online_users/CHANGELOG.md @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [unreleased] +### Added + +- Push `user_online` events to the `event_relay` canister ([#5337](https://github.com/open-chat-labs/open-chat/pull/5337)) + ### Changed - Avoid usages of `make_c2c_call` and use macro instead ([#5252](https://github.com/open-chat-labs/open-chat/pull/5252)) diff --git a/backend/canisters/online_users/api/src/lifecycle/init.rs b/backend/canisters/online_users/api/src/lifecycle/init.rs index d203057a57..391def5844 100644 --- a/backend/canisters/online_users/api/src/lifecycle/init.rs +++ b/backend/canisters/online_users/api/src/lifecycle/init.rs @@ -6,6 +6,7 @@ use types::{BuildVersion, CanisterId}; pub struct Args { pub user_index_canister_id: CanisterId, pub cycles_dispenser_canister_id: CanisterId, + pub event_relay_canister_id: CanisterId, pub wasm_version: BuildVersion, pub test_mode: bool, } diff --git a/backend/canisters/online_users/impl/Cargo.toml b/backend/canisters/online_users/impl/Cargo.toml index b4039124c7..916401bf94 100644 --- a/backend/canisters/online_users/impl/Cargo.toml +++ b/backend/canisters/online_users/impl/Cargo.toml @@ -14,6 +14,8 @@ candid = { workspace = true } canister_logger = { path = "../../../libraries/canister_logger" } canister_state_macros = { path = "../../../libraries/canister_state_macros" } canister_tracing_macros = { path = "../../../libraries/canister_tracing_macros" } +event_sink_client = { workspace = true } +event_sink_client_cdk_runtime = { workspace = true } http_request = { path = "../../../libraries/http_request" } ic-cdk = { workspace = true } ic-cdk-macros = { workspace = true } diff --git a/backend/canisters/online_users/impl/src/lib.rs b/backend/canisters/online_users/impl/src/lib.rs index fcb992fc0f..3514133e68 100644 --- a/backend/canisters/online_users/impl/src/lib.rs +++ b/backend/canisters/online_users/impl/src/lib.rs @@ -1,8 +1,11 @@ use crate::model::last_online_dates::LastOnlineDates; use crate::model::principal_to_user_id_map::PrincipalToUserIdMap; use canister_state_macros::canister_state; +use event_sink_client::{EventSinkClient, EventSinkClientBuilder, EventSinkClientInfo}; +use event_sink_client_cdk_runtime::CdkRuntime; use serde::{Deserialize, Serialize}; use std::cell::RefCell; +use std::time::Duration; use types::{BuildVersion, CanisterId, Cycles, TimestampMillis, Timestamped}; use utils::env::Environment; @@ -30,6 +33,9 @@ impl RuntimeState { } pub fn metrics(&self) -> Metrics { + let event_sink_client_info = self.data.event_sink_client.info(); + let event_sink_canister_id = event_sink_client_info.event_sink_canister_id; + Metrics { memory_used: utils::memory::used(), now: self.env.now(), @@ -38,8 +44,10 @@ impl RuntimeState { git_commit_id: utils::git::git_commit_id().to_string(), mark_as_online_count: self.data.mark_as_online_count, active_users: self.data.cached_active_users.clone(), + event_sink_client_info, canister_ids: CanisterIds { user_index: self.data.user_index_canister_id, + event_relay: event_sink_canister_id, cycles_dispenser: self.data.cycles_dispenser_canister_id, }, } @@ -52,19 +60,36 @@ struct Data { pub principal_to_user_id_map: PrincipalToUserIdMap, pub user_index_canister_id: CanisterId, pub cycles_dispenser_canister_id: CanisterId, + #[serde(default = "event_sink_client")] + pub event_sink_client: EventSinkClient, pub mark_as_online_count: u64, pub cached_active_users: ActiveUsers, pub rng_seed: [u8; 32], pub test_mode: bool, } +fn event_sink_client() -> EventSinkClient { + let event_relay_canister_id = CanisterId::from_text("6ofpc-2aaaa-aaaaf-biibq-cai").unwrap(); + EventSinkClientBuilder::new(event_relay_canister_id, CdkRuntime::default()) + .with_flush_delay(Duration::from_secs(60)) + .build() +} + impl Data { - pub fn new(user_index_canister_id: CanisterId, cycles_dispenser_canister_id: CanisterId, test_mode: bool) -> Data { + pub fn new( + user_index_canister_id: CanisterId, + event_relay_canister_id: CanisterId, + cycles_dispenser_canister_id: CanisterId, + test_mode: bool, + ) -> Data { Data { last_online_dates: LastOnlineDates::default(), principal_to_user_id_map: PrincipalToUserIdMap::default(), user_index_canister_id, cycles_dispenser_canister_id, + event_sink_client: EventSinkClientBuilder::new(event_relay_canister_id, CdkRuntime::default()) + .with_flush_delay(Duration::from_secs(60)) + .build(), mark_as_online_count: 0, cached_active_users: ActiveUsers::default(), rng_seed: [0; 32], @@ -82,6 +107,7 @@ pub struct Metrics { pub git_commit_id: String, pub mark_as_online_count: u64, pub active_users: ActiveUsers, + pub event_sink_client_info: EventSinkClientInfo, pub canister_ids: CanisterIds, } @@ -98,5 +124,6 @@ pub struct ActiveUsers { #[derive(Serialize, Debug)] pub struct CanisterIds { pub user_index: CanisterId, + pub event_relay: CanisterId, pub cycles_dispenser: CanisterId, } diff --git a/backend/canisters/online_users/impl/src/lifecycle/init.rs b/backend/canisters/online_users/impl/src/lifecycle/init.rs index 9c1238df9c..16533ce28b 100644 --- a/backend/canisters/online_users/impl/src/lifecycle/init.rs +++ b/backend/canisters/online_users/impl/src/lifecycle/init.rs @@ -13,7 +13,12 @@ fn init(args: Args) { init_cycles_dispenser_client(args.cycles_dispenser_canister_id, args.test_mode); let env = init_env([0; 32]); - let data = Data::new(args.user_index_canister_id, args.cycles_dispenser_canister_id, args.test_mode); + let data = Data::new( + args.user_index_canister_id, + args.event_relay_canister_id, + args.cycles_dispenser_canister_id, + args.test_mode, + ); init_state(env, data, args.wasm_version); diff --git a/backend/canisters/online_users/impl/src/updates/mark_as_online.rs b/backend/canisters/online_users/impl/src/updates/mark_as_online.rs index 3d57d9c6de..ed78aaeea3 100644 --- a/backend/canisters/online_users/impl/src/updates/mark_as_online.rs +++ b/backend/canisters/online_users/impl/src/updates/mark_as_online.rs @@ -1,6 +1,7 @@ use crate::{mutate_state, read_state, RuntimeState}; use candid::Principal; use canister_tracing_macros::trace; +use event_sink_client::Event; use ic_cdk_macros::update; use online_users_canister::mark_as_online::{Response::*, *}; use types::{CanisterId, UserId}; @@ -36,7 +37,15 @@ fn try_get_user_id_locally(state: &RuntimeState) -> Result Response { - state.data.last_online_dates.mark_online(user_id, state.env.now()); + let now = state.env.now(); + state.data.last_online_dates.mark_online(user_id, now); state.data.mark_as_online_count += 1; + state.data.event_sink_client.push_event(Event { + name: "user_online".to_string(), + timestamp: now, + user: Some(user_id.to_string()), + source: Some(state.env.canister_id().to_string()), + payload: Vec::new(), + }); Success } diff --git a/backend/integration_tests/Cargo.toml b/backend/integration_tests/Cargo.toml index 4ace1f96e0..ba756e55a2 100644 --- a/backend/integration_tests/Cargo.toml +++ b/backend/integration_tests/Cargo.toml @@ -10,6 +10,7 @@ candid = { workspace = true } community_canister = { path = "../canisters/community/api" } cycles_dispenser_canister = { path = "../canisters/cycles_dispenser/api" } escrow_canister = { path = "../canisters/escrow/api" } +event_relay_canister = { path = "../canisters/event_relay/api" } group_canister = { path = "../canisters/group/api" } group_index_canister = { path = "../canisters/group_index/api" } ic-cdk = { workspace = true } diff --git a/backend/integration_tests/src/lib.rs b/backend/integration_tests/src/lib.rs index 63ee965a67..6645ba5556 100644 --- a/backend/integration_tests/src/lib.rs +++ b/backend/integration_tests/src/lib.rs @@ -94,6 +94,7 @@ pub struct CanisterIds { pub registry: CanisterId, pub escrow: CanisterId, pub translations: CanisterId, + pub event_relay: CanisterId, pub icp_ledger: CanisterId, pub chat_ledger: CanisterId, pub cycles_minting_canister: CanisterId, diff --git a/backend/integration_tests/src/setup.rs b/backend/integration_tests/src/setup.rs index d8834f07bb..0a842b6bc0 100644 --- a/backend/integration_tests/src/setup.rs +++ b/backend/integration_tests/src/setup.rs @@ -81,6 +81,7 @@ fn install_canisters(env: &mut PocketIc, controller: Principal) -> CanisterIds { let registry_canister_id = create_canister(env, controller); let escrow_canister_id = create_canister(env, controller); let translations_canister_id = create_canister(env, controller); + let event_relay_canister_id = create_canister(env, controller); let local_user_index_canister_id = create_canister(env, user_index_canister_id); let local_group_index_canister_id = create_canister(env, group_index_canister_id); @@ -90,6 +91,7 @@ fn install_canisters(env: &mut PocketIc, controller: Principal) -> CanisterIds { let cycles_dispenser_canister_wasm = wasms::CYCLES_DISPENSER.clone(); let cycles_minting_canister_wasm = wasms::CYCLES_MINTING_CANISTER.clone(); let escrow_canister_wasm = wasms::ESCROW.clone(); + let event_relay_canister_wasm = wasms::EVENT_RELAY.clone(); let group_canister_wasm = wasms::GROUP.clone(); let group_index_canister_wasm = wasms::GROUP_INDEX.clone(); let icp_ledger_canister_wasm = wasms::ICP_LEDGER.clone(); @@ -203,6 +205,7 @@ fn install_canisters(env: &mut PocketIc, controller: Principal) -> CanisterIds { let online_users_init_args = online_users_canister::init::Args { user_index_canister_id, + event_relay_canister_id, cycles_dispenser_canister_id, wasm_version: BuildVersion::min(), test_mode: true, @@ -311,6 +314,21 @@ fn install_canisters(env: &mut PocketIc, controller: Principal) -> CanisterIds { }; install_canister(env, controller, escrow_canister_id, escrow_canister_wasm, escrow_init_args); + let event_relay_init_args = event_relay_canister::init::Args { + push_events_whitelist: vec![], + event_sink_canister_id: Principal::anonymous(), + cycles_dispenser_canister_id, + wasm_version: BuildVersion::min(), + test_mode: true, + }; + install_canister( + env, + controller, + event_relay_canister_id, + event_relay_canister_wasm, + event_relay_init_args, + ); + client::user_index::happy_path::upgrade_user_canister_wasm(env, controller, user_index_canister_id, user_canister_wasm); client::user_index::happy_path::upgrade_local_user_index_canister_wasm( env, @@ -426,10 +444,11 @@ fn install_canisters(env: &mut PocketIc, controller: Principal) -> CanisterIds { cycles_dispenser: cycles_dispenser_canister_id, registry: registry_canister_id, escrow: escrow_canister_id, + translations: translations_canister_id, + event_relay: event_relay_canister_id, icp_ledger: nns_ledger_canister_id, chat_ledger: chat_ledger_canister_id, cycles_minting_canister: cycles_minting_canister_id, - translations: translations_canister_id, } } diff --git a/backend/integration_tests/src/wasms.rs b/backend/integration_tests/src/wasms.rs index a97397c4da..329337bd7b 100644 --- a/backend/integration_tests/src/wasms.rs +++ b/backend/integration_tests/src/wasms.rs @@ -9,6 +9,7 @@ lazy_static! { pub static ref CYCLES_DISPENSER: CanisterWasm = get_canister_wasm("cycles_dispenser"); pub static ref CYCLES_MINTING_CANISTER: CanisterWasm = get_canister_wasm("cycles_minting_canister"); pub static ref ESCROW: CanisterWasm = get_canister_wasm("escrow"); + pub static ref EVENT_RELAY: CanisterWasm = get_canister_wasm("event_relay"); pub static ref GROUP: CanisterWasm = get_canister_wasm("group"); pub static ref GROUP_INDEX: CanisterWasm = get_canister_wasm("group_index"); pub static ref ICP_LEDGER: CanisterWasm = get_canister_wasm("icp_ledger");