Skip to content

Commit

Permalink
Make websocket subscriptions work: fix router and event deserializati…
Browse files Browse the repository at this point in the history
…on error

Ported fix from informalsystems/tendermint-rs#1433
  • Loading branch information
ChronosXYZ committed Sep 23, 2024
1 parent 03f689b commit 2b6dbad
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 131 deletions.
2 changes: 2 additions & 0 deletions cometbft/src/abci/response/finalize_block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::serializers::null_to_empty_vec;
use serde::{Deserialize, Serialize};

use crate::abci::{types::ExecTxResult, Event};
Expand All @@ -16,6 +17,7 @@ pub struct FinalizeBlock {
pub tx_results: Vec<ExecTxResult>,
/// A list of updates to the validator set.
/// These will reflect the validator set at current height + 2.
#[serde(default, deserialize_with = "null_to_empty_vec")]
pub validator_updates: Vec<validator::Update>,
/// Updates to the consensus params, if any.
#[serde(default)]
Expand Down
12 changes: 12 additions & 0 deletions cometbft/src/serializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
//! CAUTION: There are no guarantees for backwards compatibility, this module should be considered
//! an internal implementation detail which can vanish without further warning. Use at your own
//! risk.
use serde::{Deserialize, Deserializer};
use std::vec::Vec;

pub use cometbft_proto::serializers::*;

pub mod apphash;
pub mod apphash_base64;
pub mod hash;
pub mod option_hash;
pub mod time;

pub fn null_to_empty_vec<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
let opt = Option::deserialize(deserializer)?;
Ok(opt.unwrap_or_else(Vec::new))
}
65 changes: 35 additions & 30 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[package]
name = "cometbft-rpc"
version = "0.1.0-alpha.2"
edition = "2021"
license = "Apache-2.0"
homepage = "https://cometbft.com/"
name = "cometbft-rpc"
version = "0.1.0-alpha.2"
edition = "2021"
license = "Apache-2.0"
homepage = "https://cometbft.com/"
repository = "https://github.com/cometbft/cometbft-rs"
readme = "README.md"
keywords = ["blockchain", "cosmos", "cometbft", "tendermint"]
readme = "README.md"
keywords = ["blockchain", "cosmos", "cometbft", "tendermint"]
categories = ["cryptography::cryptocurrencies", "network-programming"]
authors = [
authors = [
"Informal Systems <[email protected]>",
"Ismail Khoffi <[email protected]>",
"Alexander Simmerl <[email protected]>",
Expand All @@ -26,23 +26,13 @@ all-features = true
[[bin]]
name = "cometbft-rpc"
path = "src/client/bin/main.rs"
required-features = [ "cli" ]
required-features = ["cli"]

[features]
default = ["flex-error/std", "flex-error/eyre_tracer"]
cli = [
"http-client",
"structopt",
"tracing-subscriber",
"websocket-client"
]
http-client = [
"futures",
"reqwest",
"tokio/macros",
"tracing"
]
secp256k1 = [ "cometbft/secp256k1" ]
cli = ["http-client", "structopt", "tracing-subscriber", "websocket-client"]
http-client = ["futures", "reqwest", "tokio/macros", "tracing"]
secp256k1 = ["cometbft/secp256k1"]
websocket-client = [
"async-tungstenite",
"futures",
Expand All @@ -51,7 +41,7 @@ websocket-client = [
"tokio/macros",
"tokio/sync",
"tokio/time",
"tracing"
"tracing",
]

[dependencies]
Expand All @@ -64,28 +54,43 @@ bytes = { version = "1.0", default-features = false }
getrandom = { version = "0.2", default-features = false, features = ["js"] }
peg = { version = "0.8", default-features = false }
pin-project = { version = "1.0.1", default-features = false }
serde = { version = "1", default-features = false, features = [ "derive" ] }
serde = { version = "1", default-features = false, features = ["derive"] }
serde_bytes = { version = "0.11", default-features = false }
serde_json = { version = "1", default-features = false, features = ["std"] }
thiserror = { version = "1", default-features = false }
time = { version = "0.3", default-features = false, features = ["macros", "parsing"] }
time = { version = "0.3", default-features = false, features = [
"macros",
"parsing",
] }
uuid = { version = "1.7", default-features = false }
rand = { version = "0.8" }
subtle-encoding = { version = "0.5", default-features = false, features = ["bech32-preview"] }
subtle-encoding = { version = "0.5", default-features = false, features = [
"bech32-preview",
] }
url = { version = "2.4.1", default-features = false }
walkdir = { version = "2.3", default-features = false }
flex-error = { version = "0.4.4", default-features = false }
subtle = { version = "2", default-features = false }
semver = { version = "1.0", default-features = false }
ordered-float = { version = "4.0", default-features = false }

# Optional dependencies
async-tungstenite = { version = "0.24", default-features = false, features = ["tokio-runtime", "tokio-rustls-native-certs"], optional = true }
async-tungstenite = { version = "0.24", default-features = false, features = [
"tokio-runtime",
"tokio-rustls-native-certs",
], optional = true }
futures = { version = "0.3", optional = true, default-features = false }
reqwest = { version = "0.11.20", optional = true, default-features = false, features = ["rustls-tls-native-roots"] }
reqwest = { version = "0.11.20", optional = true, default-features = false, features = [
"rustls-tls-native-roots",
] }
structopt = { version = "0.3", optional = true, default-features = false }
tokio = { version = "1.0", optional = true, default-features = false, features = ["rt-multi-thread"] }
tokio = { version = "1.0", optional = true, default-features = false, features = [
"rt-multi-thread",
] }
tracing = { version = "0.1", optional = true, default-features = false }
tracing-subscriber = { version = "0.3", optional = true, default-features = false, features = ["fmt"] }
tracing-subscriber = { version = "0.3", optional = true, default-features = false, features = [
"fmt",
] }

[dev-dependencies]
http = { version = "1", default-features = false, features = ["std"] }
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/client/transport/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl MockClientDriver {
self.subscribe(id, query, subscription_tx, result_tx);
}
DriverCommand::Unsubscribe { query, result_tx } => {
self.unsubscribe(query, result_tx);
self.unsubscribe(&query, result_tx);
}
DriverCommand::Publish(event) => self.publish(*event),
DriverCommand::Terminate => return Ok(()),
Expand All @@ -184,7 +184,7 @@ impl MockClientDriver {
result_tx.send(Ok(())).unwrap();
}

fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx<Result<(), Error>>) {
fn unsubscribe(&mut self, query: &Query, result_tx: ChannelTx<Result<(), Error>>) {
self.router.remove_by_query(query);
result_tx.send(Ok(())).unwrap();
}
Expand Down
Loading

0 comments on commit 2b6dbad

Please sign in to comment.