Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1.4' into merge_14_master_5
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Oct 8, 2023
2 parents 46659f7 + 3f36927 commit a452fa6
Show file tree
Hide file tree
Showing 55 changed files with 1,184 additions and 480 deletions.
13 changes: 1 addition & 12 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# overwritten with more detailed information if git is available.
set(OPEN62541_VER_MAJOR 1)
set(OPEN62541_VER_MINOR 3)
set(OPEN62541_VER_PATCH 6)
set(OPEN62541_VER_PATCH 8)
set(OPEN62541_VER_LABEL "-undefined") # like "-rc1" or "-g4538abcd" or "-g4538abcd-dirty"
set(OPEN62541_VER_COMMIT "unknown-commit")

Expand Down Expand Up @@ -298,9 +298,6 @@ option(UA_FORCE_32BIT "Force compilation as 32-bit executable" OFF)
mark_as_advanced(UA_FORCE_32BIT)

option(UA_FORCE_WERROR "Force compilation with -Werror (or /WX on MSVC)" OFF)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(UA_FORCE_WERROR ON)
endif()

option(UA_ENABLE_DEBUG_SANITIZER "Use sanitizer in debug mode" ON)
mark_as_advanced(UA_ENABLE_DEBUG_SANITIZER)
Expand Down Expand Up @@ -346,14 +343,6 @@ if(UA_ENABLE_PUBSUB_MONITORING)
endif()
endif()

option(UA_ENABLE_PUBSUB_ETH_UADP "Enable the OPC UA Ethernet PubSub support to transport UADP NetworkMessages as payload of Ethernet II frame without IP or UDP headers. This option will include Publish and Subscribe based on EtherType B62C." OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_ETH_UADP)
if(UA_ENABLE_PUBSUB_ETH_UADP)
if(NOT UA_ENABLE_PUBSUB)
message(FATAL_ERROR "For UA_ENABLE_PUBSUB_ETH_UADP PubSub needs to be enabled")
endif()
endif()

option(UA_ENABLE_PUBSUB_BUFMALLOC "Enable allocation with static memory buffer for time critical PubSub parts" OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_BUFMALLOC)
if(UA_ENABLE_PUBSUB_BUFMALLOC)
Expand Down
48 changes: 32 additions & 16 deletions arch/eventloop_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t buf
#include "../deps/mqtt-c/src/mqtt.c"

#define MQTT_MESSAGE_MAXLEN (1u << 20) /* 1MB */
#define MQTT_PARAMETERSSIZE 7
#define MQTT_PARAMETERSSIZE 8
#define MQTT_BROKERPARAMETERSSIZE 5 /* Parameters shared by topic connections
* connected to the same broker */

Expand All @@ -83,6 +83,7 @@ static const struct {
{{0, UA_STRING_STATIC("keep-alive")}, &UA_TYPES[UA_TYPES_UINT16], false},
{{0, UA_STRING_STATIC("username")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("password")}, &UA_TYPES[UA_TYPES_STRING], false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("subscribe")}, &UA_TYPES[UA_TYPES_BOOLEAN], false},
{{0, UA_STRING_STATIC("topic")}, &UA_TYPES[UA_TYPES_STRING], true}
};
Expand Down Expand Up @@ -589,8 +590,8 @@ MQTTNetworkCallback(UA_ConnectionManager *tcpCM, uintptr_t connectionId,
}

static MQTTBrokerConnection *
createBrokerConnection(MQTTConnectionManager *mcm,
const UA_KeyValueMap *params) {
createBrokerConnection(MQTTConnectionManager *mcm, const UA_KeyValueMap *params,
UA_Boolean validate) {
/* Allocate connection memory */
MQTTBrokerConnection *bc = (MQTTBrokerConnection*)
UA_calloc(1, sizeof(MQTTBrokerConnection));
Expand Down Expand Up @@ -630,23 +631,15 @@ createBrokerConnection(MQTTConnectionManager *mcm,
if(keepAlive && *keepAlive > 0)
bc->keepalive = *keepAlive;

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

/* Open the Connection. This also sets the broker connection id to the TCP id. */
UA_KeyValuePair tcpParams[2];
UA_KeyValuePair tcpParams[3];
tcpParams[0].key = UA_QUALIFIEDNAME(0, "address");
UA_Variant_setScalar(&tcpParams[0].value, broker, &UA_TYPES[UA_TYPES_STRING]);
tcpParams[1].key = UA_QUALIFIEDNAME(0, "port");
UA_Variant_setScalar(&tcpParams[1].value, port, &UA_TYPES[UA_TYPES_UINT16]);
UA_KeyValueMap kvm = {2, tcpParams};
tcpParams[2].key = UA_QUALIFIEDNAME(0, "validate");
UA_Variant_setScalar(&tcpParams[2].value, &validate, &UA_TYPES[UA_TYPES_BOOLEAN]);
UA_KeyValueMap kvm = {3, tcpParams};

UA_ConnectionManager *tcpCM = mcm->tcpCM;
res = tcpCM->openConnection(tcpCM, &kvm, NULL, bc, MQTTNetworkCallback);
Expand All @@ -655,6 +648,22 @@ createBrokerConnection(MQTTConnectionManager *mcm,
return NULL;
}

/* Return non-null to indicate success */
if(validate) {
removeBrokerConnection(bc);
return (MQTTBrokerConnection*)0x01;
}

UA_EventLoop *el = mcm->cm.eventSource.eventLoop;
res = el->addCyclicCallback(el, (UA_Callback)MQTTKeepAliveCallback, NULL, bc,
(UA_Double)(bc->keepalive * UA_DATETIME_SEC),
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&bc->keepAliveCallbackId);
if(res != UA_STATUSCODE_GOOD) {
removeBrokerConnection(bc);
return NULL;
}

UA_LOG_DEBUG(bc->mcm->cm.eventSource.eventLoop->logger,
UA_LOGCATEGORY_NETWORK, "MQTT-TCP %u\t| Created broker connection",
(unsigned)bc->tcpConnectionId);
Expand Down Expand Up @@ -757,10 +766,17 @@ MQTT_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return UA_STATUSCODE_BADCONNECTIONREJECTED;
}

const UA_Boolean *validate = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params, UA_QUALIFIEDNAME(0, "validate"),
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validate && *validate)
return (createBrokerConnection(mcm, params, true) == 0) ?
UA_STATUSCODE_BADCONNECTIONREJECTED : UA_STATUSCODE_GOOD;

/* Test whether an existing broker connection can be reused.
* Otherwise create a new one. */
MQTTBrokerConnection *bc = findIdenticalBrokerConnection(mcm, params);
if(!bc && !(bc = createBrokerConnection(mcm, params)))
if(!bc && !(bc = createBrokerConnection(mcm, params, false)))
return UA_STATUSCODE_BADNOTCONNECTED;

/* Create the per-topic connection */
Expand Down
7 changes: 5 additions & 2 deletions arch/eventloop_posix_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {

/* Poll the registered sockets */
struct epoll_event epoll_events[64];
int events = epoll_wait(el->epollfd, epoll_events, 64,
int epollfd = el->epollfd;
UA_UNLOCK(&el->elMutex);
int events = epoll_wait(epollfd, epoll_events, 64,
(int)(listenTimeout / UA_DATETIME_MSEC));
/* TODO: Replace with pwait2 for higher-precision timeouts once this is
* available in the standard library.
Expand All @@ -79,8 +81,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
* (long)(listenTimeout / UA_DATETIME_SEC),
* (long)((listenTimeout % UA_DATETIME_SEC) * 100)
* };
* int events = epoll_pwait2(el->epollfd, epoll_events, 64,
* int events = epoll_pwait2(epollfd, epoll_events, 64,
* precisionTimeout, NULL); */
UA_LOCK(&el->elMutex);

/* Handle error conditions */
if(events == -1) {
Expand Down
35 changes: 25 additions & 10 deletions arch/eventloop_posix_eth.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static UA_KeyValueRestriction ethManagerParams[ETH_MANAGERPARAMS] = {
{{0, UA_STRING_STATIC("send-bufsize")}, &UA_TYPES[UA_TYPES_UINT32], false, true, false}
};

#define ETH_PARAMETERSSIZE 14
#define ETH_PARAMETERSSIZE 15
#define ETH_PARAMINDEX_ADDR 0
#define ETH_PARAMINDEX_LISTEN 1
#define ETH_PARAMINDEX_IFACE 2
Expand All @@ -42,6 +42,7 @@ static UA_KeyValueRestriction ethManagerParams[ETH_MANAGERPARAMS] = {
#define ETH_PARAMINDEX_TXTIME 11
#define ETH_PARAMINDEX_TXTIME_PICO 12
#define ETH_PARAMINDEX_TXTIME_DROP 13
#define ETH_PARAMINDEX_VALIDATE 14

static UA_KeyValueRestriction ethConnectionParams[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], false, true, false},
Expand All @@ -58,6 +59,7 @@ static UA_KeyValueRestriction ethConnectionParams[ETH_PARAMETERSSIZE+1] = {
{{0, UA_STRING_STATIC("txtime")}, &UA_TYPES[UA_TYPES_DATETIME], false, true, false},
{{0, UA_STRING_STATIC("txtime-pico")}, &UA_TYPES[UA_TYPES_UINT16], false, true, false},
{{0, UA_STRING_STATIC("txtime-drop-late")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
{{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false},
/* Duplicated address parameter with a scalar value required. For the send-socket case. */
{{0, UA_STRING_STATIC("address")}, &UA_TYPES[UA_TYPES_STRING], true, true, false},
};
Expand Down Expand Up @@ -429,7 +431,8 @@ ETH_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd,
static UA_StatusCode
ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
const UA_KeyValueMap *params,
int ifindex, UA_UInt16 etherType) {
int ifindex, UA_UInt16 etherType,
UA_Boolean validate) {
UA_LOCK_ASSERT(&el->elMutex, 1);

/* Bind the socket to interface and EtherType. Don't receive anything else. */
Expand All @@ -438,7 +441,7 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
sll.sll_family = AF_PACKET;
sll.sll_protocol = htons(etherType);
sll.sll_ifindex = ifindex;
if(bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
if(!validate && bind(conn->rfd.fd, (struct sockaddr*)&sll, sizeof(sll)) < 0)
return UA_STATUSCODE_BADINTERNALERROR;

/* Immediately register for listen events. Don't have to wait for a
Expand Down Expand Up @@ -489,19 +492,20 @@ ETH_openListenConnection(UA_EventLoopPOSIX *el, ETH_FD *conn,
}

struct packet_mreq mreq;
memset(&mreq, 0, sizeof(struct packet_mreq));
mreq.mr_ifindex = ifindex;
mreq.mr_type = PACKET_MR_MULTICAST;
mreq.mr_alen = ETH_ALEN;
memcpy(mreq.mr_address, addr, ETHER_ADDR_LEN);
int ret = UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq));
if(ret < 0) {
if(!validate && UA_setsockopt(conn->rfd.fd, SOL_PACKET, PACKET_ADD_MEMBERSHIP,
(char *)&mreq, sizeof(mreq)) < 0) {
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH\t| Registering for multicast failed with error %s", errno_str));
"ETH\t| Registering for multicast failed with error %s",
errno_str));
return UA_STATUSCODE_BADINTERNALERROR;
}
}
}

UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Opened an Ethernet listen socket",
Expand Down Expand Up @@ -643,6 +647,15 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return res;
}

/* Only validate the parameters? */
UA_Boolean validate = false;
const UA_Boolean *validateParam = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params,
ethConnectionParams[ETH_PARAMINDEX_VALIDATE].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validateParam)
validate = *validateParam;

/* Get the EtherType parameter */
UA_UInt16 etherType = ETH_P_ALL;
const UA_UInt16 *etParam = (const UA_UInt16*)
Expand Down Expand Up @@ -724,9 +737,11 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
(unsigned char*)ifr.ifr_hwaddr.sa_data,
ifindex, etherType);
} else {
res = ETH_openListenConnection(el, conn, params, ifindex, etherType);
res = ETH_openListenConnection(el, conn, params, ifindex, etherType, validate);
}
if(res != UA_STATUSCODE_GOOD)

/* Don't actually open or shut down */
if(validate || res != UA_STATUSCODE_GOOD)
goto cleanup;

/* Register in the EventLoop */
Expand Down
2 changes: 2 additions & 0 deletions arch/eventloop_posix_select.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
#endif
};

UA_UNLOCK(&el->elMutex);
int selectStatus = UA_select(highestfd+1, &readset, &writeset, &errset, &tmptv);
UA_LOCK(&el->elMutex);
if(selectStatus < 0) {
/* We will retry, only log the error */
UA_LOG_SOCKET_ERRNO_WRAP(
Expand Down
49 changes: 38 additions & 11 deletions arch/eventloop_posix_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Copyright 2021 (c) Fraunhofer IOSB (Author: Jan Hermes)
*/

#include "open62541/types.h"
#include "eventloop_posix.h"
#include "eventloop_common.h"

Expand Down Expand Up @@ -344,7 +345,8 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) {
static UA_StatusCode
TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
UA_UInt16 port, void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand Down Expand Up @@ -440,6 +442,12 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
return UA_STATUSCODE_BADINTERNALERROR;
}

/* Only validate, don't actually start listening */
if(validate) {
UA_close(listenSocket);
return UA_STATUSCODE_GOOD;
}

/* Start listening */
if(listen(listenSocket, UA_MAXBACKLOG) < 0) {
UA_LOG_SOCKET_ERRNO_WRAP(
Expand Down Expand Up @@ -512,7 +520,8 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai,
static UA_StatusCode
TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname,
UA_UInt16 port, void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop)->elMutex, 1);

/* Create a string for the port */
Expand Down Expand Up @@ -552,7 +561,7 @@ TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname,
struct addrinfo *ai = res;
while(ai) {
total_result &= TCP_registerListenSocket(pcm, ai, port, application, context,
connectionCallback);
connectionCallback, validate);
ai = ai->ai_next;
}
freeaddrinfo(res);
Expand Down Expand Up @@ -675,7 +684,8 @@ TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
static UA_StatusCode
TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params,
void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand All @@ -702,7 +712,7 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"TCP\t| Listening on all interfaces");
return TCP_registerListenSockets(pcm, NULL, *port, application,
context, connectionCallback);
context, connectionCallback, validate);
}

/* Iterate over the configured hostnames */
Expand All @@ -714,7 +724,7 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
memcpy(hostname, hostStrings[i].data, hostStrings->length);
hostname[hostStrings->length] = '\0';
TCP_registerListenSockets(pcm, hostname, *port, application,
context, connectionCallback);
context, connectionCallback, validate);
}

return UA_STATUSCODE_GOOD;
Expand All @@ -724,7 +734,8 @@ TCP_openPassiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *
static UA_StatusCode
TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *params,
void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_ConnectionManager_connectionCallback connectionCallback,
UA_Boolean validate) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)pcm->cm.eventSource.eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

Expand Down Expand Up @@ -805,6 +816,13 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p
return res;
}

/* Only validate, don't actually open the connection */
if(validate) {
freeaddrinfo(info);
UA_close(newSock);
return UA_STATUSCODE_GOOD;
}

/* Non-blocking connect */
error = UA_connect(newSock, info->ai_addr, info->ai_addrlen);
freeaddrinfo(info);
Expand Down Expand Up @@ -893,6 +911,15 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
return res;
}

/* Only validate the parameters? */
UA_Boolean validate = false;
const UA_Boolean *validateParam = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params,
tcpConnectionParams[TCP_PARAMINDEX_VALIDATE].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
if(validateParam)
validate = *validateParam;

/* Listen or active connection? */
UA_Boolean listen = false;
const UA_Boolean *listenParam = (const UA_Boolean*)
Expand All @@ -903,11 +930,11 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
listen = *listenParam;

if(listen) {
res = TCP_openPassiveConnection(pcm, params, application,
context, connectionCallback);
res = TCP_openPassiveConnection(pcm, params, application, context,
connectionCallback, validate);
} else {
res = TCP_openActiveConnection(pcm, params, application,
context, connectionCallback);
res = TCP_openActiveConnection(pcm, params, application, context,
connectionCallback, validate);
}

UA_UNLOCK(&el->elMutex);
Expand Down
Loading

0 comments on commit a452fa6

Please sign in to comment.