Skip to content

Commit

Permalink
refactor(P2P): types and modules (#2256)
Browse files Browse the repository at this point in the history
Organizes scattered P2P types and modules into a more suitable crate and makes them easier to maintain and accessible directly from the P2P layer
  • Loading branch information
onur-ozkan authored Oct 30, 2024
1 parent 8de861d commit a538a02
Show file tree
Hide file tree
Showing 32 changed files with 155 additions and 107 deletions.
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion mm2src/coins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ mm2_event_stream = { path = "../mm2_event_stream" }
mm2_git = { path = "../mm2_git" }
mm2_io = { path = "../mm2_io" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_net = { path = "../mm2_net", features = ["p2p"] }
mm2_net = { path = "../mm2_net" }
mm2_number = { path = "../mm2_number"}
mm2_p2p = { path = "../mm2_p2p" }
mm2_rpc = { path = "../mm2_rpc" }
mm2_state_machine = { path = "../mm2_state_machine" }
mocktopus = "0.8.0"
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/eth/v2_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use instant::Instant;
use mm2_err_handle::common_errors::WithInternal;
#[cfg(target_arch = "wasm32")]
use mm2_metamask::{from_metamask_error, MetamaskError, MetamaskRpcError, WithMetamaskRpcError};
use mm2_net::p2p::P2PContext;
use mm2_p2p::p2p_ctx::P2PContext;
use proxy_signature::RawMessage;
use rpc_task::RpcTaskError;
use std::sync::atomic::Ordering;
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/eth/web3_transport/http_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use common::APPLICATION_JSON;
use common::X_AUTH_PAYLOAD;
use http::header::CONTENT_TYPE;
use jsonrpc_core::{Call, Response};
use mm2_net::p2p::Keypair;
use mm2_p2p::Keypair;
use proxy_signature::RawMessage;
use serde_json::Value as Json;
use std::ops::Deref;
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/eth/web3_transport/websocket_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures_ticker::Ticker;
use futures_util::{FutureExt, SinkExt, StreamExt};
use instant::{Duration, Instant};
use jsonrpc_core::Call;
use mm2_net::p2p::Keypair;
use mm2_p2p::Keypair;
use proxy_signature::{ProxySign, RawMessage};
use std::sync::atomic::AtomicBool;
use std::sync::{atomic::{AtomicUsize, Ordering},
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/nft.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use http::Uri;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::{MmError, MmResult};
use mm2_net::p2p::P2PContext;
use mm2_p2p::p2p_ctx::P2PContext;
use proxy_signature::{ProxySign, RawMessage};
use url::Url;

Expand Down
4 changes: 2 additions & 2 deletions mm2src/coins/tendermint/rpc/tendermint_native_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cosmrs::tendermint::evidence::Evidence;
use cosmrs::tendermint::Genesis;
use cosmrs::tendermint::Hash;
use http::Uri;
use mm2_net::p2p::Keypair;
use mm2_p2p::Keypair;
use serde::{de::DeserializeOwned, Serialize};
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -382,7 +382,7 @@ mod sealed {
use hyper::client::HttpConnector;
use hyper::{header, Uri};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use mm2_net::p2p::Keypair;
use mm2_p2p::Keypair;
use proxy_signature::RawMessage;
use std::io::Read;
use tendermint_rpc::{Error, Response, SimpleRequest};
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/rpc/tendermint_wasm_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use derive_more::Display;
use http::header::{ACCEPT, CONTENT_TYPE};
use http::uri::InvalidUri;
use http::{StatusCode, Uri};
use mm2_net::p2p::Keypair;
use mm2_net::transport::SlurpError;
use mm2_net::wasm::http::FetchRequest;
use mm2_p2p::Keypair;
use proxy_signature::RawMessage;
use std::str::FromStr;
use tendermint_rpc::endpoint::{abci_info, broadcast};
Expand Down
3 changes: 1 addition & 2 deletions mm2src/coins/tendermint/tendermint_balance_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use jsonrpc_core::{Id as RpcId, Params as RpcParams, Value as RpcValue, Version
use mm2_core::mm_ctx::MmArc;
use mm2_event_stream::{behaviour::{EventBehaviour, EventInitStatus},
ErrorEventName, Event, EventName, EventStreamConfiguration};
use mm2_net::p2p::Keypair;
use mm2_number::BigDecimal;
use proxy_signature::RawMessage;
use std::collections::{HashMap, HashSet};
Expand All @@ -24,7 +23,7 @@ impl EventBehaviour for TendermintCoin {
async fn handle(self, _interval: f64, tx: oneshot::Sender<EventInitStatus>) {
fn generate_subscription_query(
query_filter: String,
proxy_sign_keypair: &Option<Keypair>,
proxy_sign_keypair: &Option<mm2_p2p::Keypair>,
uri: &http::Uri,
) -> String {
let mut params = serde_json::Map::with_capacity(1);
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use keys::{KeyPair, Public};
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_git::{FileMetadata, GitController, GithubClient, RepositoryOperations, GITHUB_API_URI};
use mm2_net::p2p::P2PContext;
use mm2_number::MmNumber;
use mm2_p2p::p2p_ctx::P2PContext;
use parking_lot::Mutex as PaMutex;
use primitives::hash::H256;
use regex::Regex;
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ derive_more = "0.99"
futures = { version = "0.3", package = "futures", features = ["compat", "async-await", "thread-pool"] }
hex = "0.4.2"
lazy_static = "1.4"
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["identify"] }
mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_metrics = { path = "../mm2_metrics" }
mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
primitives = { path = "../mm2_bitcoin/primitives" }
rand = { version = "0.7", features = ["std", "small_rng", "wasm-bindgen"] }
serde = "1"
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_core/src/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use futures::channel::oneshot;
use futures::lock::Mutex as AsyncMutex;
use gstuff::{try_s, Constructible, ERR, ERRL};
use lazy_static::lazy_static;
use libp2p::PeerId;
use mm2_event_stream::{controller::Controller, Event, EventStreamConfiguration};
use mm2_libp2p::PeerAddress;
use mm2_metrics::{MetricsArc, MetricsOps};
use primitives::hash::H160;
use rand::Rng;
Expand Down Expand Up @@ -146,7 +146,7 @@ pub struct MmCtx {
#[cfg(not(target_arch = "wasm32"))]
pub async_sqlite_connection: Constructible<Arc<AsyncMutex<AsyncConnection>>>,
/// Links the RPC context to the P2P context to handle health check responses.
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerAddress, oneshot::Sender<()>>>,
pub healthcheck_response_handler: AsyncMutex<ExpirableMap<PeerId, oneshot::Sender<()>>>,
}

impl MmCtx {
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ mm2_err_handle = { path = "../mm2_err_handle" }
mm2_event_stream = { path = "../mm2_event_stream" }
mm2_gui_storage = { path = "../mm2_gui_storage" }
mm2_io = { path = "../mm2_io" }
mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p" }
mm2_libp2p = { path = "../mm2_p2p", package = "mm2_p2p", features = ["application"] }
mm2_metrics = { path = "../mm2_metrics" }
mm2_net = { path = "../mm2_net", features = ["event-stream", "p2p"] }
mm2_net = { path = "../mm2_net" }
mm2_number = { path = "../mm2_number" }
mm2_rpc = { path = "../mm2_rpc", features = ["rpc_facilities"]}
mm2_state_machine = { path = "../mm2_state_machine" }
Expand Down
6 changes: 3 additions & 3 deletions mm2src/mm2_main/src/lp_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use instant::{Duration, Instant};
use lazy_static::lazy_static;
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::MmError;
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix};
use mm2_net::p2p::P2PContext;
use ser_error_derive::SerializeErrorType;
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
Expand Down Expand Up @@ -265,7 +265,7 @@ pub async fn peer_connection_healthcheck_rpc(

{
let mut book = ctx.healthcheck_response_handler.lock().await;
book.insert(target_peer_address, tx, address_record_exp);
book.insert(target_peer_address.into(), tx, address_record_exp);
}

broadcast_p2p_msg(
Expand Down Expand Up @@ -328,7 +328,7 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li
} else {
// The requested peer is healthy; signal the response channel.
let mut response_handler = ctx.healthcheck_response_handler.lock().await;
if let Some(tx) = response_handler.remove(&sender_peer) {
if let Some(tx) = response_handler.remove(&sender_peer.into()) {
if tx.send(()).is_err() {
log::error!("Result channel isn't present for peer '{sender_peer}'.");
};
Expand Down
4 changes: 2 additions & 2 deletions mm2src/mm2_main/src/lp_native_dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ use mm2_core::mm_ctx::{MmArc, MmCtx};
use mm2_err_handle::common_errors::InternalError;
use mm2_err_handle::prelude::*;
use mm2_event_stream::behaviour::{EventBehaviour, EventInitStatus};
use mm2_libp2p::application::network_event::NetworkEvent;
use mm2_libp2p::behaviours::atomicdex::{generate_ed25519_keypair, GossipsubConfig, DEPRECATED_NETID_LIST};
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{spawn_gossipsub, AdexBehaviourError, NodeType, RelayAddress, RelayAddressError, SeedNodeInfo,
SwarmRuntime, WssCerts};
use mm2_metrics::mm_gauge;
use mm2_net::network_event::NetworkEvent;
use mm2_net::p2p::P2PContext;
use rpc_task::RpcTaskError;
use serde_json as json;
use std::convert::TryInto;
Expand Down
9 changes: 2 additions & 7 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ use instant::Instant;
use keys::KeyPair;
use mm2_core::mm_ctx::{MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::application::request_response::P2PRequest;
use mm2_libp2p::p2p_ctx::P2PContext;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubEvent, GossipsubMessage, Libp2pPublic,
Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
use mm2_libp2p::{AdexBehaviourCmd, AdexBehaviourEvent, AdexEventRx, AdexResponse};
use mm2_libp2p::{PeerAddresses, RequestResponseBehaviourEvent};
use mm2_metrics::{mm_label, mm_timing};
use mm2_net::p2p::P2PContext;
use serde::de;
use std::net::ToSocketAddrs;

Expand Down Expand Up @@ -87,12 +88,6 @@ impl From<rmp_serde::decode::Error> for P2PRequestError {
fn from(e: rmp_serde::decode::Error) -> Self { P2PRequestError::DecodeError(e.to_string()) }
}

#[derive(Eq, Debug, Deserialize, PartialEq, Serialize)]
pub enum P2PRequest {
Ordermatch(lp_ordermatch::OrdermatchRequest),
NetworkInfo(lp_stats::NetworkInfoRequest),
}

pub async fn p2p_event_process_loop(ctx: MmWeak, mut rx: AdexEventRx, i_am_relay: bool) {
loop {
let adex_event = rx.next().await;
Expand Down
34 changes: 3 additions & 31 deletions mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//

use async_trait::async_trait;
use best_orders::BestOrdersAction;
use blake2::digest::{Update, VariableOutput};
use blake2::Blake2bVar;
use coins::utxo::{compressed_pub_key_from_priv_raw, ChecksumType, UtxoAddressFormat};
Expand All @@ -42,6 +41,8 @@ use http::Response;
use keys::{AddressFormat, KeyPair};
use mm2_core::mm_ctx::{from_ctx, MmArc, MmWeak};
use mm2_err_handle::prelude::*;
use mm2_libp2p::application::request_response::ordermatch::OrdermatchRequest;
use mm2_libp2p::application::request_response::P2PRequest;
use mm2_libp2p::{decode_signed, encode_and_sign, encode_message, pub_sub_topic, PublicKey, TopicHash, TopicPrefix,
TOPIC_SEPARATOR};
use mm2_metrics::mm_gauge;
Expand Down Expand Up @@ -69,8 +70,7 @@ use std::time::Duration;
use trie_db::NodeCodec as NodeCodecT;
use uuid::Uuid;

use crate::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequest,
P2PRequestError};
use crate::lp_network::{broadcast_p2p_msg, request_any_relay, request_one_peer, subscribe_to_topic, P2PRequestError};
use crate::lp_swap::maker_swap_v2::{self, MakerSwapStateMachine, MakerSwapStorage};
use crate::lp_swap::taker_swap_v2::{self, TakerSwapStateMachine, TakerSwapStorage};
use crate::lp_swap::{calc_max_maker_vol, check_balance_for_maker_swap, check_balance_for_taker_swap,
Expand Down Expand Up @@ -600,34 +600,6 @@ pub async fn process_msg(ctx: MmArc, from_peer: String, msg: &[u8], i_am_relay:
}
}

#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum OrdermatchRequest {
/// Get an orderbook for the given pair.
GetOrderbook {
base: String,
rel: String,
},
/// Sync specific pubkey orderbook state if our known Patricia trie state doesn't match the latest keep alive message
SyncPubkeyOrderbookState {
pubkey: String,
/// Request using this condition
trie_roots: HashMap<AlbOrderedOrderbookPair, H64>,
},
BestOrders {
coin: String,
action: BestOrdersAction,
volume: BigRational,
},
OrderbookDepth {
pairs: Vec<(String, String)>,
},
BestOrdersByNumber {
coin: String,
action: BestOrdersAction,
number: usize,
},
}

#[derive(Debug)]
struct TryFromBytesError(String);

Expand Down
13 changes: 4 additions & 9 deletions mm2src/mm2_main/src/lp_ordermatch/best_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use derive_more::Display;
use http::{Response, StatusCode};
use mm2_core::mm_ctx::MmArc;
use mm2_err_handle::prelude::*;
use mm2_libp2p::application::request_response::{ordermatch::{BestOrdersAction, OrdermatchRequest},
P2PRequest};
use mm2_number::{BigRational, MmNumber};
use mm2_rpc::data::legacy::OrderConfirmationsSettings;
use num_traits::Zero;
Expand All @@ -12,15 +14,8 @@ use std::collections::{HashMap, HashSet};
use uuid::Uuid;

use super::{addr_format_from_protocol_info, is_my_order, mm2_internal_pubkey_hex, orderbook_address,
BaseRelProtocolInfo, OrderbookP2PItemWithProof, OrdermatchContext, OrdermatchRequest, RpcOrderbookEntryV2};
use crate::lp_network::{request_any_relay, P2PRequest};

#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum BestOrdersAction {
Buy,
Sell,
}
BaseRelProtocolInfo, OrderbookP2PItemWithProof, OrdermatchContext, RpcOrderbookEntryV2};
use crate::lp_network::request_any_relay;

#[derive(Debug, Deserialize)]
pub struct BestOrdersRequest {
Expand Down
5 changes: 3 additions & 2 deletions mm2src/mm2_main/src/lp_ordermatch/orderbook_depth.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::{orderbook_topic_from_base_rel, OrdermatchContext, OrdermatchRequest};
use crate::lp_network::{request_any_relay, P2PRequest};
use super::{orderbook_topic_from_base_rel, OrdermatchContext};
use crate::lp_network::request_any_relay;
use coins::is_wallet_only_ticker;
use common::log;
use http::Response;
use mm2_core::mm_ctx::MmArc;
use mm2_libp2p::application::request_response::{ordermatch::OrdermatchRequest, P2PRequest};
use serde_json::{self as json, Value as Json};
use std::collections::HashMap;

Expand Down
Loading

0 comments on commit a538a02

Please sign in to comment.