Skip to content

Commit

Permalink
feat(pubsub): Allow to configure a basetime for the cyclic publish in…
Browse files Browse the repository at this point in the history
…terval
  • Loading branch information
jpfr committed Oct 20, 2024
1 parent 5669534 commit 8a27295
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
10 changes: 10 additions & 0 deletions include/open62541/server_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,17 @@ typedef struct {
UA_Byte priority;
UA_ExtensionObject transportSettings;
UA_ExtensionObject messageSettings;

/* The group properties are a key-value map whose members are defined in
* future versions of the OPC UA standard. We prefix the non-standard fields
* defined for open62541 with __.
*
* 0:__publish-basetime [datetime]: Defines the basetime for the cyclic
* publishing interval. This also sets the timer policy
* UA_TIMER_HANDLE_CYCLEMISS_WITH_BASETIME for the publishing interval.
*/
UA_KeyValueMap groupProperties;

UA_PubSubEncodingType encodingMimeType;
/* PubSub Manager Callback */
UA_PubSub_CallbackLifecycle pubsubManagerCallback;
Expand Down
15 changes: 10 additions & 5 deletions src/pubsub/ua_pubsub_writergroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,28 @@ UA_WriterGroup_addPublishCallback(UA_Server *server, UA_WriterGroup *wg) {
if(wg->publishCallbackId != 0)
return UA_STATUSCODE_BADINTERNALERROR;

const UA_DateTime *basetime = (const UA_DateTime *)
UA_KeyValueMap_getScalar(&wg->config.groupProperties,
UA_QUALIFIEDNAME(0, "__publish-basetime"),
&UA_TYPES[UA_TYPES_DATETIME]);
UA_TimerPolicy timerpolicy = (basetime) ?
UA_TIMER_HANDLE_CYCLEMISS_WITH_BASETIME :
UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME;

UA_StatusCode retval = UA_STATUSCODE_GOOD;
if(wg->config.pubsubManagerCallback.addCustomCallback) {
/* Use configured mechanism for cyclic callbacks */
retval = wg->config.pubsubManagerCallback.
addCustomCallback(server, wg->identifier,
(UA_ServerCallback)UA_WriterGroup_publishCallback,
wg, wg->config.publishingInterval,
NULL, UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&wg->publishCallbackId);
basetime, timerpolicy, &wg->publishCallbackId);
} else {
/* Use EventLoop for cyclic callbacks */
UA_EventLoop *el = UA_PubSubConnection_getEL(server, wg->linkedConnection);
retval = el->addCyclicCallback(el, (UA_Callback)UA_WriterGroup_publishCallback,
server, wg, wg->config.publishingInterval,
NULL /* TODO: use basetime */,
UA_TIMER_HANDLE_CYCLEMISS_WITH_CURRENTTIME,
&wg->publishCallbackId);
basetime, timerpolicy, &wg->publishCallbackId);
}
if(retval != UA_STATUSCODE_GOOD)
return retval;
Expand Down

0 comments on commit 8a27295

Please sign in to comment.