Skip to content

Commit

Permalink
casper-deploy-notifier: support casper-types 2.0.0-rc4
Browse files Browse the repository at this point in the history
  • Loading branch information
marijanp committed Sep 17, 2024
1 parent bc4ef9b commit 2c5f862
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 33 deletions.
8 changes: 5 additions & 3 deletions casper-deploy-notifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ license.workspace = true
name = "echo_demo"

[dependencies]
futures = "0.3.30"
futures = "0.3"
reqwest = { version = "0.12", features = ["stream"] }
tokio = { version = "1", features = ["full"] }
serde = "1.0"
serde_json = "1.0"
casper-types.workspace = true
eventsource-stream = "0.2.3"
eventsource-stream = "0.2"
thiserror = "1.0"
base16 = "0.2.1"
base16 = "0.2"
tracing = "0.1"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
mockito = "1"
tracing-subscriber = "0.3"
16 changes: 8 additions & 8 deletions casper-deploy-notifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,28 @@ impl DeployNotifier {
// Before running this function again, make sure you established new connection.
pub async fn run(&mut self, tx: mpsc::Sender<Notification>) -> Result<(), SseError> {
// Take stream out of state.
let mut event_stream = match self.event_stream.take() {
Some(s) => Ok(s),
None => Err(SseError::NotConnected),
}?;
let mut event_stream = self.event_stream.take().ok_or(SseError::NotConnected)?;

while let Some(event) = event_stream.try_next().await? {
let data: SseData = serde_json::from_str(&event.data)?;
match data {
// FIXME This is never going to return a UndexpectedHandshake error
SseData::ApiVersion(_) => Err(SseError::UnexpectedHandshake)?,
SseData::Other(_) => {}
SseData::DeployProcessed(event_details) => {
SseData::Other(other) => tracing::debug!("Received SSE event other than TransactionProcessed: {other}"),
SseData::TransactionProcessed(event_details) => {
let notification = event_details.into();
if let Err(_e) = tx.send(notification).await {
if let Err(err) = tx.send(notification).await {
tracing::error!("Failed to a send notification on TransactionProcessed event: {err}");
// Receiver probably dropeed.
break;
}
}
// FIXME This is never going to return a NodeShutdown error
SseData::Shutdown => Err(SseError::NodeShutdown)?,
}
}

// Stream was exhausted.
Err(SseError::StreamExhausted)?
Err(SseError::StreamExhausted)
}
}
32 changes: 22 additions & 10 deletions casper-deploy-notifier/src/sse_types.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
use casper_types::execution::ExecutionResult;
use casper_types::{
execution::ExecutionResult,
contract_messages::Messages,
TransactionHash,
ProtocolVersion,
InitiatorAddr,
BlockHash,
};
use serde::{Deserialize, Serialize};

/// Casper does not expose SSE types directly, so we have to reimplement them.
///
/// Source: https://github.com/casper-network/casper-node/blob/9f3995853204a18f17de9c022233d22aa14b9c37/node/src/components/event_stream_server/sse_server.rs#L75.
/// Source: https://github.com/casper-network/casper-node/blob/release-2.0.0-rc4/node/src/components/event_stream_server/sse_server.rs
///
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
pub enum SseData {
/// The version of node's API. First event to receive, used for handshake.
ApiVersion(casper_types::ProtocolVersion),
/// The given deploy has been executed, committed and forms part of the given block.
DeployProcessed(DeployProcessed),
/// The version of this node's API server. This event will always be the first sent to a new
/// client, and will have no associated event ID provided.
ApiVersion(ProtocolVersion),
/// The given transaction has been executed, committed and forms part of the given block.
TransactionProcessed(TransactionProcessed),
/// The node is about to shut down.
Shutdown,
/// Other events, that we are not interested in.
Expand All @@ -19,8 +27,12 @@ pub enum SseData {
}

#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
pub struct DeployProcessed {
pub deploy_hash: Box<casper_types::DeployHash>,
pub account: Box<casper_types::PublicKey>,
pub execution_result: Box<ExecutionResult>,
pub struct TransactionProcessed {
pub transaction_hash: Box<TransactionHash>,
pub initiator_addr: Box<InitiatorAddr>,
pub timestamp: String,
pub ttl: String,
pub block_hash: Box<BlockHash>,
pub execution_result: Box<ExecutionResult>,
pub messages: Messages,
}
11 changes: 5 additions & 6 deletions casper-deploy-notifier/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use casper_types::{
execution::{ExecutionResult, ExecutionResultV1},
AsymmetricType,
};

use crate::sse_types::DeployProcessed;
use crate::sse_types::TransactionProcessed;

#[derive(Debug)]
pub struct Notification {
Expand All @@ -12,17 +11,17 @@ pub struct Notification {
pub success: bool,
}

impl From<DeployProcessed> for Notification {
fn from(event_details: DeployProcessed) -> Self {
impl From<TransactionProcessed> for Notification {
fn from(event_details: TransactionProcessed) -> Self {
let success = match *event_details.execution_result {
ExecutionResult::V1(execution_result_v1) => match execution_result_v1 {
ExecutionResultV1::Failure { .. } => false,
ExecutionResultV1::Success { .. } => true,
},
ExecutionResult::V2(execution_result_v2) => execution_result_v2.error_message.is_none(),
};
let deploy_hash = base16::encode_lower(event_details.deploy_hash.inner());
let public_key = event_details.account.to_hex();
let deploy_hash = base16::encode_lower(&event_details.transaction_hash.digest());
let public_key = event_details.initiator_addr.account_hash().to_string();

Notification {
deploy_hash,
Expand Down
Loading

0 comments on commit 2c5f862

Please sign in to comment.