From 63168384689bc5846cf9fdb90743824768963fdd Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Mon, 18 Dec 2023 13:00:00 +0100 Subject: [PATCH] Fix fragmentation (#297) --- .github/workflows/build-check.yaml | 29 +++- CMakeLists.txt | 16 +++ .../zenoh-pico/protocol/definitions/message.h | 2 + src/protocol/codec.c | 3 +- src/protocol/codec/declarations.c | 2 +- src/protocol/codec/network.c | 4 +- src/session/rx.c | 2 +- src/transport/multicast/tx.c | 2 +- src/transport/raweth/tx.c | 2 +- src/transport/unicast/transport.c | 16 ++- src/transport/unicast/tx.c | 2 +- tests/fragment.py | 132 ++++++++++++++++++ tests/z_perf_rx.c | 115 +++++++++++++++ tests/z_perf_tx.c | 104 ++++++++++++++ tests/z_test_fragment_rx.c | 90 ++++++++++++ tests/z_test_fragment_tx.c | 80 +++++++++++ zenohpico.pc | 2 +- 17 files changed, 586 insertions(+), 17 deletions(-) create mode 100644 tests/fragment.py create mode 100644 tests/z_perf_rx.c create mode 100644 tests/z_perf_tx.c create mode 100644 tests/z_test_fragment_rx.c create mode 100644 tests/z_test_fragment_tx.c diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index b5b96943c..01ab37196 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -56,7 +56,7 @@ jobs: uses: actions/checkout@v4 - name: Run docker image - run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest - name: Build project run: | @@ -86,7 +86,7 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Build project + - name: Build project and run test run: | sudo apt install -y ninja-build CMAKE_GENERATOR=Ninja make @@ -94,4 +94,27 @@ jobs: timeout-minutes: 5 env: Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }} - \ No newline at end of file + + fragment_test: + name: Test multicast and unicast fragmentation + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run docker image + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest + + - name: Build project and run test + run: | + sudo apt install -y ninja-build + cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja + CMAKE_GENERATOR=Ninja make + python3 ./build/tests/fragment.py + timeout-minutes: 5 + + - name: Stop docker image + if: always() + run: | + docker stop zenoh_router + docker rm zenoh_router diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fc9605b9..426f85cc6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ option(WITH_ZEPHYR "Build for Zephyr RTOS" OFF) option(WITH_FREERTOS_PLUS_TCP "Build for FreeRTOS RTOS and FreeRTOS-Plus-TCP network stack" OFF) set(ZENOH_DEBUG 0 CACHE STRING "Use this to set the ZENOH_DEBUG variable") set(FRAG_MAX_SIZE 0 CACHE STRING "Use this to override the maximum size for fragmented messages") +set(BATCH_UNICAST_SIZE 0 CACHE STRING "Use this to override the maximum unicast batch size") set(CMAKE_EXPORT_COMPILE_COMMANDS ON CACHE INTERNAL "") if(CMAKE_EXPORT_COMPILE_COMMANDS) set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES @@ -106,6 +107,9 @@ add_definition(ZENOH_DEBUG=${ZENOH_DEBUG}) if(FRAG_MAX_SIZE) add_definition(Z_FRAG_MAX_SIZE=${FRAG_MAX_SIZE}) endif() +if (BATCH_UNICAST_SIZE) + add_definition(Z_BATCH_UNICAST_SIZE=${BATCH_UNICAST_SIZE}) +endif() # Zenoh pico feature configuration options set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") @@ -132,6 +136,9 @@ message(STATUS "Zenoh Level Log: ${ZENOH_DEBUG}") if(FRAG_MAX_SIZE) message(STATUS "Fragmented message max size: ${FRAG_MAX_SIZE}") endif() +if(BATCH_UNICAST_SIZE) + message(STATUS "Unicast batch max size: ${BATCH_UNICAST_SIZE}") +endif() message(STATUS "Build for Zephyr RTOS: ${WITH_ZEPHYR}") message(STATUS "Build for FreeRTOS-Plus-TCP: ${WITH_FREERTOS_PLUS_TCP}") message(STATUS "Configuring for ${CMAKE_SYSTEM_NAME}") @@ -301,6 +308,10 @@ if(UNIX OR MSVC) add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c) add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c) add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c) + add_executable(z_test_fragment_tx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_tx.c) + add_executable(z_test_fragment_rx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_rx.c) + add_executable(z_perf_tx ${PROJECT_SOURCE_DIR}/tests/z_perf_tx.c) + add_executable(z_perf_rx ${PROJECT_SOURCE_DIR}/tests/z_perf_rx.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) @@ -309,9 +320,14 @@ if(UNIX OR MSVC) target_link_libraries(z_keyexpr_test ${Libname}) target_link_libraries(z_api_null_drop_test ${Libname}) target_link_libraries(z_api_double_drop_test ${Libname}) + target_link_libraries(z_test_fragment_tx ${Libname}) + target_link_libraries(z_test_fragment_rx ${Libname}) + target_link_libraries(z_perf_tx ${Libname}) + target_link_libraries(z_perf_rx ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) + configure_file(${PROJECT_SOURCE_DIR}/tests/fragment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/fragment.py COPYONLY) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index 703ed6172..eea54a70e 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -45,6 +45,8 @@ #define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present #define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero +#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header + // Flags: // - T: Timestamp If T==1 then the timestamp if present // - E: Encoding If E==1 then the encoding is present diff --git a/src/protocol/codec.c b/src/protocol/codec.c index 7b4be95f4..225253418 100644 --- a/src/protocol/codec.c +++ b/src/protocol/codec.c @@ -296,8 +296,7 @@ int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) { int8_t ret = _Z_RES_OK; if ((wbf->_expansion_step != 0) && (bs->len > Z_TSID_LENGTH)) { - // ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len); - ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len); + ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len); } else { ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len); } diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 7e474fb6a..e78a94e2a 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -36,7 +36,7 @@ int8_t _z_decl_ext_keyexpr_encode(_z_wbuf_t *wbf, _z_keyexpr_t ke, _Bool has_next_ext) { uint8_t header = _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - uint32_t kelen = _z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0; + uint32_t kelen = (uint32_t)(_z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0); header = (_z_keyexpr_is_local(&ke) ? 2 : 0) | (kelen != 0 ? 1 : 0); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, 1 + kelen + _z_zint_len(ke._id))); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 28abb51d1..bdbbdfb84 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -326,7 +326,7 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) { _z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { - msg->_ext_qos._val = extension->_body._zint._val; + msg->_ext_qos._val = (uint8_t)extension->_body._zint._val; break; } case _Z_MSG_EXT_ENC_ZBUF | 0x02: { @@ -450,7 +450,7 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { _z_n_msg_declare_t *decl = (_z_n_msg_declare_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { - decl->_ext_qos._val = extension->_body._zint._val; + decl->_ext_qos._val = (uint8_t)extension->_body._zint._val; break; } case _Z_MSG_EXT_ENC_ZBUF | 0x02: { diff --git a/src/session/rx.c b/src/session/rx.c index 28d225827..00e708a14 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -92,7 +92,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_REQUEST_QUERY: { #if Z_FEATURE_QUERYABLE == 1 _z_msg_query_t *query = &req._body._query; - ret = _z_trigger_queryables(zn, query, req._key, req._rid); + ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid); #else _Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n"); #endif diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 4b9d034b5..4b2d77c6c 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 2f9ca8ed5..c9984bc48 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -271,7 +271,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Fragment and send the message diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 15f660bd8..b41a9ce55 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -51,14 +51,21 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; size_t dbuf_size = 0; + size_t wbuf_size = 0; + size_t zbuf_size = 0; _Bool expandable = false; switch (zl->_cap._flow) { case Z_LINK_CAP_FLOW_STREAM: + // Add stream length field to buffer size + wbuf_size = mtu + _Z_MSG_LEN_ENC_SIZE; + zbuf_size = Z_BATCH_UNICAST_SIZE + _Z_MSG_LEN_ENC_SIZE; expandable = true; break; case Z_LINK_CAP_FLOW_DATAGRAM: default: + wbuf_size = mtu; + zbuf_size = Z_BATCH_UNICAST_SIZE; expandable = false; break; } @@ -67,16 +74,17 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo expandable = false; dbuf_size = Z_FRAG_MAX_SIZE; #endif - zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); + // Initialize tx rx buffers + zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false); + zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size); // Initialize the defragmentation buffers zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) || + if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) || + (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) || #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 25caabee6..7fce59560 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { diff --git a/tests/fragment.py b/tests/fragment.py new file mode 100644 index 000000000..d29295f0f --- /dev/null +++ b/tests/fragment.py @@ -0,0 +1,132 @@ +import subprocess +import sys +import time + +# Specify the directory for the binaries +DIR_TESTS = "build/tests" + +def check_output(tx_status, tx_output, rx_status, rx_output): + test_status = 0 + + # Expected tx output & status + z_tx_expected_status = 0 + z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 10000" + # Expected rx output & status + z_rx_expected_status = 0 + z_rx_expected_output = ( + "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1") + + # Check the exit status of tx + if tx_status == z_tx_expected_status: + print("z_tx status valid") + else: + print(f"z_tx status invalid, expected: {z_tx_expected_status}, received: {tx_status}") + test_status = 1 + + # Check output of tx + if z_tx_expected_output in tx_output: + print("z_tx output valid") + else: + print("z_tx output invalid:") + print(f"Expected: \"{z_tx_expected_output}\"") + print(f"Received: \"{tx_output}\"") + test_status = 1 + + # Check the exit status of z_rx + if rx_status == z_rx_expected_status: + print("z_rx status valid") + else: + print(f"z_rx status invalid, expected: {z_rx_expected_status}, received: {rx_status}") + test_status = 1 + + # Check output of z_rx + if z_rx_expected_output in rx_output: + print("z_rx output valid") + else: + print("z_rx output invalid:") + print(f"Expected: \"{z_rx_expected_output}\"") + print(f"Received: \"{rx_output}\"") + test_status = 1 + # Return value + return test_status + +def test_client(): + # Start rx in the background + print("Start rx client") + z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx" + z_rx_process = subprocess.Popen(z_rx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True) + # Introduce a delay to ensure rx starts + time.sleep(2) + # Start tx + print("Start tx client") + z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx" + z_tx_process = subprocess.Popen(z_tx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + # Wait for tx to finish + z_tx_process.wait() + # Wait for rx to receive + time.sleep(1) + print("Stop rx") + if z_rx_process.poll() is None: + # Send "q" command to rx to stop it + z_rx_process.stdin.write("q\n") + z_rx_process.stdin.flush() + # Wait for rx to finish + z_rx_process.wait() + # Check output + return check_output(z_tx_process.returncode, z_tx_process.stdout.read(), + z_rx_process.returncode, z_rx_process.stdout.read()) + +def test_peer(): + # Start rx in the background + print("Start rx peer") + z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx 1" + z_rx_process = subprocess.Popen(z_rx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True) + # Introduce a delay to ensure rx starts + time.sleep(2) + # Start tx + print("Start tx peer") + z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx 1" + z_tx_process = subprocess.Popen(z_tx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + # Wait for tx to finish + z_tx_process.wait() + # Wait for rx to receive + time.sleep(1) + print("Stop rx") + if z_rx_process.poll() is None: + # Send "q" command to rx to stop it + z_rx_process.stdin.write("q\n") + z_rx_process.stdin.flush() + # Wait for rx to finish + z_rx_process.wait() + # Check output + return check_output(z_tx_process.returncode, z_tx_process.stdout.read(), + z_rx_process.returncode, z_rx_process.stdout.read()) + +if __name__ == "__main__": + EXIT_STATUS = 0 + + # Run tests + if test_client() == 1: + EXIT_STATUS = 1 + if test_peer() == 1: + EXIT_STATUS = 1 + # Exit + sys.exit(EXIT_STATUS) diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c new file mode 100644 index 000000000..7a67e153a --- /dev/null +++ b/tests/z_perf_rx.c @@ -0,0 +1,115 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico.h" + +typedef struct { + volatile unsigned long count; + unsigned long curr_len; + z_clock_t start; +} z_stats_t; + +static z_stats_t test_stats; +static volatile bool test_end; + +#if Z_FEATURE_SUBSCRIPTION == 1 +void z_stats_stop(z_stats_t *stats) { + // Ignore default value + if (stats->curr_len == 0) { + return; + } + // Print values + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); + printf("End test for pkt len: %lu, msg nb: %lu, time ms: %lu\n", stats->curr_len, stats->count, elapsed_ms); + stats->count = 0; +} + +void on_sample(const z_sample_t *sample, void *context) { + z_stats_t *stats = (z_stats_t *)context; + + if (stats->curr_len != sample->payload.len) { + // End previous measurement + z_stats_stop(stats); + // Check for end packet + stats->curr_len = (unsigned long)sample->payload.len; + if (sample->payload.len == 1) { + test_end = true; + return; + } + // Start new measurement + printf("Starting test for pkt len: %lu\n", stats->curr_len); + stats->start = z_clock_now(); + } + stats->count++; +} + +int main(int argc, char **argv) { + char *keyexpr = "test/thr"; + const char *mode = "client"; + char *llocator = NULL; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare Subscriber/resource + z_owned_closure_sample_t callback = z_closure(on_sample, NULL, (void *)&test_stats); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to create subscriber.\n"); + exit(-1); + } + // Listen until stopped + printf("Start listening.\n"); + while (!test_end) { + } + // Wait for everything to settle + printf("End of test\n"); + z_sleep_s(1); + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this test requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c new file mode 100644 index 000000000..454d72de2 --- /dev/null +++ b/tests/z_perf_tx.c @@ -0,0 +1,104 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) +#define TEST_DURATION_US 10000000 + +#if Z_FEATURE_PUBLICATION == 1 +int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value) { + z_clock_t test_start = z_clock_now(); + unsigned long elapsed_us = 0; + while (elapsed_us < TEST_DURATION_US) { + if (z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL) != 0) { + printf("Put failed for pkt len: %lu\n", pkt_len); + return -1; + } + elapsed_us = z_clock_elapsed_us(&test_start); + } + return 0; +} + +int main(int argc, char **argv) { + unsigned long len_array[] = {1048576, 524288, 262144, 131072, 65536, 32768, 16384, 8192, 4096, + 2048, 1024, 512, 256, 128, 64, 32, 16, 8}; // Biggest value first + uint8_t *value = (uint8_t *)malloc(len_array[0]); + memset(value, 1, len_array[0]); + char *keyexpr = "test/thr"; + const char *mode = "client"; + char *llocator = NULL; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare publisher + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + // Wait for joins + if (strcmp(mode, "peer") == 0) { + printf("Waiting for JOIN messages\n"); + z_sleep_s(3); + } + // Send packets + for (size_t i = 0; i < ARRAY_SIZE(len_array); i++) { + printf("Start sending pkt len: %lu\n", len_array[i]); + if (send_packets(len_array[i], &pub, value) != 0) { + break; + } + } + // Send end packet + printf("Sending end pkt\n"); + z_publisher_put(z_loan(pub), (const uint8_t *)value, 1, NULL); + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + exit(0); +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this test requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c new file mode 100644 index 000000000..b236b1cfc --- /dev/null +++ b/tests/z_test_fragment_rx.c @@ -0,0 +1,90 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 +void data_handler(const z_sample_t *sample, void *ctx) { + (void)(ctx); + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + bool is_valid = true; + const uint8_t *data = sample->payload.start; + for (size_t i = 0; i < sample->payload.len; i++) { + if (data[i] != (uint8_t)i) { + is_valid = false; + break; + } + } + printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid); + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + const char *keyexpr = "test/zenoh-pico-fragment"; + const char *mode = "client"; + char *llocator = NULL; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Declare subscriber + z_owned_closure_sample_t callback = z_closure(data_handler); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } + // Wait for termination + char c = '\0'; + while (c != 'q') { + fflush(stdin); + int ret = scanf("%c", &c); + (void)ret; // Remove unused result warning + } + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this test requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c new file mode 100644 index 000000000..bec1c673d --- /dev/null +++ b/tests/z_test_fragment_tx.c @@ -0,0 +1,80 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include + +#if Z_FEATURE_PUBLICATION == 1 +int main(int argc, char **argv) { + const char *keyexpr = "test/zenoh-pico-fragment"; + const char *mode = "client"; + char *llocator = NULL; + uint8_t *value = NULL; + size_t size = 10000; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Init value + value = malloc(size); + if (value == NULL) { + return -1; + } + for (size_t i = 0; i < size; i++) { + value[i] = (uint8_t)i; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Put data + z_put_options_t options = z_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + for (int i = 0; i < 5; i++) { + printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); + if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) { + printf("Oh no! Put has failed...\n"); + } + } + // Clean up + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this test requires it.\n"); + return -2; +} +#endif diff --git a/zenohpico.pc b/zenohpico.pc index e08e8cce2..023120da0 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20231031dev +Version: 0.11.20231218dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico