From c66f9b0a65b1192e8001c908bfd6f39cbb7b9e2f Mon Sep 17 00:00:00 2001 From: Arnaud Fontaine Date: Sun, 12 Feb 2023 14:22:18 +0100 Subject: [PATCH] improve RaptorQ parameters --- src/bin/diode-receive.rs | 24 ++++++--------------- src/bin/diode-send.rs | 25 ++++++---------------- src/protocol.rs | 27 ++++++++++++++++++++---- src/receive/decoding.rs | 36 +++++++++++++++++++------------- src/send/encoding.rs | 45 +++++++++++++++++++++------------------- 5 files changed, 81 insertions(+), 76 deletions(-) diff --git a/src/bin/diode-receive.rs b/src/bin/diode-receive.rs index 5aefe2a0..3b3c5f6f 100644 --- a/src/bin/diode-receive.rs +++ b/src/bin/diode-receive.rs @@ -3,7 +3,7 @@ use crossbeam_channel::{unbounded, SendError}; use diode::protocol; use diode::receive::decoding; use diode::receive::deserialize; -use log::{debug, error, info}; +use log::{error, info}; use std::{ env, fmt, io, net::{self, SocketAddr, UdpSocket}, @@ -177,18 +177,14 @@ fn main_loop(config: Config) -> Result<(), Error> { .spawn(move || deserialize::new(deserialize_config, decoding_recvs)) .unwrap(); + let object_transmission_info = + protocol::object_transmission_information(config.from_udp_mtu, config.encoding_block_size); + let decoding_config = decoding::Config { - logical_block_size: config.encoding_block_size, - input_mtu: config.from_udp_mtu, + object_transmission_info, flush_timeout: config.flush_timeout, }; - info!( - "decoding with block size of {} bytes and flush timeout of {} miiilseconds", - decoding_config.logical_block_size, - decoding_config.flush_timeout.as_millis(), - ); - thread::Builder::new() .name("decoding".to_string()) .spawn(move || decoding::new(decoding_config, udp_recvq, decoding_sends)) @@ -211,18 +207,10 @@ fn main_loop(config: Config) -> Result<(), Error> { } fn main() { - let mut config = command_args(); + let config = command_args(); init_logger(); - config.encoding_block_size = - protocol::adjust_encoding_block_size(config.from_udp_mtu, config.encoding_block_size); - - debug!( - "adjusting encoding_block_size to {} bytes", - config.encoding_block_size - ); - if let Err(e) = main_loop(config) { error!("failed to launch main_loop: {e}"); } diff --git a/src/bin/diode-send.rs b/src/bin/diode-send.rs index 61bc246c..06500e1a 100644 --- a/src/bin/diode-send.rs +++ b/src/bin/diode-send.rs @@ -4,7 +4,7 @@ use diode::{ protocol, semaphore, send::{devector, encoding, tcp_client, udp_send}, }; -use log::{debug, error, info}; +use log::{error, info}; use std::{ env, fmt, net::{SocketAddr, TcpListener, TcpStream}, @@ -201,18 +201,10 @@ fn connect_loop( } fn main() { - let mut config = command_args(); + let config = command_args(); init_logger(); - config.encoding_block_size = - protocol::adjust_encoding_block_size(config.to_udp_mtu, config.encoding_block_size); - - debug!( - "adjusting encoding_block_size to {} bytes", - config.encoding_block_size - ); - info!( "accepting TCP clients at {} with read buffer of {} bytes", config.from_tcp, config.from_tcp_buffer_size @@ -222,20 +214,15 @@ fn main() { buffer_size: config.from_tcp_buffer_size, }; + let object_transmission_info = + protocol::object_transmission_information(config.to_udp_mtu, config.encoding_block_size); + let encoding_config = encoding::Config { - logical_block_size: config.encoding_block_size, + object_transmission_info, repair_block_size: config.repair_block_size, - output_mtu: config.to_udp_mtu, flush_timeout: config.flush_timeout, }; - info!( - "encoding with block size of {} bytes and repair block size of {} bytes and a flush timeout of {} milliseconds", - encoding_config.logical_block_size, - encoding_config.repair_block_size, - encoding_config.flush_timeout.as_millis(), - ); - let (connect_sendq, connect_recvq) = bounded::(1); let (tcp_sendq, tcp_recvq) = bounded::(config.nb_clients as usize); let (devector_sendq, devector_recvq) = unbounded::>(); diff --git a/src/protocol.rs b/src/protocol.rs index 42bd2aa1..eae7d386 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -135,9 +135,28 @@ impl fmt::Display for ClientMessage { } } -pub(crate) const RAPTORQ_PAYLOAD_SIZE: u64 = 4; +const PACKET_HEADER_SIZE: u16 = 20 + 8; +const RAPTORQ_ALIGNMENT: u16 = 8; -pub fn adjust_encoding_block_size(mtu: u16, encoding_block_size: u64) -> u64 { - (mtu as u64 - RAPTORQ_PAYLOAD_SIZE) - * (encoding_block_size / (mtu as u64 - RAPTORQ_PAYLOAD_SIZE)) +pub fn object_transmission_information( + mtu: u16, + logical_block_size: u64, +) -> raptorq::ObjectTransmissionInformation { + let data_mtu: u16 = RAPTORQ_ALIGNMENT * ((mtu - PACKET_HEADER_SIZE) / RAPTORQ_ALIGNMENT); + + let nb_encoding_packets = logical_block_size / data_mtu as u64; + + let encoding_block_size = data_mtu as u64 * nb_encoding_packets; + + let data_mtu = (encoding_block_size / nb_encoding_packets) as u16; + + raptorq::ObjectTransmissionInformation::with_defaults(encoding_block_size, data_mtu) +} + +pub(crate) fn data_mtu(oti: &raptorq::ObjectTransmissionInformation) -> u16 { + oti.symbol_size() +} + +pub(crate) fn nb_encoding_packets(oti: &raptorq::ObjectTransmissionInformation) -> u64 { + oti.transfer_length() / data_mtu(oti) as u64 } diff --git a/src/receive/decoding.rs b/src/receive/decoding.rs index 0dee89bc..9225b51d 100644 --- a/src/receive/decoding.rs +++ b/src/receive/decoding.rs @@ -1,6 +1,6 @@ use crate::protocol; use crossbeam_channel::{Receiver, RecvTimeoutError}; -use log::{debug, error, trace, warn}; +use log::{debug, error, info, trace, warn}; use raptorq::{self, EncodingPacket, ObjectTransmissionInformation, SourceBlockDecoder}; use std::{ fmt, @@ -10,9 +10,8 @@ use std::{ }; pub struct Config { - pub logical_block_size: u64, + pub object_transmission_info: ObjectTransmissionInformation, pub flush_timeout: Duration, - pub input_mtu: u16, } enum Error { @@ -54,17 +53,19 @@ fn main_loop( udp_recvq: Receiver, deserialize_socket: UnixStream, ) -> Result<(), Error> { - let oti = - ObjectTransmissionInformation::with_defaults(config.logical_block_size, config.input_mtu); + let encoding_block_size = config.object_transmission_info.transfer_length(); let mut deserialize_socket = - io::BufWriter::with_capacity(config.logical_block_size as usize, deserialize_socket); + io::BufWriter::with_capacity(encoding_block_size as usize, deserialize_socket); - let nb_normal_packets = - config.logical_block_size / (config.input_mtu as u64 - protocol::RAPTORQ_PAYLOAD_SIZE); - debug!( - "need at least {} packets for normal decoding", - nb_normal_packets + let nb_normal_packets = config.object_transmission_info.transfer_length() + / config.object_transmission_info.symbol_size() as u64; + + info!( + "decoding will expect {} packets ({} bytes per block) + flush timeout of {} ms", + protocol::nb_encoding_packets(&config.object_transmission_info), + encoding_block_size, + config.flush_timeout.as_millis() ); let mut desynchro = true; @@ -81,8 +82,11 @@ fn main_loop( if nb_normal_packets as usize <= qlen { debug!("trying to decode"); - let mut decoder = - SourceBlockDecoder::new2(block_id, &oti, config.logical_block_size); + let mut decoder = SourceBlockDecoder::new2( + block_id, + &config.object_transmission_info, + encoding_block_size, + ); match decoder.decode(queue) { None => { @@ -136,7 +140,11 @@ fn main_loop( } // message block_id is from next block, flushing current block - let mut decoder = SourceBlockDecoder::new2(block_id, &oti, config.logical_block_size); + let mut decoder = SourceBlockDecoder::new2( + block_id, + &config.object_transmission_info, + encoding_block_size, + ); match decoder.decode(queue) { None => warn!("lost block {block_id}"), diff --git a/src/send/encoding.rs b/src/send/encoding.rs index 1f2a4e61..038a48ca 100644 --- a/src/send/encoding.rs +++ b/src/send/encoding.rs @@ -7,9 +7,8 @@ use std::{collections::VecDeque, fmt, time::Duration}; use super::devector; pub struct Config { - pub logical_block_size: u64, + pub object_transmission_info: ObjectTransmissionInformation, pub repair_block_size: u32, - pub output_mtu: u16, pub flush_timeout: Duration, } @@ -62,31 +61,30 @@ fn main_loop( recvq: Receiver, sendq: Sender, ) -> Result<(), Error> { - let nb_repair_packets = config.repair_block_size / config.output_mtu as u32; + let nb_repair_packets = + config.repair_block_size / protocol::data_mtu(&config.object_transmission_info) as u32; + + let encoding_block_size = config.object_transmission_info.transfer_length() as usize; + + info!( + "encoding will produce {} packets ({} bytes per block) + {} repair packets + flush timeout of {} ms", + protocol::nb_encoding_packets(&config.object_transmission_info), encoding_block_size, nb_repair_packets, config.flush_timeout.as_millis() + ); if nb_repair_packets == 0 { - warn!("configuration produces 0 repair packets"); - } else { - info!( - "{nb_repair_packets} repair packets ({} bytes) per encoding block will be produced", - nb_repair_packets * config.output_mtu as u32 - ); + warn!("configuration produces 0 repair packet"); } - let oti = - ObjectTransmissionInformation::with_defaults(config.logical_block_size, config.output_mtu); - let sbep = SourceBlockEncodingPlan::generate( - (config.logical_block_size / oti.symbol_size() as u64) as u16, + (config.object_transmission_info.transfer_length() + / config.object_transmission_info.symbol_size() as u64) as u16, ); - debug!("object transformation information = {:?} ", oti); - let overhead = protocol::ClientMessage::serialize_padding_overhead(); debug!("padding encoding overhead is {} bytes", overhead); - let mut queue = VecDeque::with_capacity(config.logical_block_size as usize); + let mut queue = VecDeque::with_capacity(encoding_block_size); let mut block_id = 0; @@ -97,7 +95,7 @@ fn main_loop( if queue.is_empty() { continue; } - let padding_needed = config.logical_block_size as usize - queue.len(); + let padding_needed = encoding_block_size - queue.len(); let padding_len = if padding_needed < overhead { debug!("top much padding overhead !"); 0 @@ -124,14 +122,19 @@ fn main_loop( _ => (), } - while (config.logical_block_size as usize) <= queue.len() { + while encoding_block_size <= queue.len() { // full block, we can flush trace!("flushing queue len = {}", queue.len()); - let data = &queue.make_contiguous()[..config.logical_block_size as usize]; + let data = &queue.make_contiguous()[..encoding_block_size]; - let encoder = SourceBlockEncoder::with_encoding_plan2(block_id, &oti, data, &sbep); + let encoder = SourceBlockEncoder::with_encoding_plan2( + block_id, + &config.object_transmission_info, + data, + &sbep, + ); - let _ = queue.drain(0..config.logical_block_size as usize); + let _ = queue.drain(0..encoding_block_size); trace!("after flushing queue len = {}", queue.len()); sendq.send(encoder.source_packets())?;