Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raweth data length header field #294

Merged
merged 8 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/zenoh-pico/protocol/codec/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
} \
}

#define _Z_CLEAN_RETURN_IF_ERR(base_expr, clean_expr) \
{ \
int8_t __res = base_expr; \
if (__res != _Z_RES_OK) { \
clean_expr; \
return __res; \
} \
}

/*------------------ Internal Zenoh-net Macros ------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en);
int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ typedef struct {
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not true, the fragment header size may vary depending on the used protocol extension.
As a consequence, basing the code logic on this value may lead to some bugs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we should fix this in another PR, I opened an issue on the overall issue: #295


/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down
6 changes: 5 additions & 1 deletion include/zenoh-pico/system/link/raweth.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
#define _ZP_MAC_ADDR_LENGTH 6

// Max frame size
#define _ZP_MAX_ETH_FRAME_SIZE 1500
#define _ZP_MAX_ETH_FRAME_SIZE 1514

// Ethernet header structure type
typedef struct {
uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address
uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address
uint16_t ethtype; // Ethertype of frame
uint16_t data_length; // Payload length
} _zp_eth_header_t;

typedef struct {
Expand All @@ -44,6 +45,7 @@ typedef struct {
uint16_t vlan_type; // Vlan ethtype
uint16_t tag; // Vlan tag
uint16_t ethtype; // Ethertype of frame
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need ethtype then? As far as I understand it, data_legnth replaces ethtype.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

802.3 raw requires us to use a 802.2 LLC header which would be 3 bytes of useless information for us, so I figured an ethertype and a frame length was the most efficient choice.

uint16_t data_length; // Payload length
} _zp_eth_vlan_header_t;

typedef struct {
Expand All @@ -61,6 +63,8 @@ int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface);
size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len);
size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr);
int8_t _z_close_raweth(_z_sys_net_socket_t *sock);
size_t _z_raweth_ntohs(size_t val);
size_t _z_raweth_htons(size_t val);

#endif

Expand Down
4 changes: 4 additions & 0 deletions src/system/unix/link/raweth.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,9 @@ size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buf
return bytesRead;
}

size_t _z_raweth_ntohs(size_t val) { return ntohs(val); }

size_t _z_raweth_htons(size_t val) { return htons(val); }

#endif // defined(__linux)
#endif // Z_FEATURE_RAWETH_TRANSPORT == 1
2 changes: 1 addition & 1 deletion src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fragmentation buffer should allocate a small amount of memory and it shouldn't be dependent on the fragment header size.


ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
27 changes: 20 additions & 7 deletions src/transport/raweth/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@

#if Z_FEATURE_RAWETH_TRANSPORT == 1

void print_buf(_z_zbuf_t *buf) {
printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity);
}

static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) {
uint8_t *buff = _z_zbuf_get_wptr(zbf);
size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr);
Expand All @@ -48,14 +44,31 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z
if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) {
return SIZE_MAX;
}
// Update buffer but skip eth header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb);
size_t data_length = 0;
if (has_vlan) {
_zp_eth_vlan_header_t *header = (_zp_eth_vlan_header_t *)buff;
// Retrieve data length
data_length = _z_raweth_ntohs(header->data_length);
if (rb < (data_length + sizeof(_zp_eth_vlan_header_t))) {
// Invalid data_length
return SIZE_MAX;
}
// Skip header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_vlan_header_t) + data_length);
_z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t));
} else {
_zp_eth_header_t *header = (_zp_eth_header_t *)buff;
// Retrieve data length
data_length = _z_raweth_ntohs(header->data_length);
if (rb < (data_length + sizeof(_zp_eth_header_t))) {
// Invalid data_length
return SIZE_MAX;
}
// Skip header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_header_t) + data_length);
_z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t));
}
return rb;
return data_length;
}

/*------------------ Reception helper ------------------*/
Expand Down
99 changes: 59 additions & 40 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@

#if Z_FEATURE_RAWETH_TRANSPORT == 1

int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
#if Z_FEATURE_MULTI_THREAD == 1
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
#else
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
#endif

static int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
int8_t ret = _Z_RES_OK;

if (_ZP_RAWETH_CFG_SIZE < 1) {
Expand Down Expand Up @@ -85,33 +91,51 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia
return sn;
}

static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) {
_z_raweth_socket_t *resocket = &zl->_socket._raweth;
// Reserve eth header in buffer
if (resocket->_has_vlan) {
_z_wbuf_set_wpos(wbf, sizeof(_zp_eth_vlan_header_t));
} else {
_z_wbuf_set_wpos(wbf, sizeof(_zp_eth_header_t));
}
}

/**
* This function is unsafe because it operates in potentially concurrent data.
* Make sure that the following mutexes are locked before calling this function:
* - ztm->_mutex_inner
*/
static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) {
_z_raweth_socket_t *resocket = &zl->_socket._raweth;

// Save and reset buffer position
size_t wpos = _z_wbuf_len(wbf);
_z_wbuf_set_wpos(wbf, 0);
// Write eth header in buffer
if (resocket->_has_vlan) {
_zp_eth_vlan_header_t header;
// Set header
memset(&header, 0, sizeof(header));
memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH);
memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH);
header.vlan_type = _ZP_ETH_TYPE_VLAN;
header.tag = resocket->_vlan;
header.ethtype = _ZP_RAWETH_CFG_ETHTYPE;
header.data_length = _z_raweth_htons(wpos - sizeof(header));
// Write header
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header)));
} else {
_zp_eth_header_t header;
// Set header
memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH);
memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH);
header.ethtype = _ZP_RAWETH_CFG_ETHTYPE;
header.data_length = _z_raweth_htons(wpos - sizeof(header));
// Write header
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header)));
}
// Restore wpos
_z_wbuf_set_wpos(wbf, wpos);
return _Z_RES_OK;
}

Expand Down Expand Up @@ -141,32 +165,18 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Discard const qualifier
_z_link_t *mzl = (_z_link_t *)zl;
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth));
// Prepare buff
__unsafe_z_raweth_prepare_header(mzl, &wbf);
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&wbf, t_msg));
// Write the message header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf));
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Send the wbuf on the socket
ret = _z_raweth_link_send_wbuf(zl, &wbf);
}
// Send the wbuf on the socket
ret = _z_raweth_link_send_wbuf(zl, &wbf);
_z_wbuf_clear(&wbf);

return ret;
Expand All @@ -182,13 +192,15 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth));
// Write the message header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Write the message header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;

Expand Down Expand Up @@ -239,26 +251,29 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth));
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth),
_zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Set the frame header
_z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability);
_z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability);
// Encode the frame header
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Encode the network message
ret = _z_network_message_encode(&ztm->_wbuf, n_msg);
if (ret == _Z_RES_OK) {
if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) {
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
// Encode the message on the expandable wbuf
_Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Fragment and send the message
_Bool is_first = true;
while (_z_wbuf_len(&fbf) > 0) {
Expand All @@ -269,12 +284,16 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
is_first = false;
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Serialize one fragment
_Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn),
_zp_raweth_unlock_tx_mutex(ztm));
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on the fragment header size above.


ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
Loading