Skip to content

Commit

Permalink
Fix fragmentation (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland authored Dec 18, 2023
1 parent a65a8ff commit 6316838
Show file tree
Hide file tree
Showing 17 changed files with 586 additions and 17 deletions.
29 changes: 26 additions & 3 deletions .github/workflows/build-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
uses: actions/checkout@v4

- name: Run docker image
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest

- name: Build project
run: |
Expand Down Expand Up @@ -86,12 +86,35 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4

- name: Build project
- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/raweth.py --reth $Z_FEATURE_RAWETH_TRANSPORT
timeout-minutes: 5
env:
Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }}


fragment_test:
name: Test multicast and unicast fragmentation
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Run docker image
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest

- name: Build project and run test
run: |
sudo apt install -y ninja-build
cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/fragment.py
timeout-minutes: 5

- name: Stop docker image
if: always()
run: |
docker stop zenoh_router
docker rm zenoh_router
16 changes: 16 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ option(WITH_ZEPHYR "Build for Zephyr RTOS" OFF)
option(WITH_FREERTOS_PLUS_TCP "Build for FreeRTOS RTOS and FreeRTOS-Plus-TCP network stack" OFF)
set(ZENOH_DEBUG 0 CACHE STRING "Use this to set the ZENOH_DEBUG variable")
set(FRAG_MAX_SIZE 0 CACHE STRING "Use this to override the maximum size for fragmented messages")
set(BATCH_UNICAST_SIZE 0 CACHE STRING "Use this to override the maximum unicast batch size")
set(CMAKE_EXPORT_COMPILE_COMMANDS ON CACHE INTERNAL "")
if(CMAKE_EXPORT_COMPILE_COMMANDS)
set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES
Expand Down Expand Up @@ -106,6 +107,9 @@ add_definition(ZENOH_DEBUG=${ZENOH_DEBUG})
if(FRAG_MAX_SIZE)
add_definition(Z_FRAG_MAX_SIZE=${FRAG_MAX_SIZE})
endif()
if (BATCH_UNICAST_SIZE)
add_definition(Z_BATCH_UNICAST_SIZE=${BATCH_UNICAST_SIZE})
endif()

# Zenoh pico feature configuration options
set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature")
Expand All @@ -132,6 +136,9 @@ message(STATUS "Zenoh Level Log: ${ZENOH_DEBUG}")
if(FRAG_MAX_SIZE)
message(STATUS "Fragmented message max size: ${FRAG_MAX_SIZE}")
endif()
if(BATCH_UNICAST_SIZE)
message(STATUS "Unicast batch max size: ${BATCH_UNICAST_SIZE}")
endif()
message(STATUS "Build for Zephyr RTOS: ${WITH_ZEPHYR}")
message(STATUS "Build for FreeRTOS-Plus-TCP: ${WITH_FREERTOS_PLUS_TCP}")
message(STATUS "Configuring for ${CMAKE_SYSTEM_NAME}")
Expand Down Expand Up @@ -301,6 +308,10 @@ if(UNIX OR MSVC)
add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c)
add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c)
add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c)
add_executable(z_test_fragment_tx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_tx.c)
add_executable(z_test_fragment_rx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_rx.c)
add_executable(z_perf_tx ${PROJECT_SOURCE_DIR}/tests/z_perf_tx.c)
add_executable(z_perf_rx ${PROJECT_SOURCE_DIR}/tests/z_perf_rx.c)

target_link_libraries(z_data_struct_test ${Libname})
target_link_libraries(z_endpoint_test ${Libname})
Expand All @@ -309,9 +320,14 @@ if(UNIX OR MSVC)
target_link_libraries(z_keyexpr_test ${Libname})
target_link_libraries(z_api_null_drop_test ${Libname})
target_link_libraries(z_api_double_drop_test ${Libname})
target_link_libraries(z_test_fragment_tx ${Libname})
target_link_libraries(z_test_fragment_rx ${Libname})
target_link_libraries(z_perf_tx ${Libname})
target_link_libraries(z_perf_rx ${Libname})

configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/fragment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/fragment.py COPYONLY)

enable_testing()
add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test)
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present
#define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero

#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header

// Flags:
// - T: Timestamp If T==1 then the timestamp if present
// - E: Encoding If E==1 then the encoding is present
Expand Down
3 changes: 1 addition & 2 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) {
int8_t ret = _Z_RES_OK;

if ((wbf->_expansion_step != 0) && (bs->len > Z_TSID_LENGTH)) {
// ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
} else {
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
int8_t _z_decl_ext_keyexpr_encode(_z_wbuf_t *wbf, _z_keyexpr_t ke, _Bool has_next_ext) {
uint8_t header = _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
uint32_t kelen = _z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0;
uint32_t kelen = (uint32_t)(_z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0);
header = (_z_keyexpr_is_local(&ke) ? 2 : 0) | (kelen != 0 ? 1 : 0);
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, 1 + kelen + _z_zint_len(ke._id)));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) {
_z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
case _Z_MSG_EXT_ENC_ZINT | 0x01: {
msg->_ext_qos._val = extension->_body._zint._val;
msg->_ext_qos._val = (uint8_t)extension->_body._zint._val;
break;
}
case _Z_MSG_EXT_ENC_ZBUF | 0x02: {
Expand Down Expand Up @@ -450,7 +450,7 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
_z_n_msg_declare_t *decl = (_z_n_msg_declare_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
case _Z_MSG_EXT_ENC_ZINT | 0x01: {
decl->_ext_qos._val = extension->_body._zint._val;
decl->_ext_qos._val = (uint8_t)extension->_body._zint._val;
break;
}
case _Z_MSG_EXT_ENC_ZBUF | 0x02: {
Expand Down
2 changes: 1 addition & 1 deletion src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
case _Z_REQUEST_QUERY: {
#if Z_FEATURE_QUERYABLE == 1
_z_msg_query_t *query = &req._body._query;
ret = _z_trigger_queryables(zn, query, req._key, req._rid);
ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n");
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
// Encode the message on the expandable wbuf
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Fragment and send the message
Expand Down
16 changes: 12 additions & 4 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
size_t dbuf_size = 0;
size_t wbuf_size = 0;
size_t zbuf_size = 0;
_Bool expandable = false;

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
// Add stream length field to buffer size
wbuf_size = mtu + _Z_MSG_LEN_ENC_SIZE;
zbuf_size = Z_BATCH_UNICAST_SIZE + _Z_MSG_LEN_ENC_SIZE;
expandable = true;
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
wbuf_size = mtu;
zbuf_size = Z_BATCH_UNICAST_SIZE;
expandable = false;
break;
}
Expand All @@ -67,16 +74,17 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
expandable = false;
dbuf_size = Z_FRAG_MAX_SIZE;
#endif
zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
// Initialize tx rx buffers
zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size);

// Initialize the defragmentation buffers
zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable);
zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) ||
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) ||
#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) ||
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
132 changes: 132 additions & 0 deletions tests/fragment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import subprocess
import sys
import time

# Specify the directory for the binaries
DIR_TESTS = "build/tests"

def check_output(tx_status, tx_output, rx_status, rx_output):
test_status = 0

# Expected tx output & status
z_tx_expected_status = 0
z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 10000"
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
print("z_tx status valid")
else:
print(f"z_tx status invalid, expected: {z_tx_expected_status}, received: {tx_status}")
test_status = 1

# Check output of tx
if z_tx_expected_output in tx_output:
print("z_tx output valid")
else:
print("z_tx output invalid:")
print(f"Expected: \"{z_tx_expected_output}\"")
print(f"Received: \"{tx_output}\"")
test_status = 1

# Check the exit status of z_rx
if rx_status == z_rx_expected_status:
print("z_rx status valid")
else:
print(f"z_rx status invalid, expected: {z_rx_expected_status}, received: {rx_status}")
test_status = 1

# Check output of z_rx
if z_rx_expected_output in rx_output:
print("z_rx output valid")
else:
print("z_rx output invalid:")
print(f"Expected: \"{z_rx_expected_output}\"")
print(f"Received: \"{rx_output}\"")
test_status = 1
# Return value
return test_status

def test_client():
# Start rx in the background
print("Start rx client")
z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx"
z_rx_process = subprocess.Popen(z_rx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# Introduce a delay to ensure rx starts
time.sleep(2)
# Start tx
print("Start tx client")
z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx"
z_tx_process = subprocess.Popen(z_tx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# Wait for tx to finish
z_tx_process.wait()
# Wait for rx to receive
time.sleep(1)
print("Stop rx")
if z_rx_process.poll() is None:
# Send "q" command to rx to stop it
z_rx_process.stdin.write("q\n")
z_rx_process.stdin.flush()
# Wait for rx to finish
z_rx_process.wait()
# Check output
return check_output(z_tx_process.returncode, z_tx_process.stdout.read(),
z_rx_process.returncode, z_rx_process.stdout.read())

def test_peer():
# Start rx in the background
print("Start rx peer")
z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx 1"
z_rx_process = subprocess.Popen(z_rx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# Introduce a delay to ensure rx starts
time.sleep(2)
# Start tx
print("Start tx peer")
z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx 1"
z_tx_process = subprocess.Popen(z_tx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# Wait for tx to finish
z_tx_process.wait()
# Wait for rx to receive
time.sleep(1)
print("Stop rx")
if z_rx_process.poll() is None:
# Send "q" command to rx to stop it
z_rx_process.stdin.write("q\n")
z_rx_process.stdin.flush()
# Wait for rx to finish
z_rx_process.wait()
# Check output
return check_output(z_tx_process.returncode, z_tx_process.stdout.read(),
z_rx_process.returncode, z_rx_process.stdout.read())

if __name__ == "__main__":
EXIT_STATUS = 0

# Run tests
if test_client() == 1:
EXIT_STATUS = 1
if test_peer() == 1:
EXIT_STATUS = 1
# Exit
sys.exit(EXIT_STATUS)
Loading

0 comments on commit 6316838

Please sign in to comment.