From 4e05df656aa6837f44bf1fa6224c0176c290bee3 Mon Sep 17 00:00:00 2001 From: Julius Pfrommer Date: Thu, 14 Sep 2023 23:33:14 +0200 Subject: [PATCH] refactor(el): Make EventLoop connection a struct --- arch/eventloop_posix.c | 6 +- arch/eventloop_posix.h | 10 +- arch/eventloop_posix_epoll.c | 2 +- arch/eventloop_posix_eth.c | 117 ++++------ arch/eventloop_posix_interrupt.c | 7 +- arch/eventloop_posix_tcp.c | 221 ++++++++---------- arch/eventloop_posix_udp.c | 101 ++++---- include/open62541/plugin/eventloop.h | 56 +++-- src/client/ua_client_connect.c | 73 +++--- src/pubsub/ua_pubsub.h | 9 +- src/pubsub/ua_pubsub_eventloop.c | 217 +++++++++-------- src/pubsub/ua_pubsub_writergroup.c | 72 +++--- src/server/ua_server_binary.c | 172 +++++++------- src/server/ua_server_internal.h | 7 +- src/server/ua_session.h | 5 +- src/ua_securechannel.c | 45 ++-- src/ua_securechannel.h | 10 +- tests/check_eventloop_eth.c | 42 ++-- tests/check_eventloop_tcp.c | 33 ++- tests/check_eventloop_udp.c | 75 +++--- tests/check_securechannel.c | 1 - tests/client/check_activateSession.c | 5 +- tests/client/check_client.c | 5 +- tests/client/check_client_async.c | 9 +- tests/client/check_client_async_connect.c | 4 +- tests/client/check_client_securechannel.c | 9 +- tests/client/check_client_subscriptions.c | 9 +- tests/pubsub/check_pubsub_get_state.c | 2 +- tests/testing-plugins/testing_networklayers.c | 15 +- 29 files changed, 622 insertions(+), 717 deletions(-) diff --git a/arch/eventloop_posix.c b/arch/eventloop_posix.c index 816f11b5341..17e46a10b1f 100644 --- a/arch/eventloop_posix.c +++ b/arch/eventloop_posix.c @@ -478,16 +478,14 @@ UA_EventLoop_new_POSIX(const UA_Logger *logger) { /* Reusable EventSource functionality */ UA_StatusCode -UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, - uintptr_t connectionId, +UA_EventLoopPOSIX_allocNetworkBuffer(UA_Connection *c, UA_ByteString *buf, size_t bufSize) { return UA_ByteString_allocBuffer(buf, bufSize); } void -UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, - uintptr_t connectionId, +UA_EventLoopPOSIX_freeNetworkBuffer(UA_Connection *c, UA_ByteString *buf) { UA_ByteString_clear(buf); } diff --git a/arch/eventloop_posix.h b/arch/eventloop_posix.h index 4c296b29e95..5bd23a6ed30 100644 --- a/arch/eventloop_posix.h +++ b/arch/eventloop_posix.h @@ -65,6 +65,8 @@ typedef struct UA_RegisteredFD UA_RegisteredFD; typedef void (*UA_FDCallback)(UA_EventSource *es, UA_RegisteredFD *rfd, short event); struct UA_RegisteredFD { + UA_Connection c; + // XXX change the delayed mechanism as this is no longer first UA_DelayedCallback dc; /* Used for async closing. Must be the first member * because the rfd is freed by the delayed callback * mechanism. */ @@ -72,8 +74,6 @@ struct UA_RegisteredFD { ZIP_ENTRY(UA_RegisteredFD) zipPointers; /* Register FD in the EventSource */ UA_FD fd; short listenEvents; /* UA_FDEVENT_IN | UA_FDEVENT_OUT*/ - - UA_EventSource *es; /* Backpointer to the EventSource */ UA_FDCallback eventSourceCB; }; @@ -146,14 +146,12 @@ UA_StatusCode UA_EventLoopPOSIX_allocateRXBuffer(UA_POSIXConnectionManager *pcm); UA_StatusCode -UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm, - uintptr_t connectionId, +UA_EventLoopPOSIX_allocNetworkBuffer(UA_Connection *c, UA_ByteString *buf, size_t bufSize); void -UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm, - uintptr_t connectionId, +UA_EventLoopPOSIX_freeNetworkBuffer(UA_Connection *c, UA_ByteString *buf); /* diff --git a/arch/eventloop_posix_epoll.c b/arch/eventloop_posix_epoll.c index cdcd7f40804..c614e58632d 100644 --- a/arch/eventloop_posix_epoll.c +++ b/arch/eventloop_posix_epoll.c @@ -108,7 +108,7 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) { } else { revent = UA_FDEVENT_ERR; } - rfd->eventSourceCB(rfd->es, rfd, revent); + rfd->eventSourceCB(&rfd->c.cm->eventSource, rfd, revent); } return UA_STATUSCODE_GOOD; } diff --git a/arch/eventloop_posix_eth.c b/arch/eventloop_posix_eth.c index 66dfba48501..c5feaefb434 100644 --- a/arch/eventloop_posix_eth.c +++ b/arch/eventloop_posix_eth.c @@ -46,10 +46,6 @@ static UA_KeyValueRestriction ETHConfigParameters[ETH_PARAMETERSSIZE+1] = { typedef struct { UA_RegisteredFD rfd; - UA_ConnectionManager_connectionCallback applicationCB; - void *application; - void *context; - struct sockaddr_ll sll; /* The Ethernet header to prepend for sending frames is precomputed and reused. * The length field (the last 2 byte) is adjusted. @@ -183,17 +179,9 @@ setETHHeader(unsigned char *buf, } static UA_StatusCode -ETH_allocNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf, size_t bufSize) { - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; - - /* Get the ETH_FD */ - UA_FD fd = (UA_FD)connectionId; - ETH_FD *erfd = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); - if(!erfd) - return UA_STATUSCODE_BADCONNECTIONREJECTED; - +ETH_allocNetworkBuffer(UA_Connection *c, UA_ByteString *buf, size_t bufSize) { /* Allocate the buffer with the hidden Ethernet header in front */ + ETH_FD *erfd = (ETH_FD*)c; UA_StatusCode res = UA_ByteString_allocBuffer(buf, bufSize+erfd->headerSize); buf->data += erfd->headerSize; buf->length -= erfd->headerSize; @@ -201,17 +189,9 @@ ETH_allocNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId, } static void -ETH_freeNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf) { - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; - - /* Get the ETH_FD */ - UA_FD fd = (UA_FD)connectionId; - ETH_FD *erfd = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); - if(!erfd) - return; - +ETH_freeNetworkBuffer(UA_Connection *c, UA_ByteString *buf) { /* Unhide the Ethernet header */ + ETH_FD *erfd = (ETH_FD*)c; buf->data -= erfd->headerSize; buf->length += erfd->headerSize; UA_ByteString_clear(buf); @@ -252,22 +232,20 @@ ETH_close(UA_POSIXConnectionManager *pcm, ETH_FD *conn) { /* Signal closing to the application */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(&pcm->cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_CLOSING, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_CLOSING, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); /* Close the socket */ int ret = UA_close(conn->rfd.fd); if(ret == 0) { UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "ETH %u\t| Socket closed", (unsigned)conn->rfd.fd); + "ETH %u\t| Socket closed", conn->rfd.c.identifier); } else { UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "ETH %u\t| Could not close the socket (%s)", - (unsigned)conn->rfd.fd, errno_str)); + conn->rfd.c.identifier, errno_str)); } /* Don't call free here. This might be done automatically via the delayed @@ -410,8 +388,8 @@ ETH_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd, response.data += headerSize; response.length -= headerSize; UA_UNLOCK(&el->elMutex); - conn->applicationCB(cm, (uintptr_t)rfd->fd, conn->application, &conn->context, - UA_CONNECTIONSTATE_ESTABLISHED, &map, response); + conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED, + map, response); UA_LOCK(&el->elMutex); response.data -= headerSize; response.length += headerSize; @@ -561,7 +539,7 @@ ETH_openSendConnection(UA_EventLoopPOSIX *el, ETH_FD *conn, const UA_KeyValueMap } static UA_StatusCode -ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, +ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap params, void *application, void *context, UA_ConnectionManager_connectionCallback connectionCallback) { UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; @@ -571,7 +549,7 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Listen or send connection? */ const UA_Boolean *listen = (const UA_Boolean*) - UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_LISTEN].name, + UA_KeyValueMap_getScalar(¶ms, ETHConfigParameters[ETH_PARAMINDEX_LISTEN].name, &UA_TYPES[UA_TYPES_BOOLEAN]); size_t ethParams = ETH_PARAMETERSSIZE; if(!listen || !*listen) @@ -580,7 +558,7 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Validate the parameters */ UA_StatusCode res = UA_KeyValueRestriction_validate(el->eventLoop.logger, "ETH", ETHConfigParameters, - ethParams, params); + ethParams, ¶ms); if(res != UA_STATUSCODE_GOOD) { UA_UNLOCK(&el->elMutex); return res; @@ -589,14 +567,14 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Get the EtherType parameter */ UA_UInt16 etherType = ETH_P_ALL; const UA_UInt16 *etParam = (const UA_UInt16*) - UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_ETHERTYPE].name, + UA_KeyValueMap_getScalar(¶ms, ETHConfigParameters[ETH_PARAMINDEX_ETHERTYPE].name, &UA_TYPES[UA_TYPES_UINT16]); if(etParam) etherType = *etParam; /* Get the interface index */ const UA_String *interface = (const UA_String*) - UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_IFACE].name, + UA_KeyValueMap_getScalar(¶ms, ETHConfigParameters[ETH_PARAMINDEX_IFACE].name, &UA_TYPES[UA_TYPES_STRING]); if(interface->length >= 128) { UA_UNLOCK(&el->elMutex); @@ -640,11 +618,12 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, } conn->rfd.fd = sockfd; - conn->rfd.es = &pcm->cm.eventSource; conn->rfd.eventSourceCB = (UA_FDCallback)ETH_connectionSocketCallback; - conn->context = context; - conn->application = application; - conn->applicationCB = connectionCallback; + conn->rfd.c.callback = connectionCallback; + conn->rfd.c.application = application; + conn->rfd.c.context = context; + conn->rfd.c.identifier = (UA_UInt32)sockfd; + conn->rfd.c.cm = &pcm->cm; /* Configure a listen or a send connection */ if(!listen || !*listen) { @@ -661,11 +640,11 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, res = UA_STATUSCODE_BADCONNECTIONREJECTED; goto cleanup; } - res = ETH_openSendConnection(el, conn, params, + res = ETH_openSendConnection(el, conn, ¶ms, (unsigned char*)ifr.ifr_hwaddr.sa_data, ifindex, etherType); } else { - res = ETH_openListenConnection(el, conn, params, ifindex, etherType); + res = ETH_openListenConnection(el, conn, ¶ms, ifindex, etherType); } if(res != UA_STATUSCODE_GOOD) goto cleanup; @@ -681,9 +660,8 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Register the listen socket in the application */ UA_UNLOCK(&el->elMutex); - connectionCallback(cm, (uintptr_t)sockfd, application, &conn->context, - UA_CONNECTIONSTATE_ESTABLISHED, &UA_KEYVALUEMAP_NULL, - UA_BYTESTRING_NULL); + connectionCallback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); return UA_STATUSCODE_GOOD; cleanup: @@ -719,43 +697,26 @@ ETH_shutdown(UA_POSIXConnectionManager *pcm, ETH_FD *conn) { } static UA_StatusCode -ETH_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) { - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; +ETH_shutdownConnection(UA_Connection *c) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop; + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)c->cm; UA_LOCK(&el->elMutex); - - /* Get the ETH_FD */ - UA_FD fd = (UA_FD)connectionId; - UA_RegisteredFD *rfd = ZIP_FIND(UA_FDTree, &pcm->fds, &fd); - if(!rfd) { - UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "ETH\t| Cannot close Ethernet connection %u - not found", - (unsigned)connectionId); - UA_UNLOCK(&el->elMutex); - return UA_STATUSCODE_BADNOTFOUND; - } - - ETH_shutdown(pcm, (ETH_FD*)rfd); + ETH_shutdown(pcm, (ETH_FD*)c); UA_UNLOCK(&el->elMutex); return UA_STATUSCODE_GOOD; } static UA_StatusCode -ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, - const UA_KeyValueMap *params, UA_ByteString *buf) { - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; +ETH_sendWithConnection(UA_Connection *c, const UA_KeyValueMap params, + UA_ByteString *buf) { + if(!c) + return UA_STATUSCODE_BADCONNECTIONREJECTED; + ETH_FD *conn = (ETH_FD*)c; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop; + UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)c->cm; UA_LOCK(&el->elMutex); - /* Get the ETH_FD */ - UA_FD fd = (UA_FD)connectionId; - ETH_FD *conn = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); - if(!conn) { - UA_UNLOCK(&el->elMutex); - return UA_STATUSCODE_BADCONNECTIONREJECTED; - } - /* Uncover and set the Ethernet header */ buf->data -= conn->headerSize; buf->length += conn->headerSize; @@ -769,7 +730,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, int flags = MSG_NOSIGNAL; struct pollfd tmp_poll_fd; - tmp_poll_fd.fd = (UA_FD)connectionId; + tmp_poll_fd.fd = conn->rfd.fd; tmp_poll_fd.events = UA_POLLOUT; /* Send the full buffer. This may require several calls to send */ @@ -778,7 +739,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, ssize_t n = 0; do { UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "ETH %u\t| Attempting to send", (unsigned)connectionId); + "ETH %u\t| Attempting to send", c->identifier); size_t bytes_to_send = buf->length - nWritten; n = UA_sendto(conn->rfd.fd, (const char*)buf->data + nWritten, bytes_to_send, flags, (struct sockaddr*)&conn->sll, sizeof(conn->sll)); @@ -790,7 +751,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "ETH %u\t| Send failed with error %s", - (unsigned)connectionId, errno_str)); + c->identifier, errno_str)); ETH_shutdown(pcm, conn); UA_UNLOCK(&el->elMutex); UA_ByteString_clear(buf); @@ -806,7 +767,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "ETH %u\t| Send failed with error %s", - (unsigned)connectionId, errno_str)); + c->identifier, errno_str)); ETH_shutdown(pcm, conn); UA_UNLOCK(&el->elMutex); UA_ByteString_clear(buf); diff --git a/arch/eventloop_posix_interrupt.c b/arch/eventloop_posix_interrupt.c index e5ce5019beb..2b12c7be10c 100644 --- a/arch/eventloop_posix_interrupt.c +++ b/arch/eventloop_posix_interrupt.c @@ -25,6 +25,7 @@ typedef struct UA_RegisteredSignal { LIST_ENTRY(UA_RegisteredSignal) listPointers; + UA_EventSource *es; UA_InterruptCallback signalCallback; void *context; int signal; /* POSIX identifier of the interrupt signal */ @@ -87,7 +88,7 @@ handlePOSIXInterruptEvent(UA_EventSource *es, UA_RegisteredFD *rfd, short event) static void activateSignal(UA_RegisteredSignal *rs) { - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->rfd.es->eventLoop; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->es->eventLoop; UA_LOCK_ASSERT(&el->elMutex, 1); if(rs->active) @@ -139,7 +140,7 @@ activateSignal(UA_RegisteredSignal *rs) { static void deactivateSignal(UA_RegisteredSignal *rs) { - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->rfd.es->eventLoop; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->es->eventLoop; UA_LOCK_ASSERT(&el->elMutex, 1); /* Only dectivate if active */ @@ -315,7 +316,7 @@ registerPOSIXInterrupt(UA_InterruptManager *im, uintptr_t interruptHandle, } #ifdef UA_HAVE_EPOLL - rs->rfd.es = &im->eventSource; + rs->es = &im->eventSource; #endif rs->signal = (int)interruptHandle; rs->signalCallback = callback; diff --git a/arch/eventloop_posix_tcp.c b/arch/eventloop_posix_tcp.c index f661c48204e..6753aa97ce3 100644 --- a/arch/eventloop_posix_tcp.c +++ b/arch/eventloop_posix_tcp.c @@ -25,16 +25,8 @@ static UA_KeyValueRestriction TCPConfigParameters[TCP_PARAMETERSSIZE] = { {{0, UA_STRING_STATIC("validate")}, &UA_TYPES[UA_TYPES_BOOLEAN], false, true, false} }; -typedef struct { - UA_RegisteredFD rfd; - - UA_ConnectionManager_connectionCallback applicationCB; - void *application; - void *context; -} TCP_FD; - static void -TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn); +TCP_shutdown(UA_ConnectionManager *cm, UA_RegisteredFD *conn); /* Do not merge packets on the socket (disable Nagle's algorithm) */ static UA_StatusCode @@ -64,40 +56,38 @@ TCP_delayedClose(void *application, void *context) { UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)application; UA_ConnectionManager *cm = &pcm->cm; UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; - TCP_FD *conn = (TCP_FD*)context; + UA_RegisteredFD *conn = (UA_RegisteredFD*)context; UA_LOCK(&el->elMutex); UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP, "TCP %u\t| Delayed closing of the connection", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Deregister from the EventLoop */ - UA_EventLoopPOSIX_deregisterFD(el, &conn->rfd); + UA_EventLoopPOSIX_deregisterFD(el, conn); /* Deregister internally */ - ZIP_REMOVE(UA_FDTree, &pcm->fds, &conn->rfd); + ZIP_REMOVE(UA_FDTree, &pcm->fds, conn); UA_assert(pcm->fdsSize > 0); pcm->fdsSize--; /* Signal closing to the application */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_CLOSING, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + conn->c.callback(&conn->c, UA_CONNECTIONSTATE_CLOSING, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); /* Close the socket */ - int ret = UA_close(conn->rfd.fd); + int ret = UA_close(conn->fd); if(ret == 0) { UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "TCP %u\t| Socket closed", (unsigned)conn->rfd.fd); + "TCP %u\t| Socket closed", (unsigned)conn->fd); } else { UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Could not close the socket (%s)", - (unsigned)conn->rfd.fd, errno_str)); + (unsigned)conn->fd, errno_str)); } UA_free(conn); @@ -109,14 +99,14 @@ TCP_delayedClose(void *application, void *context) { } static int -getSockError(TCP_FD *conn) { +getSockError(UA_RegisteredFD *conn) { int error = 0; #ifndef _WIN32 socklen_t errlen = sizeof(int); - int err = getsockopt(conn->rfd.fd, SOL_SOCKET, SO_ERROR, &error, &errlen); + int err = getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &error, &errlen); #else int errlen = (int)sizeof(int); - int err = getsockopt((SOCKET)conn->rfd.fd, SOL_SOCKET, SO_ERROR, + int err = getsockopt((SOCKET)conn->fd, SOL_SOCKET, SO_ERROR, (char*)&error, &errlen); #endif return (err == 0) ? error : err; @@ -124,20 +114,20 @@ getSockError(TCP_FD *conn) { /* Gets called when a connection socket opens, receives data or closes */ static void -TCP_connectionSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, +TCP_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *conn, short event) { UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; UA_LOCK_ASSERT(&el->elMutex, 1); UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Activity on the socket", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Error. The connection has closed. */ if(event == UA_FDEVENT_ERR) { UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| The connection closes with error %i", - (unsigned)conn->rfd.fd, getSockError(conn)); + (unsigned)conn->fd, getSockError(conn)); TCP_shutdown(cm, conn); return; } @@ -151,32 +141,30 @@ TCP_connectionSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, if(error != 0) { UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| The connection closes with error %i", - (unsigned)conn->rfd.fd, error); + (unsigned)conn->fd, error); TCP_shutdown(cm, conn); return; } UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Opening a new connection", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Now we are interested in read-events. */ - conn->rfd.listenEvents = UA_FDEVENT_IN; - UA_EventLoopPOSIX_modifyFD(el, &conn->rfd); + conn->listenEvents = UA_FDEVENT_IN; + UA_EventLoopPOSIX_modifyFD(el, conn); /* A new socket has opened. Signal it to the application. */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_ESTABLISHED, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + conn->c.callback(&conn->c, UA_CONNECTIONSTATE_ESTABLISHED, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); return; } UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Allocate receive buffer", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Use the already allocated receive-buffer */ UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; @@ -184,10 +172,10 @@ TCP_connectionSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, /* Receive */ #ifndef _WIN32 - ssize_t ret = UA_recv(conn->rfd.fd, (char*)response.data, + ssize_t ret = UA_recv(conn->fd, (char*)response.data, response.length, MSG_DONTWAIT); #else - int ret = UA_recv(conn->rfd.fd, (char*)response.data, + int ret = UA_recv(conn->fd, (char*)response.data, response.length, MSG_DONTWAIT); #endif @@ -202,40 +190,38 @@ TCP_connectionSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| recv signaled the socket was shutdown (%s)", - (unsigned)conn->rfd.fd, errno_str)); + (unsigned)conn->fd, errno_str)); TCP_shutdown(cm, conn); return; } UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Received message of size %u", - (unsigned)conn->rfd.fd, (unsigned)ret); + (unsigned)conn->fd, (unsigned)ret); /* Callback to the application layer */ response.length = (size_t)ret; /* Set the length of the received buffer */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_ESTABLISHED, - &UA_KEYVALUEMAP_NULL, response); + conn->c.callback(&conn->c, UA_CONNECTIONSTATE_ESTABLISHED, + UA_KEYVALUEMAP_NULL, response); UA_LOCK(&el->elMutex); } /* Gets called when a new connection opens or if the listenSocket is closed */ static void -TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { +TCP_listenSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *conn, short event) { UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; UA_LOCK_ASSERT(&el->elMutex, 1); UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Callback on server socket", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Try to accept a new connection */ struct sockaddr_storage remote; socklen_t remote_size = sizeof(remote); - UA_FD newsockfd = accept(conn->rfd.fd, (struct sockaddr*)&remote, &remote_size); + UA_FD newsockfd = accept(conn->fd, (struct sockaddr*)&remote, &remote_size); if(newsockfd == UA_INVALID_FD) { /* Temporary error -- retry */ if(UA_ERRNO == UA_INTERRUPTED) @@ -246,7 +232,7 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error %s, closing the server socket", - (unsigned)conn->rfd.fd, errno_str)); + (unsigned)conn->fd, errno_str)); } TCP_shutdown(cm, conn); @@ -262,11 +248,11 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_WARNING(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| getnameinfo(...) could not resolve the " - "hostname (%s)", (unsigned)conn->rfd.fd, errno_str)); + "hostname (%s)", (unsigned)conn->fd, errno_str)); } UA_LOG_INFO(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Connection opened from \"%s\" via the server socket %u", - (unsigned)newsockfd, hoststr, (unsigned)conn->rfd.fd); + (unsigned)newsockfd, hoststr, (unsigned)conn->fd); /* Configure the new socket */ UA_StatusCode res = UA_STATUSCODE_GOOD; @@ -284,7 +270,7 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { } /* Allocate the UA_RegisteredFD */ - TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + UA_RegisteredFD *newConn = (UA_RegisteredFD*)UA_calloc(1, sizeof(UA_RegisteredFD)); if(!newConn) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error allocating memory for the socket", @@ -293,16 +279,17 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { return; } - newConn->rfd.fd = newsockfd; - newConn->rfd.listenEvents = UA_FDEVENT_IN; - newConn->rfd.es = &cm->eventSource; - newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; - newConn->applicationCB = conn->applicationCB; - newConn->application = conn->application; - newConn->context = conn->context; + newConn->fd = newsockfd; + newConn->listenEvents = UA_FDEVENT_IN; + newConn->eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; + newConn->c.callback = conn->c.callback; + newConn->c.application = conn->c.application; + newConn->c.context = conn->c.context; + newConn->c.identifier = (UA_UInt32)newsockfd; + newConn->c.cm = &pcm->cm; /* Register in the EventLoop. Signal to the user if registering failed. */ - res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + res = UA_EventLoopPOSIX_registerFD(el, newConn); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error registering the socket", @@ -313,7 +300,7 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { } /* Register internally in the EventSource */ - ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + ZIP_INSERT(UA_FDTree, &pcm->fds, newConn); pcm->fdsSize++; /* Forward the remote hostname to the application */ @@ -328,10 +315,8 @@ TCP_listenSocketCallback(UA_ConnectionManager *cm, TCP_FD *conn, short event) { /* The socket has opened. Signal it to the application. */ UA_UNLOCK(&el->elMutex); - newConn->applicationCB(cm, (uintptr_t)newsockfd, - newConn->application, &newConn->context, - UA_CONNECTIONSTATE_ESTABLISHED, - &kvm, UA_BYTESTRING_NULL); + newConn->c.callback(&newConn->c, UA_CONNECTIONSTATE_ESTABLISHED, + kvm, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); } @@ -445,7 +430,7 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai, } /* Allocate the connection */ - TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + UA_RegisteredFD *newConn = (UA_RegisteredFD*)UA_calloc(1, sizeof(UA_RegisteredFD)); if(!newConn) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error allocating memory for the socket", @@ -454,16 +439,17 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai, return UA_STATUSCODE_BADINTERNALERROR; } - newConn->rfd.fd = listenSocket; - newConn->rfd.listenEvents = UA_FDEVENT_IN; - newConn->rfd.es = &pcm->cm.eventSource; - newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_listenSocketCallback; - newConn->applicationCB = connectionCallback; - newConn->application = application; - newConn->context = context; + newConn->fd = listenSocket; + newConn->listenEvents = UA_FDEVENT_IN; + newConn->eventSourceCB = (UA_FDCallback)TCP_listenSocketCallback; + newConn->c.callback = connectionCallback; + newConn->c.application = application; + newConn->c.context = context; + newConn->c.identifier = (UA_UInt32)listenSocket; + newConn->c.cm = &pcm->cm; /* Register in the EventLoop */ - UA_StatusCode res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + UA_StatusCode res = UA_EventLoopPOSIX_registerFD(el, newConn); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error registering the socket", @@ -474,7 +460,7 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai, } /* Register internally */ - ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + ZIP_INSERT(UA_FDTree, &pcm->fds, newConn); pcm->fdsSize++; /* Set up the callback parameters */ @@ -494,10 +480,8 @@ TCP_registerListenSocket(UA_POSIXConnectionManager *pcm, struct addrinfo *ai, /* Announce the listen-socket in the application */ UA_UNLOCK(&el->elMutex); - connectionCallback(&pcm->cm, (uintptr_t)listenSocket, - application, &newConn->context, - UA_CONNECTIONSTATE_ESTABLISHED, - ¶mMap, UA_BYTESTRING_NULL); + connectionCallback(&newConn->c, UA_CONNECTIONSTATE_ESTABLISHED, + paramMap, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); return UA_STATUSCODE_GOOD; @@ -556,25 +540,25 @@ TCP_registerListenSockets(UA_POSIXConnectionManager *pcm, const char *hostname, /* Close the connection via a delayed callback */ static void -TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn) { +TCP_shutdown(UA_ConnectionManager *cm, UA_RegisteredFD *conn) { /* Already closing - nothing to do */ UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; UA_LOCK_ASSERT(&el->elMutex, 1); - if(conn->rfd.dc.callback) { + if(conn->dc.callback) { UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Cannot shutdown - already triggered", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); return; } UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Shutdown triggered", - (unsigned)conn->rfd.fd); + (unsigned)conn->fd); /* Add to the delayed callback list. Will be cleaned up in the next * iteration. */ - UA_DelayedCallback *dc = &conn->rfd.dc; + UA_DelayedCallback *dc = &conn->dc; dc->callback = TCP_delayedClose; dc->application = cm; dc->context = conn; @@ -585,39 +569,39 @@ TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn) { } static UA_StatusCode -TCP_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) { - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)cm->eventSource.eventLoop; +TCP_shutdownConnection(UA_Connection *c) { + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)c->cm->eventSource.eventLoop; UA_LOCK(&el->elMutex); - UA_FD fd = (UA_FD)connectionId; - TCP_FD *conn = (TCP_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); + UA_RegisteredFD *conn = (UA_RegisteredFD*)c; if(!conn) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP\t| Cannot close TCP connection %u - not found", - (unsigned)connectionId); + c->identifier); UA_UNLOCK(&el->elMutex); return UA_STATUSCODE_BADNOTFOUND; } - TCP_shutdown(cm, conn); + TCP_shutdown(c->cm, conn); UA_UNLOCK(&el->elMutex); return UA_STATUSCODE_GOOD; } static UA_StatusCode -TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, - const UA_KeyValueMap *params, UA_ByteString *buf) { +TCP_sendWithConnection(UA_Connection *c, const UA_KeyValueMap params, + UA_ByteString *buf) { /* Don't have a lock and don't take a lock. As the connectionId is the fd, * no need to to a lookup and access internal data strucures. */ - UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)cm->eventSource.eventLoop)->elMutex, 0); + UA_LOCK_ASSERT(&((UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop)->elMutex, 0); + UA_RegisteredFD *conn = (UA_RegisteredFD*)c; + UA_EventLoop *el = c->cm->eventSource.eventLoop; /* Prevent OS signals when sending to a closed socket */ int flags = MSG_NOSIGNAL; struct pollfd tmp_poll_fd; - tmp_poll_fd.fd = (UA_FD)connectionId; + tmp_poll_fd.fd = conn->fd; tmp_poll_fd.events = UA_POLLOUT; /* Send the full buffer. This may require several calls to send */ @@ -625,10 +609,10 @@ TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, do { ssize_t n = 0; do { - UA_LOG_DEBUG(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, - "TCP %u\t| Attempting to send", (unsigned)connectionId); + UA_LOG_DEBUG(el->logger, UA_LOGCATEGORY_NETWORK, + "TCP %u\t| Attempting to send", c->identifier); size_t bytes_to_send = buf->length - nWritten; - n = UA_send((UA_FD)connectionId, + n = UA_send((UA_FD)conn->fd, (const char*)buf->data + nWritten, bytes_to_send, flags); if(n < 0) { @@ -656,10 +640,10 @@ TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, shutdown: UA_LOG_SOCKET_ERRNO_WRAP( - UA_LOG_ERROR(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK, + UA_LOG_ERROR(el->logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Send failed with error %s", - (unsigned)connectionId, errno_str)); - TCP_shutdownConnection(cm, connectionId); + c->identifier, errno_str)); + TCP_shutdownConnection(c); UA_ByteString_clear(buf); return UA_STATUSCODE_BADCONNECTIONCLOSED; } @@ -813,7 +797,7 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p } /* Allocate the UA_RegisteredFD */ - TCP_FD *newConn = (TCP_FD*)UA_calloc(1, sizeof(TCP_FD)); + UA_RegisteredFD *newConn = (UA_RegisteredFD*)UA_calloc(1, sizeof(UA_RegisteredFD)); if(!newConn) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP %u\t| Error allocating memory for the socket", @@ -822,17 +806,18 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p return UA_STATUSCODE_BADOUTOFMEMORY; } - newConn->rfd.fd = newSock; - newConn->rfd.es = &pcm->cm.eventSource; - newConn->rfd.eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; - newConn->rfd.listenEvents = UA_FDEVENT_OUT; /* Switched to _IN once the + newConn->fd = newSock; + newConn->eventSourceCB = (UA_FDCallback)TCP_connectionSocketCallback; + newConn->listenEvents = UA_FDEVENT_OUT; /* Switched to _IN once the * connection is open */ - newConn->applicationCB = connectionCallback; - newConn->application = application; - newConn->context = context; + newConn->c.callback = connectionCallback; + newConn->c.application = application; + newConn->c.context = context; + newConn->c.identifier = (UA_UInt32)newSock; + newConn->c.cm = &pcm->cm; /* Register the fd to trigger when output is possible (the connection is open) */ - res = UA_EventLoopPOSIX_registerFD(el, &newConn->rfd); + res = UA_EventLoopPOSIX_registerFD(el, newConn); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "TCP\t| Registering the socket to connect to %s failed", hostname); @@ -842,7 +827,7 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p } /* Register internally in the EventSource */ - ZIP_INSERT(UA_FDTree, &pcm->fds, &newConn->rfd); + ZIP_INSERT(UA_FDTree, &pcm->fds, newConn); pcm->fdsSize++; UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, @@ -851,17 +836,15 @@ TCP_openActiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *p /* Signal the new connection to the application as asynchonously opening */ UA_UNLOCK(&el->elMutex); - connectionCallback(&pcm->cm, (uintptr_t)newSock, - application, &newConn->context, - UA_CONNECTIONSTATE_OPENING, &UA_KEYVALUEMAP_NULL, - UA_BYTESTRING_NULL); + connectionCallback(&newConn->c, UA_CONNECTIONSTATE_OPENING, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); return UA_STATUSCODE_GOOD; } static UA_StatusCode -TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, +TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap params, void *application, void *context, UA_ConnectionManager_connectionCallback connectionCallback) { UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; @@ -880,7 +863,7 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, UA_StatusCode res = UA_KeyValueRestriction_validate(el->eventLoop.logger, "TCP", &TCPConfigParameters[1], - TCP_PARAMETERSSIZE-1, params); + TCP_PARAMETERSSIZE-1, ¶ms); if(res != UA_STATUSCODE_GOOD) { UA_UNLOCK(&el->elMutex); return res; @@ -889,17 +872,17 @@ TCP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Listen or active connection? */ UA_Boolean listen = false; const UA_Boolean *listenParam = (const UA_Boolean*) - UA_KeyValueMap_getScalar(params, + UA_KeyValueMap_getScalar(¶ms, TCPConfigParameters[TCP_PARAMINDEX_LISTEN].name, &UA_TYPES[UA_TYPES_BOOLEAN]); if(listenParam) listen = *listenParam; if(listen) { - res = TCP_openPassiveConnection(pcm, params, application, + res = TCP_openPassiveConnection(pcm, ¶ms, application, context, connectionCallback); } else { - res = TCP_openActiveConnection(pcm, params, application, + res = TCP_openActiveConnection(pcm, ¶ms, application, context, connectionCallback); } @@ -949,7 +932,7 @@ TCP_eventSourceStart(UA_ConnectionManager *cm) { static void * TCP_shutdownCB(void *application, UA_RegisteredFD *rfd) { UA_ConnectionManager *cm = (UA_ConnectionManager*)application; - TCP_shutdown(cm, (TCP_FD*)rfd); + TCP_shutdown(cm, rfd); return NULL; } diff --git a/arch/eventloop_posix_udp.c b/arch/eventloop_posix_udp.c index 913611a7422..5979f8defa4 100644 --- a/arch/eventloop_posix_udp.c +++ b/arch/eventloop_posix_udp.c @@ -46,10 +46,6 @@ static UA_KeyValueRestriction UDPConfigParameters[UDP_PARAMETERSSIZE] = { typedef struct { UA_RegisteredFD rfd; - UA_ConnectionManager_connectionCallback applicationCB; - void *application; - void *context; - struct sockaddr_storage sendAddr; #ifdef _WIN32 size_t sendAddrLength; @@ -505,10 +501,8 @@ UDP_close(UA_POSIXConnectionManager *pcm, UDP_FD *conn) { /* Signal closing to the application */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(&pcm->cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_CLOSING, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_CLOSING, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); /* Close the socket */ @@ -632,10 +626,8 @@ UDP_connectionSocketCallback(UA_POSIXConnectionManager *pcm, UDP_FD *conn, /* Callback to the application layer */ UA_UNLOCK(&el->elMutex); - conn->applicationCB(&pcm->cm, (uintptr_t)conn->rfd.fd, - conn->application, &conn->context, - UA_CONNECTIONSTATE_ESTABLISHED, - &kvm, response); + conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED, + kvm, response); UA_LOCK(&el->elMutex); } @@ -755,12 +747,13 @@ UDP_registerListenSocket(UA_POSIXConnectionManager *pcm, UA_UInt16 port, } newudpfd->rfd.fd = listenSocket; - newudpfd->rfd.es = &pcm->cm.eventSource; newudpfd->rfd.listenEvents = UA_FDEVENT_IN; newudpfd->rfd.eventSourceCB = (UA_FDCallback)UDP_connectionSocketCallback; - newudpfd->applicationCB = connectionCallback; - newudpfd->application = application; - newudpfd->context = context; + newudpfd->rfd.c.callback = connectionCallback; + newudpfd->rfd.c.application = application; + newudpfd->rfd.c.context = context; + newudpfd->rfd.c.identifier = (UA_UInt32)listenSocket; + newudpfd->rfd.c.cm = &pcm->cm; /* Register in the EventLoop */ res = UA_EventLoopPOSIX_registerFD(el, &newudpfd->rfd); @@ -779,10 +772,8 @@ UDP_registerListenSocket(UA_POSIXConnectionManager *pcm, UA_UInt16 port, /* Register the listen socket in the application */ UA_UNLOCK(&el->elMutex); - connectionCallback(&pcm->cm, (uintptr_t)newudpfd->rfd.fd, - application, &newudpfd->context, - UA_CONNECTIONSTATE_ESTABLISHED, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + connectionCallback(&newudpfd->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); return UA_STATUSCODE_GOOD; } @@ -862,37 +853,26 @@ UDP_shutdown(UA_ConnectionManager *cm, UA_RegisteredFD *rfd) { } static UA_StatusCode -UDP_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) { - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)cm->eventSource.eventLoop; - UA_FD fd = (UA_FD)connectionId; +UDP_shutdownConnection(UA_Connection *c) { + if(!c) + return UA_STATUSCODE_BADNOTFOUND; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)c->cm->eventSource.eventLoop; + UDP_FD *conn = (UDP_FD*)c; UA_LOCK(&el->elMutex); - UA_RegisteredFD *rfd = ZIP_FIND(UA_FDTree, &pcm->fds, &fd); - if(!rfd) { - UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "UDP\t| Cannot close UDP connection %u - not found", - (unsigned)connectionId); - UA_UNLOCK(&el->elMutex); - return UA_STATUSCODE_BADNOTFOUND; - } - UDP_shutdown(cm, rfd); + UDP_shutdown(c->cm, &conn->rfd); UA_UNLOCK(&el->elMutex); return UA_STATUSCODE_GOOD; } static UA_StatusCode -UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, - const UA_KeyValueMap *params, +UDP_sendWithConnection(UA_Connection *c, const UA_KeyValueMap params, UA_ByteString *buf) { - UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; - UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop; + UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop; UA_LOCK(&el->elMutex); - /* Look up the registered UDP socket */ - UA_FD fd = (UA_FD)connectionId; - UDP_FD *conn = (UDP_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd); + UDP_FD *conn = (UDP_FD*)c; if(!conn) { UA_UNLOCK(&el->elMutex); UA_ByteString_clear(buf); @@ -905,12 +885,12 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, ssize_t n = 0; do { UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, - "UDP %u\t| Attempting to send", (unsigned)connectionId); + "UDP %u\t| Attempting to send", c->identifier); /* Prevent OS signals when sending to a closed socket */ int flags = MSG_NOSIGNAL; size_t bytes_to_send = buf->length - nWritten; - n = UA_sendto((UA_FD)connectionId, (const char*)buf->data + nWritten, + n = UA_sendto(conn->rfd.fd, (const char*)buf->data + nWritten, bytes_to_send, flags, (struct sockaddr*)&conn->sendAddr, conn->sendAddrLength); if(n < 0) { @@ -921,9 +901,9 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, UA_LOG_SOCKET_ERRNO_WRAP( UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "UDP %u\t| Send failed with error %s", - (unsigned)connectionId, errno_str)); + c->identifier, errno_str)); UA_UNLOCK(&el->elMutex); - UDP_shutdownConnection(cm, connectionId); + UDP_shutdownConnection(c); UA_ByteString_clear(buf); return UA_STATUSCODE_BADCONNECTIONCLOSED; } @@ -932,7 +912,7 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, * (blocking) */ int poll_ret; struct pollfd tmp_poll_fd; - tmp_poll_fd.fd = (UA_FD)connectionId; + tmp_poll_fd.fd = conn->rfd.fd; tmp_poll_fd.events = UA_POLLOUT; do { poll_ret = UA_poll(&tmp_poll_fd, 1, 100); @@ -941,9 +921,9 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK, "UDP %u\t| Send failed with error %s", - (unsigned)connectionId, errno_str)); - UDP_shutdownConnection(cm, connectionId); + conn->rfd.c.identifier, errno_str)); UA_UNLOCK(&el->elMutex); + UDP_shutdownConnection(c); UA_ByteString_clear(buf); return UA_STATUSCODE_BADCONNECTIONCLOSED; } @@ -1079,11 +1059,12 @@ UDP_openSendConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *par conn->rfd.fd = newSock; conn->rfd.listenEvents = 0; - conn->rfd.es = &pcm->cm.eventSource; conn->rfd.eventSourceCB = (UA_FDCallback)UDP_connectionSocketCallback; - conn->applicationCB = connectionCallback; - conn->application = application; - conn->context = context; + conn->rfd.c.callback = connectionCallback; + conn->rfd.c.application = application; + conn->rfd.c.context = context; + conn->rfd.c.identifier = (UA_UInt32)newSock; + conn->rfd.c.cm = &pcm->cm; /* Register the fd to trigger when output is possible (the connection is open) */ res = UA_EventLoopPOSIX_registerFD(el, &conn->rfd); @@ -1106,9 +1087,8 @@ UDP_openSendConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap *par /* Signal the connection as opening. The connection fully opens in the next * iteration of the EventLoop */ UA_UNLOCK(&el->elMutex); - connectionCallback(&pcm->cm, (uintptr_t)newSock, application, - &conn->context, UA_CONNECTIONSTATE_ESTABLISHED, - &UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); + connectionCallback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED, + UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL); UA_LOCK(&el->elMutex); return UA_STATUSCODE_GOOD; @@ -1167,7 +1147,7 @@ UDP_openReceiveConnection(UA_POSIXConnectionManager *pcm, const UA_KeyValueMap * } static UA_StatusCode -UDP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, +UDP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap params, void *application, void *context, UA_ConnectionManager_connectionCallback connectionCallback) { UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm; @@ -1186,7 +1166,7 @@ UDP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, UA_StatusCode res = UA_KeyValueRestriction_validate(el->eventLoop.logger, "UDP", &UDPConfigParameters[1], - UDP_PARAMETERSSIZE-1, params); + UDP_PARAMETERSSIZE-1, ¶ms); if(res != UA_STATUSCODE_GOOD) { UA_UNLOCK(&el->elMutex); return res; @@ -1194,23 +1174,24 @@ UDP_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, UA_Boolean validate = false; const UA_Boolean *validationValue = (const UA_Boolean*) - UA_KeyValueMap_getScalar(params, UDPConfigParameters[UDP_PARAMINDEX_VALIDATE].name, + UA_KeyValueMap_getScalar(¶ms, + UDPConfigParameters[UDP_PARAMINDEX_VALIDATE].name, &UA_TYPES[UA_TYPES_BOOLEAN]); if(validationValue) validate = *validationValue; UA_Boolean listen = false; const UA_Boolean *listenValue = (const UA_Boolean*) - UA_KeyValueMap_getScalar(params, UDPConfigParameters[UDP_PARAMINDEX_LISTEN].name, + UA_KeyValueMap_getScalar(¶ms, UDPConfigParameters[UDP_PARAMINDEX_LISTEN].name, &UA_TYPES[UA_TYPES_BOOLEAN]); if(listenValue) listen = *listenValue; if(listen) { - res = UDP_openReceiveConnection(pcm, params, application, context, + res = UDP_openReceiveConnection(pcm, ¶ms, application, context, connectionCallback, validate); } else { - res = UDP_openSendConnection(pcm, params, application, context, + res = UDP_openSendConnection(pcm, ¶ms, application, context, connectionCallback, validate); } UA_UNLOCK(&el->elMutex); diff --git a/include/open62541/plugin/eventloop.h b/include/open62541/plugin/eventloop.h index b97a9a503e7..b8d73d4724b 100644 --- a/include/open62541/plugin/eventloop.h +++ b/include/open62541/plugin/eventloop.h @@ -22,6 +22,9 @@ typedef struct UA_EventLoop UA_EventLoop; struct UA_EventSource; typedef struct UA_EventSource UA_EventSource; +struct UA_Connection; +typedef struct UA_Connection UA_Connection; + struct UA_ConnectionManager; typedef struct UA_ConnectionManager UA_ConnectionManager; @@ -255,7 +258,11 @@ struct UA_EventSource { * it can keep a session to an MQTT broker open which is used by individual * connections that are each bound to an MQTT topic. */ -/* The ConnectionCallback is the only interface from the connection back to +/** + * Connection Callback + * ~~~~~~~~~~~~~~~~~~~ + * + * The ConnectionCallback is the only interface from the connection back to * the application. * * - The connectionId is initially unknown to the target application and @@ -276,11 +283,35 @@ struct UA_EventSource { * * - The msg ByteString is the message (or packet) received on the * connection. Can be empty. */ + typedef void (*UA_ConnectionManager_connectionCallback) - (UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, UA_ConnectionState state, - const UA_KeyValueMap *params, UA_ByteString msg); + (UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg); + +/** + * Connection + * ~~~~~~~~~~ + * + * A connection is created by the ConnectionManager based on the given + * parameters. A new connection is announced to the application via the supplied + * callback method-pointer. The user is free to modify application, context and + * callback. When the callback is called with a CLOSING connection state, then + * the connection pointer must not be accessed afterwads. + */ + +struct UA_Connection { + UA_UInt32 identifier; + UA_ConnectionManager *cm; + void *application; + void *context; + UA_ConnectionManager_connectionCallback callback; +}; + +/** + * Connection Manager + * ~~~~~~~~~~~~~~~~~~ + */ struct UA_ConnectionManager { /* Every ConnectionManager is treated like an EventSource from the @@ -327,9 +358,9 @@ struct UA_ConnectionManager { * hostname. Each protocol implementation documents whether multiple * connections might be opened at once. */ UA_StatusCode - (*openConnection)(UA_ConnectionManager *cm, const UA_KeyValueMap *params, + (*openConnection)(UA_ConnectionManager *cm, const UA_KeyValueMap params, void *application, void *context, - UA_ConnectionManager_connectionCallback connectionCallback); + UA_ConnectionManager_connectionCallback callback); /* Send a message over a Connection * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -341,8 +372,8 @@ struct UA_ConnectionManager { * Some ConnectionManagers can accept additional parameters for sending. For * example a tx-time for sending in time-synchronized TSN settings. */ UA_StatusCode - (*sendWithConnection)(UA_ConnectionManager *cm, uintptr_t connectionId, - const UA_KeyValueMap *params, UA_ByteString *buf); + (*sendWithConnection)(UA_Connection *connection, const UA_KeyValueMap params, + UA_ByteString *buf); /* Close a Connection * ~~~~~~~~~~~~~~~~~~ @@ -352,7 +383,7 @@ struct UA_ConnectionManager { * that are actively closed and those that are closed remotely. The return * code is non-good only if the connection is already closed. */ UA_StatusCode - (*closeConnection)(UA_ConnectionManager *cm, uintptr_t connectionId); + (*closeConnection)(UA_Connection *connection); /* Buffer Management * ~~~~~~~~~~~~~~~~~ @@ -361,11 +392,10 @@ struct UA_ConnectionManager { * connectionId is part of the API to enable cases where memory is * statically allocated for every connection */ UA_StatusCode - (*allocNetworkBuffer)(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf, size_t bufSize); + (*allocNetworkBuffer)(UA_Connection *connection, UA_ByteString *buf, + size_t bufSize); void - (*freeNetworkBuffer)(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf); + (*freeNetworkBuffer)(UA_Connection *connection, UA_ByteString *buf); }; /** diff --git a/src/client/ua_client_connect.c b/src/client/ua_client_connect.c index 51960e763b8..5afbb32e59a 100644 --- a/src/client/ua_client_connect.c +++ b/src/client/ua_client_connect.c @@ -392,14 +392,14 @@ processACKResponse(UA_Client *client, const UA_ByteString *chunk) { static UA_StatusCode sendHELMessage(UA_Client *client) { - UA_ConnectionManager *cm = client->channel.connectionManager; if(!UA_SecureChannel_isConnected(&client->channel)) return UA_STATUSCODE_BADNOTCONNECTED; + UA_Connection *c = client->channel.connection; + UA_ConnectionManager *cm = c->cm; /* Get a buffer */ UA_ByteString message; - UA_StatusCode retval = cm->allocNetworkBuffer(cm, client->channel.connectionId, - &message, UA_MINMESSAGESIZE); + UA_StatusCode retval = cm->allocNetworkBuffer(c, &message, UA_MINMESSAGESIZE); if(retval != UA_STATUSCODE_GOOD) return retval; @@ -433,14 +433,13 @@ sendHELMessage(UA_Client *client) { &UA_TRANSPORT[UA_TRANSPORT_TCPMESSAGEHEADER], &bufPos, &bufEnd, NULL, NULL); if(retval != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, client->channel.connectionId, &message); + cm->freeNetworkBuffer(c, &message); return retval; } /* Send the HEL message */ message.length = messageHeader.messageSize; - retval = cm->sendWithConnection(cm, client->channel.connectionId, - &UA_KEYVALUEMAP_NULL, &message); + retval = cm->sendWithConnection(c, UA_KEYVALUEMAP_NULL, &message); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT, "Sending HEL failed"); closeSecureChannel(client); @@ -1416,19 +1415,17 @@ verifyClientApplicationURI(const UA_Client *client) { } static void -__Client_networkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { +__Client_networkCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { /* Take the client lock */ - UA_Client *client = (UA_Client*)application; + UA_Client *client = (UA_Client*)connection->application; UA_LOCK(&client->clientMutex); UA_LOG_TRACE(&client->config.logger, UA_LOGCATEGORY_CLIENT, "Client network callback"); /* A new connection is not yet registered */ - if(!*connectionContext) { + if(!connection->context) { /* Opening the connection failed. The client cannot recover from this. */ if(state != UA_CONNECTIONSTATE_OPENING && state != UA_CONNECTIONSTATE_ESTABLISHED) { @@ -1446,9 +1443,8 @@ __Client_networkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } /* Initialize the client connection and attach to the EventLoop connection */ - client->channel.connectionManager = cm; - client->channel.connectionId = connectionId; - *connectionContext = &client->channel; + client->channel.connection = connection; + connection->context = &client->channel; /* If the connection is not fully established we still save the * connectionId in the client now so that the connection can be closed @@ -1621,7 +1617,7 @@ initConnect(UA_Client *client) { /* Open the client TCP connection */ UA_UNLOCK(&client->clientMutex); UA_StatusCode res = - cm->openConnection(cm, ¶mMap, client, NULL, __Client_networkCallback); + cm->openConnection(cm, paramMap, client, NULL, __Client_networkCallback); UA_LOCK(&client->clientMutex); if(res == UA_STATUSCODE_GOOD) break; @@ -1859,28 +1855,28 @@ UA_Client_activateSessionAsync(UA_Client *client, } static void -__Client_reverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { - - UA_Client *client = (UA_Client*)application; +__Client_reverseConnectCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + if(!connection->application) + return; + UA_Client *client = (UA_Client*)connection->application; + UA_ConnectionManager *cm = connection->cm; UA_LOCK(&client->clientMutex); /* This is the first call for the listening socket, attach the * REVERSE_CONNECT_INDICATOR marker and set the ID to the channel */ - if(!client->channel.connectionId) { - client->channel.connectionId = connectionId; - *connectionContext = REVERSE_CONNECT_INDICATOR; + if(!client->channel.connection) { + client->channel.connection = connection; + connection->context = REVERSE_CONNECT_INDICATOR; } /* Last call for the listening connection while it is being closed. Only * notify a state change if no reverse connection is being or has been * established by now */ - if(*connectionContext == REVERSE_CONNECT_INDICATOR && + if(connection->context == REVERSE_CONNECT_INDICATOR && state == UA_CONNECTIONSTATE_CLOSING) { - if(client->channel.connectionId == connectionId) { + if(client->channel.connection == connection) { client->channel.state = UA_SECURECHANNELSTATE_CLOSED; notifyClientState(client); } @@ -1890,26 +1886,25 @@ __Client_reverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId /* Second callback for the listening socket, it is now listening for * incoming connections */ - if(client->channel.connectionId == connectionId && - *connectionContext == REVERSE_CONNECT_INDICATOR) { + if(client->channel.connection == connection && + connection->context == REVERSE_CONNECT_INDICATOR) { client->channel.state = UA_SECURECHANNELSTATE_REVERSE_LISTENING; notifyClientState(client); } /* This is a connection initiated by a server, disconnect the listener and * reset secure channel information */ - if(client->channel.connectionId != connectionId) { - cm->closeConnection(cm, client->channel.connectionId); - client->channel.connectionId = 0; - *connectionContext = NULL; + if(client->channel.connection != connection) { + cm->closeConnection(connection); + client->channel.connection = NULL; + connection->context = NULL; } /* Forward all calls belonging to the reverse connection estblished by the * server to the regular network callback */ - if(*connectionContext != REVERSE_CONNECT_INDICATOR) { + if(connection->context != REVERSE_CONNECT_INDICATOR) { UA_UNLOCK(&client->clientMutex); - __Client_networkCallback(cm, connectionId, application, - connectionContext, state, params, msg); + __Client_networkCallback(connection, state, params, msg); return; } @@ -1941,7 +1936,7 @@ UA_Client_startListeningForReverseConnect(UA_Client *client, client->channel.config = client->config.localConnectionConfig; client->channel.certificateVerification = &client->config.certificateVerification; client->channel.processOPNHeader = verifyClientSecurechannelHeader; - client->channel.connectionId = 0; + client->channel.connection = NULL; initSecurityPolicy(client); if(client->connectStatus != UA_STATUSCODE_GOOD) @@ -1978,8 +1973,6 @@ UA_Client_startListeningForReverseConnect(UA_Client *client, return UA_STATUSCODE_BADINTERNALERROR; } - client->channel.connectionManager = cm; - UA_KeyValuePair params[3]; bool booleanTrue = true; params[0].key = UA_QUALIFIEDNAME(0, "port"); @@ -1995,7 +1988,7 @@ UA_Client_startListeningForReverseConnect(UA_Client *client, paramMap.mapSize = 3; UA_UNLOCK(&client->clientMutex); - res = cm->openConnection(cm, ¶mMap, client, NULL, __Client_reverseConnectCallback); + res = cm->openConnection(cm, paramMap, client, NULL, __Client_reverseConnectCallback); UA_LOCK(&client->clientMutex); /* Opening the TCP connection failed */ diff --git a/src/pubsub/ua_pubsub.h b/src/pubsub/ua_pubsub.h index 18a981311c2..3979dd49cc7 100644 --- a/src/pubsub/ua_pubsub.h +++ b/src/pubsub/ua_pubsub.h @@ -207,10 +207,9 @@ typedef struct UA_PubSubConnection { * defines additional connection properties. For example an MQTT topic name * or QoS parameters. In that case a dedicated NetworkCallback is used that * takes this ReaderGroup/WriterGroup directly as context. */ - UA_ConnectionManager *cm; - uintptr_t recvChannels[UA_PUBSUB_MAXCHANNELS]; + UA_Connection *recvChannels[UA_PUBSUB_MAXCHANNELS]; size_t recvChannelsSize; - uintptr_t sendChannel; + UA_Connection *sendChannel; size_t writerGroupsSize; LIST_HEAD(, UA_WriterGroup) writerGroups; @@ -405,7 +404,7 @@ struct UA_WriterGroup { /* The ConnectionManager pointer is stored in the Connection. The channels * are either stored here or in the Connection, but never both. */ UA_PubSubConnection *linkedConnection; - uintptr_t sendChannel; + UA_Connection *sendChannel; UA_Boolean deleteFlag; #ifdef UA_ENABLE_PUBSUB_ENCRYPTION @@ -647,7 +646,7 @@ struct UA_ReaderGroup { /* The ConnectionManager pointer is stored in the Connection. The channels * are either stored here or in the Connection, but never both. */ UA_PubSubConnection *linkedConnection; - uintptr_t recvChannels[UA_PUBSUB_MAXCHANNELS]; + UA_Connection *recvChannels[UA_PUBSUB_MAXCHANNELS]; size_t recvChannelsSize; UA_Boolean deleteFlag; diff --git a/src/pubsub/ua_pubsub_eventloop.c b/src/pubsub/ua_pubsub_eventloop.c index 2e413c08fc7..913fdedd9e7 100644 --- a/src/pubsub/ua_pubsub_eventloop.c +++ b/src/pubsub/ua_pubsub_eventloop.c @@ -15,19 +15,29 @@ /********************/ static UA_StatusCode -UA_PubSubConnection_connectUDP(UA_Server *server, UA_PubSubConnection *c); +UA_PubSubConnection_connectUDP(UA_Server *server, + UA_ConnectionManager *cm, + UA_PubSubConnection *c); static UA_StatusCode -UA_PubSubConnection_connectETH(UA_Server *server, UA_PubSubConnection *c); +UA_PubSubConnection_connectETH(UA_Server *server, + UA_ConnectionManager *cm, + UA_PubSubConnection *c); static UA_StatusCode -UA_ReaderGroup_connectMQTT(UA_Server *server, UA_ReaderGroup *rg); +UA_ReaderGroup_connectMQTT(UA_Server *server, + UA_ConnectionManager *cm, + UA_ReaderGroup *rg); static UA_StatusCode -UA_WriterGroup_connectMQTT(UA_Server *server, UA_WriterGroup *wg); +UA_WriterGroup_connectMQTT(UA_Server *server, + UA_ConnectionManager *cm, + UA_WriterGroup *wg); static UA_StatusCode -UA_WriterGroup_connectUDPUnicast(UA_Server *server, UA_WriterGroup *wg); +UA_WriterGroup_connectUDPUnicast(UA_Server *server, + UA_ConnectionManager *cm, + UA_WriterGroup *wg); #define UA_PUBSUB_PROFILES_SIZE 4 @@ -35,9 +45,15 @@ typedef struct { UA_String profileURI; UA_String protocol; UA_Boolean json; - UA_StatusCode (*connect)(UA_Server *server, UA_PubSubConnection *c); - UA_StatusCode (*connectWriterGroup)(UA_Server *server, UA_WriterGroup *wg); - UA_StatusCode (*connectReaderGroup)(UA_Server *server, UA_ReaderGroup *rg); + UA_StatusCode (*connect)(UA_Server *server, + UA_ConnectionManager *cm, + UA_PubSubConnection *c); + UA_StatusCode (*connectWriterGroup)(UA_Server *server, + UA_ConnectionManager *cm, + UA_WriterGroup *wg); + UA_StatusCode (*connectReaderGroup)(UA_Server *server, + UA_ConnectionManager *cm, + UA_ReaderGroup *rg); } ProfileMapping; static ProfileMapping transportProfiles[UA_PUBSUB_PROFILES_SIZE] = { @@ -77,15 +93,15 @@ getCM(UA_EventLoop *el, UA_String protocol) { static void UA_PubSubConnection_removeConnection(UA_PubSubConnection *c, - uintptr_t connectionId) { - if(c->sendChannel == connectionId) { - c->sendChannel = 0; + UA_Connection *connection) { + if(c->sendChannel == connection) { + c->sendChannel = NULL; return; } for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] != connectionId) + if(c->recvChannels[i] != connection) continue; - c->recvChannels[i] = 0; + c->recvChannels[i] = NULL; c->recvChannelsSize--; return; } @@ -93,26 +109,26 @@ UA_PubSubConnection_removeConnection(UA_PubSubConnection *c, static UA_StatusCode UA_PubSubConnection_addSendConnection(UA_PubSubConnection *c, - uintptr_t connectionId) { - if(c->sendChannel != 0 && c->sendChannel != connectionId) + UA_Connection *connection) { + if(c->sendChannel && c->sendChannel != connection) return UA_STATUSCODE_BADINTERNALERROR; - c->sendChannel = connectionId; + c->sendChannel = connection; return UA_STATUSCODE_GOOD; } static UA_StatusCode UA_PubSubConnection_addRecvConnection(UA_PubSubConnection *c, - uintptr_t connectionId) { + UA_Connection *connection) { for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] == connectionId) + if(c->recvChannels[i] == connection) return UA_STATUSCODE_GOOD; } if(c->recvChannelsSize >= UA_PUBSUB_MAXCHANNELS) return UA_STATUSCODE_BADINTERNALERROR; for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] != 0) + if(c->recvChannels[i]) continue; - c->recvChannels[i] = connectionId; + c->recvChannels[i] = connection; c->recvChannelsSize++; break; } @@ -121,27 +137,22 @@ UA_PubSubConnection_addRecvConnection(UA_PubSubConnection *c, void UA_PubSubConnection_disconnect(UA_PubSubConnection *c) { - if(!c->cm) - return; - if(c->sendChannel != 0) - c->cm->closeConnection(c->cm, c->sendChannel); + if(c->sendChannel) + c->sendChannel->cm->closeConnection(c->sendChannel); for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] != 0) - c->cm->closeConnection(c->cm, c->recvChannels[i]); + if(c->recvChannels[i]) + c->recvChannels[i]->cm->closeConnection(c->recvChannels[i]); } } static void -PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg, UA_Boolean recv) { - if(!connectionContext) - return; - +PubSubChannelCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg, UA_Boolean recv) { /* Get the context pointers */ - UA_Server *server = (UA_Server*)application; - UA_PubSubConnection *psc = (UA_PubSubConnection*)*connectionContext; + if(!connection->context) + return; + UA_Server *server = (UA_Server*)connection->application; + UA_PubSubConnection *psc = (UA_PubSubConnection*)connection->context; UA_LOCK(&server->serviceMutex); @@ -149,7 +160,7 @@ PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, * from that connection. Clean up the SecureChannel in the client. */ if(state == UA_CONNECTIONSTATE_CLOSING) { /* Reset the connection identifiers */ - UA_PubSubConnection_removeConnection(psc, connectionId); + UA_PubSubConnection_removeConnection(psc, connection); /* PSC marked for deletion and the last EventLoop connection has closed */ if(psc->deleteFlag && psc->recvChannelsSize == 0 && psc->sendChannel == 0) { @@ -171,13 +182,12 @@ PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, /* Store the connectionId (if a new connection) */ UA_StatusCode res = (recv) ? - UA_PubSubConnection_addRecvConnection(psc, connectionId) : - UA_PubSubConnection_addSendConnection(psc, connectionId); + UA_PubSubConnection_addRecvConnection(psc, connection) : + UA_PubSubConnection_addSendConnection(psc, connection); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING_CONNECTION(&server->config.logger, psc, "No more space for an additional EventLoop connection"); - if(psc->cm) - psc->cm->closeConnection(psc->cm, connectionId); + connection->cm->closeConnection(connection); UA_UNLOCK(&server->serviceMutex); return; } @@ -256,25 +266,20 @@ PubSubChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } static void -PubSubRecvChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { - PubSubChannelCallback(cm, connectionId, application, connectionContext, - state, params, msg, true); +PubSubRecvChannelCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + PubSubChannelCallback(connection, state, params, msg, true); } static void -PubSubSendChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { - PubSubChannelCallback(cm, connectionId, application, connectionContext, - state, params, msg, false); +PubSubSendChannelCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + PubSubChannelCallback(connection, state, params, msg, false); } static UA_StatusCode -UA_PubSubConnection_connectUDP(UA_Server *server, UA_PubSubConnection *c) { +UA_PubSubConnection_connectUDP(UA_Server *server, UA_ConnectionManager *cm, + UA_PubSubConnection *c) { UA_LOCK_ASSERT(&server->serviceMutex, 1); UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*) @@ -328,7 +333,7 @@ UA_PubSubConnection_connectUDP(UA_Server *server, UA_PubSubConnection *c) { } UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, c, PubSubRecvChannelCallback); + res = cm->openConnection(cm, kvm, server, c, PubSubRecvChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, @@ -368,7 +373,7 @@ UA_PubSubConnection_connectUDP(UA_Server *server, UA_PubSubConnection *c) { /* Open a send-channel */ listen = false; UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, c, PubSubSendChannelCallback); + res = cm->openConnection(cm, kvm, server, c, PubSubSendChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, @@ -380,7 +385,8 @@ UA_PubSubConnection_connectUDP(UA_Server *server, UA_PubSubConnection *c) { } static UA_StatusCode -UA_PubSubConnection_connectETH(UA_Server *server, UA_PubSubConnection *c) { +UA_PubSubConnection_connectETH(UA_Server *server, UA_ConnectionManager *cm, + UA_PubSubConnection *c) { UA_LOCK_ASSERT(&server->serviceMutex, 1); UA_NetworkAddressUrlDataType *addressUrl = (UA_NetworkAddressUrlDataType*) @@ -424,7 +430,7 @@ UA_PubSubConnection_connectETH(UA_Server *server, UA_PubSubConnection *c) { } UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, c, PubSubRecvChannelCallback); + res = cm->openConnection(cm, kvm, server, c, PubSubRecvChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, @@ -446,7 +452,7 @@ UA_PubSubConnection_connectETH(UA_Server *server, UA_PubSubConnection *c) { listen = false; UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, c, PubSubSendChannelCallback); + res = cm->openConnection(cm, kvm, server, c, PubSubSendChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, @@ -474,7 +480,7 @@ UA_PubSubConnection_connect(UA_Server *server, UA_PubSubConnection *c) { UA_ConnectionManager *cm = NULL; if(profile) cm = getCM(el, profile->protocol); - if(!cm || (c->cm && cm != c->cm)) { + if(!cm) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, "The requested protocol is not supported"); UA_PubSubConnection_setPubSubState(server, c, UA_PUBSUBSTATE_ERROR, @@ -482,7 +488,6 @@ UA_PubSubConnection_connect(UA_Server *server, UA_PubSubConnection *c) { return UA_STATUSCODE_BADINTERNALERROR; } - c->cm = cm; c->json = profile->json; /* Check the configuration address type */ @@ -496,7 +501,7 @@ UA_PubSubConnection_connect(UA_Server *server, UA_PubSubConnection *c) { /* Connect */ UA_StatusCode res = UA_STATUSCODE_GOOD; if(profile->connect) - res = profile->connect(server, c); + res = profile->connect(server, cm, c); if(res != UA_STATUSCODE_GOOD) UA_PubSubConnection_setPubSubState(server, c, UA_PUBSUBSTATE_ERROR, res); return res; @@ -507,25 +512,22 @@ UA_PubSubConnection_connect(UA_Server *server, UA_PubSubConnection *c) { /***************/ static void -WriterGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { - if(!connectionContext) - return; - +WriterGroupChannelCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { /* Get the context pointers */ - UA_Server *server = (UA_Server*)application; - UA_WriterGroup *wg = (UA_WriterGroup*)*connectionContext; + if(!connection->context) + return; + UA_Server *server = (UA_Server*)connection->application; + UA_WriterGroup *wg = (UA_WriterGroup*)connection->context; UA_LOCK(&server->serviceMutex); /* The connection is closing in the EventLoop. This is the last callback * from that connection. Clean up the SecureChannel in the client. */ if(state == UA_CONNECTIONSTATE_CLOSING) { - if(wg->sendChannel == connectionId) { + if(wg->sendChannel == connection) { /* Reset the connection channel */ - wg->sendChannel = 0; + wg->sendChannel = NULL; /* PSC marked for deletion and the last EventLoop connection has closed */ if(wg->deleteFlag) { @@ -547,13 +549,13 @@ WriterGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } /* Store the connectionId (if a new connection) */ - if(wg->sendChannel && wg->sendChannel != connectionId) { + if(wg->sendChannel && wg->sendChannel != connection) { UA_LOG_WARNING_WRITERGROUP(&server->config.logger, wg, "WriterGroup is already bound to a different channel"); UA_UNLOCK(&server->serviceMutex); return; } - wg->sendChannel = connectionId; + wg->sendChannel = connection; /* Connection open, set to operational if not already done */ if(wg->state != UA_PUBSUBSTATE_OPERATIONAL) @@ -565,7 +567,8 @@ WriterGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } static UA_StatusCode -UA_WriterGroup_connectUDPUnicast(UA_Server *server, UA_WriterGroup *wg) { +UA_WriterGroup_connectUDPUnicast(UA_Server *server, UA_ConnectionManager *cm, + UA_WriterGroup *wg) { UA_LOCK_ASSERT(&server->serviceMutex, 1); /* Already connected? */ @@ -631,9 +634,8 @@ UA_WriterGroup_connectUDPUnicast(UA_Server *server, UA_WriterGroup *wg) { } /* Connect */ - UA_ConnectionManager *cm = wg->linkedConnection->cm; UA_UNLOCK(&server->serviceMutex); - res = cm->openConnection(cm, &kvm, server, wg, WriterGroupChannelCallback); + res = cm->openConnection(cm, kvm, server, wg, WriterGroupChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, wg, @@ -643,7 +645,8 @@ UA_WriterGroup_connectUDPUnicast(UA_Server *server, UA_WriterGroup *wg) { } static UA_StatusCode -UA_WriterGroup_connectMQTT(UA_Server *server, UA_WriterGroup *wg) { +UA_WriterGroup_connectMQTT(UA_Server *server, UA_ConnectionManager *cm, + UA_WriterGroup *wg) { UA_LOCK_ASSERT(&server->serviceMutex, 1); UA_PubSubConnection *c = wg->linkedConnection; @@ -690,7 +693,7 @@ UA_WriterGroup_connectMQTT(UA_Server *server, UA_WriterGroup *wg) { /* Connect */ UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, wg, WriterGroupChannelCallback); + res = cm->openConnection(cm, kvm, server, wg, WriterGroupChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, wg, @@ -703,9 +706,8 @@ void UA_WriterGroup_disconnect(UA_WriterGroup *wg) { if(wg->sendChannel == 0) return; - UA_PubSubConnection *c = wg->linkedConnection; - c->cm->closeConnection(c->cm, c->sendChannel); - wg->sendChannel = 0; + wg->sendChannel->cm->closeConnection(wg->sendChannel); + wg->sendChannel = NULL; } UA_StatusCode @@ -743,7 +745,7 @@ UA_WriterGroup_connect(UA_Server *server, UA_WriterGroup *wg) { UA_ConnectionManager *cm = NULL; if(profile) cm = getCM(el, profile->protocol); - if(!cm || (c->cm && cm != c->cm)) { + if(!cm) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, "The requested protocol is not supported"); UA_PubSubConnection_setPubSubState(server, c, UA_PUBSUBSTATE_ERROR, @@ -751,13 +753,12 @@ UA_WriterGroup_connect(UA_Server *server, UA_WriterGroup *wg) { return UA_STATUSCODE_BADINTERNALERROR; } - c->cm = cm; c->json = profile->json; /* Connect */ UA_StatusCode res = UA_STATUSCODE_GOOD; if(profile->connectWriterGroup) - res = profile->connectWriterGroup(server, wg); + res = profile->connectWriterGroup(server, cm, wg); if(res != UA_STATUSCODE_GOOD) { UA_WriterGroup_setPubSubState(server, wg, UA_PUBSUBSTATE_ERROR, res); return res; @@ -779,11 +780,11 @@ UA_WriterGroup_connect(UA_Server *server, UA_WriterGroup *wg) { static void UA_ReaderGroup_removeConnection(UA_ReaderGroup *rg, - uintptr_t connectionId) { + UA_Connection *connection) { for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(rg->recvChannels[i] != connectionId) + if(rg->recvChannels[i] != connection) continue; - rg->recvChannels[i] = 0; + rg->recvChannels[i] = NULL; rg->recvChannelsSize--; return; } @@ -791,17 +792,17 @@ UA_ReaderGroup_removeConnection(UA_ReaderGroup *rg, static UA_StatusCode UA_ReaderGroup_addRecvConnection(UA_ReaderGroup*c, - uintptr_t connectionId) { + UA_Connection *connection) { for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] == connectionId) + if(c->recvChannels[i] == connection) return UA_STATUSCODE_GOOD; } if(c->recvChannelsSize >= UA_PUBSUB_MAXCHANNELS) return UA_STATUSCODE_BADINTERNALERROR; for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { - if(c->recvChannels[i] != 0) + if(c->recvChannels[i]) continue; - c->recvChannels[i] = connectionId; + c->recvChannels[i] = connection; c->recvChannelsSize++; break; } @@ -809,16 +810,14 @@ UA_ReaderGroup_addRecvConnection(UA_ReaderGroup*c, } static void -ReaderGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { - if(!connectionContext) +ReaderGroupChannelCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + if(!connection->context) return; /* Get the context pointers */ - UA_Server *server = (UA_Server*)application; - UA_ReaderGroup *rg = (UA_ReaderGroup*)*connectionContext; + UA_Server *server = (UA_Server*)connection->application; + UA_ReaderGroup *rg = (UA_ReaderGroup*)connection->application; UA_LOCK(&server->serviceMutex); @@ -826,7 +825,7 @@ ReaderGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, * from that connection. Clean up the SecureChannel in the client. */ if(state == UA_CONNECTIONSTATE_CLOSING) { /* Reset the connection identifiers */ - UA_ReaderGroup_removeConnection(rg, connectionId); + UA_ReaderGroup_removeConnection(rg, connection); /* PSC marked for deletion and the last EventLoop connection has closed */ if(rg->deleteFlag && rg->recvChannelsSize == 0) { @@ -842,13 +841,11 @@ ReaderGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } /* Store the connectionId (if a new connection) */ - UA_StatusCode res = UA_ReaderGroup_addRecvConnection(rg, connectionId); + UA_StatusCode res = UA_ReaderGroup_addRecvConnection(rg, connection); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING_READERGROUP(&server->config.logger, rg, "No more space for an additional EventLoop connection"); - UA_PubSubConnection *c = rg->linkedConnection; - if(c && c->cm) - c->cm->closeConnection(c->cm, connectionId); + connection->cm->closeConnection(connection); UA_UNLOCK(&server->serviceMutex); return; } @@ -911,7 +908,8 @@ ReaderGroupChannelCallback(UA_ConnectionManager *cm, uintptr_t connectionId, } static UA_StatusCode -UA_ReaderGroup_connectMQTT(UA_Server *server, UA_ReaderGroup *rg) { +UA_ReaderGroup_connectMQTT(UA_Server *server, UA_ConnectionManager *cm, + UA_ReaderGroup *rg) { UA_LOCK_ASSERT(&server->serviceMutex, 1); UA_PubSubConnection *c = rg->linkedConnection; @@ -958,7 +956,7 @@ UA_ReaderGroup_connectMQTT(UA_Server *server, UA_ReaderGroup *rg) { /* Connect */ UA_UNLOCK(&server->serviceMutex); - res = c->cm->openConnection(c->cm, &kvm, server, rg, ReaderGroupChannelCallback); + res = cm->openConnection(cm, kvm, server, rg, ReaderGroupChannelCallback); UA_LOCK(&server->serviceMutex); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_READERGROUP(&server->config.logger, rg, @@ -974,7 +972,7 @@ UA_ReaderGroup_disconnect(UA_ReaderGroup *rg) { return; for(size_t i = 0; i < UA_PUBSUB_MAXCHANNELS; i++) { if(rg->recvChannels[i] != 0) - c->cm->closeConnection(c->cm, rg->recvChannels[i]); + rg->recvChannels[i]->cm->closeConnection(rg->recvChannels[i]); } } @@ -1008,7 +1006,7 @@ UA_ReaderGroup_connect(UA_Server *server, UA_ReaderGroup *rg) { UA_ConnectionManager *cm = NULL; if(profile) cm = getCM(el, profile->protocol); - if(!cm || (c->cm && cm != c->cm)) { + if(!cm) { UA_LOG_ERROR_CONNECTION(&server->config.logger, c, "The requested protocol is not supported"); UA_PubSubConnection_setPubSubState(server, c, UA_PUBSUBSTATE_ERROR, @@ -1016,13 +1014,12 @@ UA_ReaderGroup_connect(UA_Server *server, UA_ReaderGroup *rg) { return UA_STATUSCODE_BADINTERNALERROR; } - c->cm = cm; c->json = profile->json; /* Connect */ UA_StatusCode res = UA_STATUSCODE_GOOD; if(profile->connectReaderGroup) - res = profile->connectReaderGroup(server, rg); + res = profile->connectReaderGroup(server, cm, rg); if(res != UA_STATUSCODE_GOOD) { UA_ReaderGroup_setPubSubState(server, rg, UA_PUBSUBSTATE_ERROR, res); return res; diff --git a/src/pubsub/ua_pubsub_writergroup.c b/src/pubsub/ua_pubsub_writergroup.c index 4f2d998ba41..94bdf69564d 100644 --- a/src/pubsub/ua_pubsub_writergroup.c +++ b/src/pubsub/ua_pubsub_writergroup.c @@ -1007,18 +1007,18 @@ encodeNetworkMessage(UA_WriterGroup *wg, UA_NetworkMessage *nm, static void sendNetworkMessageBuffer(UA_Server *server, UA_WriterGroup *wg, - UA_PubSubConnection *connection, uintptr_t connectionId, + UA_PubSubConnection *psc, + UA_Connection *connection, UA_ByteString *buffer) { UA_StatusCode res = connection->cm-> - sendWithConnection(connection->cm, connectionId, - &UA_KEYVALUEMAP_NULL, buffer); + sendWithConnection(connection, UA_KEYVALUEMAP_NULL, buffer); /* Failure, set the WriterGroup into an error mode */ if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, wg, "Sending NetworkMessage failed"); UA_WriterGroup_setPubSubState(server, wg, UA_PUBSUBSTATE_ERROR, res); - UA_PubSubConnection_setPubSubState(server, connection, UA_PUBSUBSTATE_ERROR, res); + UA_PubSubConnection_setPubSubState(server, psc, UA_PUBSUBSTATE_ERROR, res); return; } @@ -1028,7 +1028,7 @@ sendNetworkMessageBuffer(UA_Server *server, UA_WriterGroup *wg, #ifdef UA_ENABLE_JSON_ENCODING static UA_StatusCode -sendNetworkMessageJson(UA_Server *server, UA_PubSubConnection *connection, UA_WriterGroup *wg, +sendNetworkMessageJson(UA_Server *server, UA_PubSubConnection *psc, UA_WriterGroup *wg, UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount) { /* Prepare the NetworkMessage */ UA_NetworkMessage nm; @@ -1040,21 +1040,17 @@ sendNetworkMessageJson(UA_Server *server, UA_PubSubConnection *connection, UA_Wr nm.payloadHeader.dataSetPayloadHeader.dataSetWriterIds = writerIds; nm.payload.dataSetPayload.dataSetMessages = dsm; nm.publisherIdEnabled = true; - nm.publisherIdType = connection->config.publisherIdType; - nm.publisherId = connection->config.publisherId; + nm.publisherIdType = psc->config.publisherIdType; + nm.publisherId = psc->config.publisherId; /* Compute the message length */ size_t msgSize = UA_NetworkMessage_calcSizeJson(&nm, NULL, 0, NULL, 0, true); - UA_ConnectionManager *cm = connection->cm; - if(!cm) - return UA_STATUSCODE_BADINTERNALERROR; - /* Select the wg sendchannel if configured */ - uintptr_t sendChannel = connection->sendChannel; - if(wg->sendChannel != 0) + UA_Connection *sendChannel = psc->sendChannel; + if(wg->sendChannel) sendChannel = wg->sendChannel; - if(sendChannel == 0) { + if(!sendChannel) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, wg, "Cannot send, no open connection"); return UA_STATUSCODE_BADINTERNALERROR; @@ -1062,7 +1058,7 @@ sendNetworkMessageJson(UA_Server *server, UA_PubSubConnection *connection, UA_Wr /* Allocate the buffer */ UA_ByteString buf; - UA_StatusCode res = cm->allocNetworkBuffer(cm, sendChannel, &buf, msgSize); + UA_StatusCode res = sendChannel->cm->allocNetworkBuffer(sendChannel, &buf, msgSize); UA_CHECK_STATUS(res, return res); /* Encode the message */ @@ -1070,13 +1066,13 @@ sendNetworkMessageJson(UA_Server *server, UA_PubSubConnection *connection, UA_Wr const UA_Byte *bufEnd = &buf.data[msgSize]; res = UA_NetworkMessage_encodeJson(&nm, &bufPos, &bufEnd, NULL, 0, NULL, 0, true); if(res != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, sendChannel, &buf); + sendChannel->cm->freeNetworkBuffer(sendChannel, &buf); return res; } UA_assert(bufPos == bufEnd); /* Send the prepared messages */ - sendNetworkMessageBuffer(server, wg, connection, sendChannel, &buf); + sendNetworkMessageBuffer(server, wg, psc, sendChannel, &buf); return UA_STATUSCODE_GOOD; } #endif @@ -1182,14 +1178,14 @@ generateNetworkMessage(UA_PubSubConnection *connection, UA_WriterGroup *wg, } static UA_StatusCode -sendNetworkMessageBinary(UA_Server *server, UA_PubSubConnection *connection, UA_WriterGroup *wg, +sendNetworkMessageBinary(UA_Server *server, UA_PubSubConnection *psc, UA_WriterGroup *wg, UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount) { UA_NetworkMessage nm; memset(&nm, 0, sizeof(UA_NetworkMessage)); /* Fill the message structure */ UA_StatusCode rv = - generateNetworkMessage(connection, wg, dsm, writerIds, dsmCount, + generateNetworkMessage(psc, wg, dsm, writerIds, dsmCount, &wg->config.messageSettings, &wg->config.transportSettings, &nm); UA_CHECK_STATUS(rv, return rv); @@ -1205,15 +1201,11 @@ sendNetworkMessageBinary(UA_Server *server, UA_PubSubConnection *connection, UA_ } #endif - UA_ConnectionManager *cm = connection->cm; - if(!cm) - return UA_STATUSCODE_BADINTERNALERROR; - /* Select the wg sendchannel if configured */ - uintptr_t sendChannel = connection->sendChannel; - if(wg->sendChannel != 0) + UA_Connection *sendChannel = psc->sendChannel; + if(wg->sendChannel) sendChannel = wg->sendChannel; - if(sendChannel == 0) { + if(!sendChannel) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, wg, "Cannot send, no open connection"); return UA_STATUSCODE_BADINTERNALERROR; @@ -1221,26 +1213,26 @@ sendNetworkMessageBinary(UA_Server *server, UA_PubSubConnection *connection, UA_ /* Allocate the buffer. Allocate on the stack if the buffer is small. */ UA_ByteString buf = UA_BYTESTRING_NULL; - rv = cm->allocNetworkBuffer(cm, sendChannel, &buf, msgSize); + rv = sendChannel->cm->allocNetworkBuffer(sendChannel, &buf, msgSize); UA_CHECK_STATUS(rv, return rv); /* Encode and encrypt the message */ rv = encodeNetworkMessage(wg, &nm, &buf); if(rv != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, sendChannel, &buf); + sendChannel->cm->freeNetworkBuffer(sendChannel, &buf); UA_free(nm.payload.dataSetPayload.sizes); return rv; } /* Send out the message */ - sendNetworkMessageBuffer(server, wg, connection, sendChannel, &buf); + sendNetworkMessageBuffer(server, wg, psc, sendChannel, &buf); UA_free(nm.payload.dataSetPayload.sizes); return UA_STATUSCODE_GOOD; } static void -publishRT(UA_Server *server, UA_WriterGroup *writerGroup, UA_PubSubConnection *connection) { +publishRT(UA_Server *server, UA_WriterGroup *writerGroup, UA_PubSubConnection *psc) { UA_LOCK_ASSERT(&server->serviceMutex, 1); UA_StatusCode res = @@ -1281,15 +1273,11 @@ publishRT(UA_Server *server, UA_WriterGroup *writerGroup, UA_PubSubConnection *c } #endif - UA_ConnectionManager *cm = connection->cm; - if(!cm) - return; - /* Select the wg sendchannel if configured */ - uintptr_t sendChannel = connection->sendChannel; - if(writerGroup->sendChannel != 0) + UA_Connection *sendChannel = psc->sendChannel; + if(writerGroup->sendChannel) sendChannel = writerGroup->sendChannel; - if(sendChannel == 0) { + if(!sendChannel) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, writerGroup, "Cannot send, no open connection"); return; @@ -1297,27 +1285,27 @@ publishRT(UA_Server *server, UA_WriterGroup *writerGroup, UA_PubSubConnection *c /* Copy into the network buffer */ UA_ByteString outBuf; - res = cm->allocNetworkBuffer(cm, sendChannel, &outBuf, buf->length); + res = sendChannel->cm->allocNetworkBuffer(sendChannel, &outBuf, buf->length); if(res != UA_STATUSCODE_GOOD) { UA_LOG_ERROR_WRITERGROUP(&server->config.logger, writerGroup, "PubSub message memory allocation failed"); return; } memcpy(outBuf.data, buf->data, buf->length); - sendNetworkMessageBuffer(server, writerGroup, connection, sendChannel, &outBuf); + sendNetworkMessageBuffer(server, writerGroup, psc, sendChannel, &outBuf); } static void -sendNetworkMessage(UA_Server *server, UA_WriterGroup *wg, UA_PubSubConnection *connection, +sendNetworkMessage(UA_Server *server, UA_WriterGroup *wg, UA_PubSubConnection *psc, UA_DataSetMessage *dsm, UA_UInt16 *writerIds, UA_Byte dsmCount) { UA_StatusCode res = UA_STATUSCODE_GOOD; switch(wg->config.encodingMimeType) { case UA_PUBSUB_ENCODING_UADP: - res = sendNetworkMessageBinary(server, connection, wg, dsm, writerIds, dsmCount); + res = sendNetworkMessageBinary(server, psc, wg, dsm, writerIds, dsmCount); break; #ifdef UA_ENABLE_JSON_ENCODING case UA_PUBSUB_ENCODING_JSON: - res = sendNetworkMessageJson(server, connection, wg, dsm, writerIds, dsmCount); + res = sendNetworkMessageJson(server, psc, wg, dsm, writerIds, dsmCount); break; #endif default: diff --git a/src/server/ua_server_binary.c b/src/server/ua_server_binary.c index 9da611fdf81..639a2e79b09 100644 --- a/src/server/ua_server_binary.c +++ b/src/server/ua_server_binary.c @@ -52,8 +52,7 @@ typedef struct channel_entry { typedef struct { UA_ConnectionState state; - uintptr_t connectionId; - UA_ConnectionManager *connectionManager; + UA_Connection *connection; } UA_ServerConnection; /* Reverse connect */ @@ -62,7 +61,8 @@ typedef struct reverse_connect_context { UA_UInt16 port; UA_UInt64 handle; - UA_SecureChannelState state; + UA_ConnectionState state; + UA_Connection *connection; UA_Server_ReverseConnectStateCallback stateCallback; void *callbackContext; @@ -71,7 +71,6 @@ typedef struct reverse_connect_context { * closes. */ UA_Boolean destruction; - UA_ServerConnection currentConnection; UA_SecureChannel *channel; LIST_ENTRY(reverse_connect_context) next; } reverse_connect_context; @@ -466,10 +465,11 @@ getServicePointers(UA_UInt32 requestTypeId, const UA_DataType **requestType, /* HEL -> Open up the connection */ static UA_StatusCode processHEL(UA_Server *server, UA_SecureChannel *channel, const UA_ByteString *msg) { - UA_ConnectionManager *cm = channel->connectionManager; - if(!cm || (channel->state != UA_SECURECHANNELSTATE_CONNECTED && - channel->state != UA_SECURECHANNELSTATE_RHE_SENT)) + if(!UA_SecureChannel_isConnected(channel) || + (channel->state != UA_SECURECHANNELSTATE_CONNECTED && + channel->state != UA_SECURECHANNELSTATE_RHE_SENT)) return UA_STATUSCODE_BADINTERNALERROR; + UA_ConnectionManager *cm = channel->connection->cm; size_t offset = 0; /* Go to the beginning of the TcpHelloMessage */ UA_TcpHelloMessage helloMessage; @@ -495,7 +495,7 @@ processHEL(UA_Server *server, UA_SecureChannel *channel, const UA_ByteString *ms /* Get the send buffer from the network layer */ UA_ByteString ack_msg; UA_ByteString_init(&ack_msg); - retval = cm->allocNetworkBuffer(cm, channel->connectionId, + retval = cm->allocNetworkBuffer(channel->connection, &ack_msg, channel->config.sendBufferSize); if(retval != UA_STATUSCODE_GOOD) return retval; @@ -522,12 +522,12 @@ processHEL(UA_Server *server, UA_SecureChannel *channel, const UA_ByteString *ms &UA_TRANSPORT[UA_TRANSPORT_TCPACKNOWLEDGEMESSAGE], &bufPos, &bufEnd, NULL, NULL); if(retval != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, channel->connectionId, &ack_msg); + cm->freeNetworkBuffer(channel->connection, &ack_msg); return retval; } ack_msg.length = ackHeader.messageSize; - retval = cm->sendWithConnection(cm, channel->connectionId, &UA_KEYVALUEMAP_NULL, &ack_msg); + retval = cm->sendWithConnection(channel->connection, UA_KEYVALUEMAP_NULL, &ack_msg); if(retval == UA_STATUSCODE_GOOD) channel->state = UA_SECURECHANNELSTATE_ACK_SENT; return retval; @@ -1086,8 +1086,8 @@ configServerSecureChannel(void *application, UA_SecureChannel *channel, } static UA_StatusCode -createServerSecureChannel(UA_BinaryProtocolManager *bpm, UA_ConnectionManager *cm, - uintptr_t connectionId, UA_SecureChannel **outChannel) { +createServerSecureChannel(UA_BinaryProtocolManager *bpm, UA_Connection *connection, + UA_SecureChannel **outChannel) { UA_Server *server = bpm->server; UA_ServerConfig *config = &server->config; @@ -1123,8 +1123,7 @@ createServerSecureChannel(UA_BinaryProtocolManager *bpm, UA_ConnectionManager *c entry->channel.config = connConfig; entry->channel.certificateVerification = &config->secureChannelPKI; entry->channel.processOPNHeader = configServerSecureChannel; - entry->channel.connectionManager = cm; - entry->channel.connectionId = connectionId; + entry->channel.connection = connection; /* Set the SecureChannel identifier already here. So we get the right * identifier for logging right away. The rest of the SecurityToken is set @@ -1154,19 +1153,18 @@ createServerSecureChannel(UA_BinaryProtocolManager *bpm, UA_ConnectionManager *c /* Callback of a TCP socket (server socket or an active connection) */ void -serverNetworkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, - const UA_KeyValueMap *params, - UA_ByteString msg) { - UA_BinaryProtocolManager *bpm = (UA_BinaryProtocolManager*)application; +serverNetworkCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + UA_BinaryProtocolManager *bpm = (UA_BinaryProtocolManager*) + connection->application; + UA_ConnectionManager *cm = connection->cm; /* A server socket that is not yet registered in the server. Register it and * set the connection context to the pointer in the * bpm->serverConnections list. New connections on that server socket * inherit the context (and on the first callback we set the context of * client-connections to a SecureChannel). */ - if(*connectionContext == NULL) { + if(!connection->context) { /* The socket is closing without being previously registered -> ignore */ if(state == UA_CONNECTIONSTATE_CLOSED || state == UA_CONNECTIONSTATE_CLOSING) @@ -1176,24 +1174,24 @@ serverNetworkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, if(bpm->serverConnectionsSize >= UA_MAXSERVERCONNECTIONS) { UA_LOG_WARNING(bpm->logging, UA_LOGCATEGORY_SERVER, "Cannot register server socket - too many already open"); - cm->closeConnection(cm, connectionId); + cm->closeConnection(connection); return; } /* Find and use a free connection slot */ bpm->serverConnectionsSize++; UA_ServerConnection *sc = bpm->serverConnections; - while(sc->connectionId != 0) + while(sc->connection != NULL) sc++; + sc->connection = connection; sc->state = state; - sc->connectionId = connectionId; - sc->connectionManager = cm; - *connectionContext = (void*)sc; /* Set the context pointer in the connection */ + connection->context = (void*)sc; /* Set the context pointer in the connection */ return; } - UA_ServerConnection *sc = (UA_ServerConnection*)*connectionContext; - UA_SecureChannel *channel = (UA_SecureChannel*)*connectionContext; + /* Either or of the following */ + UA_ServerConnection *sc = (UA_ServerConnection*)connection->context; + UA_SecureChannel *channel = (UA_SecureChannel*)connection->context; UA_Boolean serverSocket = (sc >= bpm->serverConnections && sc < &bpm->serverConnections[UA_MAXSERVERCONNECTIONS]); @@ -1202,7 +1200,7 @@ serverNetworkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, if(serverSocket) { /* Server socket is closed */ sc->state = UA_CONNECTIONSTATE_CLOSED; - sc->connectionId = 0; + sc->connection = NULL; bpm->serverConnectionsSize--; } else { /* A connection attached to a SecureChannel is closing. This is the @@ -1226,20 +1224,20 @@ serverNetworkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, if(serverSocket) { /* A new connection is opening. This is the only place where * createSecureChannel is used. */ - retval = createServerSecureChannel(bpm, cm, connectionId, &channel); + retval = createServerSecureChannel(bpm, connection, &channel); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(bpm->logging, UA_LOGCATEGORY_SERVER, - "TCP %lu\t| Could not accept the connection with status %s", - (unsigned long)sc->connectionId, UA_StatusCode_name(retval)); - *connectionContext = NULL; - cm->closeConnection(cm, connectionId); + "TCP %u\t| Could not accept the connection with status %s", + connection->identifier, UA_StatusCode_name(retval)); + connection->context = NULL; + cm->closeConnection(connection); return; } UA_LOG_INFO_CHANNEL(bpm->logging, channel, "SecureChannel created"); /* Set the new channel as the new context for the connection */ - *connectionContext = (void*)channel; + connection->context = (void*)channel; return; } @@ -1319,7 +1317,7 @@ createServerConnection(UA_BinaryProtocolManager *bpm, const UA_String *serverUrl paramsMap.mapSize = paramsSize; /* Open the server connection */ - res = cm->openConnection(cm, ¶msMap, bpm, NULL, serverNetworkCallback); + res = cm->openConnection(cm, paramsMap, bpm, NULL, serverNetworkCallback); if(res == UA_STATUSCODE_GOOD) return res; } @@ -1377,14 +1375,16 @@ secureChannelHouseKeeping(UA_Server *server, void *context) { #define UA_MINMESSAGESIZE 8192 static UA_StatusCode -sendRHEMessage(UA_Server *server, uintptr_t connectionId, - UA_ConnectionManager *cm) { +sendRHEMessage(UA_Server *server, UA_SecureChannel *channel) { UA_ServerConfig *config = UA_Server_getConfig(server); + if(!UA_SecureChannel_isConnected(channel)) + return UA_STATUSCODE_BADCONNECTIONCLOSED; + UA_ConnectionManager *cm = channel->connection->cm; /* Get a buffer */ UA_ByteString message; UA_StatusCode retval = - cm->allocNetworkBuffer(cm, connectionId, &message, UA_MINMESSAGESIZE); + cm->allocNetworkBuffer(channel->connection, &message, UA_MINMESSAGESIZE); if(retval != UA_STATUSCODE_GOOD) return retval; @@ -1403,7 +1403,7 @@ sendRHEMessage(UA_Server *server, uintptr_t connectionId, &bufPos, &bufEnd, NULL, NULL); if(result != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, connectionId, &message); + cm->freeNetworkBuffer(channel->connection, &message); return result; } @@ -1416,13 +1416,13 @@ sendRHEMessage(UA_Server *server, uintptr_t connectionId, &UA_TRANSPORT[UA_TRANSPORT_TCPMESSAGEHEADER], &bufPos, &bufEnd, NULL, NULL); if(retval != UA_STATUSCODE_GOOD) { - cm->freeNetworkBuffer(cm, connectionId, &message); + cm->freeNetworkBuffer(channel->connection, &message); return retval; } /* Send the RHE message */ message.length = messageHeader.messageSize; - return cm->sendWithConnection(cm, connectionId, NULL, &message); + return cm->sendWithConnection(channel->connection, UA_KEYVALUEMAP_NULL, &message); } static void @@ -1433,7 +1433,7 @@ retryReverseConnectCallback(UA_Server *server, void *context) { reverse_connect_context *rc = NULL; LIST_FOREACH(rc, &bpm->reverseConnects, next) { - if(rc->currentConnection.connectionId) + if(rc->connection) continue; UA_LOG_INFO(&server->config.logger, UA_LOGCATEGORY_SERVER, "Attempt to reverse reconnect to %.*s:%d", @@ -1464,21 +1464,15 @@ setReverseConnectRetryCallback(UA_BinaryProtocolManager *bpm, UA_Boolean enabled void setReverseConnectState(UA_Server *server, reverse_connect_context *context, UA_SecureChannelState newState) { - if(context->state == newState) + if(!context->channel || context->channel->state == newState || !context->stateCallback) return; - - context->state = newState; - - if(context->stateCallback) - context->stateCallback(server, context->handle, context->state, - context->callbackContext); + context->stateCallback(server, context->handle, context->channel->state, + context->callbackContext); } static void -serverReverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg); +serverReverseConnectCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg); UA_StatusCode attemptReverseConnect(UA_BinaryProtocolManager *bpm, reverse_connect_context *context) { @@ -1513,7 +1507,7 @@ attemptReverseConnect(UA_BinaryProtocolManager *bpm, reverse_connect_context *co UA_KeyValueMap kvm = {2, params}; /* Open the connection */ - UA_StatusCode res = cm->openConnection(cm, &kvm, bpm, context, + UA_StatusCode res = cm->openConnection(cm, kvm, bpm, context, serverReverseConnectCallback); if(res != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, @@ -1607,10 +1601,10 @@ UA_Server_removeReverseConnect(UA_Server *server, UA_UInt64 handle) { LIST_REMOVE(rev, next); /* Connected -> disconnect, otherwise free immediately */ - if(rev->currentConnection.connectionId) { - UA_ConnectionManager *cm = rev->currentConnection.connectionManager; + if(rev->connection) { + UA_ConnectionManager *cm = rev->connection->cm; rev->destruction = true; - cm->closeConnection(cm, rev->currentConnection.connectionId); + cm->closeConnection(rev->connection); } else { setReverseConnectState(server, rev, UA_SECURECHANNELSTATE_CLOSED); UA_String_clear(&rev->hostname); @@ -1629,23 +1623,21 @@ UA_Server_removeReverseConnect(UA_Server *server, UA_UInt64 handle) { } void -serverReverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, const UA_KeyValueMap *params, - UA_ByteString msg) { +serverReverseConnectCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { (void)params; - UA_BinaryProtocolManager *bpm = (UA_BinaryProtocolManager*)application; + UA_BinaryProtocolManager *bpm = (UA_BinaryProtocolManager*) + connection->application; UA_LOG_DEBUG(bpm->logging, UA_LOGCATEGORY_SERVER, - "Activity for reverse connect %lu with state %d", - (long unsigned)connectionId, state); - - reverse_connect_context *context = (reverse_connect_context *)*connectionContext; - context->currentConnection.state = state; + "Activity for reverse connect %u with state %d", + connection->identifier, state); + reverse_connect_context *context = (reverse_connect_context *) + connection->context; + context->state = state; /* New connection */ - if(context->currentConnection.connectionId == 0) { - context->currentConnection.connectionId = connectionId; - context->currentConnection.connectionManager = cm; + if(!context->connection) { + context->connection = connection; setReverseConnectState(bpm->server, context, UA_SECURECHANNELSTATE_CONNECTING); /* Fall through -- e.g. if state == ESTABLISHED already */ } @@ -1669,14 +1661,13 @@ serverReverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId, bpm->serverConnectionsSize == 0 && LIST_EMPTY(&bpm->reverseConnects) && TAILQ_EMPTY(&bpm->channels)) { - setBinaryProtocolManagerState(bpm->server, bpm, - UA_LIFECYCLESTATE_STOPPED); + setBinaryProtocolManagerState(bpm->server, bpm, UA_LIFECYCLESTATE_STOPPED); } return; } /* Reset. Will be picked up in the regular retry callback. */ - context->currentConnection.connectionId = 0; + context->connection = NULL; setReverseConnectState(bpm->server, context, UA_SECURECHANNELSTATE_CONNECTING); return; } @@ -1686,28 +1677,28 @@ serverReverseConnectCallback(UA_ConnectionManager *cm, uintptr_t connectionId, /* A new connection is opening. This is the only place where * createSecureChannel is used. */ + UA_ConnectionManager *cm = connection->cm; UA_StatusCode retval = UA_STATUSCODE_GOOD; if(!context->channel) { - retval = createServerSecureChannel(bpm, cm, connectionId, &context->channel); + retval = createServerSecureChannel(bpm, connection, &context->channel); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(bpm->logging, UA_LOGCATEGORY_SERVER, - "TCP %lu\t| Could not accept the reverse " + "TCP %u\t| Could not accept the reverse " "connection with status %s", - (unsigned long)context->currentConnection.connectionId, - UA_StatusCode_name(retval)); - cm->closeConnection(cm, connectionId); + connection->identifier, UA_StatusCode_name(retval)); + cm->closeConnection(connection); return; } /* Send the RHE message */ - retval = sendRHEMessage(bpm->server, connectionId, cm); + retval = sendRHEMessage(bpm->server, context->channel); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_WARNING(bpm->logging, UA_LOGCATEGORY_SERVER, - "TCP %lu\t| Could not send the RHE message " + "TCP %u\t| Could not send the RHE message " "with status %s", - (unsigned long)context->currentConnection.connectionId, + context->connection->identifier, UA_StatusCode_name(retval)); - cm->closeConnection(cm, connectionId); + cm->closeConnection(connection); return; } @@ -1832,10 +1823,12 @@ UA_BinaryProtocolManager_stop(UA_Server *server, /* Close or free all reverse connections */ reverse_connect_context *rev, *rev_tmp; LIST_FOREACH_SAFE(rev, &bpm->reverseConnects, next, rev_tmp) { - if(rev->currentConnection.connectionId) { - UA_ConnectionManager *cm = rev->currentConnection.connectionManager; + if(rev->connection) { + if(!UA_SecureChannel_isConnected(rev->channel)) + continue; + UA_Connection *c = rev->channel->connection; rev->destruction = true; - cm->closeConnection(cm, rev->currentConnection.connectionId); + c->cm->closeConnection(c); } else { LIST_REMOVE(rev, next); setReverseConnectState(server, rev, UA_SECURECHANNELSTATE_CLOSED); @@ -1853,9 +1846,10 @@ UA_BinaryProtocolManager_stop(UA_Server *server, /* Stop all server sockets */ for(size_t i = 0; i < UA_MAXSERVERCONNECTIONS; i++) { UA_ServerConnection *sc = &bpm->serverConnections[i]; - UA_ConnectionManager *cm = sc->connectionManager; - if(sc->connectionId > 0) - cm->closeConnection(cm, sc->connectionId); + if(sc->connection) { + UA_ConnectionManager *cm = sc->connection->cm; + cm->closeConnection(sc->connection); + } } /* If open sockets remain, set to STOPPING */ diff --git a/src/server/ua_server_internal.h b/src/server/ua_server_internal.h index 349afb45308..8724f534b86 100644 --- a/src/server/ua_server_internal.h +++ b/src/server/ua_server_internal.h @@ -209,11 +209,8 @@ ZIP_FUNCTIONS(UA_ReferenceNameTree, UA_ReferenceTargetTreeElem, nameTreeEntry, /**************************/ void -serverNetworkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState state, - const UA_KeyValueMap *params, - UA_ByteString msg); +serverNetworkCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg); UA_StatusCode sendServiceFault(UA_SecureChannel *channel, UA_UInt32 requestId, diff --git a/src/server/ua_session.h b/src/server/ua_session.h index 22551a1dd37..3c7a9cc0a37 100644 --- a/src/server/ua_session.h +++ b/src/server/ua_session.h @@ -136,8 +136,9 @@ UA_Session_dequeuePublishReq(UA_Session *session); int nameLen = (SESSION) ? (int)(SESSION)->sessionName.length : 0; \ const char *nameStr = (SESSION) ? \ (const char*)(SESSION)->sessionName.data : ""; \ - unsigned long sockId = ((SESSION) && (SESSION)->header.channel) ? \ - (unsigned long)(SESSION)->header.channel->connectionId : 0; \ + unsigned long sockId = ((SESSION) && (SESSION)->header.channel && \ + UA_SecureChannel_isConnected((SESSION)->header.channel)) ? \ + (unsigned long)(SESSION)->header.channel->connection->identifier : 0; \ UA_UInt32 chanId = ((SESSION) && (SESSION)->header.channel) ? \ (SESSION)->header.channel->securityToken.channelId : 0; \ UA_LOG_##LEVEL(LOGGER, UA_LOGCATEGORY_SESSION, \ diff --git a/src/ua_securechannel.c b/src/ua_securechannel.c index 311bb6056c2..a9631fed813 100644 --- a/src/ua_securechannel.c +++ b/src/ua_securechannel.c @@ -83,7 +83,9 @@ hideErrors(UA_TcpErrorMessage *const error) { } UA_Boolean -UA_SecureChannel_isConnected(UA_SecureChannel *channel) { +UA_SecureChannel_isConnected(const UA_SecureChannel *channel) { + if(!channel->connection) + return false; return (channel->state > UA_SECURECHANNELSTATE_CLOSED && channel->state < UA_SECURECHANNELSTATE_CLOSING); } @@ -101,9 +103,9 @@ UA_SecureChannel_sendError(UA_SecureChannel *channel, UA_TcpErrorMessage *error) header.messageSize = 8 + (4 + 4 + (UA_UInt32)error->reason.length); /* Get the send buffer from the network layer */ - UA_ConnectionManager *cm = channel->connectionManager; + UA_ConnectionManager *cm = channel->connection->cm; UA_ByteString msg = UA_BYTESTRING_NULL; - UA_StatusCode retval = cm->allocNetworkBuffer(cm, channel->connectionId, + UA_StatusCode retval = cm->allocNetworkBuffer(channel->connection, &msg, header.messageSize); if(retval != UA_STATUSCODE_GOOD) return; @@ -119,7 +121,7 @@ UA_SecureChannel_sendError(UA_SecureChannel *channel, UA_TcpErrorMessage *error) &bufPos, &bufEnd, NULL, NULL); (void)retval; /* Encoding of these cannot fail */ msg.length = header.messageSize; - cm->sendWithConnection(cm, channel->connectionId, &UA_KEYVALUEMAP_NULL, &msg); + cm->sendWithConnection(channel->connection, UA_KEYVALUEMAP_NULL, &msg); } static void @@ -156,8 +158,8 @@ UA_SecureChannel_shutdown(UA_SecureChannel *channel, channel->shutdownReason= shutdownReason; /* Trigger the async closing of the connection */ - UA_ConnectionManager *cm = channel->connectionManager; - cm->closeConnection(cm, channel->connectionId); + UA_ConnectionManager *cm = channel->connection->cm; + cm->closeConnection(channel->connection); channel->state = UA_SECURECHANNELSTATE_CLOSING; } @@ -191,8 +193,7 @@ UA_SecureChannel_clear(UA_SecureChannel *channel) { UA_SecureChannel_deleteBuffered(channel); /* The EventLoop connection is no longer valid */ - channel->connectionId = 0; - channel->connectionManager = NULL; + channel->connection = NULL; /* Set the state to closed */ channel->state = UA_SECURECHANNELSTATE_CLOSED; @@ -236,16 +237,16 @@ UA_SecureChannel_sendAsymmetricOPNMessage(UA_SecureChannel *channel, return UA_STATUSCODE_BADSECURITYMODEREJECTED); /* Can we use the connection manager? */ - UA_ConnectionManager *cm = channel->connectionManager; if(!UA_SecureChannel_isConnected(channel)) return UA_STATUSCODE_BADCONNECTIONCLOSED; + UA_ConnectionManager *cm = channel->connection->cm; const UA_SecurityPolicy *sp = channel->securityPolicy; UA_CHECK_MEM(sp, return UA_STATUSCODE_BADINTERNALERROR); /* Allocate the message buffer */ UA_ByteString buf = UA_BYTESTRING_NULL; - UA_StatusCode res = cm->allocNetworkBuffer(cm, channel->connectionId, &buf, + UA_StatusCode res = cm->allocNetworkBuffer(channel->connection, &buf, channel->config.sendBufferSize); UA_CHECK_STATUS(res, return res); @@ -293,10 +294,10 @@ UA_SecureChannel_sendAsymmetricOPNMessage(UA_SecureChannel *channel, /* Send the message, the buffer is freed in the network layer */ buf.length = encryptedLength; - return cm->sendWithConnection(cm, channel->connectionId, &UA_KEYVALUEMAP_NULL, &buf); + return cm->sendWithConnection(channel->connection, UA_KEYVALUEMAP_NULL, &buf); error: - cm->freeNetworkBuffer(cm, channel->connectionId, &buf); + cm->freeNetworkBuffer(channel->connection, &buf); return res; } @@ -354,9 +355,9 @@ static UA_StatusCode sendSymmetricChunk(UA_MessageContext *mc) { UA_SecureChannel *channel = mc->channel; const UA_SecurityPolicy *sp = channel->securityPolicy; - UA_ConnectionManager *cm = channel->connectionManager; if(!UA_SecureChannel_isConnected(channel)) return UA_STATUSCODE_BADCONNECTIONCLOSED; + UA_ConnectionManager *cm = channel->connection->cm; /* The size of the message payload */ size_t bodyLength = (uintptr_t)mc->buf_pos - @@ -414,14 +415,14 @@ sendSymmetricChunk(UA_MessageContext *mc) { /* Send the chunk. The buffer is freed in the network layer. If sending goes * wrong, the connection is removed in the next iteration of the * SecureChannel. Set the SecureChannel to closing already. */ - res = cm->sendWithConnection(cm, channel->connectionId, - &UA_KEYVALUEMAP_NULL, &mc->messageBuffer); + res = cm->sendWithConnection(channel->connection, UA_KEYVALUEMAP_NULL, + &mc->messageBuffer); if(res != UA_STATUSCODE_GOOD && UA_SecureChannel_isConnected(channel)) channel->state = UA_SECURECHANNELSTATE_CLOSING; error: /* Free the unused message buffer */ - cm->freeNetworkBuffer(cm, channel->connectionId, &mc->messageBuffer); + cm->freeNetworkBuffer(channel->connection, &mc->messageBuffer); return res; } @@ -439,11 +440,11 @@ sendSymmetricEncodingCallback(void *data, UA_Byte **buf_pos, UA_CHECK_STATUS(res, return res); /* Set a new buffer for the next chunk */ - UA_ConnectionManager *cm = mc->channel->connectionManager; if(!UA_SecureChannel_isConnected(mc->channel)) return UA_STATUSCODE_BADCONNECTIONCLOSED; + UA_ConnectionManager *cm = mc->channel->connection->cm; - res = cm->allocNetworkBuffer(cm, mc->channel->connectionId, + res = cm->allocNetworkBuffer(mc->channel->connection, &mc->messageBuffer, mc->channel->config.sendBufferSize); UA_CHECK_STATUS(res, return res); @@ -461,9 +462,9 @@ UA_MessageContext_begin(UA_MessageContext *mc, UA_SecureChannel *channel, UA_CHECK(messageType == UA_MESSAGETYPE_MSG || messageType == UA_MESSAGETYPE_CLO, return UA_STATUSCODE_BADINTERNALERROR); - UA_ConnectionManager *cm = channel->connectionManager; if(!UA_SecureChannel_isConnected(channel)) return UA_STATUSCODE_BADCONNECTIONCLOSED; + UA_ConnectionManager *cm = channel->connection->cm; /* Create the chunking info structure */ mc->channel = channel; @@ -476,7 +477,7 @@ UA_MessageContext_begin(UA_MessageContext *mc, UA_SecureChannel *channel, /* Allocate the message buffer */ UA_StatusCode res = - cm->allocNetworkBuffer(cm, channel->connectionId, + cm->allocNetworkBuffer(channel->connection, &mc->messageBuffer, channel->config.sendBufferSize); UA_CHECK_STATUS(res, return res); @@ -505,10 +506,10 @@ UA_MessageContext_finish(UA_MessageContext *mc) { void UA_MessageContext_abort(UA_MessageContext *mc) { - UA_ConnectionManager *cm = mc->channel->connectionManager; if(!UA_SecureChannel_isConnected(mc->channel)) return; - cm->freeNetworkBuffer(cm, mc->channel->connectionId, &mc->messageBuffer); + UA_ConnectionManager *cm = mc->channel->connection->cm; + cm->freeNetworkBuffer(mc->channel->connection, &mc->messageBuffer); } UA_StatusCode diff --git a/src/ua_securechannel.h b/src/ua_securechannel.h index 59b8cf028f8..82c32dd3248 100644 --- a/src/ua_securechannel.h +++ b/src/ua_securechannel.h @@ -106,8 +106,7 @@ struct UA_SecureChannel { UA_ConnectionConfig config; /* Connection handling in the EventLoop */ - UA_ConnectionManager *connectionManager; - uintptr_t connectionId; + UA_Connection *connection; /* Rules for revolving the token with a renew OPN request: The client is * allowed to accept messages with the old token until the OPN response has @@ -183,7 +182,7 @@ UA_SecureChannel_setSecurityPolicy(UA_SecureChannel *channel, const UA_ByteString *remoteCertificate); UA_Boolean -UA_SecureChannel_isConnected(UA_SecureChannel *channel); +UA_SecureChannel_isConnected(const UA_SecureChannel *channel); /* When a fatal error occurs the Server shall send an Error Message to the * Client and close the socket. When a Client encounters one of these errors, it @@ -348,8 +347,9 @@ signAndEncryptSym(UA_MessageContext *messageContext, #define UA_LOG_CHANNEL_INTERNAL(LOGGER, LEVEL, CHANNEL, MSG, ...) \ if(UA_LOGLEVEL <= UA_LOGLEVEL_##LEVEL) { \ UA_LOG_##LEVEL(LOGGER, UA_LOGCATEGORY_SECURECHANNEL, \ - "TCP %lu\t| SC %" PRIu32 "\t| " MSG "%.0s", \ - (long unsigned)(CHANNEL)->connectionId, \ + "TCP %lu\t| SC %" PRIu32 "\t| " MSG "%.0s", \ + UA_SecureChannel_isConnected((CHANNEL)) ? \ + (long unsigned)(CHANNEL)->connection->identifier : 0, \ (CHANNEL)->securityToken.channelId, __VA_ARGS__); \ } diff --git a/tests/check_eventloop_eth.c b/tests/check_eventloop_eth.c index 6078aead511..e57beded17f 100644 --- a/tests/check_eventloop_eth.c +++ b/tests/check_eventloop_eth.c @@ -14,7 +14,7 @@ static UA_EventLoop *el; static char *testMsg = "open62541"; -static uintptr_t clientId; +static UA_Connection *clientConnection; static UA_Boolean received; #define ETHERNET_INTERFACE "lo" /* use the loopback interface for testing */ @@ -25,38 +25,36 @@ typedef struct TestContext { } TestContext; static void -connectionCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState status, const UA_KeyValueMap *params, - UA_ByteString msg) { - TestContext *ctx = (TestContext*) *connectionContext; - if(status == UA_CONNECTIONSTATE_CLOSING) { +connectionCallback(UA_Connection *connection, UA_ConnectionState state, + const UA_KeyValueMap params, UA_ByteString msg) { + TestContext *ctx = (TestContext*)connection->context; + if(state == UA_CONNECTIONSTATE_CLOSING) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Closing connection %u", (unsigned)connectionId); + "Closing connection %u", connection->identifier); } else { if(msg.length == 0) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Opening connection %u", (unsigned)connectionId); + "Opening connection %u", connection->identifier); } else { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received a message of length %u", (unsigned)msg.length); } } - if(msg.length == 0 && status == UA_CONNECTIONSTATE_ESTABLISHED) { + if(msg.length == 0 && state == UA_CONNECTIONSTATE_ESTABLISHED) { ctx->connCount++; - clientId = connectionId; + clientConnection = connection; /* The remote-hostname is set during the first callback */ - if(params->mapSize> 0) { + if(params.mapSize> 0) { const void *hn = - UA_KeyValueMap_getScalar(params, UA_QUALIFIEDNAME(0, "remote-hostname"), + UA_KeyValueMap_getScalar(¶ms, UA_QUALIFIEDNAME(0, "remote-hostname"), &UA_TYPES[UA_TYPES_STRING]); ck_assert(hn != NULL); } } - if(status == UA_CONNECTIONSTATE_CLOSING) + if(state == UA_CONNECTIONSTATE_CLOSING) ctx->connCount--; if(msg.length > 0) { @@ -87,7 +85,7 @@ START_TEST(listenETH) { UA_Variant_setScalar(¶ms[2].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]); UA_KeyValueMap kvm = {3, params}; - UA_StatusCode res = cm->openConnection(cm, &kvm, NULL, &testContext, connectionCallback); + UA_StatusCode res = cm->openConnection(cm, kvm, NULL, &testContext, connectionCallback); ck_assert_uint_eq(res, UA_STATUSCODE_GOOD); ck_assert(testContext.connCount == 1); @@ -140,30 +138,30 @@ START_TEST(connectETH) { /* Don't use the address parameter for listening */ UA_KeyValueMap kvm = {3, ¶ms[1]}; UA_StatusCode retval = - cm->openConnection(cm, &kvm, NULL, &testContext, connectionCallback); + cm->openConnection(cm, kvm, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); size_t listenSockets = testContext.connCount; /* Open a client connection. Don't use the listen parameter.*/ kvm.map = params; - clientId = 0; - retval = cm->openConnection(cm, &kvm, NULL, &testContext, connectionCallback); + clientConnection = NULL; + retval = cm->openConnection(cm, kvm, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = el->run(el, 1); UA_fakeSleep((UA_UInt32)((next - UA_DateTime_now()) / UA_DATETIME_MSEC)); } - ck_assert(clientId != 0); + ck_assert(clientConnection != NULL); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); /* Send a message from the client */ received = false; UA_ByteString snd; - retval = cm->allocNetworkBuffer(cm, clientId, &snd, strlen(testMsg)); + retval = cm->allocNetworkBuffer(clientConnection, &snd, strlen(testMsg)); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); memcpy(snd.data, testMsg, strlen(testMsg)); - retval = cm->sendWithConnection(cm, clientId, NULL, &snd); + retval = cm->sendWithConnection(clientConnection, UA_KEYVALUEMAP_NULL, &snd); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); while(!received) { @@ -173,7 +171,7 @@ START_TEST(connectETH) { ck_assert(received); /* Close the connection */ - retval = cm->closeConnection(cm, clientId); + retval = cm->closeConnection(clientConnection); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); for(size_t i = 0; i < 2; i++) { diff --git a/tests/check_eventloop_tcp.c b/tests/check_eventloop_tcp.c index be396b20461..2c920886926 100644 --- a/tests/check_eventloop_tcp.c +++ b/tests/check_eventloop_tcp.c @@ -15,30 +15,27 @@ static UA_EventLoop *el; static unsigned connCount; static char *testMsg = "open62541"; -static uintptr_t clientId; +static UA_Connection *clientConnection; static UA_Boolean received; static void -connectionCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState status, - const UA_KeyValueMap *params, - UA_ByteString msg) { +connectionCallback(UA_Connection *c, UA_ConnectionState status, + const UA_KeyValueMap params, UA_ByteString msg) { if(status == UA_CONNECTIONSTATE_CLOSING) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Closing connection %u", (unsigned)connectionId); + "Closing connection %u", c->identifier); } else { if(msg.length == 0) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Opening connection %u", (unsigned)connectionId); + "Opening connection %u", c->identifier); } else { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received a message of length %u", (unsigned)msg.length); } } - if(*connectionContext != NULL) - clientId = connectionId; + if(!c->context) + clientConnection = c; if(msg.length == 0 && status == UA_CONNECTIONSTATE_ESTABLISHED) connCount++; @@ -73,7 +70,7 @@ START_TEST(listenTCP) { ck_assert_uint_eq(connCount, 0); - cm->openConnection(cm, ¶msMap, NULL, NULL, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, NULL, connectionCallback); ck_assert(connCount > 0); @@ -122,32 +119,32 @@ START_TEST(connectTCP) { connCount = 0; - cm->openConnection(cm, ¶msMap, NULL, NULL, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, NULL, connectionCallback); size_t listenSockets = connCount; /* Open a client connection */ - clientId = 0; + clientConnection = NULL; listen = false; UA_StatusCode retval = - cm->openConnection(cm, ¶msMap, NULL, (void*)0x01, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, (void*)0x01, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = el->run(el, 1); UA_fakeSleep((UA_UInt32)((next - UA_DateTime_now()) / UA_DATETIME_MSEC)); } - ck_assert(clientId != 0); + ck_assert(clientConnection != NULL); ck_assert_uint_eq(connCount, listenSockets + 2); /* Send a message from the client */ received = false; UA_ByteString snd; - retval = cm->allocNetworkBuffer(cm, clientId, &snd, strlen(testMsg)); + retval = cm->allocNetworkBuffer(clientConnection, &snd, strlen(testMsg)); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); memcpy(snd.data, testMsg, strlen(testMsg)); - retval = cm->sendWithConnection(cm, clientId, NULL, &snd); + retval = cm->sendWithConnection(clientConnection, UA_KEYVALUEMAP_NULL, &snd); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = el->run(el, 1); @@ -156,7 +153,7 @@ START_TEST(connectTCP) { ck_assert(received); /* Close the connection */ - retval = cm->closeConnection(cm, clientId); + retval = cm->closeConnection(clientConnection); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); ck_assert_uint_eq(connCount, listenSockets + 2); for(size_t i = 0; i < 2; i++) { diff --git a/tests/check_eventloop_udp.c b/tests/check_eventloop_udp.c index 021982ac67c..49eb08e9296 100644 --- a/tests/check_eventloop_udp.c +++ b/tests/check_eventloop_udp.c @@ -14,7 +14,7 @@ static UA_EventLoop *el; static char *testMsg = "open62541"; -static uintptr_t clientId; +static UA_Connection *clientConnection; static UA_Boolean received; typedef struct TestContext { @@ -22,19 +22,16 @@ typedef struct TestContext { } TestContext; static void -connectionCallback(UA_ConnectionManager *cm, uintptr_t connectionId, - void *application, void **connectionContext, - UA_ConnectionState status, - const UA_KeyValueMap *params, - UA_ByteString msg) { - TestContext *ctx = (TestContext*) *connectionContext; +connectionCallback(UA_Connection *c, UA_ConnectionState status, + const UA_KeyValueMap params, UA_ByteString msg) { + TestContext *ctx = (TestContext*)c->context; if(status == UA_CONNECTIONSTATE_CLOSING) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Closing connection %u", (unsigned)connectionId); + "Closing connection %u", c->identifier); } else { if(msg.length == 0) { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, - "Opening connection %u", (unsigned)connectionId); + "Opening connection %u", c->identifier); } else { UA_LOG_DEBUG(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "Received a message of length %u", (unsigned)msg.length); @@ -43,12 +40,12 @@ connectionCallback(UA_ConnectionManager *cm, uintptr_t connectionId, if(msg.length == 0 && status == UA_CONNECTIONSTATE_ESTABLISHED) { ctx->connCount++; - clientId = connectionId; + clientConnection = c; /* The remote-hostname is set during the first callback */ - if(!UA_KeyValueMap_isEmpty(params)) { + if(!UA_KeyValueMap_isEmpty(¶ms)) { const void *hn = - UA_KeyValueMap_getScalar(params, + UA_KeyValueMap_getScalar(¶ms, UA_QUALIFIEDNAME(0, "remote-hostname"), &UA_TYPES[UA_TYPES_STRING]); ck_assert(hn != NULL); @@ -83,7 +80,7 @@ START_TEST(listenUDP) { params[1].key = UA_QUALIFIEDNAME(0, "listen"); UA_Variant_setScalar(¶ms[1].value, &listen, &UA_TYPES[UA_TYPES_BOOLEAN]); - cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert(testContext.connCount > 0); @@ -138,16 +135,16 @@ START_TEST(connectUDPValidationSucceeds) { testContext.connCount = 0; UA_StatusCode retval = - cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); /* Open a client connection */ - clientId = 0; + clientConnection = NULL; listen = false; UA_String targetHost = UA_STRING("localhost"); UA_Variant_setScalar(¶ms[1].value, &targetHost, &UA_TYPES[UA_TYPES_STRING]); - retval = cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + retval = cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); el->stop(el); @@ -185,15 +182,15 @@ START_TEST(connectUDPValidationFails) { testContext.connCount = 0; UA_StatusCode retval = - cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_BADCONNECTIONREJECTED); /* Open a client connection */ - clientId = 0; + clientConnection = NULL; UA_String targetHost = UA_STRING("localho"); UA_Variant_setScalar(¶ms[1].value, &targetHost, &UA_TYPES[UA_TYPES_STRING]); - retval = cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + retval = cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_BADCONNECTIONREJECTED); el->stop(el); @@ -228,7 +225,7 @@ START_TEST(connectUDP) { testContext.connCount = 0; UA_StatusCode retval = - cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); size_t listenSockets = testContext.connCount; @@ -236,22 +233,22 @@ START_TEST(connectUDP) { /* Open a client connection */ listen = false; paramsMap.mapSize = 3; - retval = cm->openConnection(cm, ¶msMap, NULL, &testContext, connectionCallback); + retval = cm->openConnection(cm, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = el->run(el, 1); UA_fakeSleep((UA_UInt32)((next - UA_DateTime_now()) / UA_DATETIME_MSEC)); } - ck_assert(clientId != 0); + ck_assert(clientConnection != NULL); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); /* Send a message from the client */ received = false; UA_ByteString snd; - retval = cm->allocNetworkBuffer(cm, clientId, &snd, strlen(testMsg)); + retval = cm->allocNetworkBuffer(clientConnection, &snd, strlen(testMsg)); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); memcpy(snd.data, testMsg, strlen(testMsg)); - retval = cm->sendWithConnection(cm, clientId, &UA_KEYVALUEMAP_NULL, &snd); + retval = cm->sendWithConnection(clientConnection, UA_KEYVALUEMAP_NULL, &snd); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = el->run(el, 1); @@ -260,7 +257,7 @@ START_TEST(connectUDP) { ck_assert(received); /* Close the connection */ - retval = cm->closeConnection(cm, clientId); + retval = cm->closeConnection(clientConnection); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); for(size_t i = 0; i < 2; i++) { @@ -311,14 +308,14 @@ START_TEST(udpTalkerAndListener) { testContext.connCount = 0; UA_StatusCode retval = - cmListener->openConnection(cmListener, ¶msMap, NULL, &testContext, + cmListener->openConnection(cmListener, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); size_t listenSockets = testContext.connCount; /* Open a talker connection */ - clientId = 0; + clientConnection = NULL; listen = false; UA_String targetHost = UA_STRING("localhost"); @@ -326,7 +323,7 @@ START_TEST(udpTalkerAndListener) { UA_Variant_setScalar(¶ms[2].value, &targetHost, &UA_TYPES[UA_TYPES_STRING]); paramsMap.mapSize = 3; - retval = cmTalker->openConnection(cmTalker, ¶msMap, NULL, &testContext, + retval = cmTalker->openConnection(cmTalker, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); @@ -336,16 +333,16 @@ START_TEST(udpTalkerAndListener) { UA_DateTime next = elTalker->run(elTalker, 1); UA_fakeSleep((UA_UInt32)((next - UA_DateTime_now()) / UA_DATETIME_MSEC)); } - ck_assert_uint_ne(clientId, 0); + ck_assert(clientConnection != NULL); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); /* Send a message from the talker */ received = false; UA_ByteString snd; - retval = cmTalker->allocNetworkBuffer(cmTalker, clientId, &snd, strlen(testMsg)); + retval = cmTalker->allocNetworkBuffer(clientConnection, &snd, strlen(testMsg)); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); memcpy(snd.data, testMsg, strlen(testMsg)); - retval = cmTalker->sendWithConnection(cmTalker, clientId, &UA_KEYVALUEMAP_NULL, &snd); + retval = cmTalker->sendWithConnection(clientConnection, UA_KEYVALUEMAP_NULL, &snd); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = elListener->run(elListener, 1); @@ -354,7 +351,7 @@ START_TEST(udpTalkerAndListener) { ck_assert(received); /* Close the connection */ - retval = cmTalker->closeConnection(cmTalker, clientId); + retval = cmTalker->closeConnection(clientConnection); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); for(size_t i = 0; i < 2; i++) { @@ -422,14 +419,14 @@ START_TEST(udpTalkerAndListenerDifferentDestination) { testContext.connCount = 0; UA_StatusCode retval = - cmListener->openConnection(cmListener, ¶msMap, NULL, &testContext, + cmListener->openConnection(cmListener, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); size_t listenSockets = testContext.connCount; /* Open a talker connection */ - clientId = 0; + clientConnection = NULL; listen = false; UA_String connectionTargetHost = UA_STRING("localhost"); @@ -437,7 +434,7 @@ START_TEST(udpTalkerAndListenerDifferentDestination) { UA_Variant_setScalar(¶ms[2].value, &connectionTargetHost, &UA_TYPES[UA_TYPES_STRING]); paramsMap.mapSize = 3; - retval = cmTalker->openConnection(cmTalker, ¶msMap, NULL, &testContext, + retval = cmTalker->openConnection(cmTalker, paramsMap, NULL, &testContext, connectionCallback); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); @@ -447,13 +444,13 @@ START_TEST(udpTalkerAndListenerDifferentDestination) { UA_DateTime next = elTalker->run(elTalker, 1); UA_fakeSleep((UA_UInt32)((next - UA_DateTime_now()) / UA_DATETIME_MSEC)); } - ck_assert_uint_ne(clientId, 0); + ck_assert(clientConnection != NULL); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); /* Send a message from the talker */ received = false; UA_ByteString snd; - retval = cmTalker->allocNetworkBuffer(cmTalker, clientId, &snd, strlen(testMsg)); + retval = cmTalker->allocNetworkBuffer(clientConnection, &snd, strlen(testMsg)); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); memcpy(snd.data, testMsg, strlen(testMsg)); @@ -468,7 +465,7 @@ START_TEST(udpTalkerAndListenerDifferentDestination) { sendParamsMap.map = sendParams; sendParamsMap.mapSize = 2; - retval = cmTalker->sendWithConnection(cmTalker, clientId, &sendParamsMap, &snd); + retval = cmTalker->sendWithConnection(clientConnection, sendParamsMap, &snd); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); for(size_t i = 0; i < 2; i++) { UA_DateTime next = elListener->run(elListener, 1); @@ -477,7 +474,7 @@ START_TEST(udpTalkerAndListenerDifferentDestination) { ck_assert(received); /* Close the connection */ - retval = cmTalker->closeConnection(cmTalker, clientId); + retval = cmTalker->closeConnection(clientConnection); ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); ck_assert_uint_eq(testContext.connCount, listenSockets + 1); for(size_t i = 0; i < 2; i++) { diff --git a/tests/check_securechannel.c b/tests/check_securechannel.c index 571c3f535d3..fb249f33761 100644 --- a/tests/check_securechannel.c +++ b/tests/check_securechannel.c @@ -45,7 +45,6 @@ setup_secureChannel(void) { testChannel.config = UA_ConnectionConfig_default; UA_SecureChannel_setSecurityPolicy(&testChannel, &dummyPolicy, &dummyCertificate); - testChannel.connectionManager = &testConnectionManagerTCP; testChannel.state = UA_SECURECHANNELSTATE_OPEN; testConnectionLastSentBuf = &sentData; } diff --git a/tests/client/check_activateSession.c b/tests/client/check_activateSession.c index e4f0161cf8d..4e05b907b5f 100644 --- a/tests/client/check_activateSession.c +++ b/tests/client/check_activateSession.c @@ -410,9 +410,8 @@ START_TEST(Client_activateSessionTimeout) { /* Manually close the connection. The connection is internally closed at the * next iteration of the EventLoop. Hence the next request is sent out. But * the connection "actually closes" before receiving the response. */ - UA_ConnectionManager *cm = client->channel.connectionManager; - uintptr_t connId = client->channel.connectionId; - cm->closeConnection(cm, connId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_Variant_init(&val); retval = UA_Client_readValueAttribute(client, nodeId, &val); diff --git a/tests/client/check_client.c b/tests/client/check_client.c index e74781fdeb6..35b729c8b09 100644 --- a/tests/client/check_client.c +++ b/tests/client/check_client.c @@ -382,9 +382,8 @@ START_TEST(Client_activateSessionTimeout) { /* Manually close the connection. The connection is internally closed at the * next iteration of the EventLoop. Hence the next request is sent out. But * the connection "actually closes" before receiving the response. */ - UA_ConnectionManager *cm = client->channel.connectionManager; - uintptr_t connId = client->channel.connectionId; - cm->closeConnection(cm, connId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_Variant_init(&val); retval = UA_Client_readValueAttribute(client, nodeId, &val); diff --git a/tests/client/check_client_async.c b/tests/client/check_client_async.c index bad867333d7..2aa30fec45f 100644 --- a/tests/client/check_client_async.c +++ b/tests/client/check_client_async.c @@ -93,8 +93,8 @@ START_TEST(Client_highlevel_async_readValue) { ck_assert_uint_eq(asyncCounter, 1); /* Simulate network cable unplugged */ - UA_ConnectionManager *cm = client->channel.connectionManager; - cm->closeConnection(cm, client->channel.connectionId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_EventLoop *el = client->config.eventLoop; el->run(el, 0); @@ -182,9 +182,8 @@ START_TEST(Client_read_async_timed) { /* Manually close the connection. The connection is internally closed at the * next iteration of the EventLoop. Hence the next request is sent out. But * the connection "actually closes" before receiving the response. */ - UA_ConnectionManager *cm = client->channel.connectionManager; - uintptr_t connId = client->channel.connectionId; - cm->closeConnection(cm, connId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); rr.requestHeader.timeoutHint = 100; retval = __UA_Client_AsyncService(client, &rr, diff --git a/tests/client/check_client_async_connect.c b/tests/client/check_client_async_connect.c index f1e05f524f8..91167f31a7f 100644 --- a/tests/client/check_client_async_connect.c +++ b/tests/client/check_client_async_connect.c @@ -148,8 +148,8 @@ START_TEST(Client_no_connection) { } /* Manually close the TCP connection */ - UA_ConnectionManager *cm = client->channel.connectionManager; - cm->closeConnection(cm, client->channel.connectionId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); do { UA_Server_run_iterate(server, false); diff --git a/tests/client/check_client_securechannel.c b/tests/client/check_client_securechannel.c index 942078eb7bf..efa604dabda 100644 --- a/tests/client/check_client_securechannel.c +++ b/tests/client/check_client_securechannel.c @@ -153,8 +153,8 @@ START_TEST(SecureChannel_networkfail) { rq.nodesToReadSize = 1; /* Manually close the TCP connection */ - UA_ConnectionManager *cm = client->channel.connectionManager; - cm->closeConnection(cm, client->channel.connectionId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_EventLoop *el = client->config.eventLoop; el->run(el, 0); @@ -208,9 +208,8 @@ START_TEST(SecureChannel_cableunplugged) { /* Manually close the connection. The connection is internally closed at the * next iteration of the EventLoop. Hence the next request is sent out. But * the connection "actually closes" before receiving the response. */ - UA_ConnectionManager *cm = client->channel.connectionManager; - uintptr_t connId = client->channel.connectionId; - cm->closeConnection(cm, connId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_Variant_init(&val); retval = UA_Client_readValueAttribute(client, nodeId, &val); diff --git a/tests/client/check_client_subscriptions.c b/tests/client/check_client_subscriptions.c index 7a45a76d215..edb81b89cd8 100644 --- a/tests/client/check_client_subscriptions.c +++ b/tests/client/check_client_subscriptions.c @@ -890,9 +890,8 @@ START_TEST(Client_subscription_connectionClose) { /* Manually close the connection. The connection is internally closed at the * next iteration of the EventLoop. Hence the next request is sent out. But * the connection "actually closes" before receiving the response. */ - UA_ConnectionManager *cm = client->channel.connectionManager; - uintptr_t connId = client->channel.connectionId; - cm->closeConnection(cm, connId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); notificationReceived = false; @@ -1372,8 +1371,8 @@ START_TEST(Client_subscription_async_sub) { ck_assert_uint_eq(countNotificationReceived, 5); /* Simulate network cable unplugged (no response from server) */ - UA_ConnectionManager *cm = client->channel.connectionManager; - cm->closeConnection(cm, client->channel.connectionId); + UA_ConnectionManager *cm = client->channel.connection->cm; + cm->closeConnection(client->channel.connection); UA_fakeSleep((UA_UInt32)publishingInterval * 100); ck_assert_uint_lt(client->config.outStandingPublishRequests, 10); diff --git a/tests/pubsub/check_pubsub_get_state.c b/tests/pubsub/check_pubsub_get_state.c index 945598aef26..a8b722910ba 100644 --- a/tests/pubsub/check_pubsub_get_state.c +++ b/tests/pubsub/check_pubsub_get_state.c @@ -492,7 +492,7 @@ START_TEST(Test_error_case) { UA_PubSubConnection *tmpConnection; TAILQ_FOREACH(tmpConnection, &server->pubSubManager.connections, listEntry) { if(UA_NodeId_equal(&tmpConnection->identifier, &ConnId_1)) { - close((int)tmpConnection->sendChannel); + close((int)tmpConnection->sendChannel->identifier); } } diff --git a/tests/testing-plugins/testing_networklayers.c b/tests/testing-plugins/testing_networklayers.c index fef017da15e..8fd786de291 100644 --- a/tests/testing-plugins/testing_networklayers.c +++ b/tests/testing-plugins/testing_networklayers.c @@ -7,16 +7,14 @@ UA_ByteString *testConnectionLastSentBuf; static UA_StatusCode -testOpenConnection(UA_ConnectionManager *cm, - const UA_KeyValueMap *params, +testOpenConnection(UA_ConnectionManager *cm, const UA_KeyValueMap params, void *application, void *context, UA_ConnectionManager_connectionCallback connectionCallback) { return UA_STATUSCODE_BADNOTCONNECTED; } static UA_StatusCode -testSendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, - const UA_KeyValueMap *params, +testSendWithConnection(UA_Connection *c, const UA_KeyValueMap params, UA_ByteString *buf) { if(testConnectionLastSentBuf) { UA_ByteString_clear(testConnectionLastSentBuf); @@ -29,19 +27,18 @@ testSendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId, } static UA_StatusCode -testCloseConnection(UA_ConnectionManager *cm, uintptr_t connectionId) { +testCloseConnection(UA_Connection *c) { return UA_STATUSCODE_GOOD; } static UA_StatusCode -testAllocNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf, size_t bufSize) { +testAllocNetworkBuffer(UA_Connection *c, UA_ByteString *buf, + size_t bufSize) { return UA_ByteString_allocBuffer(buf, bufSize); } static void -testFreeNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId, - UA_ByteString *buf) { +testFreeNetworkBuffer(UA_Connection *c, UA_ByteString *buf) { UA_ByteString_clear(buf); }