Skip to content

Commit

Permalink
Support sending CanFrame via tokio::CanFdSocket
Browse files Browse the repository at this point in the history
This is a breaking change.

This also adds tests for all the additional cases (although no mixed
tests so far).
  • Loading branch information
EliteTK committed Jun 5, 2024
1 parent f8d4273 commit 0495fda
Showing 1 changed file with 113 additions and 17 deletions.
130 changes: 113 additions & 17 deletions src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! }
//! ```
use crate::{
CanAddr, CanAnyFrame, CanFdFrame, CanFrame, Error, IoResult, Result, Socket, SocketOptions,
frame::AsPtr, CanAddr, CanAnyFrame, CanFrame, Error, IoResult, Result, Socket, SocketOptions,
};
use futures::{prelude::*, ready, task::Context};
use std::{
Expand Down Expand Up @@ -138,9 +138,12 @@ pub type CanFdSocket = AsyncCanSocket<crate::CanFdSocket>;

impl CanFdSocket {
/// Write a CAN FD frame to the socket asynchronously
pub async fn write_frame(&self, frame: CanFdFrame) -> IoResult<()> {
pub async fn write_frame<F>(&self, frame: &F) -> IoResult<()>
where
F: Into<CanAnyFrame> + AsPtr,
{
self.0
.async_io(Interest::WRITABLE, |inner| inner.write_frame(&frame))
.async_io(Interest::WRITABLE, |inner| inner.write_frame(frame))
.await
}

Expand All @@ -166,7 +169,7 @@ impl Stream for CanFdSocket {
}
}

impl Sink<CanFdFrame> for CanFdSocket {
impl Sink<CanAnyFrame> for CanFdSocket {
type Error = Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Expand All @@ -184,7 +187,7 @@ impl Sink<CanFdFrame> for CanFdSocket {
Poll::Ready(Ok(()))
}

fn start_send(self: Pin<&mut Self>, item: CanFdFrame) -> Result<()> {
fn start_send(self: Pin<&mut Self>, item: CanAnyFrame) -> Result<()> {
self.0.get_ref().write_frame_insist(&item)?;
Ok(())
}
Expand All @@ -196,7 +199,7 @@ impl Sink<CanFdFrame> for CanFdSocket {
#[cfg(test)]
mod tests {
use super::*;
use crate::{CanFrame, Frame, IoErrorKind, StandardId};
use crate::{CanFdFrame, CanFrame, Frame, IoErrorKind, StandardId};
use embedded_can::Frame as EmbeddedFrame;
use futures::{select, try_join};
use futures_timer::Delay;
Expand Down Expand Up @@ -244,11 +247,18 @@ mod tests {
Ok(())
}

/// Write a test frame to the CanSocket
async fn write_frame_fd(socket: &CanFdSocket) -> Result<()> {
/// Write a test CANFD frame to the CanSocket
async fn write_frame_fd_canfd(socket: &CanFdSocket) -> Result<()> {
let test_frame =
CanFdFrame::new(StandardId::new(0x1).unwrap(), &[0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap();
socket.write_frame(test_frame).await?;
socket.write_frame(&test_frame).await?;
Ok(())
}

/// Write a test CAN frame to the CanSocket
async fn write_frame_fd_can(socket: &CanFdSocket) -> Result<()> {
let test_frame = CanFrame::new(StandardId::new(0x1).unwrap(), &[0]).unwrap();
socket.write_frame(&test_frame).await?;
Ok(())
}

Expand Down Expand Up @@ -292,11 +302,34 @@ mod tests {

#[serial]
#[tokio::test]
async fn test_receive_can_fd() -> Result<()> {
async fn test_receive_can_fd_canfd() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

let send_frames = future::try_join(
write_frame_fd_canfd(&socket1),
write_frame_fd_canfd(&socket1),
);

let recv_frames = async {
let socket2 = recv_frame_fd(socket2).await?;
let _socket2 = recv_frame_fd(socket2).await;
Ok(())
};

try_join!(recv_frames, send_frames)?;

Ok(())
}

#[serial]
#[tokio::test]
async fn test_receive_can_fd_can() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

let send_frames = future::try_join(write_frame_fd(&socket1), write_frame_fd(&socket1));
let send_frames =
future::try_join(write_frame_fd_can(&socket1), write_frame_fd_can(&socket1));

let recv_frames = async {
let socket2 = recv_frame_fd(socket2).await?;
Expand All @@ -311,11 +344,34 @@ mod tests {

#[serial]
#[tokio::test]
async fn test_receive_can_fd_with_stream() -> Result<()> {
async fn test_receive_can_fd_canfd_with_stream() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

let send_frames = future::try_join(write_frame_fd(&socket1), write_frame_fd(&socket1));
let send_frames = future::try_join(
write_frame_fd_canfd(&socket1),
write_frame_fd_canfd(&socket1),
);

let recv_frames = async {
let socket2 = recv_frame_fd_with_stream(socket2).await?;
let _socket2 = recv_frame_fd_with_stream(socket2).await;
Ok(())
};

try_join!(recv_frames, send_frames)?;

Ok(())
}

#[serial]
#[tokio::test]
async fn test_receive_can_fd_can_with_stream() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

let send_frames =
future::try_join(write_frame_fd_can(&socket1), write_frame_fd_can(&socket1));

let recv_frames = async {
let socket2 = recv_frame_fd_with_stream(socket2).await?;
Expand Down Expand Up @@ -364,7 +420,7 @@ mod tests {

#[serial]
#[tokio::test]
async fn test_sink_stream_fd() -> Result<()> {
async fn test_sink_stream_fd_canfd() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

Expand All @@ -387,9 +443,49 @@ mod tests {
.fold(0u8, |acc, _frame| async move { acc + 1 });

let send_frames = async {
let _frame_1 = sink.send(frame_id_1).await?;
let _frame_2 = sink.send(frame_id_2).await?;
let _frame_3 = sink.send(frame_id_3).await?;
let _frame_1 = sink.send(frame_id_1.into()).await?;
let _frame_2 = sink.send(frame_id_2.into()).await?;
let _frame_3 = sink.send(frame_id_3.into()).await?;
println!("Sent 3 frames");
Ok::<(), Error>(())
};

let (x, frame_send_r) = future::join(count_ids_less_than_3, send_frames).await;
frame_send_r?;

assert_eq!(x, 2);

Ok(())
}

#[serial]
#[tokio::test]
async fn test_sink_stream_fd_can() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();

let frame_id_1 = CanFrame::from_raw_id(0x01, &[0u8]).unwrap();
let frame_id_2 = CanFrame::from_raw_id(0x02, &[0u8]).unwrap();
let frame_id_3 = CanFrame::from_raw_id(0x03, &[0u8]).unwrap();

let (mut sink, _stream) = socket1.split();
let (_sink, stream) = socket2.split();

let count_ids_less_than_3 = stream
.map(|x| x.unwrap())
.take_while(|frame| {
if let CanAnyFrame::Normal(frame) = frame {
future::ready(frame.raw_id() < 3)
} else {
future::ready(false)
}
})
.fold(0u8, |acc, _frame| async move { acc + 1 });

let send_frames = async {
let _frame_1 = sink.send(frame_id_1.into()).await?;
let _frame_2 = sink.send(frame_id_2.into()).await?;
let _frame_3 = sink.send(frame_id_3.into()).await?;
println!("Sent 3 frames");
Ok::<(), Error>(())
};
Expand Down

0 comments on commit 0495fda

Please sign in to comment.