Skip to content

Commit

Permalink
Expose aggregated data for DappRadar to consume (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored May 3, 2024
1 parent c53b0cf commit b1cc881
Show file tree
Hide file tree
Showing 15 changed files with 405 additions and 25 deletions.
53 changes: 47 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ candid = "0.10.3"
ic-agent = "0.33.0"
ic-cdk = "0.12.0"
ic-cdk-timers = "0.6.0"
ic-http-certification = "2.5.0"
ic_principal = "0.1.1"
ic-stable-structures = "0.6.3"
pocket-ic = "2.1.0"
querystring = "1.1.0"
rand = "0.8.5"
rmp-serde = "1.1.2"
serde = "1.0.196"
serde_bytes = "0.11.14"
serde_json = "1.0.114"
sha2 = "0.10.8"
test-case = "3.3.1"
time = "0.3.36"
tokio = "1.36.0"
tokio-util = "0.7.10"
tracing = "0.1.40"
13 changes: 13 additions & 0 deletions rs/canister/api/can.did
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ type EventsResponse = record {
events : vec IndexedEvent;
latest_event_index : opt nat64;
};
type HttpRequest = record {
url : text;
method : text;
body : vec nat8;
headers : vec record { text; text };
};
type HttpResponse = record {
body : vec nat8;
headers : vec record { text; text };
upgrade : opt bool;
status_code : nat16;
};
type IdempotentEvent = record {
source : opt Anonymizable;
name : text;
Expand Down Expand Up @@ -32,6 +44,7 @@ type WhitelistedPrincipals = record {
};
service : (InitArgs) -> {
events : (EventsArgs) -> (EventsResponse) query;
http_request : (HttpRequest) -> (HttpResponse) query;
push_events : (PushEventsArgs) -> ();
whitelisted_principals : () -> (WhitelistedPrincipals) query;
}
8 changes: 8 additions & 0 deletions rs/canister/impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ event_store_canister.path = "../api"
event_store_utils.path = "../../utils"
ic-cdk.workspace = true
ic-cdk-timers.workspace = true
ic-http-certification.workspace = true
ic-stable-structures.workspace = true
querystring = { workspace = true, optional = true }
rmp-serde.workspace = true
serde.workspace = true
serde_bytes.workspace = true
serde_json = { workspace = true, optional = true }
sha2.workspace = true
time = { workspace = true, optional = true }

[features]
default = ["dapp-radar"]
dapp-radar = ["querystring", "serde_json", "time"]
127 changes: 127 additions & 0 deletions rs/canister/impl/src/integrations/dapp_radar.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use event_store_canister::TimestampMillis;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

const HOURLY_MAX_ENTRIES: usize = 24 * 70;
const PAGE_SIZE: usize = 1000;

#[derive(Serialize, Deserialize, Default)]
pub struct DappRadarData {
daily: BTreeMap<(u32, u8, u8), EventsPerUser>,
hourly: BTreeMap<(u32, u8, u8, u8), EventsPerUser>,
next_event_index: u64,
}

impl DappRadarData {
pub fn push_event(&mut self, index: u64, user: String, timestamp: TimestampMillis) -> bool {
if index != self.next_event_index {
return false;
}

let datetime =
time::OffsetDateTime::from_unix_timestamp((timestamp / 1000) as i64).unwrap();

let year = datetime.year() as u32;
let month = datetime.month() as u8;
let day = datetime.day();
let hour = datetime.hour();

let day_key = (year, month, day);
let hour_key = (year, month, day, hour);

self.daily.entry(day_key).or_default().push(user.clone());
self.hourly.entry(hour_key).or_default().push(user);

while self.hourly.len() > HOURLY_MAX_ENTRIES {
self.hourly.pop_first();
}

self.next_event_index = index + 1;
true
}

pub fn next_event_index(&self) -> u64 {
self.next_event_index
}

pub fn hourly(&self, year: u32, month: u8, day: u8, page: usize) -> DappRadarResponse {
let all_results: Vec<_> = self
.hourly
.range(&(year, month, day, 0)..&(year, month, day + 1, 0))
.flat_map(|((_, _, _, hour), events)| {
events
.per_user
.iter()
.map(move |(user, count)| DappRadarResponseEntry {
date_time: Some(format!("{year}-{month:02}-{day:02} {hour:02}:00:00")),
user: user.clone(),
transactions: *count,
})
})
.collect();

Self::extract_page(all_results, page)
}

pub fn daily(&self, year: u32, month: u8, day: u8, page: usize) -> DappRadarResponse {
let all_results: Vec<_> = self
.daily
.range(&(year, month, day)..&(year, month, day + 1))
.flat_map(|((_, _, _), events)| {
events
.per_user
.iter()
.map(|(user, count)| DappRadarResponseEntry {
date_time: None,
user: user.clone(),
transactions: *count,
})
})
.collect();

Self::extract_page(all_results, page)
}

fn extract_page(all_results: Vec<DappRadarResponseEntry>, page: usize) -> DappRadarResponse {
let page_count = (((all_results.len() - 1) / PAGE_SIZE) + 1) as u32;

DappRadarResponse {
results: if page == 0 {
Vec::new()
} else {
all_results
.into_iter()
.skip(page.saturating_sub(1) * PAGE_SIZE)
.take(PAGE_SIZE)
.collect()
},
page_count,
}
}
}

#[derive(Serialize, Deserialize, Default)]
pub struct EventsPerUser {
per_user: BTreeMap<String, u32>,
}

#[derive(Serialize)]
pub struct DappRadarResponse {
results: Vec<DappRadarResponseEntry>,
#[serde(rename = "pageCount")]
page_count: u32,
}

#[derive(Serialize)]
struct DappRadarResponseEntry {
#[serde(rename = "dateTime", skip_serializing_if = "Option::is_none")]
date_time: Option<String>,
user: String,
transactions: u32,
}

impl EventsPerUser {
fn push(&mut self, user: String) {
*self.per_user.entry(user).or_default() += 1;
}
}
2 changes: 2 additions & 0 deletions rs/canister/impl/src/integrations/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#[cfg(feature = "dapp-radar")]
pub mod dapp_radar;
2 changes: 2 additions & 0 deletions rs/canister/impl/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod env;
mod guards;
mod integrations;
mod lifecycle;
mod memory;
mod model;
Expand All @@ -11,6 +12,7 @@ mod updates;
mod generate_candid_file {
use event_store_canister::*;
use ic_cdk::export_candid;
use ic_http_certification::*;
use std::env;
use std::fs::write;
use std::path::PathBuf;
Expand Down
25 changes: 25 additions & 0 deletions rs/canister/impl/src/lifecycle/post_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::state::State;
use ic_cdk::post_upgrade;
use ic_stable_structures::reader::{BufferedReader, Reader};
use serde::Deserialize;
use std::time::Duration;

#[post_upgrade]
fn post_upgrade() {
Expand All @@ -13,4 +14,28 @@ fn post_upgrade() {
let mut deserializer = rmp_serde::Deserializer::new(reader);

state::init(State::deserialize(&mut deserializer).unwrap());

run_job_to_populate_integrations_data_if_required()
}

fn run_job_to_populate_integrations_data_if_required() {
state::read(|s| {
if let Some(next) = s.integrations_data().next_event_index() {
if s.events().stats().latest_event_index > Some(next) {
ic_cdk_timers::set_timer(Duration::ZERO, populate_integrations_data);
}
}
});
}

fn populate_integrations_data() {
state::mutate(|s| {
if let Some(next) = s.integrations_data().next_event_index() {
let events = s.events().get(next, 10_000);
for event in events {
s.integrations_data_mut().push_event(event);
}
}
});
run_job_to_populate_integrations_data_if_required();
}
Loading

0 comments on commit b1cc881

Please sign in to comment.