From dcb5b24c5dffb4d8a690d35814db2b9437b0dcf1 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 1 Mar 2024 11:51:24 +0800 Subject: [PATCH 1/5] Fix clippy issue. Signed-off-by: ChenYing Kuo --- examples/publisher.rs | 4 ++-- examples/rpc_client.rs | 4 ++-- examples/rpc_server.rs | 9 ++------- examples/subscriber.rs | 4 ++-- src/lib.rs | 2 +- tests/integration_test.rs | 23 ++++++++++------------- 6 files changed, 19 insertions(+), 27 deletions(-) diff --git a/examples/publisher.rs b/examples/publisher.rs index 6895503..50d17e8 100644 --- a/examples/publisher.rs +++ b/examples/publisher.rs @@ -58,14 +58,14 @@ async fn main() { let mut cnt: u64 = 0; loop { - let data = format!("{}", cnt); + let data = format!("{cnt}"); let payload = UPayload { length: Some(0), format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), data: Some(Data::Value(data.as_bytes().to_vec())), ..Default::default() }; - println!("Sending {} to {}...", data, uuri.to_string()); + println!("Sending {data} to {uuri}..."); publisher .send(UMessage { attributes: Some(attributes.clone()).into(), diff --git a/examples/rpc_client.rs b/examples/rpc_client.rs index 10879c0..5bc96f6 100644 --- a/examples/rpc_client.rs +++ b/examples/rpc_client.rs @@ -62,7 +62,7 @@ async fn main() { }; // invoke RPC method - println!("Send request to {}", uuri.to_string()); + println!("Send request to {uuri}"); let result = rpc_client .invoke_method(uuri, payload, CallOptionsBuilder::default().build()) .await; @@ -70,7 +70,7 @@ async fn main() { // process the result if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() { let value = v.into_iter().map(|c| c as char).collect::(); - println!("Receive {}", value); + println!("Receive {value}"); } else { println!("Failed to get result from invoke_method."); } diff --git a/examples/rpc_server.rs b/examples/rpc_server.rs index 1ee0c58..1139d4e 100644 --- a/examples/rpc_server.rs +++ b/examples/rpc_server.rs @@ -64,12 +64,7 @@ async fn main() { // Build the payload to send back if let Data::Value(v) = payload.unwrap().data.unwrap() { let value = v.into_iter().map(|c| c as char).collect::(); - println!( - "Receive {} from {} to {}", - value, - source.to_string(), - sink.to_string() - ); + println!("Receive {value} from {source} to {sink}"); } // Get current time let upayload = UPayload { @@ -92,7 +87,7 @@ async fn main() { .unwrap(); } Err(ustatus) => { - println!("Internal Error: {:?}", ustatus); + println!("Internal Error: {ustatus:?}"); } } }; diff --git a/examples/subscriber.rs b/examples/subscriber.rs index 10359e1..a90d888 100644 --- a/examples/subscriber.rs +++ b/examples/subscriber.rs @@ -26,10 +26,10 @@ fn callback(result: Result) { let uri = msg.attributes.unwrap().source.unwrap().to_string(); if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { let value = v.into_iter().map(|c| c as char).collect::(); - println!("Receiving {} from {}", value, uri); + println!("Receiving {value} from {uri}"); } } - Err(ustatus) => println!("Internal Error: {:?}", ustatus), + Err(ustatus) => println!("Internal Error: {ustatus:?}"), } } diff --git a/src/lib.rs b/src/lib.rs index 17d6384..0e275fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,7 +223,7 @@ mod tests { let uuri = UUri { authority: Some(UAuthority { name: Some("UAuthName".to_string()), - number: Some(Number::Id(vec![01, 02, 03, 10, 11, 12])), + number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])), ..Default::default() }) .into(), diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 490ca5f..b9f4dfb 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -67,7 +67,7 @@ fn create_rpcserver_uuri() -> UUri { fn create_authority() -> UAuthority { UAuthority { name: Some("UAuthName".to_string()), - number: Some(Number::Id(vec![01, 02, 03, 10, 11, 12])), + number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])), ..Default::default() } } @@ -92,11 +92,10 @@ async fn test_utransport_register_and_unregister() { assert_eq!(listener_string, "upl/0100162e04d20100_0"); // Able to ungister - let result = upclient + upclient .unregister_listener(uuri.clone(), &listener_string) .await .unwrap(); - assert_eq!(result, ()); // Unable to ungister let result = upclient @@ -108,7 +107,7 @@ async fn test_utransport_register_and_unregister() { UCode::INVALID_ARGUMENT, "Publish listener doesn't exist" )) - ) + ); } #[async_std::test] @@ -124,11 +123,10 @@ async fn test_rpcserver_register_and_unregister() { assert_eq!(listener_string, "upl/0100162e04d20100_0"); // Able to ungister - let result = upclient + upclient .unregister_rpc_listener(uuri.clone(), &listener_string) .await .unwrap(); - assert_eq!(result, ()); // Unable to ungister let result = upclient @@ -140,7 +138,7 @@ async fn test_rpcserver_register_and_unregister() { UCode::INVALID_ARGUMENT, "RPC request listener doesn't exist" )) - ) + ); } #[async_std::test] @@ -159,11 +157,10 @@ async fn test_utransport_special_uuri_register_and_unregister() { ); // Able to ungister - let result = upclient + upclient .unregister_listener(uuri.clone(), &listener_string) .await .unwrap(); - assert_eq!(result, ()); // Unable to ungister let result = upclient @@ -175,7 +172,7 @@ async fn test_utransport_special_uuri_register_and_unregister() { UCode::INVALID_ARGUMENT, "RPC response callback doesn't exist" )) - ) + ); } #[async_std::test] @@ -197,7 +194,7 @@ async fn test_publish_and_subscribe() { panic!("The message should be Data::Value type."); } } - Err(ustatus) => panic!("Internal Error: {:?}", ustatus), + Err(ustatus) => panic!("Internal Error: {ustatus:?}"), }; let listener_string = upclient .register_listener(uuri.clone(), Box::new(listener)) @@ -286,7 +283,7 @@ async fn test_rpc_server_client() { .unwrap(); } Err(ustatus) => { - panic!("Internal Error: {:?}", ustatus); + panic!("Internal Error: {ustatus:?}"); } } }; @@ -370,7 +367,7 @@ async fn test_register_listener_with_special_uuri() { } } } - Err(ustatus) => panic!("Internal Error: {:?}", ustatus), + Err(ustatus) => panic!("Internal Error: {ustatus:?}"), }; let listener_string = upclient1 .register_listener(listener_uuri.clone(), Box::new(listener)) From a965e7ac6a956ef87a9bbd73a599e977195e9b22 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 1 Mar 2024 11:54:51 +0800 Subject: [PATCH 2/5] Update README Signed-off-by: ChenYing Kuo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index cf80df9..900988b 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Rust UPClient implementation for the Zenoh transport ```shell # Check clippy -cargo clippy +cargo clippy --all-targets # Build cargo build # Run test From 9652ba34b8923db69097c4d30c49cd091b3ce362 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 1 Mar 2024 16:13:19 +0800 Subject: [PATCH 3/5] Keep update with the latest up-rust. Signed-off-by: ChenYing Kuo --- examples/publisher.rs | 34 +++++++--------------- src/rpc.rs | 36 ++++++++++++++++------- src/utransport.rs | 2 +- tests/integration_test.rs | 61 ++++++++++++--------------------------- 4 files changed, 55 insertions(+), 78 deletions(-) diff --git a/examples/publisher.rs b/examples/publisher.rs index 50d17e8..9be5642 100644 --- a/examples/publisher.rs +++ b/examples/publisher.rs @@ -14,9 +14,9 @@ use async_std::task; use std::time; use up_rust::{ - transport::builder::UAttributesBuilder, - transport::datamodel::UTransport, - uprotocol::{Data, UEntity, UMessage, UPayload, UPayloadFormat, UPriority, UResource, UUri}, + transport::{builder::UMessageBuilder, datamodel::UTransport}, + uprotocol::{UEntity, UPayloadFormat, UResource, UUri}, + uuid::builder::UUIDBuilder, }; use uprotocol_zenoh::UPClientZenoh; use zenoh::config::Config; @@ -50,30 +50,18 @@ async fn main() { ..Default::default() }; - // create uattributes - let mut attributes = UAttributesBuilder::publish(UPriority::UPRIORITY_CS4).build(); - attributes.sink = Some(uuri.clone()).into(); - // TODO: We need to check how to set the source - attributes.source = Some(uuri.clone()).into(); - let mut cnt: u64 = 0; loop { let data = format!("{cnt}"); - let payload = UPayload { - length: Some(0), - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(data.as_bytes().to_vec())), - ..Default::default() - }; - println!("Sending {data} to {uuri}..."); - publisher - .send(UMessage { - attributes: Some(attributes.clone()).into(), - payload: Some(payload).into(), - ..Default::default() - }) - .await + let umessage = UMessageBuilder::publish(&uuri) + .build_with_payload( + &UUIDBuilder::new(), + data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) .unwrap(); + println!("Sending {data} to {uuri}..."); + publisher.send(umessage).await.unwrap(); task::sleep(time::Duration::from_millis(1000)).await; cnt += 1; } diff --git a/src/rpc.rs b/src/rpc.rs index f934939..6ea52d4 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -16,8 +16,8 @@ use async_trait::async_trait; use std::{string::ToString, time::Duration}; use up_rust::{ rpc::{CallOptions, RpcClient, RpcClientResult, RpcMapperError, RpcServer}, - transport::{builder::UAttributesBuilder, datamodel::UTransport}, - uprotocol::{Data, UMessage, UPayload, UPriority, UStatus, UUri}, + transport::{builder::UMessageBuilder, datamodel::UTransport}, + uprotocol::{Data, UMessage, UPayload, UStatus, UUri}, uri::{builder::resourcebuilder::UResourceBuilder, validator::UriValidator}, uuid::builder::UUIDBuilder, }; @@ -52,16 +52,30 @@ impl RpcClient for UPClientZenoh { }; // Generate UAttributes - // TODO: Check the ttl - let mut uattributes = - UAttributesBuilder::request(UPriority::UPRIORITY_CS4, topic.clone(), 255).build(); - // TODO: How to create the source address for Response + let uuid_builder = UUIDBuilder::new(); + let reqid = UUIDBuilder::new().build(); + // Create response address let mut source = topic.clone(); source.resource = Some(UResourceBuilder::for_rpc_response()).into(); - uattributes.source = Some(source).into(); - uattributes.reqid = Some(UUIDBuilder::new().build()).into(); - // TODO: how to map CallOptions timeout into uAttributes - uattributes.token = options.token().map(ToString::to_string); + // TODO: Check the ttl + let umessage = if let Some(token) = options.token() { + UMessageBuilder::request(&topic, &source, &reqid, 255) + .with_token(&token.to_string()) + .build(&uuid_builder) + } else { + UMessageBuilder::request(&topic, &source, &reqid, 255).build(&uuid_builder) + }; + // Extract uAttributes + let Ok(UMessage { + attributes: uattributes, + .. + }) = umessage + else { + return Err(RpcMapperError::UnexpectedError(String::from( + "Unable to create uAttributes", + ))); + }; + // Put into attachment let Ok(attachment) = UPClientZenoh::uattributes_to_attachment(&uattributes) else { return Err(RpcMapperError::UnexpectedError(String::from( "Invalid uAttributes", @@ -103,7 +117,7 @@ impl RpcClient for UPClientZenoh { }; // TODO: Need to check attributes is correct or not Ok(UMessage { - attributes: Some(uattributes).into(), + attributes: uattributes, payload: Some(UPayload { length: Some(0), format: encoding.into(), diff --git a/src/utransport.rs b/src/utransport.rs index 910a08c..560ab82 100644 --- a/src/utransport.rs +++ b/src/utransport.rs @@ -482,7 +482,7 @@ impl UTransport for UPClientZenoh { ) })?; // Get Zenoh key - let topic = attributes.clone().sink; + let topic = attributes.clone().source; let zenoh_key = UPClientZenoh::to_zenoh_key_string(&topic)?; // Send Publish self.send_publish(&zenoh_key, payload, attributes).await diff --git a/tests/integration_test.rs b/tests/integration_test.rs index b9f4dfb..4ec6e49 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -15,12 +15,13 @@ use async_std::task::{self, block_on}; use std::{sync::Arc, time}; use up_rust::{ rpc::{CallOptionsBuilder, RpcClient, RpcServer}, - transport::{builder::UAttributesBuilder, datamodel::UTransport}, + transport::{builder::UMessageBuilder, datamodel::UTransport}, uprotocol::{ uri::uauthority::Number, Data, UAuthority, UCode, UEntity, UMessage, UMessageType, - UPayload, UPayloadFormat, UPriority, UResource, UStatus, UUri, + UPayload, UPayloadFormat, UResource, UStatus, UUri, }, uri::builder::resourcebuilder::UResourceBuilder, + uuid::builder::UUIDBuilder, }; use uprotocol_zenoh::UPClientZenoh; use zenoh::config::Config; @@ -188,7 +189,7 @@ async fn test_publish_and_subscribe() { Ok(msg) => { if let Data::Value(v) = msg.payload.unwrap().data.unwrap() { let value = v.into_iter().map(|c| c as char).collect::(); - assert_eq!(msg.attributes.unwrap().sink.unwrap(), uuri_cloned); + assert_eq!(msg.attributes.unwrap().source.unwrap(), uuri_cloned); assert_eq!(value, data_cloned); } else { panic!("The message should be Data::Value type."); @@ -201,27 +202,14 @@ async fn test_publish_and_subscribe() { .await .unwrap(); - // Create uattributes - let mut attributes = UAttributesBuilder::publish(UPriority::UPRIORITY_CS4).build(); - attributes.sink = Some(uuri.clone()).into(); - // TODO: Check what source we should fill - attributes.source = Some(uuri.clone()).into(); - - // Publish the data - let payload = UPayload { - length: Some(0), - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(target_data.as_bytes().to_vec())), - ..Default::default() - }; - upclient - .send(UMessage { - attributes: Some(attributes.clone()).into(), - payload: Some(payload).into(), - ..Default::default() - }) - .await + let umessage = UMessageBuilder::publish(&uuri) + .build_with_payload( + &UUIDBuilder::new(), + target_data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) .unwrap(); + upclient.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; @@ -379,27 +367,14 @@ async fn test_register_listener_with_special_uuri() { let mut publish_uuri = create_utransport_uuri(); publish_uuri.authority = Some(create_authority()).into(); - // Create uattributes - let mut attributes = UAttributesBuilder::publish(UPriority::UPRIORITY_CS4).build(); - attributes.sink = Some(publish_uuri.clone()).into(); - // TODO: Check what source we should fill - attributes.source = Some(publish_uuri.clone()).into(); - - // Publish the data - let payload = UPayload { - length: Some(0), - format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(), - data: Some(Data::Value(publish_data.as_bytes().to_vec())), - ..Default::default() - }; - upclient2 - .send(UMessage { - attributes: Some(attributes.clone()).into(), - payload: Some(payload).into(), - ..Default::default() - }) - .await + let umessage = UMessageBuilder::publish(&publish_uuri) + .build_with_payload( + &UUIDBuilder::new(), + publish_data.as_bytes().to_vec().into(), + UPayloadFormat::UPAYLOAD_FORMAT_TEXT, + ) .unwrap(); + upclient2.send(umessage).await.unwrap(); // Waiting for the subscriber to receive data task::sleep(time::Duration::from_millis(1000)).await; From fe3a101ec7c082cbdafe30ef7669e88efe09b581 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 1 Mar 2024 16:39:25 +0800 Subject: [PATCH 4/5] Update the way to serialize micro-form UUri. Signed-off-by: ChenYing Kuo --- src/lib.rs | 44 ++++++++++++++++---------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0e275fa..55f5fc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,15 +15,11 @@ pub mod rpc; pub mod utransport; use protobuf::{Enum, Message}; -use std::io::Write; use std::{ collections::HashMap, sync::{atomic::AtomicU64, Arc, Mutex}, }; -use up_rust::{ - uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri}, - uri::serializer::{MicroUriSerializer, UriSerializer}, -}; +use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri}; use zenoh::{ config::Config, prelude::r#async::*, @@ -70,31 +66,23 @@ impl UPClientZenoh { }) } - // TODO: Workaround function. Should be added in up-rust fn get_uauth_from_uuri(uri: &UUri) -> Result { - let mut buf = vec![]; if let Some(authority) = uri.authority.as_ref() { - if authority.has_id() { - let id = authority.id().to_vec(); - let len = u8::try_from(id.len()).map_err(|_| { - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority") - })?; - buf.write(&[len]).map_err(|_| { - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority") - })?; - buf.write_all(&id).map_err(|_| { - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority") - })?; - } else if authority.has_ip() { - let ip = authority.ip().to_vec(); - buf.write_all(&ip).map_err(|_| { - UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority") - })?; - } + let buf: Vec = authority.try_into().map_err(|_| { + UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Unable to transform UAuthority into micro form", + ) + })?; + Ok(buf + .iter() + .fold(String::new(), |s, c| s + &format!("{c:02x}"))) + } else { + Err(UStatus::fail_with_code( + UCode::INVALID_ARGUMENT, + "Empty UAuthority", + )) } - Ok(buf - .iter() - .fold(String::new(), |s, c| s + &format!("{c:02x}"))) } // The UURI format should be "upr//" or "upl/" @@ -102,7 +90,7 @@ impl UPClientZenoh { if uri.authority.is_some() && uri.entity.is_none() && uri.resource.is_none() { Ok(String::from("upr/") + &UPClientZenoh::get_uauth_from_uuri(uri)? + "/**") } else { - let micro_uuri = MicroUriSerializer::serialize(uri).map_err(|_| { + let micro_uuri: Vec = uri.try_into().map_err(|_| { UStatus::fail_with_code( UCode::INVALID_ARGUMENT, "Unable to serialize into micro format", From 9df460d9473c315c8ef3440b4204a6ddb6a99519 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 1 Mar 2024 16:43:17 +0800 Subject: [PATCH 5/5] Stick to the specific commit of up-rust. Signed-off-by: ChenYing Kuo --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e352a43..dca1cda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,5 +20,5 @@ log = "0.4.17" prost = "0.12" prost-types = "0.12" protobuf = { version = "3.3" } -up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", branch = "main" } +up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "68c8a1d94f0006daf4ba135c9cbbfddcd793108d" } zenoh = { version = "0.10.1-rc", features = ["unstable"]}