Skip to content

Commit

Permalink
feat(client): Add new function to retrieve the user supplied subscrip…
Browse files Browse the repository at this point in the history
…tion and monitored item contexts (open62541#6877)
  • Loading branch information
RolfKal authored Nov 18, 2024
1 parent 31947a9 commit 22e1bd9
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 16 deletions.
14 changes: 14 additions & 0 deletions include/open62541/client_subscriptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ UA_Client_Subscriptions_delete_async(UA_Client *client,
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Client_Subscriptions_deleteSingle(UA_Client *client, UA_UInt32 subscriptionId);

/* Retrieve or change the user supplied subscription contexts */
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Client_Subscriptions_getContext(UA_Client *client, UA_UInt32 subscriptionId, void **subContext);

UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Client_Subscriptions_setContext(UA_Client *client, UA_UInt32 subscriptionId, void *subContext);

static UA_INLINE UA_THREADSAFE UA_SetPublishingModeResponse
UA_Client_Subscriptions_setPublishingMode(UA_Client *client,
const UA_SetPublishingModeRequest request) {
Expand Down Expand Up @@ -277,6 +284,13 @@ UA_Client_MonitoredItems_setTriggering_async(UA_Client *client,
userdata, requestId);
}

/* Retrieve or change the user supplied monitored item context */
UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Client_MonitoredItem_getContext(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId, void **monContext);

UA_StatusCode UA_EXPORT UA_THREADSAFE
UA_Client_MonitoredItem_setContext(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId, void *monContext);

_UA_END_DECLS

#endif /* UA_CLIENT_SUBSCRIPTIONS_H_ */
130 changes: 115 additions & 15 deletions src/client/ua_client_subscriptions.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ UA_Client_Subscriptions_create_async(UA_Client *client,
}

static UA_Client_Subscription *
findSubscription(const UA_Client *client, UA_UInt32 subscriptionId) {
findSubscriptionById(const UA_Client *client, UA_UInt32 subscriptionId) {
UA_Client_Subscription *sub = NULL;
LIST_FOREACH(sub, &client->subscriptions, listEntry) {
if(sub->subscriptionId == subscriptionId)
Expand All @@ -183,7 +183,7 @@ ua_Subscriptions_modify_handler(UA_Client *client, void *data, UA_UInt32 request
CustomCallback *cc = (CustomCallback *)data;
UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub =
findSubscription(client, (UA_UInt32)(uintptr_t)cc->clientData);
findSubscriptionById(client, (UA_UInt32)(uintptr_t)cc->clientData);
if(sub) {
ua_Subscriptions_modify(client, sub, response);
} else {
Expand All @@ -198,6 +198,42 @@ ua_Subscriptions_modify_handler(UA_Client *client, void *data, UA_UInt32 request
UA_free(cc);
}

UA_StatusCode
UA_Client_Subscriptions_getContext(UA_Client *client, UA_UInt32 subscriptionId, void **subContext)
{
if (!client || !subContext)
return UA_STATUSCODE_BADINVALIDARGUMENT;

UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
if (!sub) {
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
}

*subContext = sub->context;
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_GOOD;
}

UA_StatusCode
UA_Client_Subscriptions_setContext(UA_Client *client, UA_UInt32 subscriptionId, void *subContext)
{
if (!client)
return UA_STATUSCODE_BADINVALIDARGUMENT;

UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
if (!sub) {
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
}

sub->context = subContext;
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_GOOD;
}

UA_ModifySubscriptionResponse
UA_Client_Subscriptions_modify(UA_Client *client,
const UA_ModifySubscriptionRequest request) {
Expand All @@ -206,7 +242,7 @@ UA_Client_Subscriptions_modify(UA_Client *client,

/* Find the internal representation */
UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
UA_UNLOCK(&client->clientMutex);
if(!sub) {
response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
Expand All @@ -220,7 +256,7 @@ UA_Client_Subscriptions_modify(UA_Client *client,

/* Adjust the internal representation. Lookup again for thread-safety. */
UA_LOCK(&client->clientMutex);
sub = findSubscription(client, request.subscriptionId);
sub = findSubscriptionById(client, request.subscriptionId);
if(!sub) {
response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
UA_UNLOCK(&client->clientMutex);
Expand All @@ -238,7 +274,7 @@ UA_Client_Subscriptions_modify_async(UA_Client *client,
void *userdata, UA_UInt32 *requestId) {
/* Find the internal representation */
UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
UA_UNLOCK(&client->clientMutex);
if(!sub)
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
Expand Down Expand Up @@ -313,7 +349,7 @@ __Client_Subscription_processDelete(UA_Client *client,

/* Get the Subscription */
UA_Client_Subscription *sub =
findSubscription(client, request->subscriptionIds[i]);
findSubscriptionById(client, request->subscriptionIds[i]);
if(!sub) {
UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
"No internal representation of subscription %" PRIu32,
Expand Down Expand Up @@ -465,7 +501,7 @@ ua_MonitoredItems_create(UA_Client *client, MonitoredItems_CreateData *data,
UA_CreateMonitoredItemsRequest *request = &data->request;
UA_Client_DeleteMonitoredItemCallback *deleteCallbacks = data->deleteCallbacks;

UA_Client_Subscription *sub = findSubscription(client, data->request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, data->request.subscriptionId);
if(!sub)
goto cleanup;

Expand Down Expand Up @@ -607,7 +643,7 @@ ua_Client_MonitoredItems_create(UA_Client *client,
}

/* Test if the subscription is valid */
UA_Client_Subscription *sub = findSubscription(client, request->subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request->subscriptionId);
if(!sub) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
return;
Expand Down Expand Up @@ -644,7 +680,7 @@ createDataChanges_async(UA_Client *client, const UA_CreateMonitoredItemsRequest
UA_UInt32 *requestId) {
UA_LOCK_ASSERT(&client->clientMutex);

UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
if(!sub)
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;

Expand Down Expand Up @@ -830,7 +866,7 @@ ua_MonitoredItems_delete_handler(UA_Client *client, void *d, UA_UInt32 requestId
if(response->responseHeader.serviceResult != UA_STATUSCODE_GOOD)
goto cleanup;

sub = findSubscription(client, request->subscriptionId);
sub = findSubscriptionById(client, request->subscriptionId);
if(!sub) {
UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
"No internal representation of subscription %" PRIu32,
Expand Down Expand Up @@ -864,7 +900,7 @@ UA_Client_MonitoredItems_delete(UA_Client *client,
UA_LOCK(&client->clientMutex);

/* Find the internal subscription representation */
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
if(!sub) {
UA_LOG_INFO(client->config.logging, UA_LOGCATEGORY_CLIENT,
"No internal representation of subscription %" PRIu32,
Expand Down Expand Up @@ -962,7 +998,7 @@ UA_Client_MonitoredItems_modify(UA_Client *client,
UA_ModifyMonitoredItemsResponse_init(&response);

UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
if(!sub) {
UA_UNLOCK(&client->clientMutex);
response.responseHeader.serviceResult = UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
Expand All @@ -988,7 +1024,7 @@ UA_Client_MonitoredItems_modify_async(UA_Client *client,
UA_ClientAsyncServiceCallback callback,
void *userdata, UA_UInt32 *requestId) {
UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscription(client, request.subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, request.subscriptionId);
if(!sub) {
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
Expand All @@ -1007,6 +1043,70 @@ UA_Client_MonitoredItems_modify_async(UA_Client *client,
return statusCode;
}

static void *
ua_MonitoredItem_findByID(void *data, UA_Client_MonitoredItem *mon) {
UA_UInt32 monitorId = *(UA_UInt32*)data;
if (monitorId && (mon->monitoredItemId == monitorId)) {
return mon;
}
return NULL;
}

static UA_Client_MonitoredItem
*findMonitoredItemById(UA_Client_Subscription *sub, UA_UInt32 monitoredItemId)
{
return (UA_Client_MonitoredItem *)
ZIP_ITER(MonitorItemsTree, &sub->monitoredItems, ua_MonitoredItem_findByID, &monitoredItemId);
}

UA_StatusCode
UA_Client_MonitoredItem_getContext(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId, void **monContext)
{
if (!client || !monContext)
return UA_STATUSCODE_BADINVALIDARGUMENT;

*monContext = NULL;

UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
if (!sub) {
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
}

UA_StatusCode status = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
UA_Client_MonitoredItem *monItem = findMonitoredItemById(sub, monitoredItemId);
if (monItem) {
*monContext = monItem->context;
status = UA_STATUSCODE_GOOD;
}
UA_UNLOCK(&client->clientMutex);
return status;
}

UA_StatusCode
UA_Client_MonitoredItem_setContext(UA_Client *client, UA_UInt32 subscriptionId, UA_UInt32 monitoredItemId, void *monContext)
{
if (!client)
return UA_STATUSCODE_BADINVALIDARGUMENT;

UA_LOCK(&client->clientMutex);
UA_Client_Subscription *sub = findSubscriptionById(client, subscriptionId);
if (!sub) {
UA_UNLOCK(&client->clientMutex);
return UA_STATUSCODE_BADSUBSCRIPTIONIDINVALID;
}

UA_StatusCode status = UA_STATUSCODE_BADMONITOREDITEMIDINVALID;
UA_Client_MonitoredItem *monItem = findMonitoredItemById(sub, monitoredItemId);
if (monItem) {
monItem->context = monContext;
status = UA_STATUSCODE_GOOD;
}
UA_UNLOCK(&client->clientMutex);
return status;
}

/*************************************/
/* Async Processing of Notifications */
/*************************************/
Expand Down Expand Up @@ -1208,7 +1308,7 @@ __Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishReque
{
UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
"Received BadNoSubscription, delete internal information about subscription");
UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, response->subscriptionId);
if(sub != NULL)
__Client_Subscription_deleteInternal(client, sub);
return;
Expand All @@ -1219,7 +1319,7 @@ __Client_Subscriptions_processPublishResponse(UA_Client *client, UA_PublishReque
return;
}

UA_Client_Subscription *sub = findSubscription(client, response->subscriptionId);
UA_Client_Subscription *sub = findSubscriptionById(client, response->subscriptionId);
if(!sub) {
response->responseHeader.serviceResult = UA_STATUSCODE_BADINTERNALERROR;
UA_LOG_WARNING(client->config.logging, UA_LOGCATEGORY_CLIENT,
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/ua_pubsub_dataset.c
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ UA_DataSetFieldConfig_copy(const UA_DataSetFieldConfig *src,
}

UA_DataSetField *
UA_DataSetField_find(UA_PubSubManager *psm, UA_NodeId id) {
UA_DataSetField_find(UA_PubSubManager *psm, const UA_NodeId id) {
if(!psm)
return NULL;
UA_PublishedDataSet *tmpPDS;
Expand Down

0 comments on commit 22e1bd9

Please sign in to comment.