Skip to content

Commit

Permalink
quic: add more compatibility tests
Browse files Browse the repository at this point in the history
  • Loading branch information
riptl authored and ripatel-fd committed Dec 19, 2024
1 parent 801cce6 commit e947a13
Show file tree
Hide file tree
Showing 7 changed files with 584 additions and 5 deletions.
10 changes: 5 additions & 5 deletions contrib/quic/agave_compat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ crossbeam-channel = "0.5"
env_logger = "0.11"
libc = "0.2"
rand = "0.8.5"
solana-connection-cache = "2.1.4"
solana-client = "2.1.4"
solana-sdk = "2.1.4"
solana-streamer = "2.1.4"
solana-quic-client = "2.1.4"
solana-connection-cache = "2.1.6"
solana-client = "2.1.6"
solana-sdk = "2.1.6"
solana-streamer = "2.1.6"
solana-quic-client = "2.1.6"
18 changes: 18 additions & 0 deletions contrib/quic/rust_compat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "firedancer-rust-quic-test"
version = "0.1.0"
edition = "2021"
publish = false
build = "build.rs"

[build-dependencies]
bindgen = "0.71"

[dependencies]
env_logger = "0"
libc = "0.2"
quiche = { version = "0.22", features = ["qlog"] }
quinn = { version = "0.11", features = ["rustls-aws-lc-rs", "rustls-ring"] }
rustls = { version = "0.23", features = ["aws_lc_rs", "ring"] }
rustls-post-quantum = "0.2.1"
tokio = { version = "1.42", features = ["net", "rt"] }
52 changes: 52 additions & 0 deletions contrib/quic/rust_compat/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use std::env;
use std::path::PathBuf;

fn main() {
let cargo_manifest_dir = env::var("CARGO_MANIFEST_DIR").unwrap();
let mut build_path = PathBuf::new().join(cargo_manifest_dir);
build_path.pop();
build_path.pop();
build_path.pop();
build_path.push("build");
build_path.push("native");
build_path.push("gcc");

let mut lib_path = build_path.clone();
lib_path.push("lib");
println!("cargo:rustc-link-search={}", lib_path.to_str().unwrap());
for lib in &[
"fd_quic",
"fd_waltz", // net
"fd_tls",
"fd_tango", // spmc queues
"fd_ballet", // crypto
"fd_util",
] {
println!("cargo:rustc-link-lib=static={}", lib);
println!(
"cargo:rerun-if-changed={}/lib{}.a",
lib_path.to_str().unwrap(),
lib
);
}
println!("cargo:rustc-link-lib=static=stdc++"); // fd_tile_threads.cxx

let mut include_path = build_path.clone();
include_path.push("include");

let bindings = bindgen::Builder::default()
.header("wrapper.h")
.clang_args(&["-isystem", include_path.to_str().unwrap(), "-std=c17"])
.allowlist_type("fd_.*")
.allowlist_function("fd_.*")
.allowlist_var("FD_.*")
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
.generate()
.expect("Unable to generate bindings");
println!("cargo:rerun-if-changed=wrapper.h");

let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("bindings.rs"))
.expect("Couldn't write bindings!");
}
138 changes: 138 additions & 0 deletions contrib/quic/rust_compat/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use libc::{in_addr, sockaddr_in, socket, AF_INET, IPPROTO_UDP, SOCK_DGRAM};
use std::ffi::c_char;
use std::io::Write;
use std::net::Ipv4Addr;
use std::sync::Mutex;

mod quiche;
mod quinn;

#[allow(non_upper_case_globals)]
#[allow(non_camel_case_types)]
#[allow(non_snake_case)]
#[allow(unused)]
#[allow(clippy::all)]
pub(crate) mod bindings {
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
}

use crate::bindings::{fd_boot, fd_wksp_new_anon, fd_wksp_t};

pub(crate) unsafe fn fd_wksp_new_anonymous(
page_sz: u64,
page_cnt: u64,
cpu_idx: u64,
name: *const c_char,
opt_part_max: u64,
) -> *mut fd_wksp_t {
let sub_page_cnt = [page_cnt];
let sub_cpu_idx = [cpu_idx];
fd_wksp_new_anon(
name,
page_sz,
1,
sub_page_cnt.as_ptr(),
sub_cpu_idx.as_ptr(),
0,
opt_part_max,
)
}

pub(crate) unsafe fn new_udp_socket() -> (i32, u16) {
let udp_sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
assert!(udp_sock_fd > 0);

let mut listen_addr: sockaddr_in = std::mem::zeroed();
listen_addr.sin_family = AF_INET as u16;
listen_addr.sin_addr = in_addr {
s_addr: u32::from(Ipv4Addr::new(127, 0, 0, 1)).to_be(),
};
listen_addr.sin_port = 0;
assert!(
0 == libc::bind(
udp_sock_fd,
&listen_addr as *const sockaddr_in as *const libc::sockaddr,
std::mem::size_of_val(&listen_addr) as u32
)
);

let mut listen_addr_size = std::mem::size_of_val(&listen_addr) as u32;
assert!(
0 == libc::getsockname(
udp_sock_fd,
&mut listen_addr as *mut sockaddr_in as *mut libc::sockaddr,
&mut listen_addr_size
)
);
assert!(listen_addr_size == std::mem::size_of_val(&listen_addr) as u32);
let listen_port = u16::from_be(listen_addr.sin_port);
(udp_sock_fd, listen_port)
}

struct StdoutWriter {
lock: Mutex<()>,
}

impl StdoutWriter {
fn new() -> Self {
Self {
lock: Mutex::new(()),
}
}
}

impl Write for StdoutWriter {
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let guard = self.lock.lock().unwrap();
print!("{}", unsafe { std::str::from_utf8_unchecked(buf) });
drop(guard);
Ok(buf.len())
}

fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
}

static USAGE: &str = r"Usage: ./firedancer-quiche-quic-test <command>
Available commands are:
quiche-fd: Ping quiche client (BoringSSL) to fd_quic server
quinn-awslc-fd: Ping quinn client (rustls aws-ls-rc) to fd_quic server
quinn-pq-fd: Ping quinn client (rustls aws-ls-rc post quantum) to fd_quic server
quinn-ring-fd: Ping quinn client (rustls ring) to fd_quic server";

fn main() {
env_logger::init();
let arg = if let Some(arg) = std::env::args().nth(1) {
arg
} else {
eprintln!("{}", USAGE);
std::process::exit(1);
};

std::env::set_var("FD_LOG_PATH", "");
std::env::set_var("FD_LOG_LEVEL_LOGFILE", "0");
std::env::set_var("FD_LOG_LEVEL_STDERR", "0");
let mut argc = 1;
let mut argv = vec![b"test\0".as_ptr() as *mut c_char, std::ptr::null_mut()];
let mut argv_ptr = argv.as_mut_ptr();
unsafe {
fd_boot(&mut argc, &mut argv_ptr);
}

match arg.as_str() {
"quiche-fd" => unsafe { crate::quiche::quiche_to_fdquic() },
"quinn-awslc-fd" => unsafe {
crate::quinn::quinn_to_fdquic(rustls::crypto::aws_lc_rs::default_provider())
},
"quinn-pq-fd" => unsafe {
crate::quinn::quinn_to_fdquic(rustls_post_quantum::provider())
},
"quinn-ring-fd" => unsafe {
crate::quinn::quinn_to_fdquic(rustls::crypto::ring::default_provider())
},
_ => panic!("Unknown arg"),
}
}
167 changes: 167 additions & 0 deletions contrib/quic/rust_compat/src/quiche.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use crate::bindings::{
fd_aio_pcapng_get_aio, fd_aio_pcapng_join, fd_aio_pcapng_start, fd_aio_pcapng_t, fd_halt,
fd_pcapng_fwrite_tls_key_log, fd_quic_get_aio_net_rx, fd_quic_init,
fd_quic_new_anonymous_small, fd_quic_service, fd_quic_set_aio_net_tx, fd_quic_t, fd_rng_t,
fd_udpsock_align, fd_udpsock_footprint, fd_udpsock_get_tx, fd_udpsock_join, fd_udpsock_new,
fd_udpsock_service, fd_udpsock_set_rx, fd_udpsock_t, FD_QUIC_ROLE_SERVER,
};
use libc::{fflush, fopen, strlen, FILE};
use quiche::{ConnectionId, QlogLevel};
use std::ffi::{c_char, c_void, CString};
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicU32, Ordering};

pub(crate) unsafe fn quiche_to_fdquic() {
// Set up Firedancer components

let (udp_sock_fd, listen_port) = crate::new_udp_socket();

let wksp = crate::fd_wksp_new_anonymous(4096, 16384, 0, b"test\0".as_ptr() as *const c_char, 0);
assert!(!wksp.is_null(), "Failed to create workspace");

let mut rng = fd_rng_t {
idx: 0,
seq: 0x172046447c516741,
};

let udpsock_mem = std::alloc::alloc(std::alloc::Layout::from_size_align_unchecked(
fd_udpsock_footprint(2048, 256, 256) as usize,
fd_udpsock_align() as usize,
)) as *mut c_void;
let udpsock = fd_udpsock_join(fd_udpsock_new(udpsock_mem, 2048, 256, 256), udp_sock_fd);
assert!(!udpsock.is_null(), "Failed to create fd_udpsock_t");

let quic = fd_quic_new_anonymous_small(wksp, FD_QUIC_ROLE_SERVER as i32, &mut rng);
assert!(!quic.is_null(), "Failed to create fd_quic_t");
(*quic).config.retry = 1;

let pcap = std::env::var("PCAP").unwrap_or_default();
if !pcap.is_empty() {
let pcap_path_cstr = CString::new(pcap).unwrap();
let pcap_file = fopen(
pcap_path_cstr.as_ptr() as *const c_char,
"wb\x00".as_ptr() as *const c_char,
);
assert!(!pcap_file.is_null());
fd_aio_pcapng_start(pcap_file as *mut c_void);
fflush(pcap_file);

static mut PCAP_FILE_GLOB: *mut FILE = std::ptr::null_mut();
PCAP_FILE_GLOB = pcap_file;

let mut aio_pcapng1_mem: fd_aio_pcapng_t = MaybeUninit::zeroed().assume_init();
let mut aio_pcapng2_mem: fd_aio_pcapng_t = MaybeUninit::zeroed().assume_init();
let aio_pcapng1 = fd_aio_pcapng_join(
&mut aio_pcapng1_mem as *mut fd_aio_pcapng_t as *mut c_void,
fd_udpsock_get_tx(udpsock),
pcap_file as *mut c_void,
);
let aio_pcapng2 = fd_aio_pcapng_join(
&mut aio_pcapng2_mem as *mut fd_aio_pcapng_t as *mut c_void,
fd_quic_get_aio_net_rx(quic),
pcap_file as *mut c_void,
);
assert!(!aio_pcapng1.is_null());
assert!(!aio_pcapng2.is_null());

fd_quic_set_aio_net_tx(quic, fd_aio_pcapng_get_aio(aio_pcapng1));
fd_udpsock_set_rx(udpsock, fd_aio_pcapng_get_aio(aio_pcapng2));

unsafe extern "C" fn tls_keylog_cb(_ctx: *mut c_void, line: *const c_char) {
fd_pcapng_fwrite_tls_key_log(
line as *const u8,
strlen(line) as u32,
PCAP_FILE_GLOB as *mut c_void,
);
}
(*quic).cb.tls_keylog = Some(tls_keylog_cb);
} else {
fd_quic_set_aio_net_tx(quic, fd_udpsock_get_tx(udpsock));
fd_udpsock_set_rx(udpsock, fd_quic_get_aio_net_rx(quic));
}

assert!(!fd_quic_init(quic).is_null(), "fd_quic_init failed");

let udpsock2 = udpsock as usize;
let quic2 = quic as usize;
let stop_ptr = Box::leak(Box::new(AtomicU32::new(0))) as *mut AtomicU32 as usize;
let fd_quic_thread = std::thread::spawn(move || {
let stop = stop_ptr as *mut AtomicU32;
let udpsock3: *mut fd_udpsock_t = udpsock2 as *mut fd_udpsock_t;
let quic3: *mut fd_quic_t = quic2 as *mut fd_quic_t;
while (*stop).load(Ordering::Relaxed) == 0 {
fd_udpsock_service(udpsock3);
fd_quic_service(quic3);
}
let metrics = &(*quic3).metrics.__bindgen_anon_1;
// Limit packet counts to reasonable numbers
eprintln!("Received {} packets", metrics.net_rx_pkt_cnt);
assert!(metrics.net_rx_pkt_cnt < 64);
assert!(metrics.net_tx_pkt_cnt < metrics.net_rx_pkt_cnt);
assert!(metrics.net_tx_byte_cnt < metrics.net_rx_byte_cnt);
assert!(metrics.conn_active_cnt <= 1);
assert!(metrics.conn_created_cnt == 1);
assert!(metrics.conn_closed_cnt <= 1);
assert!(metrics.conn_aborted_cnt <= 1);
assert!(metrics.conn_retry_cnt == 1);
assert!(metrics.conn_err_no_slots_cnt == 0);
assert!(metrics.conn_err_retry_fail_cnt == 0);
assert!(metrics.hs_created_cnt == 1);
assert!(metrics.hs_err_alloc_fail_cnt == 0);
eprintln!("Shutting down fd_quic");
});

// Set up quiche components

let mut config = quiche::Config::new(1).unwrap();
config
.set_application_protos_wire_format(b"\x0asolana-tpu")
.unwrap();

let socket2 = UdpSocket::bind("127.0.0.1:0").unwrap();
let local = socket2.local_addr().unwrap();

let mut conn = quiche::connect(
None,
&ConnectionId::from_ref(&[0x77]),
local,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), listen_port),
&mut config,
)
.expect("quiche::connect failed");

conn.set_qlog_with_level(
Box::new(crate::StdoutWriter::new()),
"".to_string(),
"".to_string(),
QlogLevel::Extra,
);

let mut buf = [0u8; 1232];
while !conn.is_established() {
loop {
let (write, send_info) = match conn.send(&mut buf) {
Ok(v) => v,
Err(quiche::Error::Done) => break,
Err(e) => panic!("quiche Error {:?}", e),
};
socket2.send_to(&buf[..write], send_info.to).unwrap();
}

let (read, from) = socket2.recv_from(&mut buf).unwrap();
let recv_info = quiche::RecvInfo { from, to: local };

match conn.recv(&mut buf[..read], recv_info) {
Ok(_) | Err(quiche::Error::Done) => {}
Err(e) => panic!("quiche Error {:?}", e),
};
}
conn.close(true, 0, b"bye")
.expect("quiche::Connection::close failed");

let stop = stop_ptr as *mut AtomicU32;
(*stop).store(1, Ordering::Relaxed);
fd_quic_thread.join().unwrap();
fd_halt();
}
Loading

0 comments on commit e947a13

Please sign in to comment.