Skip to content

Commit

Permalink
Merge pull request open62541#6045 from jpfr/merge_13_14_5
Browse files Browse the repository at this point in the history
Merge 1.3 to 1.4
  • Loading branch information
jpfr authored Oct 8, 2023
2 parents 25af928 + 9260330 commit 3f36927
Show file tree
Hide file tree
Showing 42 changed files with 721 additions and 315 deletions.
10 changes: 1 addition & 9 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# overwritten with more detailed information if git is available.
set(OPEN62541_VER_MAJOR 1)
set(OPEN62541_VER_MINOR 3)
set(OPEN62541_VER_PATCH 6)
set(OPEN62541_VER_PATCH 8)
set(OPEN62541_VER_LABEL "-undefined") # like "-rc1" or "-g4538abcd" or "-g4538abcd-dirty"
set(OPEN62541_VER_COMMIT "unknown-commit")

Expand Down Expand Up @@ -343,14 +343,6 @@ if(UA_ENABLE_PUBSUB_MONITORING)
endif()
endif()

option(UA_ENABLE_PUBSUB_ETH_UADP "Enable the OPC UA Ethernet PubSub support to transport UADP NetworkMessages as payload of Ethernet II frame without IP or UDP headers. This option will include Publish and Subscribe based on EtherType B62C." OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_ETH_UADP)
if(UA_ENABLE_PUBSUB_ETH_UADP)
if(NOT UA_ENABLE_PUBSUB)
message(FATAL_ERROR "For UA_ENABLE_PUBSUB_ETH_UADP PubSub needs to be enabled")
endif()
endif()

option(UA_ENABLE_PUBSUB_BUFMALLOC "Enable allocation with static memory buffer for time critical PubSub parts" OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_BUFMALLOC)
if(UA_ENABLE_PUBSUB_BUFMALLOC)
Expand Down
48 changes: 32 additions & 16 deletions arch/eventloop_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ static ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t buf
#include "../deps/mqtt-c/src/mqtt.c"

#define MQTT_MESSAGE_MAXLEN (1u << 20) /* 1MB */
#define MQTT_PARAMETERSSIZE 7
#define MQTT_PARAMETERSSIZE 8
#define MQTT_BROKERPARAMETERSSIZE 5 /* Parameters shared by topic connections
* connected to the same broker */

Expand All @@ -82,6 +82,7 @@ static const struct {
{{0, UA_STRING_STATIC("keep-alive")}, &UA_TYPES[UA_TYPES_UINT16], false},
{{0, UA_STRING_STATIC("username")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("password")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("subscribe")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("topic")}, &UA_TYPES[UA_TYPES_STRING], true}
};
Expand Down Expand Up @@ -586,8 +587,8 @@ MQTTNetworkCallback(UA_ConnectionManager *tcpCM, uintptr_t connectionId,
}

static MQTTBrokerConnection *
createBrokerConnection(MQTTConnectionManager *mcm,
const UA_KeyValueMap *params) {
createBrokerConnection(MQTTConnectionManager *mcm, const UA_KeyValueMap *params,
UA_Boolean validate) {
/* Allocate connection memory */
MQTTBrokerConnection *bc = (MQTTBrokerConnection*)
UA_calloc(1, sizeof(MQTTBrokerConnection));
Expand Down Expand Up @@ -628,23 +629,15 @@ createBrokerConnection(MQTTConnectionManager *mcm,
if(keepAlive && *keepAlive > 0)
bc->keepalive = *keepAlive;

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

/* Open the Connection. This also sets the broker connection id to the TCP id. */
UA_KeyValuePair tcpParams[2];
UA_KeyValuePair tcpParams[3];
tcpParams[0].key = UA_QUALIFIEDNAME(0, "address");
UA_Variant_setScalar(&tcpParams[0].value, broker, &UA_TYPES[UA_TYPES_STRING]);
tcpParams[1].key = UA_QUALIFIEDNAME(0, "port");
UA_Variant_setScalar(&tcpParams[1].value, port, &UA_TYPES[UA_TYPES_UINT16]);
UA_KeyValueMap kvm = {2, tcpParams};
tcpParams[2].key = UA_QUALIFIEDNAME(0, "validate");
UA_Variant_setScalar(&tcpParams[2].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
UA_KeyValueMap kvm = {3, tcpParams};

UA_ConnectionManager *tcpCM = mcm->tcpCM;
res = tcpCM->openConnection(tcpCM, &kvm, NULL, bc, MQTTNetworkCallback);
Expand All @@ -653,6 +646,22 @@ createBrokerConnection(MQTTConnectionManager *mcm,
return NULL;
}

/* Return non-null to indicate success */
if(validate) {
removeBrokerConnection(bc);
return (MQTTBrokerConnection*)0x01;
}

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

UA_LOG_DEBUG(bc->mcm->cm.eventSource.eventLoop->logger,
UA_LOGCATEGORY_NETWORK, "MQTT-TCP %u\t| Created broker connection",
(unsigned)bc->tcpConnectionId);
Expand Down Expand Up @@ -756,10 +765,17 @@ MQTT_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return UA_STATUSCODE_BADCONNECTIONREJECTED;
}

const UA_Boolean *validate = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params, UA_QUALIFIEDNAME(0, "validate"),
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validate && *validate)
return (createBrokerConnection(mcm, params, true) == 0) ?
UA_STATUSCODE_BADCONNECTIONREJECTED : UA_STATUSCODE_GOOD;

/* Test whether an existing broker connection can be reused.
* Otherwise create a new one. */
MQTTBrokerConnection *bc = findIdenticalBrokerConnection(mcm, params);
if(!bc && !(bc = createBrokerConnection(mcm, params)))
if(!bc && !(bc = createBrokerConnection(mcm, params, false)))
return UA_STATUSCODE_BADNOTCONNECTED;

/* Create the per-topic connection */
Expand Down
35 changes: 25 additions & 10 deletions arch/eventloop_posix_eth.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <linux/if_packet.h>

/* Configuration parameters */
#define ETH_PARAMETERSSIZE 8
#define ETH_PARAMETERSSIZE 9
#define ETH_PARAMINDEX_ADDR 0
#define ETH_PARAMINDEX_LISTEN 1
#define ETH_PARAMINDEX_IFACE 2
Expand All @@ -27,6 +27,7 @@
#define ETH_PARAMINDEX_PCP 5
#define ETH_PARAMINDEX_DEI 6
#define ETH_PARAMINDEX_PROMISCUOUS 7
#define ETH_PARAMINDEX_VALIDATE 8

static UA_KeyValueRestriction ETHConfigParameters[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], false, true, false},
Expand All @@ -37,6 +38,7 @@ static UA_KeyValueRestriction ETHConfigParameters[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("pcp")}, &UA_TYPES[UA_TYPES_BYTE], false, true, false},
{{0, UA_STRING_STATIC("dei")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
{{0, UA_STRING_STATIC("promiscuous")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
/* Duplicated address parameter with a scalar value required. For the send-socket case. */
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], true, true, false},
};
Expand Down Expand Up @@ -421,7 +423,8 @@ ETH_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd,
static UA_StatusCode
ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
const UA_KeyValueMap *params,
int ifindex, UA_UInt16 etherType) {
int ifindex, UA_UInt16 etherType,
UA_Boolean validate) {
UA_LOCK_ASSERT(&el->elMutex, 1);

/* Bind the socket to interface and EtherType. Don't receive anything else. */
Expand All @@ -430,7 +433,7 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
sll.sll_family = AF_PACKET;
sll.sll_protocol = htons(etherType);
sll.sll_ifindex = ifindex;
if(bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
if(!validate && bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
return UA_STATUSCODE_BADINTERNALERROR;

/* Immediately register for listen events. Don't have to wait for a
Expand Down Expand Up @@ -481,19 +484,20 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
}

struct packet_mreq mreq;
memset(&mreq, 0, sizeof(struct packet_mreq));
mreq.mr_ifindex = ifindex;
mreq.mr_type = PACKET_MR_MULTICAST;
mreq.mr_alen = ETH_ALEN;
memcpy(mreq.mr_address, addr, ETHER_ADDR_LEN);
int ret = UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq));
if(ret < 0) {
if(!validate && UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq)) < 0) {
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH\t| Registering for multicast failed with error %s", errno_str));
"ETH\t| Registering for multicast failed with error %s",
errno_str));
return UA_STATUSCODE_BADINTERNALERROR;
}
}
}

UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Opened an Ethernet listen socket",
Expand Down Expand Up @@ -586,6 +590,15 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return res;
}

/* Only validate the parameters? */
UA_Boolean validate = false;
const UA_Boolean *validateParam = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params,
ETHConfigParameters[ETH_PARAMINDEX_VALIDATE].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validateParam)
validate = *validateParam;

/* Get the EtherType parameter */
UA_UInt16 etherType = ETH_P_ALL;
const UA_UInt16 *etParam = (const UA_UInt16*)
Expand Down Expand Up @@ -665,9 +678,11 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
(unsigned char*)ifr.ifr_hwaddr.sa_data,
ifindex, etherType);
} else {
res = ETH_openListenConnection(el, conn, params, ifindex, etherType);
res = ETH_openListenConnection(el, conn, params, ifindex, etherType, validate);
}
if(res != UA_STATUSCODE_GOOD)

/* Don't actually open or shut down */
if(validate || res != UA_STATUSCODE_GOOD)
goto cleanup;

/* Register in the EventLoop */
Expand Down
49 changes: 38 additions & 11 deletions arch/eventloop_posix_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
*/

#include "open62541/types.h"
#include "eventloop_posix.h"
#include "eventloop_common.h"

Expand Down Expand Up @@ -338,7 +339,8 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) {
static UA_StatusCode
TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
UA_UInt16 port, void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand Down Expand Up @@ -434,6 +436,12 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
return UA_STATUSCODE_BADINTERNALERROR;
}

/* Only validate, don't actually start listening */
if(validate) {
UA_close(listenSocket);
return UA_STATUSCODE_GOOD;
}

/* Start listening */
if(listen(listenSocket, UA_MAXBACKLOG) < 0) {
UA_LOG_SOCKET_ERRNO_WRAP(
Expand Down Expand Up @@ -506,7 +514,8 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
static UA_StatusCode
TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname,
UA_UInt16 port, void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop)->elMutex, 1);

/* Create a string for the port */
Expand Down Expand Up @@ -546,7 +555,7 @@ TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname,
struct addrinfo *ai = res;
while(ai) {
total_result &= TCP_registerListenSocket(pcm, ai, port, application, context,
connectionCallback);
connectionCallback, validate);
ai = ai->ai_next;
}
freeaddrinfo(res);
Expand Down Expand Up @@ -668,7 +677,8 @@ TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
static UA_StatusCode
TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params,
void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand All @@ -695,7 +705,7 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"TCP\t| Listening on all interfaces");
return TCP_registerListenSockets(pcm, NULL, *port, application,
context, connectionCallback);
context, connectionCallback, validate);
}

/* Iterate over the configured hostnames */
Expand All @@ -707,7 +717,7 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
memcpy(hostname, hostStrings[i].data, hostStrings->length);
hostname[hostStrings->length] = '\0';
TCP_registerListenSockets(pcm, hostname, *port, application,
context, connectionCallback);
context, connectionCallback, validate);
}

return UA_STATUSCODE_GOOD;
Expand All @@ -717,7 +727,8 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
static UA_StatusCode
TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params,
void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand Down Expand Up @@ -798,6 +809,13 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p
return res;
}

/* Only validate, don't actually open the connection */
if(validate) {
freeaddrinfo(info);
UA_close(newSock);
return UA_STATUSCODE_GOOD;
}

/* Non-blocking connect */
error = UA_connect(newSock, info->ai_addr, info->ai_addrlen);
freeaddrinfo(info);
Expand Down Expand Up @@ -886,6 +904,15 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return res;
}

/* Only validate the parameters? */
UA_Boolean validate = false;
const UA_Boolean *validateParam = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params,
TCPConfigParameters[TCP_PARAMINDEX_VALIDATE].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validateParam)
validate = *validateParam;

/* Listen or active connection? */
UA_Boolean listen = false;
const UA_Boolean *listenParam = (const UA_Boolean*)
Expand All @@ -896,11 +923,11 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
listen = *listenParam;

if(listen) {
res = TCP_openPassiveConnection(pcm, params, application,
context, connectionCallback);
res = TCP_openPassiveConnection(pcm, params, application, context,
connectionCallback, validate);
} else {
res = TCP_openActiveConnection(pcm, params, application,
context, connectionCallback);
res = TCP_openActiveConnection(pcm, params, application, context,
connectionCallback, validate);
}

UA_UNLOCK(&el->elMutex);
Expand Down
5 changes: 0 additions & 5 deletions doc/building.rst
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,6 @@ PubSub Build Options
be changed by the application to satisfy realtime requirements. Disabled by
default.

**UA_ENABLE_PUBSUB_ETH_UADP**
Enable the OPC UA Ethernet PubSub support to transport UADP NetworkMessages
as payload of Ethernet II frame without IP or UDP headers. This option will
include Publish and Subscribe based on EtherType B62C. Disabled by default.

Debug Build Options
^^^^^^^^^^^^^^^^^^^

Expand Down
14 changes: 6 additions & 8 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ if(UA_ENABLE_PUBSUB)
add_example(server_pubsub_publish_rt_level pubsub/server_pubsub_publisher_rt_level.c)
add_example(server_pubsub_rt_information_model pubsub/server_pubsub_rt_field_information_model.c)
if(CMAKE_SYSTEM MATCHES "Linux")
if(UA_ENABLE_PUBSUB_ETH_UADP AND UA_ENABLE_MALLOC_SINGLETON AND UA_ENABLE_IMMUTABLE_NODES)
if(UA_ENABLE_MALLOC_SINGLETON AND UA_ENABLE_IMMUTABLE_NODES)
add_example(rt_publisher pubsub_realtime/pubsub_interrupt_publish.c)
target_link_libraries(rt_publisher rt)
endif()
Expand All @@ -242,13 +242,11 @@ if(UA_ENABLE_PUBSUB)
# add_example(pubsub_TSN_loopback_single_thread pubsub_realtime/pubsub_TSN_loopback_single_thread.c)
# target_link_libraries(pubsub_TSN_publisher_multiple_thread rt pthread)
# target_link_libraries(pubsub_TSN_loopback_single_thread rt pthread)
# if(UA_ENABLE_PUBSUB_ETH_UADP)
# add_subdirectory(pubsub_realtime/nodeset)
# add_example(pubsub_TSN_publisher pubsub_realtime/pubsub_TSN_publisher.c)
# add_example(pubsub_TSN_loopback pubsub_realtime/pubsub_TSN_loopback.c)
# target_link_libraries(pubsub_TSN_publisher rt pthread)
# target_link_libraries(pubsub_TSN_loopback rt pthread)
# endif()
# add_subdirectory(pubsub_realtime/nodeset)
# add_example(pubsub_TSN_publisher pubsub_realtime/pubsub_TSN_publisher.c)
# add_example(pubsub_TSN_loopback pubsub_realtime/pubsub_TSN_loopback.c)
# target_link_libraries(pubsub_TSN_publisher rt pthread)
# target_link_libraries(pubsub_TSN_loopback rt pthread)
# endif()
endif()
if(UA_ENABLE_PUBSUB_ENCRYPTION)
Expand Down
Loading

0 comments on commit 3f36927

Please sign in to comment.