Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CI issues #5

Merged
merged 5 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ log = "0.4.17"
prost = "0.12"
prost-types = "0.12"
protobuf = { version = "3.3" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", branch = "main" }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "68c8a1d94f0006daf4ba135c9cbbfddcd793108d" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, and then if there's a tag, we can use that instead. I'm always super impressed by Rust tooling. Using a git tag has been supported since 2014 🙂

zenoh = { version = "0.10.1-rc", features = ["unstable"]}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Rust UPClient implementation for the Zenoh transport

```shell
# Check clippy
cargo clippy
cargo clippy --all-targets
# Build
cargo build
# Run test
Expand Down
36 changes: 12 additions & 24 deletions examples/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
use async_std::task;
use std::time;
use up_rust::{
transport::builder::UAttributesBuilder,
transport::datamodel::UTransport,
uprotocol::{Data, UEntity, UMessage, UPayload, UPayloadFormat, UPriority, UResource, UUri},
transport::{builder::UMessageBuilder, datamodel::UTransport},
uprotocol::{UEntity, UPayloadFormat, UResource, UUri},
uuid::builder::UUIDBuilder,
};
use uprotocol_zenoh::UPClientZenoh;
use zenoh::config::Config;
Expand Down Expand Up @@ -50,30 +50,18 @@ async fn main() {
..Default::default()
};

// create uattributes
let mut attributes = UAttributesBuilder::publish(UPriority::UPRIORITY_CS4).build();
attributes.sink = Some(uuri.clone()).into();
// TODO: We need to check how to set the source
attributes.source = Some(uuri.clone()).into();

let mut cnt: u64 = 0;
loop {
let data = format!("{}", cnt);
let payload = UPayload {
length: Some(0),
format: UPayloadFormat::UPAYLOAD_FORMAT_TEXT.into(),
data: Some(Data::Value(data.as_bytes().to_vec())),
..Default::default()
};
println!("Sending {} to {}...", data, uuri.to_string());
publisher
.send(UMessage {
attributes: Some(attributes.clone()).into(),
payload: Some(payload).into(),
..Default::default()
})
.await
let data = format!("{cnt}");
let umessage = UMessageBuilder::publish(&uuri)
.build_with_payload(
&UUIDBuilder::new(),
data.as_bytes().to_vec().into(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.unwrap();
println!("Sending {data} to {uuri}...");
publisher.send(umessage).await.unwrap();
task::sleep(time::Duration::from_millis(1000)).await;
cnt += 1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ async fn main() {
};

// invoke RPC method
println!("Send request to {}", uuri.to_string());
println!("Send request to {uuri}");
let result = rpc_client
.invoke_method(uuri, payload, CallOptionsBuilder::default().build())
.await;

// process the result
if let Data::Value(v) = result.unwrap().payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!("Receive {}", value);
println!("Receive {value}");
} else {
println!("Failed to get result from invoke_method.");
}
Expand Down
9 changes: 2 additions & 7 deletions examples/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,7 @@ async fn main() {
// Build the payload to send back
if let Data::Value(v) = payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!(
"Receive {} from {} to {}",
value,
source.to_string(),
sink.to_string()
);
println!("Receive {value} from {source} to {sink}");
}
// Get current time
let upayload = UPayload {
Expand All @@ -92,7 +87,7 @@ async fn main() {
.unwrap();
}
Err(ustatus) => {
println!("Internal Error: {:?}", ustatus);
println!("Internal Error: {ustatus:?}");
}
}
};
Expand Down
4 changes: 2 additions & 2 deletions examples/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ fn callback(result: Result<UMessage, UStatus>) {
let uri = msg.attributes.unwrap().source.unwrap().to_string();
if let Data::Value(v) = msg.payload.unwrap().data.unwrap() {
let value = v.into_iter().map(|c| c as char).collect::<String>();
println!("Receiving {} from {}", value, uri);
println!("Receiving {value} from {uri}");
}
}
Err(ustatus) => println!("Internal Error: {:?}", ustatus),
Err(ustatus) => println!("Internal Error: {ustatus:?}"),
}
}

Expand Down
46 changes: 17 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ pub mod rpc;
pub mod utransport;

use protobuf::{Enum, Message};
use std::io::Write;
use std::{
collections::HashMap,
sync::{atomic::AtomicU64, Arc, Mutex},
};
use up_rust::{
uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri},
uri::serializer::{MicroUriSerializer, UriSerializer},
};
use up_rust::uprotocol::{UAttributes, UCode, UMessage, UPayloadFormat, UPriority, UStatus, UUri};
use zenoh::{
config::Config,
prelude::r#async::*,
Expand Down Expand Up @@ -70,39 +66,31 @@ impl UPClientZenoh {
})
}

// TODO: Workaround function. Should be added in up-rust
fn get_uauth_from_uuri(uri: &UUri) -> Result<String, UStatus> {
let mut buf = vec![];
if let Some(authority) = uri.authority.as_ref() {
if authority.has_id() {
let id = authority.id().to_vec();
let len = u8::try_from(id.len()).map_err(|_| {
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority")
})?;
buf.write(&[len]).map_err(|_| {
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority")
})?;
buf.write_all(&id).map_err(|_| {
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority")
})?;
} else if authority.has_ip() {
let ip = authority.ip().to_vec();
buf.write_all(&ip).map_err(|_| {
UStatus::fail_with_code(UCode::INVALID_ARGUMENT, "Wrong authority")
})?;
}
let buf: Vec<u8> = authority.try_into().map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to transform UAuthority into micro form",
)
})?;
Ok(buf
.iter()
.fold(String::new(), |s, c| s + &format!("{c:02x}")))
} else {
Err(UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Empty UAuthority",
))
}
Ok(buf
.iter()
.fold(String::new(), |s, c| s + &format!("{c:02x}")))
}

// The UURI format should be "upr/<UAuthority id or ip>/<the rest of remote UUri>" or "upl/<local UUri>"
fn to_zenoh_key_string(uri: &UUri) -> Result<String, UStatus> {
if uri.authority.is_some() && uri.entity.is_none() && uri.resource.is_none() {
Ok(String::from("upr/") + &UPClientZenoh::get_uauth_from_uuri(uri)? + "/**")
} else {
let micro_uuri = MicroUriSerializer::serialize(uri).map_err(|_| {
let micro_uuri: Vec<u8> = uri.try_into().map_err(|_| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

UStatus::fail_with_code(
UCode::INVALID_ARGUMENT,
"Unable to serialize into micro format",
Expand Down Expand Up @@ -223,7 +211,7 @@ mod tests {
let uuri = UUri {
authority: Some(UAuthority {
name: Some("UAuthName".to_string()),
number: Some(Number::Id(vec![01, 02, 03, 10, 11, 12])),
number: Some(Number::Id(vec![1, 2, 3, 10, 11, 12])),
..Default::default()
})
.into(),
Expand Down
36 changes: 25 additions & 11 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use async_trait::async_trait;
use std::{string::ToString, time::Duration};
use up_rust::{
rpc::{CallOptions, RpcClient, RpcClientResult, RpcMapperError, RpcServer},
transport::{builder::UAttributesBuilder, datamodel::UTransport},
uprotocol::{Data, UMessage, UPayload, UPriority, UStatus, UUri},
transport::{builder::UMessageBuilder, datamodel::UTransport},
uprotocol::{Data, UMessage, UPayload, UStatus, UUri},
uri::{builder::resourcebuilder::UResourceBuilder, validator::UriValidator},
uuid::builder::UUIDBuilder,
};
Expand Down Expand Up @@ -52,16 +52,30 @@ impl RpcClient for UPClientZenoh {
};

// Generate UAttributes
// TODO: Check the ttl
let mut uattributes =
UAttributesBuilder::request(UPriority::UPRIORITY_CS4, topic.clone(), 255).build();
// TODO: How to create the source address for Response
let uuid_builder = UUIDBuilder::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this RPC flow still needs some thought though.

We should not be creating a new UUIDBuilder here and getting the reqid from it. I believe this is where the UUIDBuilder that's been constructed already for the uE sending the request should be used.

As is stated over here in the UUID spec, the rand_b portion of the UUID is used to uniquely identify the uE, thus it should be "stable" and thus we should use the UUIDBuilder already associated with this uE.

I suppose that may mean that on construction, you'd need to construct a UUIDBuilder and then use it within RpcClient::invoke_method().

WDYT?

let reqid = UUIDBuilder::new().build();
// Create response address
let mut source = topic.clone();
source.resource = Some(UResourceBuilder::for_rpc_response()).into();
uattributes.source = Some(source).into();
uattributes.reqid = Some(UUIDBuilder::new().build()).into();
// TODO: how to map CallOptions timeout into uAttributes
uattributes.token = options.token().map(ToString::to_string);
// TODO: Check the ttl
let umessage = if let Some(token) = options.token() {
UMessageBuilder::request(&topic, &source, &reqid, 255)
.with_token(&token.to_string())
.build(&uuid_builder)
} else {
UMessageBuilder::request(&topic, &source, &reqid, 255).build(&uuid_builder)
};
// Extract uAttributes
let Ok(UMessage {
attributes: uattributes,
..
}) = umessage
else {
return Err(RpcMapperError::UnexpectedError(String::from(
"Unable to create uAttributes",
)));
};
// Put into attachment
let Ok(attachment) = UPClientZenoh::uattributes_to_attachment(&uattributes) else {
return Err(RpcMapperError::UnexpectedError(String::from(
"Invalid uAttributes",
Expand Down Expand Up @@ -103,7 +117,7 @@ impl RpcClient for UPClientZenoh {
};
// TODO: Need to check attributes is correct or not
Ok(UMessage {
attributes: Some(uattributes).into(),
attributes: uattributes,
payload: Some(UPayload {
length: Some(0),
format: encoding.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/utransport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ impl UTransport for UPClientZenoh {
)
})?;
// Get Zenoh key
let topic = attributes.clone().sink;
let topic = attributes.clone().source;
let zenoh_key = UPClientZenoh::to_zenoh_key_string(&topic)?;
// Send Publish
self.send_publish(&zenoh_key, payload, attributes).await
Expand Down
Loading