Skip to content

Commit

Permalink
refactor(server): Simplify the Subscription state machine to a single…
Browse files Browse the repository at this point in the history
… enum
  • Loading branch information
jpfr committed Oct 17, 2023
1 parent 95e3ada commit 60a9af8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/server/ua_server_ns0_diagnostics.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fillSubscriptionDiagnostics(UA_Subscription *sub,
diag->maxKeepAliveCount = sub->maxKeepAliveCount;
diag->maxLifetimeCount = sub->lifeTimeCount;
diag->maxNotificationsPerPublish = sub->notificationsPerPublish;
diag->publishingEnabled = sub->publishingEnabled;
diag->publishingEnabled = (sub->state >= UA_SUBSCRIPTIONSTATE_NORMAL);
diag->modifyCount = sub->modifyCount;
diag->enableCount = sub->enableCount;
diag->disableCount = sub->disableCount;
Expand Down
38 changes: 14 additions & 24 deletions src/server/ua_services_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@

#ifdef UA_ENABLE_SUBSCRIPTIONS /* conditional compilation */

static void
setPublishingEnabled(UA_Subscription *sub, UA_Boolean publishingEnabled) {
if(sub->publishingEnabled == publishingEnabled)
return;

sub->publishingEnabled = publishingEnabled;

#ifdef UA_ENABLE_DIAGNOSTICS
if(publishingEnabled)
sub->enableCount++;
else
sub->disableCount++;
#endif
}

static void
setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
UA_Double requestedPublishingInterval,
Expand Down Expand Up @@ -95,14 +80,15 @@ Service_CreateSubscription(UA_Server *server, UA_Session *session,
request->requestedLifetimeCount,
request->requestedMaxKeepAliveCount,
request->maxNotificationsPerPublish, request->priority);
setPublishingEnabled(sub, request->publishingEnabled);
sub->currentKeepAliveCount = sub->maxKeepAliveCount; /* set settings first */
sub->subscriptionId = ++server->lastSubscriptionId; /* Assign the SubscriptionId */

/* Assign the SubscriptionId */
sub->subscriptionId = ++server->lastSubscriptionId;

/* Register the cyclic callback */
UA_StatusCode retval = Subscription_registerPublishCallback(server, sub);
/* Set the state, this also registers the callback */
UA_StatusCode retval = UA_STATUSCODE_GOOD;
if(request->publishingEnabled)
retval = Subscription_enable(server, sub);
else
Subscription_disable(server, sub);
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session,
"Subscription %" PRIu32 " | "
Expand Down Expand Up @@ -206,7 +192,11 @@ Operation_SetPublishingMode(UA_Server *server, UA_Session *session,
return;
}

setPublishingEnabled(sub, *publishingEnabled); /* Set the publishing mode */
/* Enable/disable */
if(*publishingEnabled)
*result = Subscription_enable(server, sub);
else
Subscription_disable(server, sub);

/* Reset the lifetime counter */
Subscription_resetLifetime(sub);
Expand Down Expand Up @@ -541,8 +531,8 @@ Operation_TransferSubscription(UA_Server *server, UA_Session *session,
* that all backpointers are set correctly. */
memcpy(newSub, sub, sizeof(UA_Subscription));

/* Register cyclic publish callback */
result->statusCode = Subscription_registerPublishCallback(server, newSub);
/* Enable / Register cyclic publish callback */
result->statusCode = Subscription_enable(server, newSub);
if(result->statusCode != UA_STATUSCODE_GOOD) {
UA_Array_delete(result->availableSequenceNumbers,
sub->retransmissionQueueSize, &UA_TYPES[UA_TYPES_UINT32]);
Expand Down
45 changes: 33 additions & 12 deletions src/server/ua_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ UA_Subscription_new(void) {
return NULL;

/* The first publish response is sent immediately */
newSub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
newSub->state = UA_SUBSCRIPTIONSTATE_DISABLED;

/* Even if the first publish response is a keepalive the sequence number is 1.
* This can happen by a subscription without a monitored item (see CTT test scripts). */
Expand All @@ -56,7 +56,7 @@ UA_Subscription_delete(UA_Server *server, UA_Subscription *sub) {
UA_EventLoop *el = server->config.eventLoop;

/* Unregister the publish callback and possible delayed callback */
Subscription_unregisterPublishCallback(server, sub);
Subscription_disable(server, sub);
if(sub->delayedCallbackRegistered) {
el->removeDelayedCallback(el, &sub->delayedMoreNotifications);
sub->delayedCallbackRegistered = false;
Expand Down Expand Up @@ -502,7 +502,8 @@ UA_Subscription_publish(UA_Server *server, UA_Subscription *sub) {
}

/* Count the available notifications */
UA_UInt32 notifications = (sub->publishingEnabled) ? sub->notificationQueueSize : 0;
UA_UInt32 notifications = (sub->state >= UA_SUBSCRIPTIONSTATE_NORMAL) ?
sub->notificationQueueSize : 0;
if(notifications > sub->notificationsPerPublish)
notifications = sub->notificationsPerPublish;

Expand Down Expand Up @@ -747,24 +748,44 @@ repeatedPublishCallback(UA_Server *server, UA_Subscription *sub) {
}

UA_StatusCode
Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
UA_LOG_DEBUG_SUBSCRIPTION(&server->config.logger, sub,
"Register subscription publishing callback");
Subscription_enable(UA_Server *server, UA_Subscription *sub) {
UA_LOCK_ASSERT(&server->serviceMutex, 1);
if(sub->publishCallbackId > 0)
UA_LOG_DEBUG_SUBSCRIPTION(&server->config.logger, sub,
"Enable Subscription");

/* Already enabled? */
if(sub->state >= UA_SUBSCRIPTIONSTATE_NORMAL)
return UA_STATUSCODE_GOOD;
return addRepeatedCallback(server, (UA_ServerCallback)repeatedPublishCallback,
sub, sub->publishingInterval, &sub->publishCallbackId);

UA_StatusCode res =
addRepeatedCallback(server, (UA_ServerCallback)repeatedPublishCallback,
sub, sub->publishingInterval, &sub->publishCallbackId);
if(res != UA_STATUSCODE_GOOD)
return res;

sub->state = UA_SUBSCRIPTIONSTATE_NORMAL;
#ifdef UA_ENABLE_DIAGNOSTICS
sub->enableCount++;
#endif
return UA_STATUSCODE_GOOD;
}

void
Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
Subscription_disable(UA_Server *server, UA_Subscription *sub) {
UA_LOCK_ASSERT(&server->serviceMutex, 1);
UA_LOG_DEBUG_SUBSCRIPTION(&server->config.logger, sub,
"Unregister subscription publishing callback");
if(sub->publishCallbackId == 0)
"Disable Subscription");

/* Alreadu disabled? */
if(sub->state < UA_SUBSCRIPTIONSTATE_NORMAL)
return;

removeCallback(server, sub->publishCallbackId);
sub->publishCallbackId = 0;
sub->state = UA_SUBSCRIPTIONSTATE_DISABLED;
#ifdef UA_ENABLE_DIAGNOSTICS
sub->disableCount++;
#endif
}

#endif /* UA_ENABLE_SUBSCRIPTIONS */
12 changes: 4 additions & 8 deletions src/server/ua_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,11 @@ UA_MonitoredItem_ensureQueueSpace(UA_Server *server, UA_MonitoredItem *mon);

/* We use only a subset of the states defined in the standard */
typedef enum {
/* UA_SUBSCRIPTIONSTATE_CLOSED */
/* UA_SUBSCRIPTIONSTATE_CREATING */
UA_SUBSCRIPTIONSTATE_DISABLED,
UA_SUBSCRIPTIONSTATE_REMOVING,
UA_SUBSCRIPTIONSTATE_NORMAL,
UA_SUBSCRIPTIONSTATE_LATE,
UA_SUBSCRIPTIONSTATE_KEEPALIVE,
UA_SUBSCRIPTIONSTATE_REMOVING
} UA_SubscriptionState;

/* Subscriptions are managed in a server-wide linked list. If they are attached
Expand All @@ -253,7 +252,6 @@ struct UA_Subscription {
UA_UInt32 maxKeepAliveCount;
UA_Double publishingInterval; /* in ms */
UA_UInt32 notificationsPerPublish;
UA_Boolean publishingEnabled;
UA_Byte priority;

/* Runtime information */
Expand Down Expand Up @@ -319,12 +317,10 @@ void
UA_Subscription_delete(UA_Server *server, UA_Subscription *sub);

UA_StatusCode
Subscription_registerPublishCallback(UA_Server *server,
UA_Subscription *sub);
Subscription_enable(UA_Server *server, UA_Subscription *sub);

void
Subscription_unregisterPublishCallback(UA_Server *server,
UA_Subscription *sub);
Subscription_disable(UA_Server *server, UA_Subscription *sub);

void
Subscription_resetLifetime(UA_Subscription *sub);
Expand Down

0 comments on commit 60a9af8

Please sign in to comment.