Skip to content

Commit

Permalink
refactor(el): Make EventLoop connection a struct
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Sep 14, 2023
1 parent 9aafbdb commit 4e05df6
Show file tree
Hide file tree
Showing 29 changed files with 622 additions and 717 deletions.
6 changes: 2 additions & 4 deletions arch/eventloop_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -478,16 +478,14 @@ UA_EventLoop_new_POSIX(const UA_Logger *logger) {
/* Reusable EventSource functionality */

UA_StatusCode
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
uintptr_t connectionId,
UA_EventLoopPOSIX_allocNetworkBuffer(UA_Connection *c,
UA_ByteString *buf,
size_t bufSize) {
return UA_ByteString_allocBuffer(buf, bufSize);
}

void
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
uintptr_t connectionId,
UA_EventLoopPOSIX_freeNetworkBuffer(UA_Connection *c,
UA_ByteString *buf) {
UA_ByteString_clear(buf);
}
Expand Down
10 changes: 4 additions & 6 deletions arch/eventloop_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ typedef struct UA_RegisteredFD UA_RegisteredFD;
typedef void (*UA_FDCallback)(UA_EventSource *es, UA_RegisteredFD *rfd, short event);

struct UA_RegisteredFD {
UA_Connection c;
// XXX change the delayed mechanism as this is no longer first
UA_DelayedCallback dc; /* Used for async closing. Must be the first member
* because the rfd is freed by the delayed callback
* mechanism. */

ZIP_ENTRY(UA_RegisteredFD) zipPointers; /* Register FD in the EventSource */
UA_FD fd;
short listenEvents; /* UA_FDEVENT_IN | UA_FDEVENT_OUT*/

UA_EventSource *es; /* Backpointer to the EventSource */
UA_FDCallback eventSourceCB;
};

Expand Down Expand Up @@ -146,14 +146,12 @@ UA_StatusCode
UA_EventLoopPOSIX_allocateRXBuffer(UA_POSIXConnectionManager *pcm);

UA_StatusCode
UA_EventLoopPOSIX_allocNetworkBuffer(UA_ConnectionManager *cm,
uintptr_t connectionId,
UA_EventLoopPOSIX_allocNetworkBuffer(UA_Connection *c,
UA_ByteString *buf,
size_t bufSize);

void
UA_EventLoopPOSIX_freeNetworkBuffer(UA_ConnectionManager *cm,
uintptr_t connectionId,
UA_EventLoopPOSIX_freeNetworkBuffer(UA_Connection *c,
UA_ByteString *buf);

/*
Expand Down
2 changes: 1 addition & 1 deletion arch/eventloop_posix_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ UA_EventLoopPOSIX_pollFDs(UA_EventLoopPOSIX *el, UA_DateTime listenTimeout) {
} else {
revent = UA_FDEVENT_ERR;
}
rfd->eventSourceCB(rfd->es, rfd, revent);
rfd->eventSourceCB(&rfd->c.cm->eventSource, rfd, revent);
}
return UA_STATUSCODE_GOOD;
}
Expand Down
117 changes: 39 additions & 78 deletions arch/eventloop_posix_eth.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ static UA_KeyValueRestriction ETHConfigParameters[ETH_PARAMETERSSIZE+1] = {
typedef struct {
UA_RegisteredFD rfd;

UA_ConnectionManager_connectionCallback applicationCB;
void *application;
void *context;

struct sockaddr_ll sll;
/* The Ethernet header to prepend for sending frames is precomputed and reused.
* The length field (the last 2 byte) is adjusted.
Expand Down Expand Up @@ -183,35 +179,19 @@ setETHHeader(unsigned char *buf,
}

static UA_StatusCode
ETH_allocNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId,
UA_ByteString *buf, size_t bufSize) {
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;

/* Get the ETH_FD */
UA_FD fd = (UA_FD)connectionId;
ETH_FD *erfd = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd);
if(!erfd)
return UA_STATUSCODE_BADCONNECTIONREJECTED;

ETH_allocNetworkBuffer(UA_Connection *c, UA_ByteString *buf, size_t bufSize) {
/* Allocate the buffer with the hidden Ethernet header in front */
ETH_FD *erfd = (ETH_FD*)c;
UA_StatusCode res = UA_ByteString_allocBuffer(buf, bufSize+erfd->headerSize);
buf->data += erfd->headerSize;
buf->length -= erfd->headerSize;
return res;
}

static void
ETH_freeNetworkBuffer(UA_ConnectionManager *cm, uintptr_t connectionId,
UA_ByteString *buf) {
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;

/* Get the ETH_FD */
UA_FD fd = (UA_FD)connectionId;
ETH_FD *erfd = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd);
if(!erfd)
return;

ETH_freeNetworkBuffer(UA_Connection *c, UA_ByteString *buf) {
/* Unhide the Ethernet header */
ETH_FD *erfd = (ETH_FD*)c;
buf->data -= erfd->headerSize;
buf->length += erfd->headerSize;
UA_ByteString_clear(buf);
Expand Down Expand Up @@ -252,22 +232,20 @@ ETH_close(UA_POSIXConnectionManager *pcm, ETH_FD *conn) {

/* Signal closing to the application */
UA_UNLOCK(&el->elMutex);
conn->applicationCB(&pcm->cm, (uintptr_t)conn->rfd.fd,
conn->application, &conn->context,
UA_CONNECTIONSTATE_CLOSING,
&UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL);
conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_CLOSING,
UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL);
UA_LOCK(&el->elMutex);

/* Close the socket */
int ret = UA_close(conn->rfd.fd);
if(ret == 0) {
UA_LOG_INFO(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Socket closed", (unsigned)conn->rfd.fd);
"ETH %u\t| Socket closed", conn->rfd.c.identifier);
} else {
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Could not close the socket (%s)",
(unsigned)conn->rfd.fd, errno_str));
conn->rfd.c.identifier, errno_str));
}

/* Don't call free here. This might be done automatically via the delayed
Expand Down Expand Up @@ -410,8 +388,8 @@ ETH_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd,
response.data += headerSize;
response.length -= headerSize;
UA_UNLOCK(&el->elMutex);
conn->applicationCB(cm, (uintptr_t)rfd->fd, conn->application, &conn->context,
UA_CONNECTIONSTATE_ESTABLISHED, &map, response);
conn->rfd.c.callback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED,
map, response);
UA_LOCK(&el->elMutex);
response.data -= headerSize;
response.length += headerSize;
Expand Down Expand Up @@ -561,7 +539,7 @@ ETH_openSendConnection(UA_EventLoopPOSIX *el, ETH_FD *conn, const UA_KeyValueMap
}

static UA_StatusCode
ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap params,
void *application, void *context,
UA_ConnectionManager_connectionCallback connectionCallback) {
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
Expand All @@ -571,7 +549,7 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,

/* Listen or send connection? */
const UA_Boolean *listen = (const UA_Boolean*)
UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_LISTEN].name,
UA_KeyValueMap_getScalar(&params, ETHConfigParameters[ETH_PARAMINDEX_LISTEN].name,
&UA_TYPES[UA_TYPES_BOOLEAN]);
size_t ethParams = ETH_PARAMETERSSIZE;
if(!listen || !*listen)
Expand All @@ -580,7 +558,7 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
/* Validate the parameters */
UA_StatusCode res =
UA_KeyValueRestriction_validate(el->eventLoop.logger, "ETH", ETHConfigParameters,
ethParams, params);
ethParams, &params);
if(res != UA_STATUSCODE_GOOD) {
UA_UNLOCK(&el->elMutex);
return res;
Expand All @@ -589,14 +567,14 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
/* Get the EtherType parameter */
UA_UInt16 etherType = ETH_P_ALL;
const UA_UInt16 *etParam = (const UA_UInt16*)
UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_ETHERTYPE].name,
UA_KeyValueMap_getScalar(&params, ETHConfigParameters[ETH_PARAMINDEX_ETHERTYPE].name,
&UA_TYPES[UA_TYPES_UINT16]);
if(etParam)
etherType = *etParam;

/* Get the interface index */
const UA_String *interface = (const UA_String*)
UA_KeyValueMap_getScalar(params, ETHConfigParameters[ETH_PARAMINDEX_IFACE].name,
UA_KeyValueMap_getScalar(&params, ETHConfigParameters[ETH_PARAMINDEX_IFACE].name,
&UA_TYPES[UA_TYPES_STRING]);
if(interface->length >= 128) {
UA_UNLOCK(&el->elMutex);
Expand Down Expand Up @@ -640,11 +618,12 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
}

conn->rfd.fd = sockfd;
conn->rfd.es = &pcm->cm.eventSource;
conn->rfd.eventSourceCB = (UA_FDCallback)ETH_connectionSocketCallback;
conn->context = context;
conn->application = application;
conn->applicationCB = connectionCallback;
conn->rfd.c.callback = connectionCallback;
conn->rfd.c.application = application;
conn->rfd.c.context = context;
conn->rfd.c.identifier = (UA_UInt32)sockfd;
conn->rfd.c.cm = &pcm->cm;

/* Configure a listen or a send connection */
if(!listen || !*listen) {
Expand All @@ -661,11 +640,11 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,
res = UA_STATUSCODE_BADCONNECTIONREJECTED;
goto cleanup;
}
res = ETH_openSendConnection(el, conn, params,
res = ETH_openSendConnection(el, conn, &params,
(unsigned char*)ifr.ifr_hwaddr.sa_data,
ifindex, etherType);
} else {
res = ETH_openListenConnection(el, conn, params, ifindex, etherType);
res = ETH_openListenConnection(el, conn, &params, ifindex, etherType);
}
if(res != UA_STATUSCODE_GOOD)
goto cleanup;
Expand All @@ -681,9 +660,8 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params,

/* Register the listen socket in the application */
UA_UNLOCK(&el->elMutex);
connectionCallback(cm, (uintptr_t)sockfd, application, &conn->context,
UA_CONNECTIONSTATE_ESTABLISHED, &UA_KEYVALUEMAP_NULL,
UA_BYTESTRING_NULL);
connectionCallback(&conn->rfd.c, UA_CONNECTIONSTATE_ESTABLISHED,
UA_KEYVALUEMAP_NULL, UA_BYTESTRING_NULL);
return UA_STATUSCODE_GOOD;

cleanup:
Expand Down Expand Up @@ -719,43 +697,26 @@ ETH_shutdown(UA_POSIXConnectionManager *pcm, ETH_FD *conn) {
}

static UA_StatusCode
ETH_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop;
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
ETH_shutdownConnection(UA_Connection *c) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop;
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)c->cm;
UA_LOCK(&el->elMutex);

/* Get the ETH_FD */
UA_FD fd = (UA_FD)connectionId;
UA_RegisteredFD *rfd = ZIP_FIND(UA_FDTree, &pcm->fds, &fd);
if(!rfd) {
UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH\t| Cannot close Ethernet connection %u - not found",
(unsigned)connectionId);
UA_UNLOCK(&el->elMutex);
return UA_STATUSCODE_BADNOTFOUND;
}

ETH_shutdown(pcm, (ETH_FD*)rfd);
ETH_shutdown(pcm, (ETH_FD*)c);
UA_UNLOCK(&el->elMutex);
return UA_STATUSCODE_GOOD;
}

static UA_StatusCode
ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
const UA_KeyValueMap *params, UA_ByteString *buf) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)cm->eventSource.eventLoop;
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)cm;
ETH_sendWithConnection(UA_Connection *c, const UA_KeyValueMap params,
UA_ByteString *buf) {
if(!c)
return UA_STATUSCODE_BADCONNECTIONREJECTED;
ETH_FD *conn = (ETH_FD*)c;
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)c->cm->eventSource.eventLoop;
UA_POSIXConnectionManager *pcm = (UA_POSIXConnectionManager*)c->cm;

UA_LOCK(&el->elMutex);

/* Get the ETH_FD */
UA_FD fd = (UA_FD)connectionId;
ETH_FD *conn = (ETH_FD*)ZIP_FIND(UA_FDTree, &pcm->fds, &fd);
if(!conn) {
UA_UNLOCK(&el->elMutex);
return UA_STATUSCODE_BADCONNECTIONREJECTED;
}

/* Uncover and set the Ethernet header */
buf->data -= conn->headerSize;
buf->length += conn->headerSize;
Expand All @@ -769,7 +730,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
int flags = MSG_NOSIGNAL;

struct pollfd tmp_poll_fd;
tmp_poll_fd.fd = (UA_FD)connectionId;
tmp_poll_fd.fd = conn->rfd.fd;
tmp_poll_fd.events = UA_POLLOUT;

/* Send the full buffer. This may require several calls to send */
Expand All @@ -778,7 +739,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
ssize_t n = 0;
do {
UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Attempting to send", (unsigned)connectionId);
"ETH %u\t| Attempting to send", c->identifier);
size_t bytes_to_send = buf->length - nWritten;
n = UA_sendto(conn->rfd.fd, (const char*)buf->data + nWritten, bytes_to_send,
flags, (struct sockaddr*)&conn->sll, sizeof(conn->sll));
Expand All @@ -790,7 +751,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Send failed with error %s",
(unsigned)connectionId, errno_str));
c->identifier, errno_str));
ETH_shutdown(pcm, conn);
UA_UNLOCK(&el->elMutex);
UA_ByteString_clear(buf);
Expand All @@ -806,7 +767,7 @@ ETH_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"ETH %u\t| Send failed with error %s",
(unsigned)connectionId, errno_str));
c->identifier, errno_str));
ETH_shutdown(pcm, conn);
UA_UNLOCK(&el->elMutex);
UA_ByteString_clear(buf);
Expand Down
7 changes: 4 additions & 3 deletions arch/eventloop_posix_interrupt.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef struct UA_RegisteredSignal {

LIST_ENTRY(UA_RegisteredSignal) listPointers;

UA_EventSource *es;
UA_InterruptCallback signalCallback;
void *context;
int signal; /* POSIX identifier of the interrupt signal */
Expand Down Expand Up @@ -87,7 +88,7 @@ handlePOSIXInterruptEvent(UA_EventSource *es, UA_RegisteredFD *rfd, short event)

static void
activateSignal(UA_RegisteredSignal *rs) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->rfd.es->eventLoop;
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->es->eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

if(rs->active)
Expand Down Expand Up @@ -139,7 +140,7 @@ activateSignal(UA_RegisteredSignal *rs) {

static void
deactivateSignal(UA_RegisteredSignal *rs) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->rfd.es->eventLoop;
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX *)rs->es->eventLoop;
UA_LOCK_ASSERT(&el->elMutex, 1);

/* Only dectivate if active */
Expand Down Expand Up @@ -315,7 +316,7 @@ registerPOSIXInterrupt(UA_InterruptManager *im, uintptr_t interruptHandle,
}

#ifdef UA_HAVE_EPOLL
rs->rfd.es = &im->eventSource;
rs->es = &im->eventSource;
#endif
rs->signal = (int)interruptHandle;
rs->signalCallback = callback;
Expand Down
Loading

0 comments on commit 4e05df6

Please sign in to comment.