Skip to content

Commit

Permalink
[#13] Use any up-rust upstreamed code
Browse files Browse the repository at this point in the history
* Remove transport handle, collapse down to lib and transport
* Rearrange transport engine
* Implement LocalUriProvider
* Make use of new up_rust::UUri::try_from_parts(), up_rust::UUri::any()
* Show usage of uP-L2 APIs in hello_client and hello_service examples
  * Fix logic of Response handling to mean no commstatus is OK

Implements [#6], [#8]
  • Loading branch information
PLeVasseur authored Aug 5, 2024
1 parent acbb0d0 commit c9d53f3
Show file tree
Hide file tree
Showing 18 changed files with 1,082 additions and 1,276 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ regex = { version = "1.10" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros", "sync", "time", "tracing"] }
up-rust = { git = "https://github.com/eclipse-uprotocol/up-rust", rev = "3a50104421a801d52e1d9c68979db54c013ce43d" }
up-rust = { version = "0.1.5" }
vsomeip-proc-macro = { path = "vsomeip-proc-macro" }
vsomeip-sys = { path = "vsomeip-sys", default-features = false }

Expand Down
121 changes: 65 additions & 56 deletions up-transport-vsomeip/examples/hello_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
use async_trait::async_trait;
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::trace;
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use up_rust::communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload};
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
use up_rust::{UStatus, UUri};
use up_transport_vsomeip::UPTransportVsomeip;

const HELLO_SERVICE_ID: u16 = 0x6000;
Expand All @@ -24,38 +37,12 @@ const HELLO_SERVICE_UE_VERSION_MAJOR: u8 = HELLO_SERVICE_MAJOR;
const HELLO_SERVICE_RESOURCE_ID: u16 = HELLO_METHOD_ID;

const CLIENT_AUTHORITY: &str = "me_authority";
const CLIENT_UE_ID: u16 = 0x5678;
const CLIENT_UE_ID: u32 = 0x5678;
const CLIENT_UE_VERSION_MAJOR: u8 = 1;
const CLIENT_RESOURCE_ID: u16 = 0;

const REQUEST_TTL: u32 = 1000;

struct ServiceResponseListener;

#[async_trait]
impl UListener for ServiceResponseListener {
async fn on_receive(&self, msg: UMessage) {
println!("ServiceResponseListener: Received a message: {msg:?}");

let mut msg = msg.clone();

if let Some(ref mut attributes) = msg.attributes.as_mut() {
attributes.payload_format =
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
}

let Ok(hello_response) = msg.extract_protobuf_payload::<HelloResponse>() else {
panic!("Unable to parse into HelloResponse");
};

println!("Here we received response: {hello_response:?}");
}

async fn on_error(&self, err: UStatus) {
println!("ServiceResponseListener: Encountered an error: {err:?}");
}
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
env_logger::init();
Expand All @@ -70,36 +57,34 @@ async fn main() -> Result<(), UStatus> {

// There will be a single vsomeip_transport, as there is a connection into device and a streamer
// TODO: Add error handling if we fail to create a UPTransportVsomeip
let client: Arc<dyn UTransport> = Arc::new(
let client_uuri = UUri::try_from_parts(
CLIENT_AUTHORITY,
CLIENT_UE_ID,
CLIENT_UE_VERSION_MAJOR,
CLIENT_RESOURCE_ID,
)
.unwrap();
let client = Arc::new(
UPTransportVsomeip::new_with_config(
&CLIENT_AUTHORITY.to_string(),
client_uuri,
&HELLO_SERVICE_AUTHORITY.to_string(),
CLIENT_UE_ID,
&vsomeip_config.unwrap(),
None,
)
.unwrap(),
);

let source = UUri {
authority_name: CLIENT_AUTHORITY.to_string(),
ue_id: CLIENT_UE_ID as u32,
ue_version_major: CLIENT_UE_VERSION_MAJOR as u32,
resource_id: CLIENT_RESOURCE_ID as u32,
..Default::default()
};
let sink = UUri {
authority_name: HELLO_SERVICE_AUTHORITY.to_string(),
ue_id: HELLO_SERVICE_UE_ID,
ue_version_major: HELLO_SERVICE_UE_VERSION_MAJOR as u32,
resource_id: HELLO_SERVICE_RESOURCE_ID as u32,
..Default::default()
};

let service_response_listener: Arc<dyn UListener> = Arc::new(ServiceResponseListener);
client
.register_listener(&sink, Some(&source), service_response_listener)
.await?;
let l2_client = InMemoryRpcClient::new(client.clone(), client.clone())
.await
.unwrap();

let sink = UUri::try_from_parts(
HELLO_SERVICE_AUTHORITY,
HELLO_SERVICE_UE_ID,
HELLO_SERVICE_UE_VERSION_MAJOR,
HELLO_SERVICE_RESOURCE_ID,
)
.unwrap();

let mut i = 0;
loop {
Expand All @@ -110,12 +95,36 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};
i += 1;
println!("Sending Request message with payload:\n{hello_request:?}");

let call_options = CallOptions::for_rpc_request(REQUEST_TTL, None, None, None);
let invoke_res = l2_client
.invoke_method(
sink.clone(),
call_options,
Some(UPayload::try_from_protobuf(hello_request).unwrap()),
)
.await;

let Ok(response) = invoke_res else {
panic!(
"Hit an error attempting to invoke method: {:?}",
invoke_res.err().unwrap()
);
};

let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL)
.build_with_protobuf_payload(&hello_request)
.unwrap();
println!("Sending Request message:\n{request_msg:?}");
let hello_response_vsomeip_unspecified_payload_format = response.unwrap();
let hello_response_protobuf_payload_format = UPayload::new(
hello_response_vsomeip_unspecified_payload_format.payload(),
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
);

client.send(request_msg).await?;
let Ok(hello_response) =
hello_response_protobuf_payload_format.extract_protobuf::<HelloResponse>()
else {
panic!("Unable to parse into HelloResponse");
};

println!("Here we received response: {hello_response:?}");
}
}
113 changes: 58 additions & 55 deletions up-transport-vsomeip/examples/hello_service.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use async_trait::async_trait;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::{error, trace};
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use up_rust::communication::{
InMemoryRpcServer, RequestHandler, RpcServer, ServiceInvocationError, UPayload,
};
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
use up_rust::{UCode, UStatus, UUri};
use up_transport_vsomeip::UPTransportVsomeip;
use up_transport_vsomeip::UeId;

const HELLO_SERVICE_ID: u16 = 0x6000;
const HELLO_INSTANCE_ID: u32 = 0x0001;
Expand All @@ -31,27 +46,28 @@ const _CLIENT_RESOURCE_ID: u16 = 0;

const _REQUEST_TTL: u32 = 1000;

struct ServiceRequestResponder {
client: Arc<dyn UTransport>,
}
impl ServiceRequestResponder {
pub fn new(client: Arc<dyn UTransport>) -> Self {
Self { client }
struct ServiceRequestHandler;
impl ServiceRequestHandler {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl UListener for ServiceRequestResponder {
async fn on_receive(&self, msg: UMessage) {
println!("ServiceRequestResponder: Received a message: {msg:?}");

let mut msg = msg.clone();

if let Some(ref mut attributes) = msg.attributes.as_mut() {
attributes.payload_format =
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
}

let hello_request = msg.extract_protobuf_payload::<HelloRequest>();
impl RequestHandler for ServiceRequestHandler {
async fn handle_request(
&self,
resource_id: u16,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
println!("ServiceRequestHandler: Received a resource_id: {resource_id} request_payload: {request_payload:?}");

let hello_request_vsomeip_unspecified_payload_format = request_payload.unwrap();
let hello_request_protobuf_payload_format = UPayload::new(
hello_request_vsomeip_unspecified_payload_format.payload(),
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
);
let hello_request =
hello_request_protobuf_payload_format.extract_protobuf::<HelloRequest>();

let hello_request = match hello_request {
Ok(hello_request) => {
Expand All @@ -60,7 +76,10 @@ impl UListener for ServiceRequestResponder {
}
Err(err) => {
error!("Unable to parse HelloRequest: {err:?}");
return;
return Err(ServiceInvocationError::RpcError(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to parse hello_request",
)));
}
};

Expand All @@ -69,14 +88,9 @@ impl UListener for ServiceRequestResponder {
..Default::default()
};

let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap())
.build_with_wrapped_protobuf_payload(&hello_response)
.unwrap();
self.client.send(response_msg).await.unwrap();
}
println!("Making response to send back: {hello_response:?}");

async fn on_error(&self, err: UStatus) {
println!("ServiceRequestResponder: Encountered an error: {err:?}");
Ok(Some(UPayload::try_from_protobuf(hello_response).unwrap()))
}
}

Expand All @@ -94,41 +108,30 @@ async fn main() -> Result<(), UStatus> {

// There will be a single vsomeip_transport, as there is a connection into device and a streamer
// TODO: Add error handling if we fail to create a UPTransportVsomeip
let service: Arc<dyn UTransport> = Arc::new(
let service_uuri = UUri::try_from_parts(
HELLO_SERVICE_AUTHORITY,
HELLO_SERVICE_UE_ID,
HELLO_SERVICE_MAJOR,
// HELLO_SERVICE_RESOURCE_ID,
0,
)
.unwrap();
let service = Arc::new(
UPTransportVsomeip::new_with_config(
&HELLO_SERVICE_AUTHORITY.to_string(),
service_uuri,
&CLIENT_AUTHORITY.to_string(),
HELLO_SERVICE_UE_ID as UeId,
&vsomeip_config.unwrap(),
None,
)
.unwrap(),
);
let l2_service = InMemoryRpcServer::new(service.clone(), service.clone());

let source_filter = UUri {
authority_name: "*".to_string(),
ue_id: 0x0000_FFFF,
ue_version_major: 0xFF,
resource_id: 0xFFFF,
..Default::default()
};
let sink_filter = UUri {
authority_name: HELLO_SERVICE_AUTHORITY.to_string(),
ue_id: HELLO_SERVICE_UE_ID,
ue_version_major: HELLO_SERVICE_MAJOR as u32,
resource_id: HELLO_SERVICE_RESOURCE_ID as u32,
..Default::default()
};
let service_request_responder: Arc<dyn UListener> =
Arc::new(ServiceRequestResponder::new(service.clone()));
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
service
.register_listener(
&source_filter,
Some(&sink_filter),
service_request_responder.clone(),
)
.await?;
let service_request_handler = Arc::new(ServiceRequestHandler::new());
l2_service
.register_endpoint(None, HELLO_SERVICE_RESOURCE_ID, service_request_handler)
.await
.expect("Unable to register endpoint");

thread::park();
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions up-transport-vsomeip/src/determine_message_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ fn determine_type(
}
Some(ue_id) => ue_id,
},
DeterminationType::Send => source_filter.ue_id as ClientId,
DeterminationType::Send => source_filter.ue_id,
};
Ok(RegistrationType::Publish(client_id))
Ok(RegistrationType::Publish(client_id as ClientId))
}
}

Expand Down
Loading

0 comments on commit c9d53f3

Please sign in to comment.