Skip to content

Commit

Permalink
Remove all code relating to the old events format (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
hpeebles authored Mar 20, 2024
1 parent 3fcc99a commit 781bb93
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 206 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions rs/canister/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,3 @@ edition = "2021"
candid.workspace = true
serde.workspace = true
serde_bytes.workspace = true

[dev-dependencies]
rmp-serde = "1.1.2"
test-case.workspace = true
11 changes: 1 addition & 10 deletions rs/canister/api/can.did
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ type IdempotentEvent = record {
payload : vec nat8;
idempotency_key : nat;
};
type IdempotentEventPrevious = record {
source : opt text;
name : text;
user : opt text;
timestamp : nat64;
payload : vec nat8;
idempotency_key : nat;
};
type IndexedEvent = record {
source : opt text;
name : text;
Expand All @@ -34,14 +26,13 @@ type InitArgs = record {
time_granularity : opt nat64;
};
type PushEventsArgs = record { events : vec IdempotentEvent };
type PushEventsArgsPrevious = record { events : vec IdempotentEventPrevious };
type WhitelistedPrincipals = record {
push : vec principal;
read : vec principal;
};
service : (InitArgs) -> {
events : (EventsArgs) -> (EventsResponse) query;
push_events : (PushEventsArgsPrevious) -> ();
push_events : (PushEventsArgs) -> ();
push_events_v2 : (PushEventsArgs) -> ();
whitelisted_principals : () -> (WhitelistedPrincipals) query;
}
169 changes: 2 additions & 167 deletions rs/canister/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use candid::{CandidType, Deserialize};
use serde::__private::from_utf8_lossy;
use serde::de::{EnumAccess, Error, Unexpected, VariantAccess};
use serde::{Deserializer, Serialize};
use std::fmt;
use std::fmt::Formatter;
use std::marker::PhantomData;
use serde::Serialize;

mod lifecycle;
mod queries;
Expand All @@ -28,17 +23,6 @@ pub struct IdempotentEvent {
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct IdempotentEventPrevious {
pub idempotency_key: u128,
pub name: String,
pub timestamp: TimestampMillis,
pub user: Option<String>,
pub source: Option<String>,
#[serde(with = "serde_bytes")]
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct IndexedEvent {
pub index: u64,
Expand All @@ -50,7 +34,7 @@ pub struct IndexedEvent {
pub payload: Vec<u8>,
}

#[derive(CandidType, Serialize, Clone, Debug)]
#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub enum Anonymizable {
Public(String),
Anonymize(String),
Expand All @@ -76,152 +60,3 @@ impl Anonymizable {
matches!(self, Anonymizable::Public(_))
}
}

impl<'de> Deserialize<'de> for Anonymizable {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
enum Field {
Field0,
Field1,
Value(String),
}
#[doc(hidden)]
struct FieldVisitor;
impl<'de> serde::de::Visitor<'de> for FieldVisitor {
type Value = Field;
fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "variant identifier")
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: Error,
{
match value {
0u64 => Ok(Field::Field0),
1u64 => Ok(Field::Field1),
_ => Err(Error::invalid_value(
Unexpected::Unsigned(value),
&"variant index 0 <= i < 2",
)),
}
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: Error,
{
match value {
"Public" => Ok(Field::Field0),
"Anonymize" => Ok(Field::Field1),
value => Ok(Field::Value(value.to_string())),
}
}
fn visit_bytes<E>(self, value: &[u8]) -> Result<Self::Value, E>
where
E: Error,
{
match value {
b"Public" => Ok(Field::Field0),
b"Anonymize" => Ok(Field::Field1),
_ => {
let value = &from_utf8_lossy(value);
Err(Error::unknown_variant(value, VARIANTS))
}
}
}
}
impl<'de> Deserialize<'de> for Field {
#[inline]
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Deserializer::deserialize_identifier(deserializer, FieldVisitor)
}
}
#[doc(hidden)]
struct Visitor<'de> {
marker: PhantomData<Anonymizable>,
lifetime: PhantomData<&'de ()>,
}
impl<'de> serde::de::Visitor<'de> for Visitor<'de> {
type Value = Anonymizable;
fn expecting(&self, formatter: &mut Formatter) -> fmt::Result {
Formatter::write_str(formatter, "enum Anonymizable")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: EnumAccess<'de>,
{
match EnumAccess::variant(data)? {
(Field::Field0, variant) => Result::map(
VariantAccess::newtype_variant::<String>(variant),
Anonymizable::Public,
),
(Field::Field1, variant) => Result::map(
VariantAccess::newtype_variant::<String>(variant),
Anonymizable::Anonymize,
),
(Field::Value(value), _) => Ok(Self::Value::Public(value)),
}
}
}
#[doc(hidden)]
const VARIANTS: &[&str] = &["Public", "Anonymize"];
Deserializer::deserialize_enum(
deserializer,
"Anonymizable",
VARIANTS,
Visitor {
marker: PhantomData::<Anonymizable>,
lifetime: PhantomData,
},
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use test_case::test_case;

#[test_case(true)]
#[test_case(false)]
fn deserialization_succeeds(current_version: bool) {
let bytes = if current_version {
let value = IdempotentEvent {
idempotency_key: 1,
name: "name".to_string(),
timestamp: 2,
user: Some(Anonymizable::Public("user".to_string())),
source: Some(Anonymizable::Public("source".to_string())),
payload: vec![1, 2, 3],
};

rmp_serde::to_vec_named(&value).unwrap()
} else {
let value = IdempotentEventPrevious {
idempotency_key: 1,
name: "name".to_string(),
timestamp: 2,
user: Some("user".to_string()),
source: Some("source".to_string()),
payload: vec![1, 2, 3],
};

rmp_serde::to_vec_named(&value).unwrap()
};

let deserialized: IdempotentEvent = rmp_serde::from_slice(&bytes).unwrap();

assert_eq!(deserialized.idempotency_key, 1);
assert_eq!(deserialized.name, "name");
assert_eq!(deserialized.timestamp, 2);
assert_eq!(deserialized.user.clone().unwrap().as_str(), "user");
assert!(deserialized.user.clone().unwrap().is_public());
assert_eq!(deserialized.source.clone().unwrap().as_str(), "source");
assert!(deserialized.source.clone().unwrap().is_public());
assert_eq!(deserialized.payload, vec![1, 2, 3]);
}
}
4 changes: 2 additions & 2 deletions rs/canister/api/src/updates/push_events.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::IdempotentEventPrevious;
use crate::IdempotentEvent;
use candid::{CandidType, Deserialize};
use serde::Serialize;

#[derive(CandidType, Serialize, Deserialize, Clone, Debug)]
pub struct PushEventsArgsPrevious {
pub events: Vec<IdempotentEventPrevious>,
pub events: Vec<IdempotentEvent>,
}
26 changes: 7 additions & 19 deletions rs/canister/impl/src/updates/push_events.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
use crate::guards::caller_can_push_events;
use crate::{env, state};
use event_store_canister::{Anonymizable, IdempotentEvent, PushEventsArgs, PushEventsArgsPrevious};
use event_store_canister::PushEventsArgs;
use ic_cdk::update;

#[update(guard = "caller_can_push_events")]
fn push_events(args: PushEventsArgsPrevious) {
let now = env::time();

state::mutate(|s| {
for event in args.events {
s.push_event(
IdempotentEvent {
idempotency_key: event.idempotency_key,
name: event.name,
timestamp: event.timestamp,
user: event.user.map(Anonymizable::Public),
source: event.source.map(Anonymizable::Public),
payload: event.payload,
},
now,
);
}
});
fn push_events(args: PushEventsArgs) {
push_events_inner(args)
}

#[update(guard = "caller_can_push_events")]
fn push_events_v2(args: PushEventsArgs) {
push_events_inner(args)
}

fn push_events_inner(args: PushEventsArgs) {
let now = env::time();

state::mutate(|s| {
Expand Down
2 changes: 1 addition & 1 deletion rs/producer/agent_runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn flush_async<F: FnOnce(FlushOutcome) + Send + 'static>(
on_complete: F,
) {
if agent
.update(&canister_id, "push_events_v2".to_string())
.update(&canister_id, "push_events".to_string())
.with_arg(candid::encode_one(PushEventsArgs { events }).unwrap())
.call_and_wait()
.await
Expand Down
2 changes: 1 addition & 1 deletion rs/producer/cdk_runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn flush_async<F: FnOnce(FlushOutcome)>(
) {
let events_len = events.len();
if let Err(error) =
ic_cdk::call::<_, ()>(canister_id, "push_events_v2", (PushEventsArgs { events },)).await
ic_cdk::call::<_, ()>(canister_id, "push_events", (PushEventsArgs { events },)).await
{
on_complete(FLUSH_OUTCOME_FAILED_SHOULD_RETRY);
error!(%canister_id, events = events_len, ?error, "Failed to call 'push_events'");
Expand Down

0 comments on commit 781bb93

Please sign in to comment.