Skip to content

Commit

Permalink
refactor(el): Process delayed callbacks in-order
Browse files Browse the repository at this point in the history
This reduces "suprises" where enqueueing delayed callbacks results in a
inverted order of execution.
  • Loading branch information
jpfr committed Oct 10, 2024
1 parent c685301 commit e373608
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 40 deletions.
95 changes: 71 additions & 24 deletions arch/eventloop_posix/eventloop_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,48 +60,91 @@ void
UA_EventLoopPOSIX_addDelayedCallback(UA_EventLoop *public_el,
UA_DelayedCallback *dc) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
UA_DelayedCallback *old;
do {
old = el->delayedCallbacks;
dc->next = old;
} while(UA_atomic_cmpxchg((void * volatile *)&el->delayedCallbacks, old, dc) != old);
dc->next = NULL;

/* el->delayedTail points either to prev->next or to the head.
* We need to update two locations:
* 1: el->delayedTail = &dc->next;
* 2: *oldtail = dc; (equal to &dc->next)
*
* Once we have (1), we "own" the previous-to-last entry. No need to worry
* about (2), we can adjust it with a delay. This makes the queue
* "eventually consistent". */
UA_DelayedCallback **oldtail = (UA_DelayedCallback**)
UA_atomic_xchg((void**)&el->delayedTail, &dc->next);
UA_atomic_xchg((void**)oldtail, &dc->next);
}

/* Resets the delayed queue and returns the previous head and tail */
static void
resetDelayedQueue(UA_EventLoopPOSIX *el, UA_DelayedCallback **oldHead,
UA_DelayedCallback **oldTail) {
if(el->delayedHead1 <= (UA_DelayedCallback *)0x01 &&
el->delayedHead2 <= (UA_DelayedCallback *)0x01)
return; /* The queue is empty */

UA_Boolean active1 = (el->delayedHead1 != (UA_DelayedCallback*)0x01);
UA_DelayedCallback **activeHead = (active1) ? &el->delayedHead1 : &el->delayedHead2;
UA_DelayedCallback **inactiveHead = (active1) ? &el->delayedHead2 : &el->delayedHead1;

/* Switch active/inactive by resetting the sentinel values. The (old) active
* head points to an element which we return. Parallel threads continue to
* add elements to the queue "below" the first element. */
UA_atomic_xchg((void**)inactiveHead, NULL);
*oldHead = (UA_DelayedCallback *)
UA_atomic_xchg((void**)activeHead, (void*)0x01);

/* Make the tail point to the (new) active head. Return the value of last
* tail. When iterating over the queue elements, we need to find this tail
* as the last element. If we find a NULL next-pointer before hitting the
* tail spinlock until the pointer updates (eventually consistent). */
*oldTail = (UA_DelayedCallback*)
UA_atomic_xchg((void**)&el->delayedTail, inactiveHead);
}

static void
UA_EventLoopPOSIX_removeDelayedCallback(UA_EventLoop *public_el,
UA_DelayedCallback *dc) {
UA_DelayedCallback *dc) {
UA_EventLoopPOSIX *el = (UA_EventLoopPOSIX*)public_el;
UA_LOCK(&el->elMutex);
UA_DelayedCallback **prev = &el->delayedCallbacks;
while(*prev) {
if(*prev == dc) {
*prev = (*prev)->next;
UA_UNLOCK(&el->elMutex);
return;
}
prev = &(*prev)->next;

/* Reset and get the old head and tail */
UA_DelayedCallback *cur = NULL, *tail = NULL;
resetDelayedQueue(el, &cur, &tail);

/* Loop until we reach the tail (or head and tail are both NULL) */
UA_DelayedCallback *next;
for(; cur; cur = next) {
/* Spin-loop until the next-pointer of cur is updated.
* The element pointed to by tail must appear eventually. */
next = cur->next;
while(!next && cur != tail)
next = (UA_DelayedCallback *)UA_atomic_load((void**)&cur->next);
if(cur == dc)
continue;
UA_EventLoopPOSIX_addDelayedCallback(public_el, cur);
}

UA_UNLOCK(&el->elMutex);
}

/* Process and then free registered delayed callbacks */
static void
processDelayed(UA_EventLoopPOSIX *el) {
UA_LOG_TRACE(el->eventLoop.logger, UA_LOGCATEGORY_EVENTLOOP,
"Process delayed callbacks");

UA_LOCK_ASSERT(&el->elMutex);

/* First empty the linked list in the el. So a delayed callback can add
* (itself) to the list. New entries are then processed during the next
* iteration. */
UA_DelayedCallback *dc = el->delayedCallbacks, *next = NULL;
el->delayedCallbacks = NULL;
/* Reset and get the old head and tail */
UA_DelayedCallback *dc = NULL, *tail = NULL;
resetDelayedQueue(el, &dc, &tail);

/* Loop until we reach the tail (or head and tail are both NULL) */
UA_DelayedCallback *next;
for(; dc; dc = next) {
next = dc->next;
/* Delayed Callbacks might have no callback set. We don't return a
* StatusCode during "add" and don't validate. So test here. */
while(!next && dc != tail)
next = (UA_DelayedCallback *)UA_atomic_load((void**)&dc->next);
if(!dc->callback)
continue;
UA_UNLOCK(&el->elMutex);
Expand Down Expand Up @@ -229,7 +272,7 @@ checkClosed(UA_EventLoopPOSIX *el) {
}

/* Not closed until all delayed callbacks are processed */
if(el->delayedCallbacks != NULL)
if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
return;

/* Close the self-pipe when everything else is done */
Expand Down Expand Up @@ -329,7 +372,7 @@ UA_EventLoopPOSIX_run(UA_EventLoopPOSIX *el, UA_UInt32 timeout) {
* itself). In that case we don't want to wait (indefinitely) for an event
* to happen. Process queued events but don't sleep. Then process the
* delayed callbacks in the next iteration. */
if(el->delayedCallbacks != NULL)
if(el->delayedHead1 != NULL && el->delayedHead2 != NULL)
timeout = 0;

/* Compute the remaining time */
Expand Down Expand Up @@ -516,6 +559,10 @@ UA_EventLoop_new_POSIX(const UA_Logger *logger) {
UA_LOCK_INIT(&el->elMutex);
UA_Timer_init(&el->timer);

/* Initialize the queue */
el->delayedTail = &el->delayedHead1;
el->delayedHead2 = (UA_DelayedCallback*)0x01; /* sentinel value */

#ifdef _WIN32
/* Start the WSA networking subsystem on Windows */
WSADATA wsaData;
Expand Down
15 changes: 13 additions & 2 deletions arch/eventloop_posix/eventloop_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,19 @@ typedef struct {
/* Timer */
UA_Timer timer;

/* Linked List of Delayed Callbacks */
UA_DelayedCallback *delayedCallbacks;
/* Singly-linked FIFO queue (lock-free multi-producer single-consumer) of
* delayed callbacks. Insertion happens by chasing the tail-pointer. We
* "check out" the current queue and reset by switching the tail to the
* alternative head-pointer.
*
* This could be a simple singly-linked list. But we want to do in-order
* processing so we can wait until the worker jobs already in the queue get
* finished before.
*
* The currently unused head gets marked with the 0x01 sentinel. */
UA_DelayedCallback *delayedHead1;
UA_DelayedCallback *delayedHead2;
UA_DelayedCallback **delayedTail;

/* Flag determining whether the eventloop is currently within the
* "run" method */
Expand Down
5 changes: 2 additions & 3 deletions arch/eventloop_posix/eventloop_posix_eth.c
Original file line number Diff line number Diff line change
Expand Up @@ -789,9 +789,8 @@ ETH_shutdown(UA_POSIXConnectionManager *pcm, ETH_FD *conn) {
dc->application = pcm;
dc->context = conn;

/* Don't use the "public" el->addDelayedCallback. It takes a lock. */
dc->next = el->delayedCallbacks;
el->delayedCallbacks = dc;
/* Adding a delayed callback does not take a lock */
UA_EventLoopPOSIX_addDelayedCallback((UA_EventLoop*)el, dc);
}

static UA_StatusCode
Expand Down
5 changes: 2 additions & 3 deletions arch/eventloop_posix/eventloop_posix_tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,8 @@ TCP_shutdown(UA_ConnectionManager *cm, TCP_FD *conn) {
dc->application = cm;
dc->context = conn;

/* Don't use the "public" el->addDelayedCallback. It takes a lock. */
dc->next = el->delayedCallbacks;
el->delayedCallbacks = dc;
/* Adding a delayed callback does not take a lock */
UA_EventLoopPOSIX_addDelayedCallback((UA_EventLoop*)el, dc);
}

static UA_StatusCode
Expand Down
5 changes: 2 additions & 3 deletions arch/eventloop_posix/eventloop_posix_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,8 @@ UDP_shutdown(UA_ConnectionManager *cm, UA_RegisteredFD *rfd) {
dc->application = cm;
dc->context = rfd;

/* Don't use the "public" el->addDelayedCallback. It takes a lock. */
dc->next = el->delayedCallbacks;
el->delayedCallbacks = dc;
/* Adding a delayed callback does not take a lock */
UA_EventLoopPOSIX_addDelayedCallback((UA_EventLoop*)el, dc);
}

static UA_StatusCode
Expand Down
13 changes: 8 additions & 5 deletions include/open62541/plugin/eventloop.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ typedef void (*UA_Callback)(void *application, void *context);
/* Delayed callbacks are executed not when they are registered, but in the
* following EventLoop cycle */
typedef struct UA_DelayedCallback {
struct UA_DelayedCallback *next; /* Singly-linked list */
struct UA_DelayedCallback *next;
UA_Callback callback;
void *application;
void *context;
Expand Down Expand Up @@ -191,13 +191,16 @@ struct UA_EventLoop {
* delay a resource cleanup to a point where it is known that the resource
* has no remaining users.
*
* The delayed callbacks are processed in each of the cycle of the EventLoop
* The delayed callbacks are processed in each cycle of the EventLoop
* between the handling of periodic callbacks and polling for (network)
* events. The memory for the delayed callback is *NOT* automatically freed
* after the execution.
* after the execution. But this can be done from within the callback.
*
* addDelayedCallback is non-blocking and can be called from an interrupt
* context. removeDelayedCallback can take a mutex and is blocking. */
* Delayed callbacks are processed in the order in which they are added.
*
* The delayed callback API is thread-safe. addDelayedCallback is
* non-blocking and can be called from an interrupt context.
* removeDelayedCallback can take a mutex and is blocking. */

void (*addDelayedCallback)(UA_EventLoop *el, UA_DelayedCallback *dc);
void (*removeDelayedCallback)(UA_EventLoop *el, UA_DelayedCallback *dc);
Expand Down

0 comments on commit e373608

Please sign in to comment.