Skip to content

Commit

Permalink
[eclipse-uprotocol#54] Convert up-streamer from async-std to tokio
Browse files Browse the repository at this point in the history
Implements [#11]
Implements [#29]
Implements [eclipse-uprotocol#53]
  • Loading branch information
PLeVasseur committed Nov 10, 2024
1 parent 4c233b7 commit f7351ad
Show file tree
Hide file tree
Showing 14 changed files with 555 additions and 668 deletions.
1,070 changes: 465 additions & 605 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ async-trait = { version = "0.1" }
clap = { version = "4.5" }
env_logger = { version = "0.10.1" }
futures = { version = "0.3.30" }
lazy_static = { version = "1.5.0" }
log = { version = "0.4.20" }
json5 = { version = "0.4.1" }
serde = { version = "1.0.154", features = ["derive"] }
serde_json = { version = "1.0.94" }
uuid = { version = "1.7.0" }
tokio = { version = "1.35.1", default-features = false }
tokio = { version = "1.35.1", default-features = false, features = ["rt", "sync"] }
protobuf = { version = "3.3", features = ["with-bytes"] }
up-rust = { version = "0.1.5", default-features = false }
up-rust = { version = "0.2.0", default-features = false }

[profile.dev]
debug = true
Expand Down
6 changes: 3 additions & 3 deletions example-streamer-uses/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ protobuf = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
up-rust = { workspace = true }
up-transport-zenoh = { version = "0.1.1" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.1.0", default-features = false }
zenoh = { version = "1.0.0-alpha.6", features = ["unstable"]}
up-transport-zenoh = { version = "0.3.0" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.2.0", default-features = false }
zenoh = { version = "1.0.0", features = ["unstable", "internal"] }
5 changes: 2 additions & 3 deletions example-streamer-uses/src/bin/ue_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ async fn main() -> Result<(), UStatus> {
// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.set_endpoints(zenoh::config::ModeDependentValue::Unique(vec![
ipv4_endpoint,
]))
.endpoints
.set(vec![ipv4_endpoint])
.expect("Unable to set Zenoh Config");
}

Expand Down
5 changes: 2 additions & 3 deletions example-streamer-uses/src/bin/ue_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ async fn main() -> Result<(), UStatus> {
// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.set_endpoints(zenoh::config::ModeDependentValue::Unique(vec![
ipv4_endpoint,
]))
.endpoints
.set(vec![ipv4_endpoint])
.expect("Unable to set Zenoh Config");
}

Expand Down
5 changes: 2 additions & 3 deletions example-streamer-uses/src/bin/ue_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ async fn main() -> Result<(), UStatus> {
// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.set_endpoints(zenoh::config::ModeDependentValue::Unique(vec![
ipv4_endpoint,
]))
.endpoints
.set(vec![ipv4_endpoint])
.expect("Unable to set Zenoh Config");
}

Expand Down
5 changes: 2 additions & 3 deletions example-streamer-uses/src/bin/ue_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ async fn main() -> Result<(), UStatus> {
// Add the IPv4 endpoint to the Zenoh configuration
zenoh_config
.listen
.set_endpoints(zenoh::config::ModeDependentValue::Unique(vec![
ipv4_endpoint,
]))
.endpoints
.set(vec![ipv4_endpoint])
.expect("Unable to set Zenoh Config");
}

Expand Down
16 changes: 8 additions & 8 deletions up-linux-streamer-plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ serde = { version = "1.0.154" }
serde_json = { version = "1.0.94" }
tokio = { version = "1.35.1", default-features = false }
up-rust = { workspace = true }
up-transport-zenoh = { version = "0.1.1" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.1.0", default-features = false }
up-transport-zenoh = { version = "0.3.0" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.2.0", default-features = false }
up-streamer = { path = "../up-streamer" }
usubscription-static-file = {path = "../utils/usubscription-static-file"}
zenoh = { version = "1.0.0-alpha.6", features = ["unstable", "internal", "plugins"]}
zenoh-core = { version = "1.0.0-alpha.6" }
zenoh-plugin-trait = { version = "1.0.0-alpha.6" }
zenoh-result = { version = "1.0.0-alpha.6" }
zenoh-util = { version = "1.0.0-alpha.6" }
zenoh_backend_traits = { version = "1.0.0-alpha.6" }
zenoh = { version = "1.0.0", features = ["unstable", "internal", "plugins"] }
zenoh-core = { version = "1.0.0" }
zenoh-plugin-trait = { version = "1.0.0" }
zenoh-result = { version = "1.0.0" }
zenoh-util = { version = "1.0.0" }
zenoh_backend_traits = { version = "1.0.0" }
env_logger = "0.10.2"
7 changes: 4 additions & 3 deletions up-linux-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ serde = { workspace = true }
tokio = { workspace = true }
up-rust = { workspace = true }
up-streamer = { path = "../up-streamer" }
up-transport-zenoh = { version = "0.1.1" }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.1.0", default-features = false }
zenoh = { version = "1.0.0-alpha.6", features = ["unstable"]}
up-transport-zenoh = { version = "0.3.0" }
# up-transport-vsomeip = { git = "https://github.com/PLeVasseur/up-client-vsomeip-rust", branch = "feature/debug-unable-to-send-back-result", default-features = false }
up-transport-vsomeip = { git = "https://github.com/eclipse-uprotocol/up-transport-vsomeip-rust.git", tag = "v0.2.0", default-features = false }
zenoh = { version = "1.0.0", features = ["unstable", "internal"] }
usubscription-static-file = {path = "../utils/usubscription-static-file"}

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion up-linux-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct StreamerArgs {
config: String,
}

#[tokio::main]
#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), UStatus> {
env_logger::init();

Expand Down
7 changes: 7 additions & 0 deletions up-linux-streamer/vsomeip-configs/point_to_point.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,12 @@
"name" : "foo",
"id" : "0x1236"
}
],
"services" :
[
{
"service" : "0x1236",
"instance" : "0x0001"
}
]
}
3 changes: 2 additions & 1 deletion up-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { workspace = true, features = ["unstable"] }
tokio = { workspace = true }
async-trait = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
uuid = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion up-streamer/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use async_std::sync::Arc;
use std::sync::Arc;
use log::*;
use up_rust::UTransport;

Expand Down
85 changes: 53 additions & 32 deletions up-streamer/src/ustreamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
********************************************************************************/

use crate::endpoint::Endpoint;
use async_std::channel::{Receiver, Sender};
use async_std::sync::{Arc, Mutex};
use async_std::{channel, task};
use async_trait::async_trait;
use lazy_static::lazy_static;
use log::*;
use std::collections::{HashMap, HashSet};
use std::error::Error;
Expand All @@ -24,8 +22,14 @@ use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::str;
use std::sync::Arc;
use std::thread;
use subscription_cache::SubscriptionCache;
use tokio::runtime::Builder;
use tokio::runtime::Runtime;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task;
use up_rust::core::usubscription::{FetchSubscriptionsRequest, SubscriberInfo, USubscription};
use up_rust::{UCode, UListener, UMessage, UPayloadFormat, UStatus, UTransport, UUri, UUID};

Expand All @@ -34,20 +38,21 @@ const USTREAMER_FN_NEW_TAG: &str = "new():";
const USTREAMER_FN_ADD_FORWARDING_RULE_TAG: &str = "add_forwarding_rule():";
const USTREAMER_FN_DELETE_FORWARDING_RULE_TAG: &str = "delete_forwarding_rule():";

fn uauthority_to_uuri(authority_name: &str) -> UUri {
UUri {
authority_name: authority_name.to_string(),
ue_id: 0x0000_FFFF, // any instance, any service
ue_version_major: 0xFF, // any
resource_id: 0xFFFF, // any
..Default::default()
}
const THREAD_NUM: usize = 10;

// Create a separate tokio Runtime for running the callback
lazy_static::lazy_static! {
static ref CB_RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(THREAD_NUM)
.enable_all()
.build()
.expect("Unable to create callback runtime");
}

fn any_uuri() -> UUri {
fn uauthority_to_uuri(authority_name: &str) -> UUri {
UUri {
authority_name: "*".to_string(),
ue_id: 0x0000_FFFF, // any instance, any service
authority_name: authority_name.to_string(),
ue_id: 0xFFFF_FFFF, // any instance, any service
ue_version_major: 0xFF, // any
resource_id: 0xFFFF, // any
..Default::default()
Expand Down Expand Up @@ -128,7 +133,7 @@ impl TransportForwarders {
debug!(
"{TRANSPORT_FORWARDERS_TAG}:{TRANSPORT_FORWARDERS_FN_INSERT_TAG} Inserting..."
);
let (tx, rx) = channel::bounded(self.message_queue_size);
let (tx, rx) = tokio::sync::broadcast::channel(self.message_queue_size);
(0, Arc::new(TransportForwarder::new(out_transport, rx)), tx)
});
*active += 1;
Expand Down Expand Up @@ -213,10 +218,10 @@ impl ForwardingListeners {

// Perform async registration and fetching

uuris_to_backpedal.insert((any_uuri(), Some(uauthority_to_uuri(out_authority))));
uuris_to_backpedal.insert((UUri::any(), Some(uauthority_to_uuri(out_authority))));
if let Err(err) = in_transport
.register_listener(
&any_uuri(),
&UUri::any(),
Some(&uauthority_to_uuri(out_authority)),
forwarding_listener.clone(),
)
Expand Down Expand Up @@ -329,11 +334,13 @@ impl ForwardingListeners {
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} removing ForwardingListener, out_authority: {out_authority:?}");
if let Some((_, forwarding_listener)) = removed {
warn!("ForwardingListeners::remove: ForwardingListener found we can remove, out_authority: {out_authority:?}");
let unreg_res = task::block_on(in_transport.unregister_listener(
&uauthority_to_uuri(out_authority),
Some(&any_uuri()),
forwarding_listener,
));
let unreg_res = in_transport
.unregister_listener(
&uauthority_to_uuri(out_authority),
Some(&UUri::any()),
forwarding_listener,
)
.await;

if let Err(err) = unreg_res {
warn!("{FORWARDING_LISTENERS_TAG}:{FORWARDING_LISTENERS_FN_REMOVE_TAG} unable to unregister listener, error: {err}");
Expand Down Expand Up @@ -600,8 +607,11 @@ impl UStreamer {
..Default::default()
};
fetch_request.set_subscriber(subscriber_info);
let subscriptions = task::block_on(usubscription.fetch_subscriptions(fetch_request))
.expect("Failed to fetch subscriptions");
let subscriptions = task::block_in_place(|| {
CB_RUNTIME
.block_on(usubscription.fetch_subscriptions(fetch_request))
.expect("Failed to fetch subscriptions")
});

let subscription_cache_result = SubscriptionCache::new(subscriptions);

Expand Down Expand Up @@ -842,14 +852,25 @@ pub(crate) struct TransportForwarder {}
impl TransportForwarder {
fn new(out_transport: Arc<dyn UTransport>, message_receiver: Receiver<Arc<UMessage>>) -> Self {
let out_transport_clone = out_transport.clone();
let message_receiver_clone = message_receiver.clone();
let message_receiver_clone = message_receiver.resubscribe();

thread::spawn(|| {
task::block_on(Self::message_forwarding_loop(
UUID::build().to_hyphenated_string(),
out_transport_clone,
message_receiver_clone,
))
// Create a new single-threaded runtime
let runtime = Builder::new_current_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");

runtime.block_on(async move {
trace!("Within blocked runtime");
Self::message_forwarding_loop(
UUID::build().to_hyphenated_string(),
out_transport_clone,
message_receiver_clone,
)
.await;
info!("Broke out of loop! You probably dropped the UPClientVsomeip");
});
});

Self {}
Expand All @@ -858,7 +879,7 @@ impl TransportForwarder {
async fn message_forwarding_loop(
id: String,
out_transport: Arc<dyn UTransport>,
message_receiver: Receiver<Arc<UMessage>>,
mut message_receiver: Receiver<Arc<UMessage>>,
) {
while let Ok(msg) = message_receiver.recv().await {
debug!(
Expand Down Expand Up @@ -930,7 +951,7 @@ impl UListener for ForwardingListener {
);
return;
}
if let Err(e) = self.sender.send(Arc::new(msg)).await {
if let Err(e) = self.sender.send(Arc::new(msg)) {
error!(
"{}:{}:{} Unable to send message to worker pool: {e:?}",
self.forwarding_id, FORWARDING_LISTENER_TAG, FORWARDING_LISTENER_FN_ON_RECEIVE_TAG,
Expand Down

0 comments on commit f7351ad

Please sign in to comment.