Skip to content

Commit

Permalink
improve RaptorQ parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
af-anssi committed Feb 12, 2023
1 parent ea36938 commit c66f9b0
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 76 deletions.
24 changes: 6 additions & 18 deletions src/bin/diode-receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crossbeam_channel::{unbounded, SendError};
use diode::protocol;
use diode::receive::decoding;
use diode::receive::deserialize;
use log::{debug, error, info};
use log::{error, info};
use std::{
env, fmt, io,
net::{self, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -177,18 +177,14 @@ fn main_loop(config: Config) -> Result<(), Error> {
.spawn(move || deserialize::new(deserialize_config, decoding_recvs))
.unwrap();

let object_transmission_info =
protocol::object_transmission_information(config.from_udp_mtu, config.encoding_block_size);

let decoding_config = decoding::Config {
logical_block_size: config.encoding_block_size,
input_mtu: config.from_udp_mtu,
object_transmission_info,
flush_timeout: config.flush_timeout,
};

info!(
"decoding with block size of {} bytes and flush timeout of {} miiilseconds",
decoding_config.logical_block_size,
decoding_config.flush_timeout.as_millis(),
);

thread::Builder::new()
.name("decoding".to_string())
.spawn(move || decoding::new(decoding_config, udp_recvq, decoding_sends))
Expand All @@ -211,18 +207,10 @@ fn main_loop(config: Config) -> Result<(), Error> {
}

fn main() {
let mut config = command_args();
let config = command_args();

init_logger();

config.encoding_block_size =
protocol::adjust_encoding_block_size(config.from_udp_mtu, config.encoding_block_size);

debug!(
"adjusting encoding_block_size to {} bytes",
config.encoding_block_size
);

if let Err(e) = main_loop(config) {
error!("failed to launch main_loop: {e}");
}
Expand Down
25 changes: 6 additions & 19 deletions src/bin/diode-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use diode::{
protocol, semaphore,
send::{devector, encoding, tcp_client, udp_send},
};
use log::{debug, error, info};
use log::{error, info};
use std::{
env, fmt,
net::{SocketAddr, TcpListener, TcpStream},
Expand Down Expand Up @@ -201,18 +201,10 @@ fn connect_loop(
}

fn main() {
let mut config = command_args();
let config = command_args();

init_logger();

config.encoding_block_size =
protocol::adjust_encoding_block_size(config.to_udp_mtu, config.encoding_block_size);

debug!(
"adjusting encoding_block_size to {} bytes",
config.encoding_block_size
);

info!(
"accepting TCP clients at {} with read buffer of {} bytes",
config.from_tcp, config.from_tcp_buffer_size
Expand All @@ -222,20 +214,15 @@ fn main() {
buffer_size: config.from_tcp_buffer_size,
};

let object_transmission_info =
protocol::object_transmission_information(config.to_udp_mtu, config.encoding_block_size);

let encoding_config = encoding::Config {
logical_block_size: config.encoding_block_size,
object_transmission_info,
repair_block_size: config.repair_block_size,
output_mtu: config.to_udp_mtu,
flush_timeout: config.flush_timeout,
};

info!(
"encoding with block size of {} bytes and repair block size of {} bytes and a flush timeout of {} milliseconds",
encoding_config.logical_block_size,
encoding_config.repair_block_size,
encoding_config.flush_timeout.as_millis(),
);

let (connect_sendq, connect_recvq) = bounded::<TcpStream>(1);
let (tcp_sendq, tcp_recvq) = bounded::<protocol::ClientMessage>(config.nb_clients as usize);
let (devector_sendq, devector_recvq) = unbounded::<Vec<udp_send::Message>>();
Expand Down
27 changes: 23 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,28 @@ impl fmt::Display for ClientMessage {
}
}

pub(crate) const RAPTORQ_PAYLOAD_SIZE: u64 = 4;
const PACKET_HEADER_SIZE: u16 = 20 + 8;
const RAPTORQ_ALIGNMENT: u16 = 8;

pub fn adjust_encoding_block_size(mtu: u16, encoding_block_size: u64) -> u64 {
(mtu as u64 - RAPTORQ_PAYLOAD_SIZE)
* (encoding_block_size / (mtu as u64 - RAPTORQ_PAYLOAD_SIZE))
pub fn object_transmission_information(
mtu: u16,
logical_block_size: u64,
) -> raptorq::ObjectTransmissionInformation {
let data_mtu: u16 = RAPTORQ_ALIGNMENT * ((mtu - PACKET_HEADER_SIZE) / RAPTORQ_ALIGNMENT);

let nb_encoding_packets = logical_block_size / data_mtu as u64;

let encoding_block_size = data_mtu as u64 * nb_encoding_packets;

let data_mtu = (encoding_block_size / nb_encoding_packets) as u16;

raptorq::ObjectTransmissionInformation::with_defaults(encoding_block_size, data_mtu)
}

pub(crate) fn data_mtu(oti: &raptorq::ObjectTransmissionInformation) -> u16 {
oti.symbol_size()
}

pub(crate) fn nb_encoding_packets(oti: &raptorq::ObjectTransmissionInformation) -> u64 {
oti.transfer_length() / data_mtu(oti) as u64
}
36 changes: 22 additions & 14 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::protocol;
use crossbeam_channel::{Receiver, RecvTimeoutError};
use log::{debug, error, trace, warn};
use log::{debug, error, info, trace, warn};
use raptorq::{self, EncodingPacket, ObjectTransmissionInformation, SourceBlockDecoder};
use std::{
fmt,
Expand All @@ -10,9 +10,8 @@ use std::{
};

pub struct Config {
pub logical_block_size: u64,
pub object_transmission_info: ObjectTransmissionInformation,
pub flush_timeout: Duration,
pub input_mtu: u16,
}

enum Error {
Expand Down Expand Up @@ -54,17 +53,19 @@ fn main_loop(
udp_recvq: Receiver<Message>,
deserialize_socket: UnixStream,
) -> Result<(), Error> {
let oti =
ObjectTransmissionInformation::with_defaults(config.logical_block_size, config.input_mtu);
let encoding_block_size = config.object_transmission_info.transfer_length();

let mut deserialize_socket =
io::BufWriter::with_capacity(config.logical_block_size as usize, deserialize_socket);
io::BufWriter::with_capacity(encoding_block_size as usize, deserialize_socket);

let nb_normal_packets =
config.logical_block_size / (config.input_mtu as u64 - protocol::RAPTORQ_PAYLOAD_SIZE);
debug!(
"need at least {} packets for normal decoding",
nb_normal_packets
let nb_normal_packets = config.object_transmission_info.transfer_length()
/ config.object_transmission_info.symbol_size() as u64;

info!(
"decoding will expect {} packets ({} bytes per block) + flush timeout of {} ms",
protocol::nb_encoding_packets(&config.object_transmission_info),
encoding_block_size,
config.flush_timeout.as_millis()
);

let mut desynchro = true;
Expand All @@ -81,8 +82,11 @@ fn main_loop(

if nb_normal_packets as usize <= qlen {
debug!("trying to decode");
let mut decoder =
SourceBlockDecoder::new2(block_id, &oti, config.logical_block_size);
let mut decoder = SourceBlockDecoder::new2(
block_id,
&config.object_transmission_info,
encoding_block_size,
);

match decoder.decode(queue) {
None => {
Expand Down Expand Up @@ -136,7 +140,11 @@ fn main_loop(
}

// message block_id is from next block, flushing current block
let mut decoder = SourceBlockDecoder::new2(block_id, &oti, config.logical_block_size);
let mut decoder = SourceBlockDecoder::new2(
block_id,
&config.object_transmission_info,
encoding_block_size,
);

match decoder.decode(queue) {
None => warn!("lost block {block_id}"),
Expand Down
45 changes: 24 additions & 21 deletions src/send/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ use std::{collections::VecDeque, fmt, time::Duration};
use super::devector;

pub struct Config {
pub logical_block_size: u64,
pub object_transmission_info: ObjectTransmissionInformation,
pub repair_block_size: u32,
pub output_mtu: u16,
pub flush_timeout: Duration,
}

Expand Down Expand Up @@ -62,31 +61,30 @@ fn main_loop(
recvq: Receiver<protocol::ClientMessage>,
sendq: Sender<devector::Message>,
) -> Result<(), Error> {
let nb_repair_packets = config.repair_block_size / config.output_mtu as u32;
let nb_repair_packets =
config.repair_block_size / protocol::data_mtu(&config.object_transmission_info) as u32;

let encoding_block_size = config.object_transmission_info.transfer_length() as usize;

info!(
"encoding will produce {} packets ({} bytes per block) + {} repair packets + flush timeout of {} ms",
protocol::nb_encoding_packets(&config.object_transmission_info), encoding_block_size, nb_repair_packets, config.flush_timeout.as_millis()
);

if nb_repair_packets == 0 {
warn!("configuration produces 0 repair packets");
} else {
info!(
"{nb_repair_packets} repair packets ({} bytes) per encoding block will be produced",
nb_repair_packets * config.output_mtu as u32
);
warn!("configuration produces 0 repair packet");
}

let oti =
ObjectTransmissionInformation::with_defaults(config.logical_block_size, config.output_mtu);

let sbep = SourceBlockEncodingPlan::generate(
(config.logical_block_size / oti.symbol_size() as u64) as u16,
(config.object_transmission_info.transfer_length()
/ config.object_transmission_info.symbol_size() as u64) as u16,
);

debug!("object transformation information = {:?} ", oti);

let overhead = protocol::ClientMessage::serialize_padding_overhead();

debug!("padding encoding overhead is {} bytes", overhead);

let mut queue = VecDeque::with_capacity(config.logical_block_size as usize);
let mut queue = VecDeque::with_capacity(encoding_block_size);

let mut block_id = 0;

Expand All @@ -97,7 +95,7 @@ fn main_loop(
if queue.is_empty() {
continue;
}
let padding_needed = config.logical_block_size as usize - queue.len();
let padding_needed = encoding_block_size - queue.len();
let padding_len = if padding_needed < overhead {
debug!("top much padding overhead !");
0
Expand All @@ -124,14 +122,19 @@ fn main_loop(
_ => (),
}

while (config.logical_block_size as usize) <= queue.len() {
while encoding_block_size <= queue.len() {
// full block, we can flush
trace!("flushing queue len = {}", queue.len());
let data = &queue.make_contiguous()[..config.logical_block_size as usize];
let data = &queue.make_contiguous()[..encoding_block_size];

let encoder = SourceBlockEncoder::with_encoding_plan2(block_id, &oti, data, &sbep);
let encoder = SourceBlockEncoder::with_encoding_plan2(
block_id,
&config.object_transmission_info,
data,
&sbep,
);

let _ = queue.drain(0..config.logical_block_size as usize);
let _ = queue.drain(0..encoding_block_size);
trace!("after flushing queue len = {}", queue.len());

sendq.send(encoder.source_packets())?;
Expand Down

0 comments on commit c66f9b0

Please sign in to comment.