diff --git a/CMakeLists.txt b/CMakeLists.txt index f5e524d4d0b..26b632ac8ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -612,10 +612,10 @@ if(CMAKE_COMPILER_IS_GNUCC OR CMAKE_C_COMPILER_ID STREQUAL "Clang") # Force 32bit build if(UA_FORCE_32BIT) - if(MSVC) - message(FATAL_ERROR "Select the 32bit (cross-) compiler instead of forcing compiler options") - endif() - set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m32") # GCC and Clang, possibly more + if(MSVC) + message(FATAL_ERROR "Select the 32bit (cross-) compiler instead of forcing compiler options") + endif() + check_add_cc_flag("-m32") # GCC and Clang, possibly more endif() if(NOT MINGW AND NOT UA_BUILD_OSS_FUZZ) @@ -1233,17 +1233,6 @@ ua_generate_nodeset(NAME "ns0" FILE ${UA_FILE_NODESETS} ${UA_NODESET_FILE_DA} IGNORE "${PROJECT_SOURCE_DIR}/tools/nodeset_compiler/NodeID_NS0_Base.txt" DEPENDS_TARGET "open62541-generator-types") -# stack protector and optimization needs to be disabled for the huge ns0 file, otherwise debian packaging fails due to long build times. -# We also disable optimization on Appveyor builds, since they take almost an hour otherwise -if(UA_PACK_DEBIAN OR (NOT "$ENV{APPVEYOR}" STREQUAL "") OR ( - (CMAKE_BUILD_TYPE STREQUAL "MinSizeRel" OR CMAKE_BUILD_TYPE STREQUAL "Release" OR CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo") AND ( - # List of compilers which have problems with the huge ns0 optimization - (("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") AND (CMAKE_C_COMPILER_VERSION VERSION_LESS 7.0)) - ) - )) - set_source_files_properties(${PROJECT_BINARY_DIR}/src_generated/open62541/namespace0_generated.c PROPERTIES COMPILE_FLAGS "-fno-stack-protector -O0") -endif() - if(UA_ENABLE_NODESET_INJECTOR) message(STATUS "Nodesetinjector feature enabled") cmake_minimum_required(VERSION 3.20) @@ -1306,6 +1295,7 @@ if(UA_ENABLE_AMALGAMATION) add_dependencies(open62541-amalgamation-header open62541-generator-namespace) else() add_library(open62541-object OBJECT ${lib_sources} ${lib_headers} ${exported_headers}) + target_include_directories(open62541-object PRIVATE ${PROJECT_SOURCE_DIR}/src) add_custom_target(open62541-code-generation DEPENDS open62541-generator-types @@ -1321,7 +1311,11 @@ else() add_coverage(open62541-object) endif() - target_include_directories(open62541-object PRIVATE ${PROJECT_SOURCE_DIR}/src) + # stack protector and optimization are disabled for the huge ns0 file + if(UA_NAMESPACE_ZERO STREQUAL "FULL" AND NOT MSVC) + set_source_files_properties(${PROJECT_BINARY_DIR}/src_generated/open62541/namespace0_generated.c + PROPERTIES COMPILE_FLAGS "-fno-stack-protector -O0") + endif() add_library(open62541-plugins OBJECT ${plugin_sources} ${architecture_sources} ${exported_headers}) add_dependencies(open62541-plugins open62541-generator-types open62541-generator-transport open62541-generator-namespace) diff --git a/arch/eventloop_posix_eth.c b/arch/eventloop_posix_eth.c index 4ed043db092..66dfba48501 100644 --- a/arch/eventloop_posix_eth.c +++ b/arch/eventloop_posix_eth.c @@ -580,7 +580,7 @@ ETH_openConnection(UA_ConnectionManager *cm, const UA_KeyValueMap *params, /* Validate the parameters */ UA_StatusCode res = UA_KeyValueRestriction_validate(el->eventLoop.logger, "ETH", ETHConfigParameters, - ETH_PARAMETERSSIZE, params); + ethParams, params); if(res != UA_STATUSCODE_GOOD) { UA_UNLOCK(&el->elMutex); return res; diff --git a/deps/itoa.c b/deps/itoa.c index ac85b57128d..b8d4b75852c 100644 --- a/deps/itoa.c +++ b/deps/itoa.c @@ -66,49 +66,34 @@ UA_UInt16 itoaUnsigned(UA_UInt64 value, char* buffer, UA_Byte base) { return i; } -/* adapted from http://www.techiedelight.com/implement-itoa-function-in-c/ to use UA_... types */ +/* adapted from http://www.techiedelight.com/implement-itoa-function-in-c/ */ UA_UInt16 itoaSigned(UA_Int64 value, char* buffer) { - /* consider absolute value of number */ - - - UA_UInt64 n; - /* Special case for UA_INT64_MIN which can not simply be negated */ /* it will cause a signed integer overflow */ - if (value == UA_INT64_MIN) { + UA_UInt64 n; + if(value == UA_INT64_MIN) { n = (UA_UInt64)UA_INT64_MAX + 1; - } - else { + } else { n = (UA_UInt64)value; - if(value < 0){ n = (UA_UInt64)-value; } } UA_UInt16 i = 0; - while (n) { + while(n) { UA_UInt64 r = n % 10; - - if (r >= 10) - buffer[i++] = (char)(65 + (r - 10)); - else - buffer[i++] = (char)(48 + r); - + buffer[i++] = (char)('0' + r); n = n / 10; } - /* if number is 0 */ - if (i == 0) - buffer[i++] = '0'; - - if (value < 0) + if(i == 0) + buffer[i++] = '0'; /* if number is 0 */ + if(value < 0) buffer[i++] = '-'; - buffer[i] = '\0'; /* null terminate string */ i--; - /* reverse the string and return it */ - reverse(buffer, 0, i); + reverse(buffer, 0, i); /* reverse the string and return it */ i++; return i; } diff --git a/include/open62541/client.h b/include/open62541/client.h index bcd0d7d752b..dcdd3fc6475 100644 --- a/include/open62541/client.h +++ b/include/open62541/client.h @@ -726,27 +726,18 @@ UA_Client_Service_queryNext(UA_Client *client, * --------------------- * All OPC UA services are asynchronous in nature. So several service calls can * be made without waiting for the individual responses. Depending on the - * server's priorities responses may come in a different ordering than sent. - * - * As noted in :ref:`the client overview` currently no means - * of handling asynchronous events automatically is provided. However, some - * synchronous function calls will trigger handling, but to ensure this - * happens a client should periodically call `UA_Client_run_iterate` - * explicitly. - * - * Connection and session management are also performed in - * `UA_Client_run_iterate`, so to keep a connection healthy any client needs to - * consider how and when it is appropriate to do the call. - * This is especially true for the periodic renewal of a SecureChannel's - * SecurityToken which is designed to have a limited lifetime and will - * invalidate the connection if not renewed. - * - * Use the typed wrappers instead of `__UA_Client_AsyncService` directly. See - * :ref:`client_async`. However, the general mechanism of async service calls is - * explained here. - */ - -/* We say that an async service call has been dispatched once + * server's priorities responses may come in a different ordering than sent. Use + * the typed wrappers for async service requests instead of + * `__UA_Client_AsyncService` directly. See :ref:`client_async`. However, the + * general mechanism of async service calls is explained here. + * + * Connection and session management are performed in `UA_Client_run_iterate`, + * so to keep a connection healthy any client needs to consider how and when it + * is appropriate to do the call. This is especially true for the periodic + * renewal of a SecureChannel's SecurityToken which is designed to have a + * limited lifetime and will invalidate the connection if not renewed. + * + * We say that an async service call has been dispatched once * __UA_Client_AsyncService returns UA_STATUSCODE_GOOD. If there is an error * after an async service has been dispatched, the callback is called with an * "empty" response where the StatusCode has been set accordingly. This is also @@ -754,13 +745,18 @@ UA_Client_Service_queryNext(UA_Client *client, * is emptied. * * The StatusCode received when the client is shutting down is - * UA_STATUSCODE_BADSHUTDOWN. - * - * The StatusCode received when the client doesn't receive response after the - * specified in config->timeout (can be overridden via the "timeoutHint" in the - * request header) is UA_STATUSCODE_BADTIMEOUT. - * - * The userdata and requestId arguments can be NULL. */ + * UA_STATUSCODE_BADSHUTDOWN. The StatusCode received when the client doesn't + * receive response after the specified in config->timeout (can be overridden + * via the "timeoutHint" in the request header) is UA_STATUSCODE_BADTIMEOUT. + * + * The userdata and requestId arguments can be NULL. The (optional) requestId + * output can be used to cancel the service while it is still pending. The + * requestId is unique for each service request. Alternatively the requestHandle + * can be manually set (non necessarily unique) in the request header for full + * service call. This can be used to cancel all outstanding requests using that + * handle together. Note that the client will auto-generate a requestHandle + * >100,000 if none is defined. Avoid these when manually setting a requetHandle + * in the requestHeader to avoid clashes. */ typedef void (*UA_ClientAsyncServiceCallback)(UA_Client *client, void *userdata, UA_UInt32 requestId, void *response); @@ -772,6 +768,19 @@ __UA_Client_AsyncService(UA_Client *client, const void *request, const UA_DataType *responseType, void *userdata, UA_UInt32 *requestId); +/* Cancel all dispatched requests with the given requestHandle. + * The number if cancelled requests is returned by the server. + * The output argument cancelCount is not set if NULL. */ +UA_EXPORT UA_THREADSAFE UA_StatusCode +UA_Client_cancelByRequestHandle(UA_Client *client, UA_UInt32 requestHandle, + UA_UInt32 *cancelCount); + +/* Map the requestId to the requestHandle used for that request and call the + * Cancel service for that requestHandle. */ +UA_EXPORT UA_THREADSAFE UA_StatusCode +UA_Client_cancelByRequestId(UA_Client *client, UA_UInt32 requestId, + UA_UInt32 *cancelCount); + /* Set new userdata and callback for an existing request. * * @param client Pointer to the UA_Client diff --git a/src/client/ua_client.c b/src/client/ua_client.c index b18d5d10a79..d80d3c2ffa2 100644 --- a/src/client/ua_client.c +++ b/src/client/ua_client.c @@ -362,7 +362,15 @@ sendRequest(UA_Client *client, const void *request, UA_NodeId oldToken = rr->authenticationToken; /* Put back in place later */ rr->authenticationToken = client->authenticationToken; rr->timestamp = UA_DateTime_now(); - rr->requestHandle = ++client->requestHandle; + + /* Create a unique handle >100,000 if not manually defined. The handle is + * not necessarily unique when manually defined and used to cancel async + * service requests. */ + if(rr->requestHandle == 0) { + if(UA_UNLIKELY(client->requestHandle < 100000)) + client->requestHandle = 100000; + rr->requestHandle = ++client->requestHandle; + } /* Set the timeout hint if not manually defined */ if(rr->timeoutHint == 0) @@ -625,6 +633,7 @@ __Client_Service(UA_Client *client, const void *request, ac.requestId = requestId; ac.start = UA_DateTime_nowMonotonic(); /* Start timeout after sending */ ac.timeout = rh->timeoutHint; + ac.requestHandle = rh->requestHandle; if(ac.timeout == 0) ac.timeout = UA_UINT32_MAX; /* 0 -> unlimited */ @@ -809,6 +818,7 @@ __Client_AsyncService(UA_Client *client, const void *request, ac->syncResponse = NULL; ac->start = UA_DateTime_nowMonotonic(); ac->timeout = rh->timeoutHint; + ac->requestHandle = rh->requestHandle; if(ac->timeout == 0) ac->timeout = UA_UINT32_MAX; /* 0 -> unlimited */ @@ -838,6 +848,47 @@ __UA_Client_AsyncService(UA_Client *client, const void *request, return res; } +static UA_StatusCode +cancelByRequestHandle(UA_Client *client, UA_UInt32 requestHandle, UA_UInt32 *cancelCount) { + UA_CancelRequest creq; + UA_CancelRequest_init(&creq); + creq.requestHandle = requestHandle; + UA_CancelResponse cresp; + UA_CancelResponse_init(&cresp); + __Client_Service(client, &creq, &UA_TYPES[UA_TYPES_CANCELREQUEST], + &cresp, &UA_TYPES[UA_TYPES_CANCELRESPONSE]); + if(cancelCount) + *cancelCount = cresp.cancelCount; + UA_StatusCode res = cresp.responseHeader.serviceResult; + UA_CancelResponse_clear(&cresp); + return res; +} + +UA_StatusCode +UA_Client_cancelByRequestHandle(UA_Client *client, UA_UInt32 requestHandle, + UA_UInt32 *cancelCount) { + UA_LOCK(&client->clientMutex); + UA_StatusCode res = cancelByRequestHandle(client, requestHandle, cancelCount); + UA_UNLOCK(&client->clientMutex); + return res; +} + +UA_StatusCode +UA_Client_cancelByRequestId(UA_Client *client, UA_UInt32 requestId, + UA_UInt32 *cancelCount) { + UA_LOCK(&client->clientMutex); + UA_StatusCode res = UA_STATUSCODE_BADNOTFOUND; + AsyncServiceCall *ac; + LIST_FOREACH(ac, &client->asyncServiceCalls, pointers) { + if(ac->requestId != requestId) + continue; + res = cancelByRequestHandle(client, ac->requestHandle, cancelCount); + break; + } + UA_UNLOCK(&client->clientMutex); + return res; +} + /*******************/ /* Timed Callbacks */ /*******************/ diff --git a/src/client/ua_client_connect.c b/src/client/ua_client_connect.c index f0e134ccfe1..51960e763b8 100644 --- a/src/client/ua_client_connect.c +++ b/src/client/ua_client_connect.c @@ -779,7 +779,7 @@ responseGetEndpoints(UA_Client *client, void *userdata, client->endpointsHandshake = false; UA_LOG_DEBUG(&client->config.logger, UA_LOGCATEGORY_CLIENT, - "Received FindServersResponse"); + "Received GetEndpointsResponse"); UA_GetEndpointsResponse *resp = (UA_GetEndpointsResponse*)response; @@ -1059,7 +1059,7 @@ responseFindServers(UA_Client *client, void *userdata, } } - /* The current EndpointURL is not usable. Pick the first DiscoveryUrl of a + /* The current EndpointURL is not usable. Pick the first "opc.tcp" DiscoveryUrl of a * returned server. */ for(size_t i = 0; i < fsr->serversSize; i++) { UA_ApplicationDescription *server = &fsr->servers[i]; @@ -1068,8 +1068,6 @@ responseFindServers(UA_Client *client, void *userdata, server->applicationType != UA_APPLICATIONTYPE_DISCOVERYSERVER ) continue; - if(server->discoveryUrlsSize == 0) - continue; /* Filter by the ApplicationURI if defined */ if(client->config.applicationUri.length > 0 && @@ -1077,23 +1075,35 @@ responseFindServers(UA_Client *client, void *userdata, &server->applicationUri)) continue; - /* Use this DiscoveryUrl in the client */ - UA_String_clear(&client->discoveryUrl); - client->discoveryUrl = server->discoveryUrls[0]; - UA_String_init(&server->discoveryUrls[0]); + for(size_t j = 0; j < server->discoveryUrlsSize; j++) { + /* Try to parse the DiscoveryUrl. This weeds out http schemas (etc.) + * and invalid DiscoveryUrls in general. */ + UA_String hostname, path; + UA_UInt16 port; + UA_StatusCode res = + UA_parseEndpointUrl(&server->discoveryUrls[j], &hostname, + &port, &path); + if(res != UA_STATUSCODE_GOOD) + continue; - UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT, - "Use the EndpointURL %.*s returned from FindServers", - (int)client->discoveryUrl.length, - client->discoveryUrl.data); + /* Use this DiscoveryUrl in the client */ + UA_String_clear(&client->discoveryUrl); + client->discoveryUrl = server->discoveryUrls[j]; + UA_String_init(&server->discoveryUrls[j]); - /* Close the SecureChannel to build it up new with the correct - * EndpointURL in the HEL/ACK handshake */ - closeSecureChannel(client); - return; + UA_LOG_INFO(&client->config.logger, UA_LOGCATEGORY_CLIENT, + "Use the EndpointURL %.*s returned from FindServers", + (int)client->discoveryUrl.length, client->discoveryUrl.data); + + /* Close the SecureChannel to build it up new with the correct + * EndpointURL in the HEL/ACK handshake */ + closeSecureChannel(client); + return; + } } - /* Could not find a suitable server. Try to continue. */ + /* Could not find a suitable server. Try to continue with the + * original EndpointURL. */ UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, "FindServers did not returned a suitable DiscoveryURL. " "Continue with the EndpointURL %.*s.", @@ -1221,6 +1231,27 @@ createSessionAsync(UA_Client *client) { return res; } +/* A workaround if the DiscoveryUrl returned by the FindServers service doesn't work. + * Then default back to the initial EndpointUrl and pretend that was returned + * by FindServers. */ +static void +fixBadDiscoveryUrl(UA_Client* client) { + if(client->connectStatus == UA_STATUSCODE_GOOD) + return; + if(client->discoveryUrl.length == 0 || + UA_String_equal(&client->discoveryUrl, &client->config.endpointUrl)) + return; + + UA_LOG_WARNING(&client->config.logger, UA_LOGCATEGORY_CLIENT, + "The DiscoveryUrl returned by the FindServers service (%.*s) could not be " + "connected. Trying with the original EndpointUrl.", + (int)client->discoveryUrl.length, client->discoveryUrl.data); + + UA_String_clear(&client->discoveryUrl); + UA_String_copy(&client->config.endpointUrl, &client->discoveryUrl); + client->connectStatus = UA_STATUSCODE_GOOD; +} + static void initSecurityPolicy(UA_Client *client) { /* Already initialized */ @@ -1506,6 +1537,7 @@ __Client_networkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, /* Trigger the next action from our end to fully open up the connection */ continue_connect: + fixBadDiscoveryUrl(client); if(!isFullyConnected(client)) connectActivity(client); @@ -1516,6 +1548,7 @@ __Client_networkCallback(UA_ConnectionManager *cm, uintptr_t connectionId, refuse_connection: client->connectStatus = UA_STATUSCODE_BADCONNECTIONREJECTED; + fixBadDiscoveryUrl(client); notifyClientState(client); UA_UNLOCK(&client->clientMutex); } @@ -1656,8 +1689,7 @@ connectSync(UA_Client *client) { } UA_StatusCode -__UA_Client_connect(UA_Client *client, UA_Boolean async) { - UA_LOCK(&client->clientMutex); +connectInternal(UA_Client *client, UA_Boolean async) { /* Reset the connectStatus. This should be the only place where we can * recover from a bad connectStatus. */ client->connectStatus = UA_STATUSCODE_GOOD; @@ -1667,6 +1699,22 @@ __UA_Client_connect(UA_Client *client, UA_Boolean async) { else connectSync(client); notifyClientState(client); + return client->connectStatus; +} + +UA_StatusCode +connectSecureChannel(UA_Client *client, const char *endpointUrl) { + UA_ClientConfig *cc = UA_Client_getConfig(client); + cc->noSession = true; + UA_String_clear(&cc->endpointUrl); + cc->endpointUrl = UA_STRING_ALLOC(endpointUrl); + return connectInternal(client, false); +} + +UA_StatusCode +__UA_Client_connect(UA_Client *client, UA_Boolean async) { + UA_LOCK(&client->clientMutex); + connectInternal(client, async); UA_UNLOCK(&client->clientMutex); return client->connectStatus; } diff --git a/src/client/ua_client_discovery.c b/src/client/ua_client_discovery.c index e84240d5efc..7f948b157e4 100644 --- a/src/client/ua_client_discovery.c +++ b/src/client/ua_client_discovery.c @@ -89,11 +89,11 @@ UA_Client_getEndpoints(UA_Client *client, const char *serverUrl, UA_StatusCode retval; const UA_String url = UA_STRING((char*)(uintptr_t)serverUrl); if(!connected) { - UA_UNLOCK(&client->clientMutex); - retval = UA_Client_connectSecureChannel(client, serverUrl); - if(retval != UA_STATUSCODE_GOOD) + retval = connectSecureChannel(client, serverUrl); + if(retval != UA_STATUSCODE_GOOD) { + UA_UNLOCK(&client->clientMutex); return retval; - UA_LOCK(&client->clientMutex); + } } retval = getEndpointsInternal(client, url, endpointDescriptionsSize, endpointDescriptions); @@ -121,11 +121,11 @@ UA_Client_findServers(UA_Client *client, const char *serverUrl, UA_StatusCode retval; if(!connected) { - UA_UNLOCK(&client->clientMutex); - retval = UA_Client_connectSecureChannel(client, serverUrl); - if(retval != UA_STATUSCODE_GOOD) + retval = connectSecureChannel(client, serverUrl); + if(retval != UA_STATUSCODE_GOOD) { + UA_UNLOCK(&client->clientMutex); return retval; - UA_LOCK(&client->clientMutex); + } } /* Prepare the request */ @@ -179,11 +179,11 @@ UA_Client_findServersOnNetwork(UA_Client *client, const char *serverUrl, UA_StatusCode retval; if(!connected) { - UA_UNLOCK(&client->clientMutex); - retval = UA_Client_connectSecureChannel(client, serverUrl); - if(retval != UA_STATUSCODE_GOOD) + retval = connectSecureChannel(client, serverUrl); + if(retval != UA_STATUSCODE_GOOD) { + UA_LOCK(&client->clientMutex); return retval; - UA_LOCK(&client->clientMutex); + } } /* Prepare the request */ diff --git a/src/client/ua_client_internal.h b/src/client/ua_client_internal.h index 1f2e2db4bf1..2b7c4fd7ca6 100644 --- a/src/client/ua_client_internal.h +++ b/src/client/ua_client_internal.h @@ -82,7 +82,9 @@ __Client_Subscriptions_backgroundPublishInactivityCheck(UA_Client *client); typedef struct AsyncServiceCall { LIST_ENTRY(AsyncServiceCall) pointers; - UA_UInt32 requestId; + UA_UInt32 requestId; /* Unique id */ + UA_UInt32 requestHandle; /* Potentially non-unique if manually defined in + * the request header*/ UA_ClientAsyncServiceCallback callback; const UA_DataType *responseType; void *userdata; @@ -132,13 +134,14 @@ struct UA_Client { /* SecureChannel */ UA_SecureChannel channel; - UA_UInt32 requestId; + UA_UInt32 requestId; /* Unique, internally defined for each request */ UA_DateTime nextChannelRenewal; /* Session */ UA_SessionState sessionState; UA_NodeId authenticationToken; - UA_UInt32 requestHandle; + UA_UInt32 requestHandle; /* Unique handles >100,000 are generated if the + * request header contains a zero-handle. */ UA_ByteString serverSessionNonce; UA_ByteString clientSessionNonce; @@ -188,6 +191,8 @@ processServiceResponse(void *application, UA_SecureChannel *channel, UA_MessageType messageType, UA_UInt32 requestId, UA_ByteString *message); +UA_StatusCode connectInternal(UA_Client *client, UA_Boolean async); +UA_StatusCode connectSecureChannel(UA_Client *client, const char *endpointUrl); UA_Boolean isFullyConnected(UA_Client *client); void connectSync(UA_Client *client); void notifyClientState(UA_Client *client); diff --git a/src/pubsub/ua_pubsub_connection.c b/src/pubsub/ua_pubsub_connection.c index 10f5a451eca..0b6866a49a5 100644 --- a/src/pubsub/ua_pubsub_connection.c +++ b/src/pubsub/ua_pubsub_connection.c @@ -87,7 +87,7 @@ decodeNetworkMessage(UA_Server *server, UA_ByteString *buffer, size_t *pos, } #endif - rv = UA_NetworkMessage_decodePayload(buffer, pos, nm, server->config.customDataTypes); + rv = UA_NetworkMessage_decodePayload(buffer, pos, nm, server->config.customDataTypes, NULL); if(rv != UA_STATUSCODE_GOOD) { UA_NetworkMessage_clear(nm); return rv; diff --git a/src/pubsub/ua_pubsub_networkmessage.c b/src/pubsub/ua_pubsub_networkmessage.c index 509f532fc20..4c6065418c1 100644 --- a/src/pubsub/ua_pubsub_networkmessage.c +++ b/src/pubsub/ua_pubsub_networkmessage.c @@ -759,21 +759,20 @@ UA_NetworkMessage_decodeHeaders(const UA_ByteString *src, size_t *offset, UA_CHECK_STATUS(rv, return rv); } + rv = UA_ExtendedNetworkMessageHeader_decodeBinary(src, offset, dst); + UA_CHECK_STATUS(rv, return rv); + if(dst->securityEnabled) { rv = UA_SecurityHeader_decodeBinary(src, offset, dst); UA_CHECK_STATUS(rv, return rv); } - rv = UA_ExtendedNetworkMessageHeader_decodeBinary(src, offset, dst); - UA_CHECK_STATUS(rv, return rv); - return UA_STATUSCODE_GOOD; } UA_StatusCode -UA_NetworkMessage_decodePayload(const UA_ByteString *src, size_t *offset, - UA_NetworkMessage *dst, - const UA_DataTypeArray *customTypes) { +UA_NetworkMessage_decodePayload(const UA_ByteString *src, size_t *offset, UA_NetworkMessage *dst, + const UA_DataTypeArray *customTypes, UA_DataSetMetaDataType *dsm) { // Payload if(dst->networkMessageType != UA_NETWORKMESSAGE_DATASET) return UA_STATUSCODE_BADNOTIMPLEMENTED; @@ -802,12 +801,13 @@ UA_NetworkMessage_decodePayload(const UA_ByteString *src, size_t *offset, if(count == 1) { rv = UA_DataSetMessage_decodeBinary(src, offset, &dst->payload.dataSetPayload.dataSetMessages[0], - 0, customTypes); + 0, customTypes, dsm); } else { for(UA_Byte i = 0; i < count; i++) { rv = UA_DataSetMessage_decodeBinary(src, offset, &dst->payload.dataSetPayload.dataSetMessages[i], - dst->payload.dataSetPayload.sizes[i], customTypes); + dst->payload.dataSetPayload.sizes[i], customTypes, + dsm); } } UA_CHECK_STATUS(rv, return rv); @@ -862,7 +862,7 @@ UA_NetworkMessage_decodeBinary(const UA_ByteString *src, size_t *offset, UA_StatusCode rv = UA_NetworkMessage_decodeHeaders(src, offset, dst); UA_CHECK_STATUS(rv, return rv); - rv = UA_NetworkMessage_decodePayload(src, offset, dst, customTypes); + rv = UA_NetworkMessage_decodePayload(src, offset, dst, customTypes, NULL); UA_CHECK_STATUS(rv, return rv); rv = UA_NetworkMessage_decodeFooters(src, offset, dst); @@ -1424,9 +1424,9 @@ UA_DataSetMessage_encodeBinary(const UA_DataSetMessage* src, UA_Byte **bufPos, static UA_StatusCode UA_DataSetMessage_keyFrame_decodeBinary(const UA_ByteString *src, size_t *offset, - size_t initialOffset, - UA_DataSetMessage* dst, UA_UInt16 dsmSize, - const UA_DataTypeArray *customTypes) { + size_t initialOffset, UA_DataSetMessage* dst, + UA_UInt16 dsmSize, const UA_DataTypeArray *customTypes, + UA_DataSetMetaDataType *dsm) { if(*offset == src->length) return UA_STATUSCODE_GOOD; /* Messages ends after the header --> Heartbeat */ @@ -1475,14 +1475,47 @@ UA_DataSetMessage_keyFrame_decodeBinary(const UA_ByteString *src, size_t *offset case UA_FIELDENCODING_RAWDATA: kfd->rawFields.data = &src->data[*offset]; kfd->rawFields.length = dsmSize; - if(dsmSize == 0){ + if(dsmSize != 0) { + *offset += (dsmSize - (*offset - initialOffset)); + break; + } + + if(dsm == NULL) { //TODO calculate the length of the DSM-Payload for a single DSM //Problem: Size is not set and MetaData information are needed. //Increase offset to avoid endless chunk loop. Needs to be fixed when //pubsub security footer and signatur is enabled. *offset += 1500; - } else { - *offset += (dsmSize - (*offset - initialOffset)); + break; + } + + // calculate the length of the DSM-Payload for a single DSM + size_t tmpOffset = 0; + dst->data.keyFrameData.fieldCount = (UA_UInt16)dsm->fieldsSize; + for(size_t i = 0; i < dsm->fieldsSize; i++) { + /* TODO The datatype reference should be part of the internal + * pubsub configuration to avoid the time-expensive lookup */ + const UA_DataType *type = + UA_findDataTypeWithCustom(&dsm->fields[i].dataType, customTypes); + if(!type) + return UA_STATUSCODE_BADINTERNALERROR; + dst->data.keyFrameData.rawFields.length += type->memSize; + UA_STACKARRAY(UA_Byte, value, type->memSize); + rv = UA_decodeBinaryInternal(&dst->data.keyFrameData.rawFields, + &tmpOffset, value, type, NULL); + UA_CHECK_STATUS(rv, return rv); + if(dsm->fields[i].maxStringLength != 0) { + if(type->typeKind == UA_DATATYPEKIND_STRING || + type->typeKind == UA_DATATYPEKIND_BYTESTRING) { + UA_ByteString *bs = (UA_ByteString *) value; + // Check if length < maxStringLength, The types ByteString + // and String are equal in their base definition + size_t lengthDifference = dsm->fields[i].maxStringLength - bs->length; + tmpOffset += lengthDifference; + dst->data.keyFrameData.rawFields.length += lengthDifference; + } + } + UA_clear(value, type); } break; @@ -1516,8 +1549,7 @@ UA_DataSetMessage_deltaFrame_decodeBinary(const UA_ByteString *src, size_t *offs } for(UA_UInt16 i = 0; i < dfd->fieldCount; i++) { - rv = UA_UInt16_decodeBinary(src, offset, - &dfd->deltaFrameFields[i].fieldIndex); + rv = UA_UInt16_decodeBinary(src, offset, &dfd->deltaFrameFields[i].fieldIndex); UA_CHECK_STATUS(rv, return rv); if(dst->header.fieldEncoding == UA_FIELDENCODING_VARIANT) { @@ -1540,7 +1572,7 @@ UA_DataSetMessage_deltaFrame_decodeBinary(const UA_ByteString *src, size_t *offs UA_StatusCode UA_DataSetMessage_decodeBinary(const UA_ByteString *src, size_t *offset, UA_DataSetMessage* dst, UA_UInt16 dsmSize, - const UA_DataTypeArray *customTypes) { + const UA_DataTypeArray *customTypes, UA_DataSetMetaDataType *dsm) { size_t initialOffset = *offset; memset(dst, 0, sizeof(UA_DataSetMessage)); UA_StatusCode rv = UA_DataSetMessageHeader_decodeBinary(src, offset, &dst->header); @@ -1549,7 +1581,7 @@ UA_DataSetMessage_decodeBinary(const UA_ByteString *src, size_t *offset, switch(dst->header.dataSetMessageType) { case UA_DATASETMESSAGE_DATAKEYFRAME: rv = UA_DataSetMessage_keyFrame_decodeBinary(src, offset, initialOffset, dst, - dsmSize, customTypes); + dsmSize, customTypes, dsm); break; case UA_DATASETMESSAGE_DATADELTAFRAME: rv = UA_DataSetMessage_deltaFrame_decodeBinary(src, offset, dst, @@ -1634,8 +1666,12 @@ UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage* p, return 0; nmo = &offsetBuffer->offsets[pos]; nmo->offset = size; - nmo->content.value = *v; - nmo->content.value.value.storageType = UA_VARIANT_DATA_NODELETE; + if(p->data.keyFrameData.dataSetFields != NULL) { + nmo->content.value = *v; + nmo->content.value.value.storageType = UA_VARIANT_DATA_NODELETE; + } else { + UA_DataValue_init(&nmo->content.value); + } } if(p->header.fieldEncoding == UA_FIELDENCODING_VARIANT) { @@ -1643,28 +1679,39 @@ UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage* p, nmo->contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_VARIANT; size += UA_calcSizeBinary(&v->value, &UA_TYPES[UA_TYPES_VARIANT]); } else if(p->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) { - if(offsetBuffer) { - if(!v->value.type->pointerFree) - return 0; /* only integer types for now */ - /* Count the memory size of the specific field */ - offsetBuffer->rawMessageLength += v->value.type->memSize; - nmo->contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_RAW; - } - size += UA_calcSizeBinary(v->value.data, v->value.type); - - /* Handle zero-padding for strings with max-string-length. - * Currently not supported for strings that are a part of larger - * structures. */ - UA_FieldMetaData *fmd = - &p->data.keyFrameData.dataSetMetaDataType->fields[i]; - if(fmd->maxStringLength != 0 && - (v->value.type->typeKind == UA_DATATYPEKIND_STRING || - v->value.type->typeKind == UA_DATATYPEKIND_BYTESTRING)) { - /* Check if length < maxStringLength, The types ByteString - * and String are equal in their base definition */ - size_t lengthDifference = fmd->maxStringLength - - ((UA_String *)v->value.data)->length; - size += lengthDifference; + if(p->data.keyFrameData.dataSetFields != NULL) { + if(offsetBuffer) { + if(!v->value.type->pointerFree) + return 0; /* only integer types for now */ + /* Count the memory size of the specific field */ + offsetBuffer->rawMessageLength += v->value.type->memSize; + nmo->contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_RAW; + } + size += UA_calcSizeBinary(v->value.data, v->value.type); + + /* Handle zero-padding for strings with max-string-length. + * Currently not supported for strings that are a part of larger + * structures. */ + UA_FieldMetaData *fmd = + &p->data.keyFrameData.dataSetMetaDataType->fields[i]; + if(fmd->maxStringLength != 0 && + (v->value.type->typeKind == UA_DATATYPEKIND_STRING || + v->value.type->typeKind == UA_DATATYPEKIND_BYTESTRING)) { + /* Check if length < maxStringLength, The types ByteString + * and String are equal in their base definition */ + size_t lengthDifference = fmd->maxStringLength - + ((UA_String *)v->value.data)->length; + size += lengthDifference; + } + } else { + /* get length calculated in UA_DataSetMessage_decodeBinary */ + if(offsetBuffer) { + offsetBuffer->rawMessageLength = p->data.keyFrameData.rawFields.length; + nmo->contentType = UA_PUBSUB_OFFSETTYPE_PAYLOAD_RAW; + } + size += p->data.keyFrameData.rawFields.length; + /* no iteration needed */ + break; } } else if(p->header.fieldEncoding == UA_FIELDENCODING_DATAVALUE) { if(offsetBuffer) diff --git a/src/pubsub/ua_pubsub_networkmessage.h b/src/pubsub/ua_pubsub_networkmessage.h index 4554e891c71..71812613b03 100644 --- a/src/pubsub/ua_pubsub_networkmessage.h +++ b/src/pubsub/ua_pubsub_networkmessage.h @@ -248,7 +248,8 @@ UA_DataSetMessage_encodeBinary(const UA_DataSetMessage* src, UA_Byte **bufPos, UA_StatusCode UA_DataSetMessage_decodeBinary(const UA_ByteString *src, size_t *offset, UA_DataSetMessage* dst, UA_UInt16 dsmSize, - const UA_DataTypeArray *customTypes); + const UA_DataTypeArray *customTypes, + UA_DataSetMetaDataType *dsm); size_t UA_DataSetMessage_calcSizeBinary(UA_DataSetMessage *p, @@ -290,7 +291,8 @@ UA_NetworkMessage_decodeHeaders(const UA_ByteString *src, size_t *offset, UA_StatusCode UA_NetworkMessage_decodePayload(const UA_ByteString *src, size_t *offset, - UA_NetworkMessage *dst, const UA_DataTypeArray *customTypes); + UA_NetworkMessage *dst, const UA_DataTypeArray *customTypes, + UA_DataSetMetaDataType *dsm); UA_StatusCode UA_NetworkMessage_decodeFooters(const UA_ByteString *src, size_t *offset, diff --git a/src/pubsub/ua_pubsub_reader.c b/src/pubsub/ua_pubsub_reader.c index 1fd47b1d49e..b5f1ad02104 100644 --- a/src/pubsub/ua_pubsub_reader.c +++ b/src/pubsub/ua_pubsub_reader.c @@ -38,6 +38,9 @@ UA_DataSetReader_handleMessageReceiveTimeout(UA_Server *server, UA_DataSetReader static UA_Boolean publisherIdIsMatching(UA_NetworkMessage *msg, UA_Variant publisherId) { + if(!msg->publisherIdEnabled) { + return true; + } switch(msg->publisherIdType) { case UA_PUBLISHERIDTYPE_BYTE: return (publisherId.type == &UA_TYPES[UA_TYPES_BYTE] && @@ -803,6 +806,8 @@ DataSetReader_processRaw(UA_Server *server, UA_ReaderGroup *rg, dsr->config.dataSetMetaData.fieldsSize; size_t offset = 0; + /* start iteration from beginning of rawFields buffer */ + msg->data.keyFrameData.rawFields.length = 0; for(size_t i = 0; i < dsr->config.dataSetMetaData.fieldsSize; i++) { /* TODO The datatype reference should be part of the internal * pubsub configuration to avoid the time-expensive lookup */ @@ -919,6 +924,17 @@ UA_DataSetReader_process(UA_Server *server, UA_ReaderGroup *rg, if(!dsr || !rg || !msg || !server) return; + /* Received a (first) message for the Reader. + * Transition from PreOperational to Operational. */ + if(dsr->state == UA_PUBSUBSTATE_PREOPERATIONAL) { + dsr->state = UA_PUBSUBSTATE_OPERATIONAL; + UA_ServerConfig *config = &server->config; + if(config->pubSubConfig.stateChangeCallback != 0) { + config->pubSubConfig.stateChangeCallback(server, &dsr->identifier, + dsr->state, UA_STATUSCODE_GOOD); + } + } + /* Check the metadata, to see if this reader is configured for a heartbeat */ if(dsr->config.dataSetMetaData.fieldsSize == 0 && dsr->config.dataSetMetaData.configurationVersion.majorVersion == 0 && @@ -1135,9 +1151,12 @@ UA_ReaderGroup_process(UA_Server *server, UA_ReaderGroup *readerGroup, /* Received a (first) message for the ReaderGroup. * Transition from PreOperational to Operational. */ if(readerGroup->state == UA_PUBSUBSTATE_PREOPERATIONAL) { - UA_ReaderGroup_setPubSubState(server, readerGroup, - UA_PUBSUBSTATE_OPERATIONAL, - UA_STATUSCODE_GOOD); + readerGroup->state = UA_PUBSUBSTATE_OPERATIONAL; + UA_ServerConfig *config = &server->config; + if(config->pubSubConfig.stateChangeCallback != 0) { + config->pubSubConfig.stateChangeCallback(server, &readerGroup->identifier, + readerGroup->state, UA_STATUSCODE_GOOD); + } } LIST_FOREACH(reader, &readerGroup->readers, listEntry) { UA_StatusCode res = @@ -1164,14 +1183,11 @@ prepareOffsetBuffer(UA_Server *server, UA_ReaderGroup *rg, UA_DataSetReader *rea /* Decode using the non-rt decoding */ UA_StatusCode rv = UA_NetworkMessage_decodeHeaders(buf, pos, nm); - rv |= UA_NetworkMessage_decodePayload(buf, pos, nm, server->config.customDataTypes); - rv |= UA_NetworkMessage_decodeFooters(buf, pos, nm); if(rv != UA_STATUSCODE_GOOD) { UA_NetworkMessage_clear(nm); UA_free(nm); return rv; } - /* Check if the message was intended for this reader */ rv = UA_DataSetReader_checkIdentifier(server, nm, reader, rg->config); if(rv != UA_STATUSCODE_GOOD) { @@ -1181,6 +1197,13 @@ prepareOffsetBuffer(UA_Server *server, UA_ReaderGroup *rg, UA_DataSetReader *rea UA_free(nm); return UA_STATUSCODE_BADINTERNALERROR; } + rv = UA_NetworkMessage_decodePayload(buf, pos, nm, server->config.customDataTypes, &reader->config.dataSetMetaData); + rv |= UA_NetworkMessage_decodeFooters(buf, pos, nm); + if(rv != UA_STATUSCODE_GOOD) { + UA_NetworkMessage_clear(nm); + UA_free(nm); + return rv; + } /* Compute and store the offsets necessary to decode */ size_t nmSize = UA_NetworkMessage_calcSizeBinary(nm, &reader->bufferedMessage); @@ -1196,9 +1219,12 @@ prepareOffsetBuffer(UA_Server *server, UA_ReaderGroup *rg, UA_DataSetReader *rea /* If pre-operational, set to operational after the first message was * processed */ if(rg->state == UA_PUBSUBSTATE_PREOPERATIONAL) { - rv = UA_ReaderGroup_setPubSubState(server, rg, - UA_PUBSUBSTATE_OPERATIONAL, - UA_STATUSCODE_GOOD); + rg->state = UA_PUBSUBSTATE_OPERATIONAL; + UA_ServerConfig *config = &server->config; + if(config->pubSubConfig.stateChangeCallback != 0) { + config->pubSubConfig.stateChangeCallback(server, &rg->identifier, + rg->state, UA_STATUSCODE_GOOD); + } } return rv; @@ -1259,6 +1285,17 @@ decodeAndProcessRT(UA_Server *server, UA_ReaderGroup *readerGroup, UA_Boolean UA_ReaderGroup_decodeAndProcessRT(UA_Server *server, UA_ReaderGroup *readerGroup, UA_ByteString *buf) { + /* Received a (first) message for the ReaderGroup. + * Transition from PreOperational to Operational. */ + if(readerGroup->state == UA_PUBSUBSTATE_PREOPERATIONAL) { + readerGroup->state = UA_PUBSUBSTATE_OPERATIONAL; + UA_ServerConfig *config = &server->config; + if(config->pubSubConfig.stateChangeCallback != 0) { + config->pubSubConfig.stateChangeCallback(server, &readerGroup->identifier, + readerGroup->state, UA_STATUSCODE_GOOD); + } + } + #ifdef UA_ENABLE_PUBSUB_BUFMALLOC useMembufAlloc(); #endif diff --git a/src/server/ua_server_async.c b/src/server/ua_server_async.c index b4854bc82fc..2d0bfe091ab 100644 --- a/src/server/ua_server_async.c +++ b/src/server/ua_server_async.c @@ -17,57 +17,66 @@ UA_AsyncOperation_delete(UA_AsyncOperation *ar) { UA_free(ar); } -static UA_StatusCode +static void UA_AsyncManager_sendAsyncResponse(UA_AsyncManager *am, UA_Server *server, UA_AsyncResponse *ar) { + UA_LOCK_ASSERT(&server->serviceMutex, 1); + UA_LOCK_ASSERT(&am->queueLock, 1); + /* Get the session */ - UA_StatusCode res = UA_STATUSCODE_GOOD; UA_Session* session = getSessionById(server, &ar->sessionId); - UA_SecureChannel* channel = NULL; - UA_ResponseHeader *responseHeader = NULL; if(!session) { - res = UA_STATUSCODE_BADSESSIONIDINVALID; + UA_String sessionId = UA_STRING_NULL; + UA_NodeId_print(&ar->sessionId, &sessionId); UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, - "UA_Server_InsertMethodResponse: Session is gone"); - goto clean_up; + "Async Service: Session %.*s no longer exists", + (int)sessionId.length, sessionId.data); + UA_String_clear(&sessionId); + UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar); + return; } /* Check the channel */ - channel = session->header.channel; + UA_SecureChannel *channel = session->header.channel; if(!channel) { - res = UA_STATUSCODE_BADSECURECHANNELCLOSED; - UA_LOG_WARNING(&server->config.logger, UA_LOGCATEGORY_SERVER, - "UA_Server_InsertMethodResponse: Channel is gone"); - goto clean_up; + UA_LOG_WARNING_SESSION(&server->config.logger, session, + "Async Service Response cannot be sent. " + "No SecureChannel for the session."); + UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar); + return; } - /* Okay, here we go, send the UA_CallResponse */ - responseHeader = (UA_ResponseHeader*) + /* Set the request handle */ + UA_ResponseHeader *responseHeader = (UA_ResponseHeader*) &ar->response.callResponse.responseHeader; responseHeader->requestHandle = ar->requestHandle; - res = sendResponse(server, session, channel, ar->requestId, - (UA_Response*)&ar->response, &UA_TYPES[UA_TYPES_CALLRESPONSE]); - UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, - "UA_Server_SendResponse: Response for Req# %" PRIu32 " sent", ar->requestId); - clean_up: - /* Remove from the AsyncManager */ + /* Send the Response */ + UA_StatusCode res = + sendResponse(server, session, channel, ar->requestId, + (UA_Response*)&ar->response, &UA_TYPES[UA_TYPES_CALLRESPONSE]); + if(res != UA_STATUSCODE_GOOD) { + UA_LOG_WARNING_SESSION(&server->config.logger, session, + "Async Response for Req# %" PRIu32 " failed " + "with StatusCode %s", ar->requestId, + UA_StatusCode_name(res)); + } UA_AsyncManager_removeAsyncResponse(&server->asyncManager, ar); - return res; } /* Integrate operation result in the AsyncResponse and send out the response if * it is ready. */ -static void +static UA_Boolean integrateOperationResult(UA_AsyncManager *am, UA_Server *server, UA_AsyncOperation *ao) { + UA_LOCK_ASSERT(&server->serviceMutex, 1); + UA_LOCK_ASSERT(&am->queueLock, 1); + /* Grab the open request, so we can continue to construct the response */ UA_AsyncResponse *ar = ao->parent; /* Reduce the number of open results */ - UA_LOCK(&am->queueLock); ar->opCountdown -= 1; - UA_UNLOCK(&am->queueLock); UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, "Return result in the server thread with %" PRIu32 " remaining", @@ -77,33 +86,34 @@ integrateOperationResult(UA_AsyncManager *am, UA_Server *server, ar->response.callResponse.results[ao->index] = ao->response; UA_CallMethodResult_init(&ao->response); - /* Are we done with all operations? */ - if(ar->opCountdown == 0) { - UA_LOCK(&server->serviceMutex); + /* Done with all operations -> send the response */ + UA_Boolean done = (ar->opCountdown == 0); + if(done) UA_AsyncManager_sendAsyncResponse(am, server, ar); - UA_UNLOCK(&server->serviceMutex); - } + return done; } /* Process all operations in the result queue -> move content over to the - * AsyncResponse. This is only done by the server thread. */ -static void -processAsyncResults(UA_Server *server, void *data) { + * AsyncResponse. This is only done by the server thread. Returns the nmber of + * completed async sesponses. */ +static UA_UInt32 +processAsyncResults(UA_Server *server) { UA_AsyncManager *am = &server->asyncManager; - while(true) { - UA_LOCK(&am->queueLock); - UA_AsyncOperation *ao = TAILQ_FIRST(&am->resultQueue); - if(ao) - TAILQ_REMOVE(&am->resultQueue, ao, pointers); - UA_UNLOCK(&am->queueLock); - if(!ao) - break; - UA_LOG_DEBUG(&server->config.logger, UA_LOGCATEGORY_SERVER, - "UA_Server_CallMethodResponse: Got Response: OKAY"); - integrateOperationResult(am, server, ao); + UA_LOCK_ASSERT(&server->serviceMutex, 1); + UA_LOCK_ASSERT(&am->queueLock, 0); + + UA_UInt32 count = 0; + UA_AsyncOperation *ao; + UA_LOCK(&am->queueLock); + while((ao = TAILQ_FIRST(&am->resultQueue))) { + TAILQ_REMOVE(&am->resultQueue, ao, pointers); + if(integrateOperationResult(am, server, ao)) + count++; UA_AsyncOperation_delete(ao); am->opsCount--; } + UA_UNLOCK(&am->queueLock); + return count; } /* Check if any operations have timed out */ @@ -150,7 +160,9 @@ checkTimeouts(UA_Server *server, void *_) { UA_UNLOCK(&am->queueLock); /* Integrate async results and send out complete responses */ - processAsyncResults(server, NULL); + UA_LOCK(&server->serviceMutex); + processAsyncResults(server); + UA_UNLOCK(&server->serviceMutex); } void @@ -423,4 +435,49 @@ UA_Server_processServiceOperationsAsync(UA_Server *server, UA_Session *session, return UA_STATUSCODE_GOOD; } +UA_UInt32 +UA_AsyncManager_cancel(UA_Server *server, UA_Session *session, UA_UInt32 requestHandle) { + UA_LOCK_ASSERT(&server->serviceMutex, 1); + UA_AsyncManager *am = &server->asyncManager; + + UA_LOCK(&am->queueLock); + + /* Loop over the queue of dispatched ops */ + UA_AsyncOperation *op = NULL, *op_tmp = NULL; + TAILQ_FOREACH_SAFE(op, &am->dispatchedQueue, pointers, op_tmp) { + if(op->parent->requestHandle != requestHandle || + !UA_NodeId_equal(&session->sessionId, &op->parent->sessionId)) + continue; + + /* Set status and put it into the result queue */ + op->response.statusCode = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT; + TAILQ_REMOVE(&am->dispatchedQueue, op, pointers); + TAILQ_INSERT_TAIL(&am->resultQueue, op, pointers); + + /* Also set the status of the overall response */ + op->parent->response.callResponse.responseHeader. + serviceResult = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT; + } + + /* Idem for waiting ops */ + TAILQ_FOREACH_SAFE(op, &am->newQueue, pointers, op_tmp) { + if(op->parent->requestHandle != requestHandle || + !UA_NodeId_equal(&session->sessionId, &op->parent->sessionId)) + continue; + + /* Mark as timed out and put it into the result queue */ + op->response.statusCode = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT; + TAILQ_REMOVE(&am->newQueue, op, pointers); + TAILQ_INSERT_TAIL(&am->resultQueue, op, pointers); + + op->parent->response.callResponse.responseHeader. + serviceResult = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT; + } + + UA_UNLOCK(&am->queueLock); + + /* Process messages that have all ops completed */ + return processAsyncResults(server); +} + #endif diff --git a/src/server/ua_server_async.h b/src/server/ua_server_async.h index 3c7c539486e..ecedc12f440 100644 --- a/src/server/ua_server_async.h +++ b/src/server/ua_server_async.h @@ -62,7 +62,10 @@ typedef struct { /* Operations for the workers. The queues are all FIFO: Put in at the tail, * take out at the head.*/ - UA_Lock queueLock; + UA_Lock queueLock; /* Either take this lock free-standing (with no other + * locks). Or take server->serviceMutex first and then + * the queueLock. Never take the server->serviceMutex + * when the queueLock is already acquired (deadlock)! */ UA_AsyncOperationQueue newQueue; /* New operations for the workers */ UA_AsyncOperationQueue dispatchedQueue; /* Operations taken by a worker. When a result is * returned, we search for the op here to see if it @@ -93,6 +96,12 @@ UA_AsyncManager_createAsyncOp(UA_AsyncManager *am, UA_Server *server, UA_AsyncResponse *ar, size_t opIndex, const UA_CallMethodRequest *opRequest); +/* Send out the response with status set. Also removes all outstanding + * operations from the dispatch queue. The queuelock needs to be taken before + * calling _cancel. */ +UA_UInt32 +UA_AsyncManager_cancel(UA_Server *server, UA_Session *session, UA_UInt32 requestHandle); + typedef void (*UA_AsyncServiceOperation)(UA_Server *server, UA_Session *session, UA_UInt32 requestId, UA_UInt32 requestHandle, size_t opIndex, const void *requestOperation, diff --git a/src/server/ua_server_binary.c b/src/server/ua_server_binary.c index f0cae3fe9ff..9da611fdf81 100644 --- a/src/server/ua_server_binary.c +++ b/src/server/ua_server_binary.c @@ -280,8 +280,12 @@ getServicePointers(UA_UInt32 requestTypeId, const UA_DataType **requestType, *requestType = &UA_TYPES[UA_TYPES_CLOSESESSIONREQUEST]; *responseType = &UA_TYPES[UA_TYPES_CLOSESESSIONRESPONSE]; break; + case UA_NS0ID_CANCELREQUEST_ENCODING_DEFAULTBINARY: + *service = (UA_Service)Service_Cancel; + *requestType = &UA_TYPES[UA_TYPES_CANCELREQUEST]; + *responseType = &UA_TYPES[UA_TYPES_CANCELRESPONSE]; + break; case UA_NS0ID_READREQUEST_ENCODING_DEFAULTBINARY: - *service = NULL; *service = (UA_Service)Service_Read; *requestType = &UA_TYPES[UA_TYPES_READREQUEST]; *responseType = &UA_TYPES[UA_TYPES_READRESPONSE]; diff --git a/src/server/ua_server_ns0.c b/src/server/ua_server_ns0.c index 874a8da5357..5b4885d58fc 100644 --- a/src/server/ua_server_ns0.c +++ b/src/server/ua_server_ns0.c @@ -730,6 +730,10 @@ minimalServerObject(UA_Server *server) { UA_NS0ID_SERVER, UA_NS0ID_HASCOMPONENT, UA_VALUERANK_SCALAR, UA_NS0ID_BASEDATATYPE); + retval |= addVariableNode(server, "StartTime", UA_NS0ID_SERVER_SERVERSTATUS_STARTTIME, + UA_NS0ID_SERVER_SERVERSTATUS, UA_NS0ID_HASCOMPONENT, + UA_VALUERANK_SCALAR, UA_NS0ID_BASEDATATYPE); + retval |= addVariableNode(server, "CurrentTime", UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME, UA_NS0ID_SERVER_SERVERSTATUS, UA_NS0ID_HASCOMPONENT, UA_VALUERANK_SCALAR, UA_NS0ID_BASEDATATYPE); diff --git a/src/server/ua_services.h b/src/server/ua_services.h index bc1ffc986fe..61928558904 100644 --- a/src/server/ua_services.h +++ b/src/server/ua_services.h @@ -172,7 +172,9 @@ void Service_CloseSession(UA_Server *server, UA_SecureChannel *channel, * ^^^^^^^^^^^^^^ * Used to cancel outstanding Service requests. Successfully cancelled service * requests shall respond with Bad_RequestCancelledByClient. */ -/* Not Implemented */ +void Service_Cancel(UA_Server *server, UA_Session *session, + const UA_CancelRequest *request, + UA_CancelResponse *response); /** * NodeManagement Service Set diff --git a/src/server/ua_services_attribute.c b/src/server/ua_services_attribute.c index 65aaf5dd901..9f339393131 100644 --- a/src/server/ua_services_attribute.c +++ b/src/server/ua_services_attribute.c @@ -364,7 +364,7 @@ getStructureDefinition(const UA_DataType *type, UA_StructureDefinition *def) { for(size_t cnt = 0; cnt < def->fieldsSize; cnt++) { const UA_DataTypeMember *m = &type->members[cnt]; - def->fields[cnt].valueRank = (m->isArray) ? 1 : -1; + def->fields[cnt].valueRank = (m->isArray) ? UA_VALUERANK_ONE_DIMENSION : UA_VALUERANK_SCALAR; def->fields[cnt].arrayDimensions = NULL; def->fields[cnt].arrayDimensionsSize = 0; def->fields[cnt].name = UA_STRING((char *)(uintptr_t)m->memberName); diff --git a/src/server/ua_services_nodemanagement.c b/src/server/ua_services_nodemanagement.c index ac299ba7339..ffd27c54d49 100644 --- a/src/server/ua_services_nodemanagement.c +++ b/src/server/ua_services_nodemanagement.c @@ -702,7 +702,7 @@ copyChild(UA_Server *server, UA_Session *session, UA_Node_deleteReferencesSubset(node, &reftypes_skipped); /* Add the node to the nodestore */ - UA_NodeId newNodeId; + UA_NodeId newNodeId = UA_NODEID_NULL; retval = UA_NODESTORE_INSERT(server, node, &newNodeId); /* node = NULL; The pointer is no longer valid */ if(retval != UA_STATUSCODE_GOOD) diff --git a/src/server/ua_services_session.c b/src/server/ua_services_session.c index e946c0d4b0e..b079d0d1125 100644 --- a/src/server/ua_services_session.c +++ b/src/server/ua_services_session.c @@ -934,3 +934,44 @@ Service_CloseSession(UA_Server *server, UA_SecureChannel *channel, UA_Server_removeSessionByToken(server, &session->header.authenticationToken, UA_SHUTDOWNREASON_CLOSE); } + +void Service_Cancel(UA_Server *server, UA_Session *session, + const UA_CancelRequest *request, + UA_CancelResponse *response) { + /* If multithreading is disabled, then there are no async services. If all + * services are answered "right away", then there are no services that can + * be cancelled. */ +#if UA_MULTITHREADING >= 100 + response->cancelCount = UA_AsyncManager_cancel(server, session, request->requestHandle); +#endif + + /* Publish requests for Subscriptions are stored separately */ +#ifdef UA_ENABLE_SUBSCRIPTIONS + UA_PublishResponseEntry *pre, *pre_tmp; + UA_PublishResponseEntry *prev = NULL; + SIMPLEQ_FOREACH_SAFE(pre, &session->responseQueue, listEntry, pre_tmp) { + /* Skip entry and set as the previous entry that is kept in the list */ + if(pre->response.responseHeader.requestHandle != request->requestHandle) { + prev = pre; + continue; + } + + /* Dequeue */ + if(prev) + SIMPLEQ_REMOVE_AFTER(&session->responseQueue, prev, listEntry); + else + SIMPLEQ_REMOVE_HEAD(&session->responseQueue, listEntry); + session->responseQueueSize--; + + /* Send response and clean up */ + response->responseHeader.serviceResult = UA_STATUSCODE_BADREQUESTCANCELLEDBYCLIENT; + sendResponse(server, session, session->header.channel, pre->requestId, + (UA_Response *)response, &UA_TYPES[UA_TYPES_PUBLISHRESPONSE]); + UA_PublishResponse_clear(&pre->response); + UA_free(pre); + + /* Increase the CancelCount */ + response->cancelCount++; + } +#endif +} diff --git a/src/server/ua_services_view.c b/src/server/ua_services_view.c index ede8ad7c002..2dbe087efba 100644 --- a/src/server/ua_services_view.c +++ b/src/server/ua_services_view.c @@ -167,12 +167,12 @@ isNodeInTree(UA_Server *server, const UA_NodeId *leafNode, const UA_NodeId *nodeToFind, const UA_ReferenceTypeSet *relevantRefs) { struct IsNodeInTreeContext ctx; + memset(&ctx, 0, sizeof(struct IsNodeInTreeContext)); ctx.server = server; ctx.nodeToFind = UA_NodePointer_fromNodeId(nodeToFind); - ctx.parents.root = NULL; ctx.relevantRefs = *relevantRefs; - ctx.depth = 0; UA_ReferenceTarget tmpTarget; + memset(&tmpTarget, 0, sizeof(UA_ReferenceTarget)); tmpTarget.targetId = UA_NodePointer_fromNodeId(leafNode); return (isNodeInTreeIterateCallback(&ctx, &tmpTarget) != NULL); } diff --git a/src/server/ua_subscription_events.c b/src/server/ua_subscription_events.c index bb8fde7b790..6e8a6945f24 100644 --- a/src/server/ua_subscription_events.c +++ b/src/server/ua_subscription_events.c @@ -176,15 +176,17 @@ eventSetStandardFields(UA_Server *server, const UA_NodeId *event, UA_StatusCode UA_MonitoredItem_addEvent(UA_Server *server, UA_MonitoredItem *mon, const UA_NodeId *event) { - UA_Notification *notification = UA_Notification_new(); - if(!notification) - return UA_STATUSCODE_BADOUTOFMEMORY; - + /* Get the filter */ if(mon->parameters.filter.content.decoded.type != &UA_TYPES[UA_TYPES_EVENTFILTER]) return UA_STATUSCODE_BADFILTERNOTALLOWED; UA_EventFilter *eventFilter = (UA_EventFilter*) mon->parameters.filter.content.decoded.data; + /* Allocate memory for the notification */ + UA_Notification *notification = UA_Notification_new(); + if(!notification) + return UA_STATUSCODE_BADOUTOFMEMORY; + /* The MonitoredItem must be attached to a Subscription. This code path is * not taken for local MonitoredItems (once they are enabled for Events). */ UA_Subscription *sub = mon->subscription; diff --git a/src/server/ua_subscription_events_filter.c b/src/server/ua_subscription_events_filter.c index 4ff8d7d57a5..f5b3567f230 100644 --- a/src/server/ua_subscription_events_filter.c +++ b/src/server/ua_subscription_events_filter.c @@ -864,14 +864,13 @@ inListOperator(UA_FilterEvalContext *ctx, size_t index) { UA_Variant *op1 = &ctx->stack[ctx->top++]; UA_StatusCode res = resolveOperand(ctx, &elm->filterOperands[0], op0); UA_CHECK_STATUS(res, return res); - for(size_t i = 1; i < elm->filterOperandsSize; i++) { + for(size_t i = 1; i < elm->filterOperandsSize && !found; i++) { res = resolveOperand(ctx, &elm->filterOperands[i], op1); - UA_CHECK_STATUS(res, continue); + if(res != UA_STATUSCODE_GOOD) + continue; if(op0->type == op1->type && - UA_order(op0->data, op1->data, op0->type) == UA_ORDER_EQ) { + UA_order(op0->data, op1->data, op0->type) == UA_ORDER_EQ) found = true; - break; - } UA_Variant_clear(op1); } ctx->results[index] = t2v((found) ? UA_TERNARY_TRUE: UA_TERNARY_FALSE); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7048db7ba5e..abcce71d512 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -394,21 +394,19 @@ ua_add_test(server/check_server_reverseconnect.c) # Test Client ua_add_test(client/check_client.c) +ua_add_test(client/check_client_discovery.c) ua_add_test(client/check_activateSession.c) -if(UA_ENABLE_SUBSCRIPTIONS) - ua_add_test(client/check_subscriptionWithactivateSession.c) -endif() ua_add_test(client/check_activateSessionAsync.c) ua_add_test(client/check_client_securechannel.c) ua_add_test(client/check_client_async.c) ua_add_test(client/check_client_async_connect.c) +ua_add_test(client/check_client_highlevel.c) if(UA_ENABLE_SUBSCRIPTIONS) ua_add_test(client/check_client_subscriptions.c) + ua_add_test(client/check_subscriptionWithactivateSession.c) endif() -ua_add_test(client/check_client_highlevel.c) - if(UA_ENABLE_HISTORIZING) ua_add_test(client/check_client_historical_data.c) endif() diff --git a/tests/client/check_client_discovery.c b/tests/client/check_client_discovery.c new file mode 100644 index 00000000000..b1bbe6bc5e2 --- /dev/null +++ b/tests/client/check_client_discovery.c @@ -0,0 +1,81 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include +#include + +#include "client/ua_client_internal.h" + +#include +#include + +#include "thread_wrapper.h" + +UA_Server *server; +UA_Boolean running; +THREAD_HANDLE server_thread; + + +THREAD_CALLBACK(serverloop) { + while(running) + UA_Server_run_iterate(server, true); + return 0; +} + +static void setup(void) { + running = true; + server = UA_Server_new(); + UA_ServerConfig_setDefault(UA_Server_getConfig(server)); + UA_Server_run_startup(server); + THREAD_CREATE(server_thread, serverloop); +} + +static void teardown(void) { + running = false; + THREAD_JOIN(server_thread); + UA_Server_run_shutdown(server); + UA_Server_delete(server); +} + +START_TEST(Client_connect_badEndpointUrl) { + UA_Client *client = UA_Client_new(); + UA_ClientConfig_setDefault(UA_Client_getConfig(client)); + + /* Use the internal API to force a bad DiscoveryUrl */ + UA_String_clear(&client->config.endpointUrl); + UA_String_clear(&client->discoveryUrl); + client->config.endpointUrl = UA_STRING_ALLOC("opc.tcp://localhost:4840"); + client->discoveryUrl = UA_STRING_ALLOC("abc://xxx:4840"); + + /* Open a Session when possible */ + client->config.noSession = false; + + UA_LOCK(&client->clientMutex); + connectSync(client); + UA_UNLOCK(&client->clientMutex); + ck_assert_uint_eq(client->connectStatus, UA_STATUSCODE_GOOD); + + UA_Client_disconnect(client); + UA_Client_delete(client); +} +END_TEST + +static Suite* testSuite_Client(void) { + Suite *s = suite_create("Client"); + TCase *tc_client = tcase_create("Client Discovery"); + tcase_add_checked_fixture(tc_client, setup, teardown); + tcase_add_test(tc_client, Client_connect_badEndpointUrl); + suite_add_tcase(s,tc_client); + return s; +} + +int main(void) { + Suite *s = testSuite_Client(); + SRunner *sr = srunner_create(s); + srunner_set_fork_status(sr, CK_NOFORK); + srunner_run_all(sr,CK_NORMAL); + int number_failed = srunner_ntests_failed(sr); + srunner_free(sr); + return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/tests/pubsub/check_pubsub_subscribe_msgrcvtimeout.c b/tests/pubsub/check_pubsub_subscribe_msgrcvtimeout.c index 710872071bc..d11b6374ba5 100644 --- a/tests/pubsub/check_pubsub_subscribe_msgrcvtimeout.c +++ b/tests/pubsub/check_pubsub_subscribe_msgrcvtimeout.c @@ -8,6 +8,12 @@ #include +#define CHK_UNLOCK(pMutex, cond) \ +if (!(cond)) { \ + UA_UNLOCK(pMutex); \ + ck_assert(cond); \ +} \ + static UA_Server *server = NULL; /* global variables to check PubSubStateChangeCallback */ @@ -414,7 +420,7 @@ static void PubSubStateChangeCallback_basic (UA_Server *hostServer, UA_NodeId *pubsubComponentId, UA_PubSubState state, UA_StatusCode reason) { - ck_assert(hostServer == server); + CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); UA_String strId; UA_String_init(&strId); @@ -425,17 +431,17 @@ static void PubSubStateChangeCallback_basic (UA_Server *hostServer, UA_String_clear(&strId); if(ExpectedCallbackStateChange == UA_PUBSUBSTATE_OPERATIONAL) { - ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL || - state == UA_PUBSUBSTATE_PREOPERATIONAL); + CHK_UNLOCK(&hostServer->serviceMutex, state == UA_PUBSUBSTATE_OPERATIONAL || + state == UA_PUBSUBSTATE_PREOPERATIONAL); } else { - ck_assert(ExpectedCallbackStateChange == state); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStateChange == state); } UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSubStateChangeCallback(): " "Callback Cnt = %u", CallbackCnt); if(ExpectedCallbackCnt > 0) { - ck_assert(CallbackCnt <= ExpectedCallbackCnt); + CHK_UNLOCK(&hostServer->serviceMutex, CallbackCnt <= ExpectedCallbackCnt); /* UA_String_init(&strId); */ /* UA_NodeId_print(&(pExpectedComponentCallbackIds[CallbackCnt]), &strId); */ /* UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSubStateChangeCallback(): " */ @@ -555,6 +561,9 @@ START_TEST(Test_basic) { /* state change to operational of ReaderGroup */ ExpectedCallbackCnt = 1; + //ExpectedCallbackCnt = 2; + pExpectedComponentCallbackIds[0] = DSRId_Conn2_RG1_DSR1; + pExpectedComponentCallbackIds[1] = RGId_Conn2_RG1; ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; ExpectedCallbackStatus = UA_STATUSCODE_GOOD; @@ -579,6 +588,9 @@ START_TEST(Test_basic) { ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; ExpectedCallbackStatus = UA_STATUSCODE_GOOD; + /* enable the reader */ + ck_assert(UA_Server_enableDataSetReader(server, DSRId_Conn2_RG1_DSR1) == UA_STATUSCODE_GOOD); + /* check that publish/subscribe works -> set some test values */ ValidatePublishSubscribe(VarId_Conn1_WG1, VarId_Conn2_RG1_DSR1, 10, (UA_UInt32) PublishingInterval_Conn1WG1, 3); @@ -586,15 +598,20 @@ START_TEST(Test_basic) { ValidatePublishSubscribe(VarId_Conn1_WG1, VarId_Conn2_RG1_DSR1, 44, (UA_UInt32) PublishingInterval_Conn1WG1, 3); + ck_assert(UA_Server_ReaderGroup_getState(server, RGId_Conn2_RG1, &state) == UA_STATUSCODE_GOOD); + ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL); ck_assert(UA_Server_DataSetReader_getState(server, DSRId_Conn2_RG1_DSR1, &state) == UA_STATUSCODE_GOOD); ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL); /* check that callback has been called for reader group and dataset */ - ck_assert_int_eq(ExpectedCallbackCnt, CallbackCnt); - CallbackCnt = 0; + //ck_assert_int_eq(ExpectedCallbackCnt, CallbackCnt); + //CallbackCnt = 0; /* there should not be a callback notification for MessageReceiveTimeout */ - ck_assert(CallbackCnt == 0); + //ck_assert(CallbackCnt == 0); + /* check that callback has been called for reader group */ + ck_assert_int_eq(3, CallbackCnt); + CallbackCnt = 0; /* now we disable the publisher WriterGroup and check if a MessageReceiveTimeout occurs at Subscriber */ ExpectedCallbackCnt = 2; @@ -773,7 +790,7 @@ START_TEST(Test_basic) { static void PubSubStateChangeCallback_different_timeouts(UA_Server *hostServer, UA_NodeId *pubsubComponentId, UA_PubSubState state, UA_StatusCode reason) { - ck_assert(hostServer == server); + CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); /* Disable some checks during shutdown */ if(!runtime) @@ -790,18 +807,18 @@ PubSubStateChangeCallback_different_timeouts(UA_Server *hostServer, UA_NodeId *p UA_String_clear(&strId); if(ExpectedCallbackStateChange == UA_PUBSUBSTATE_OPERATIONAL) { - ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL || - state == UA_PUBSUBSTATE_PREOPERATIONAL); + CHK_UNLOCK(&hostServer->serviceMutex, state == UA_PUBSUBSTATE_OPERATIONAL || + state == UA_PUBSUBSTATE_PREOPERATIONAL); } else { - ck_assert(ExpectedCallbackStateChange == state); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStateChange == state); } - ck_assert(ExpectedCallbackStatus == reason); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStatus == reason); UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSubStateChangeCallback(): " "Callback Cnt = %u", CallbackCnt); if(ExpectedCallbackCnt > 0) { - ck_assert(CallbackCnt <= ExpectedCallbackCnt); + CHK_UNLOCK(&hostServer->serviceMutex, CallbackCnt <= ExpectedCallbackCnt); /* UA_String_init(&strId); */ /* UA_NodeId_print(&(pExpectedComponentCallbackIds[CallbackCnt]), &strId); */ /* UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSubStateChangeCallback(): " */ @@ -957,7 +974,7 @@ START_TEST(Test_different_timeouts) { ck_assert_int_eq(ExpectedCallbackCnt, CallbackCnt); CallbackCnt = 0; - ExpectedCallbackCnt = 4; + ExpectedCallbackCnt = 2; ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; ExpectedCallbackStatus = UA_STATUSCODE_GOOD; /* TODO: 2nd datasetreader */ @@ -967,10 +984,21 @@ START_TEST(Test_different_timeouts) { pExpectedComponentCallbackIds[2] = DSRId_Conn2_RG1_DSR1; pExpectedComponentCallbackIds[3] = RGId_Conn2_RG1; + + ServerDoProcess("1", (UA_UInt32) (PublishingInterval_Conn1_WG1+1), 2); + /* check that callback has been called for writer and reader groups and datasets */ ck_assert_int_eq(ExpectedCallbackCnt, CallbackCnt); + CallbackCnt = 0; + ExpectedCallbackCnt = 0; + + /* enable the reader */ + ck_assert(UA_Server_enableDataSetReader(server, DSRId_Conn1_RG1_DSR1) == UA_STATUSCODE_GOOD); + ck_assert(UA_Server_enableDataSetReader(server, DSRId_Conn2_RG1_DSR1) == UA_STATUSCODE_GOOD); + + ServerDoProcess("1", (UA_UInt32) (PublishingInterval_Conn1_WG1+1), 2); /* check that all dataset writers- and readers are operational */ UA_PubSubState state = UA_PUBSUBSTATE_DISABLED; @@ -997,6 +1025,7 @@ START_TEST(Test_different_timeouts) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "disable writergroup"); ExpectedCallbackCnt = 2; ExpectedCallbackStateChange = UA_PUBSUBSTATE_DISABLED; + CallbackCnt = 0; pExpectedComponentCallbackIds[0] = DsWId_Conn1_WG1_DS1; pExpectedComponentCallbackIds[1] = WGId_Conn1_WG1; ExpectedCallbackStatus = UA_STATUSCODE_BADRESOURCEUNAVAILABLE; @@ -1039,7 +1068,7 @@ static void PubSubStateChangeCallback_wrong_timeout (UA_Server *hostServer, UA_NodeId *pubsubComponentId, UA_PubSubState state, UA_StatusCode reason) { - ck_assert(hostServer == server); + CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); UA_String strId; UA_String_init(&strId); @@ -1163,7 +1192,7 @@ START_TEST(Test_wrong_timeout) { /* static void */ /* PubSubStateChangeCallback_many_components(UA_Server *hostServer, UA_NodeId *pubsubComponentId, */ /* UA_PubSubState state, UA_StatusCode reason) { */ -/* ck_assert(hostServer == server); */ +/* CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); */ /* if(!runtime) */ /* return; */ @@ -1176,7 +1205,7 @@ START_TEST(Test_wrong_timeout) { /* UA_String_clear(&strId); */ /* if(ExpectedCallbackCnt > 0) { */ -/* ck_assert(CallbackCnt <= ExpectedCallbackCnt); */ +/* CHK_UNLOCK(&hostServer->serviceMutex, CallbackCnt <= ExpectedCallbackCnt); */ /* UA_String_init(&strId); */ /* UA_NodeId_print(&(pExpectedComponentCallbackIds[CallbackCnt]), &strId); */ /* UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PubSubStateChangeCallback(): " */ @@ -1185,16 +1214,16 @@ START_TEST(Test_wrong_timeout) { /* } */ /* if(ExpectedCallbackStateChange == UA_PUBSUBSTATE_OPERATIONAL) { */ -/* ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL || */ +/* CHK_UNLOCK(&hostServer->serviceMutex, state == UA_PUBSUBSTATE_OPERATIONAL || */ /* state == UA_PUBSUBSTATE_PREOPERATIONAL); */ /* } else { */ -/* ck_assert(ExpectedCallbackStateChange == state); */ +/* CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStateChange == state); */ /* } */ -/* ck_assert(reason == ExpectedCallbackStatus); */ +/* CHK_UNLOCK(&hostServer->serviceMutex, reason == ExpectedCallbackStatus); */ /* if (ExpectedCallbackStateChange == UA_PUBSUBSTATE_ERROR) { */ /* /\* On error we want to verify the order of DataSetReader timeouts *\/ */ -/* ck_assert(UA_NodeId_equal(pubsubComponentId, &pExpectedComponentCallbackIds[CallbackCnt]) == UA_TRUE); */ +/* CHK_UNLOCK(&hostServer->serviceMutex, UA_NodeId_equal(pubsubComponentId, &pExpectedComponentCallbackIds[CallbackCnt]) == UA_TRUE); */ /* } /\* when the state is set back to operational we cannot verify the order of StateChanges, because we */ /* cannot know which DataSetReader will be operational first *\/ */ /* CallbackCnt++; */ @@ -1744,7 +1773,7 @@ START_TEST(Test_wrong_timeout) { static void PubSubStateChangeCallback_update_config(UA_Server *hostServer, UA_NodeId *pubsubComponentId, UA_PubSubState state, UA_StatusCode reason) { - ck_assert(hostServer == server); + CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); if(!runtime) return; @@ -1759,8 +1788,8 @@ PubSubStateChangeCallback_update_config(UA_Server *hostServer, UA_NodeId *pubsub UA_String_clear(&strId); if (UA_NodeId_equal(pubsubComponentId, &ExpectedCallbackComponentNodeId) == UA_TRUE) { - ck_assert(ExpectedCallbackStateChange == state); - ck_assert(ExpectedCallbackStatus == reason); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStateChange == state); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStatus == reason); CallbackCnt++; } } @@ -1812,7 +1841,7 @@ START_TEST(Test_update_config) { AddDataSetReader(&RGId_Conn1_RG1, "Conn1_RG1_DSR1", 1, 1, 1, MessageReceiveTimeout, &VarId_Conn1_RG1_DSR1, &DSRId_Conn1_RG1_DSR1); UA_NodeId_copy(&DSRId_Conn1_RG1_DSR1, &ExpectedCallbackComponentNodeId); - ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; + ExpectedCallbackStateChange = UA_PUBSUBSTATE_PREOPERATIONAL; ExpectedCallbackStatus = UA_STATUSCODE_GOOD; const UA_UInt32 SleepTime = 50; @@ -1824,15 +1853,17 @@ START_TEST(Test_update_config) { /* set ReaderGroup operational */ ck_assert(UA_Server_enableReaderGroup(server, RGId_Conn1_RG1) == UA_STATUSCODE_GOOD); + ck_assert(UA_Server_enableDataSetReader(server, DSRId_Conn1_RG1_DSR1) == UA_STATUSCODE_GOOD); + ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; ServerDoProcess("1", SleepTime, NoOfRunIterateCycles); /* check number of state changes */ - ck_assert(CallbackCnt == 1); + ck_assert_int_eq(CallbackCnt, 2); CallbackCnt = 0; - ExpectedCallbackStateChange = UA_PUBSUBSTATE_ERROR; - ExpectedCallbackStatus = UA_STATUSCODE_BADTIMEOUT; + ExpectedCallbackStateChange = UA_PUBSUBSTATE_OPERATIONAL; + ExpectedCallbackStatus = UA_STATUSCODE_GOOD; /* check that publish/subscribe works -> set some test values */ ValidatePublishSubscribe(VarId_Conn1_WG1, VarId_Conn1_RG1_DSR1, 10, SleepTime, NoOfRunIterateCycles); @@ -1846,6 +1877,8 @@ START_TEST(Test_update_config) { UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "disable writer group"); ck_assert(UA_Server_setWriterGroupDisabled(server, WGId_Conn1_WG1) == UA_STATUSCODE_GOOD); + ExpectedCallbackStateChange = UA_PUBSUBSTATE_ERROR; + ExpectedCallbackStatus = UA_STATUSCODE_BADTIMEOUT; ServerDoProcess("2", SleepTime, NoOfRunIterateCycles); @@ -1930,7 +1963,7 @@ START_TEST(Test_add_remove) { static void PubSubStateChangeCallback_fast_path (UA_Server *hostServer, UA_NodeId *pubsubComponentId, UA_PubSubState state, UA_StatusCode reason) { - ck_assert(hostServer == server); + CHK_UNLOCK(&hostServer->serviceMutex, hostServer == server); UA_String strId; UA_String_init(&strId); @@ -1944,8 +1977,8 @@ PubSubStateChangeCallback_fast_path (UA_Server *hostServer, UA_NodeId *pubsubCom if(state == UA_PUBSUBSTATE_PREOPERATIONAL && ExpectedCallbackStateChange == UA_PUBSUBSTATE_OPERATIONAL) state = UA_PUBSUBSTATE_OPERATIONAL; - ck_assert(ExpectedCallbackStateChange == state); - ck_assert(ExpectedCallbackStatus == reason); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStateChange == state); + CHK_UNLOCK(&hostServer->serviceMutex, ExpectedCallbackStatus == reason); CallbackCnt++; } } @@ -2018,6 +2051,9 @@ START_TEST(Test_fast_path) { ServerDoProcess("0", (UA_UInt32) (PublishingInterval_Conn1WG1), 3); + ck_assert(UA_Server_DataSetWriter_getState(server, DsWId_Conn1_WG1_DS1, &state) == UA_STATUSCODE_GOOD); + ck_assert(state == UA_PUBSUBSTATE_OPERATIONAL); + /* there should not be a MessageReceiveTimeout, writers are running, readers are still disabled */ ck_assert(CallbackCnt == 0); @@ -2034,10 +2070,16 @@ START_TEST(Test_fast_path) { ck_assert(UA_Server_freezeReaderGroupConfiguration(server, RGId_Conn2_RG1) == UA_STATUSCODE_GOOD); ck_assert(UA_Server_enableReaderGroup(server, RGId_Conn2_RG1) == UA_STATUSCODE_GOOD); + ck_assert(UA_Server_ReaderGroup_getState(server, RGId_Conn2_RG1, &state) == UA_STATUSCODE_GOOD); + ck_assert(state == UA_PUBSUBSTATE_PREOPERATIONAL); + + ck_assert(UA_Server_enableDataSetReader(server, DSRId_Conn2_RG1_DSR1) == UA_STATUSCODE_GOOD); + ck_assert(UA_Server_DataSetReader_getState(server, DSRId_Conn2_RG1_DSR1, &state) == UA_STATUSCODE_GOOD); + ck_assert(state == UA_PUBSUBSTATE_PREOPERATIONAL); ServerDoProcess("0", (UA_UInt32) (PublishingInterval_Conn1WG1), 1); /* check that PubSubStateChange callback has been called for the specific DataSetReader */ - ck_assert_int_eq(1, CallbackCnt); + ck_assert_int_eq(2, CallbackCnt); CallbackCnt = 0; ck_assert(UA_Server_ReaderGroup_getState(server, RGId_Conn2_RG1, &state) == UA_STATUSCODE_GOOD); diff --git a/tests/pubsub/check_pubsub_subscribe_rt_levels.c b/tests/pubsub/check_pubsub_subscribe_rt_levels.c index cbb50137345..5e5a3c9d027 100644 --- a/tests/pubsub/check_pubsub_subscribe_rt_levels.c +++ b/tests/pubsub/check_pubsub_subscribe_rt_levels.c @@ -478,8 +478,9 @@ START_TEST(SetupInvalidPubSubConfig) { } END_TEST /* additional SubscriberBeforeWriteCallback test data */ -static UA_UInt32 sSubscriberWriteValue = 0; -static UA_NodeId sSubscribeWriteCb_TargetVar_Id; +#define NUMVARS 2 +static UA_UInt32 sSubscriberWriteValue[NUMVARS] = {0}; +static UA_NodeId sSubscribeWriteCb_TargetVar_Id[NUMVARS]; static void SubscriberBeforeWriteCallback(UA_Server *srv, const UA_NodeId *readerId, const UA_NodeId *readerGroupId, @@ -487,20 +488,27 @@ static void SubscriberBeforeWriteCallback(UA_Server *srv, void *targetVariableContext, UA_DataValue **externalDataValue) { + int i = 0; ck_assert(srv != 0); ck_assert(UA_NodeId_equal(readerId, &readerIdentifier) == UA_TRUE); ck_assert(UA_NodeId_equal(readerGroupId, &readerGroupIdentifier) == UA_TRUE); - ck_assert(UA_NodeId_equal(targetVariableId, &sSubscribeWriteCb_TargetVar_Id) == UA_TRUE); - ck_assert(targetVariableContext != 0); - ck_assert_uint_eq(10, *((UA_UInt32*) targetVariableContext)); - ck_assert(externalDataValue != 0); - ck_assert_uint_eq((**externalDataValue).value.type->memSize, sizeof(sSubscriberWriteValue)); - memcpy(&sSubscriberWriteValue, (**externalDataValue).value.data, (**externalDataValue).value.type->memSize); + for (i = 0; i < NUMVARS; i++) { + if (UA_NodeId_equal(targetVariableId, &sSubscribeWriteCb_TargetVar_Id[i]) == UA_TRUE) { + ck_assert(targetVariableContext != 0); + ck_assert_uint_eq(10+i, *((UA_UInt32*) targetVariableContext)); + ck_assert(externalDataValue != 0); + ck_assert_uint_eq((**externalDataValue).value.type->memSize, sizeof(sSubscriberWriteValue[i])); + memcpy(&sSubscriberWriteValue[i], (**externalDataValue).value.data, (**externalDataValue).value.type->memSize); + break; + } + } + /* check if variable has been found */ + ck_assert(i < NUMVARS); } static void PublishSubscribeWithWriteCallback_Helper( - UA_NodeId publisherNode, - UA_UInt32 *publisherData, + UA_NodeId *publisherNode, + UA_UInt32 **publisherData, UA_Boolean useRawEncoding) { /* test fast-path with subscriber write callback */ @@ -508,6 +516,7 @@ static void PublishSubscribeWithWriteCallback_Helper( (useRawEncoding == UA_TRUE) ? "true" : "false"); /* configure the connection */ + int i; UA_StatusCode retVal = UA_STATUSCODE_GOOD; ck_assert(addMinimalPubSubConfiguration() == UA_STATUSCODE_GOOD); UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdentifier); @@ -515,14 +524,17 @@ static void PublishSubscribeWithWriteCallback_Helper( /* Data Set Field */ UA_DataSetFieldConfig dataSetFieldConfig; - memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig)); - dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE; - dataSetFieldConfig.field.variable.promotedField = UA_FALSE; - dataSetFieldConfig.field.variable.publishParameters.publishedVariable = publisherNode; - dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE; - dataSetFieldConfig.field.variable.rtValueSource.rtInformationModelNode = UA_TRUE; - retVal = UA_Server_addDataSetField (server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdent).result; - ck_assert(retVal == UA_STATUSCODE_GOOD); + UA_NodeId dataSetFieldIdents[NUMVARS]; + for (i = 0; i < NUMVARS; i++) { + memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig)); + dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE; + dataSetFieldConfig.field.variable.promotedField = UA_FALSE; + dataSetFieldConfig.field.variable.publishParameters.publishedVariable = publisherNode[i]; + dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE; + dataSetFieldConfig.field.variable.rtValueSource.rtInformationModelNode = UA_TRUE; + retVal = UA_Server_addDataSetField (server, publishedDataSetIdent, &dataSetFieldConfig, &dataSetFieldIdents[i]).result; + ck_assert(retVal == UA_STATUSCODE_GOOD); + } /* Writer group */ UA_WriterGroupConfig writerGroupConfig; memset(&writerGroupConfig, 0, sizeof(writerGroupConfig)); @@ -599,34 +611,41 @@ static void PublishSubscribeWithWriteCallback_Helper( /* FilltestMetadata function in subscriber implementation */ UA_DataSetMetaDataType_init (pMetaData); pMetaData->name = UA_STRING ("DataSet Test"); - /* Static definition of number of fields size to 1 to create one targetVariable */ - pMetaData->fieldsSize = 1; + /* Static definition of number of fields size to NUMVARS to create targetVariables */ + pMetaData->fieldsSize = NUMVARS; pMetaData->fields = (UA_FieldMetaData*)UA_Array_new(pMetaData->fieldsSize, &UA_TYPES[UA_TYPES_FIELDMETADATA]); - /* Unsigned Integer DataType */ - UA_FieldMetaData_init (&pMetaData->fields[0]); - UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT32].typeId, - &pMetaData->fields[0].dataType); - pMetaData->fields[0].builtInType = UA_NS0ID_UINT32; - pMetaData->fields[0].valueRank = -1; /* scalar */ + for (i = 0; i < NUMVARS; i++) { + /* Unsigned Integer DataType */ + UA_FieldMetaData_init (&pMetaData->fields[i]); + UA_NodeId_copy (&UA_TYPES[UA_TYPES_UINT32].typeId, + &pMetaData->fields[i].dataType); + pMetaData->fields[i].builtInType = UA_NS0ID_UINT32; + pMetaData->fields[i].valueRank = -1; /* scalar */ + } retVal |= UA_Server_addDataSetReader(server, readerGroupIdentifier, &readerConfig, &readerIdentifier); UA_UadpDataSetReaderMessageDataType_delete(dsReaderMessage); dsReaderMessage = 0; ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - UA_FieldTargetVariable targetVar; - memset(&targetVar, 0, sizeof(UA_FieldTargetVariable)); - /* For creating Targetvariable */ - UA_FieldTargetDataType_init(&targetVar.targetVariable); - targetVar.targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; - targetVar.targetVariable.targetNodeId = sSubscribeWriteCb_TargetVar_Id; - targetVar.beforeWrite = SubscriberBeforeWriteCallback; /* set subscriber write callback */ - UA_UInt32 DummyTargetVariableContext = 10; - targetVar.targetVariableContext = &DummyTargetVariableContext; + UA_FieldTargetVariable targetVar[NUMVARS]; + UA_UInt32 DummyTargetVariableContext[NUMVARS]; + memset(&targetVar, 0, sizeof(targetVar)); + for (i = 0; i < NUMVARS; i++) { + /* For creating Targetvariable */ + UA_FieldTargetDataType_init(&targetVar[i].targetVariable); + targetVar[i].targetVariable.attributeId = UA_ATTRIBUTEID_VALUE; + targetVar[i].targetVariable.targetNodeId = sSubscribeWriteCb_TargetVar_Id[i]; + targetVar[i].beforeWrite = SubscriberBeforeWriteCallback; /* set subscriber write callback */ + DummyTargetVariableContext[i] = 10+i; + targetVar[i].targetVariableContext = &DummyTargetVariableContext[i]; + } retVal |= UA_Server_DataSetReader_createTargetVariables(server, readerIdentifier, - 1, &targetVar); + NUMVARS, &targetVar[0]); ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - UA_FieldTargetDataType_clear(&targetVar.targetVariable); + for (i = 0; i < NUMVARS; i++) { + UA_FieldTargetDataType_clear(&targetVar[i].targetVariable); + } UA_free(pMetaData->fields); ck_assert_int_eq(UA_STATUSCODE_GOOD, UA_Server_freezeReaderGroupConfiguration(server, readerGroupIdentifier)); @@ -634,18 +653,33 @@ static void PublishSubscribeWithWriteCallback_Helper( ck_assert(UA_Server_enableWriterGroup(server, writerGroupIdent) == UA_STATUSCODE_GOOD); ck_assert(UA_Server_enableReaderGroup(server, readerGroupIdentifier) == UA_STATUSCODE_GOOD); - /* run server - publisher and subscriber */ - *publisherData = 42; - sSubscriberWriteValue = 0; + for (i = 0; i < NUMVARS; i++) { + /* run server - publisher and subscriber */ + *(publisherData[i]) = 42 + i; + sSubscriberWriteValue[i] = 0; + } + ServerDoProcess((UA_UInt32) writerGroupConfig.publishingInterval, 3); + for (i = 0; i < NUMVARS; i++) { + /* check that subscriber write callback has been called - verify received value */ + ck_assert_uint_eq(*(publisherData[i]), sSubscriberWriteValue[i]); + + /* set new publisher data and test again */ + *(publisherData[i]) = 42 + NUMVARS + i; + sSubscriberWriteValue[i] = 0; + } ServerDoProcess((UA_UInt32) writerGroupConfig.publishingInterval, 3); - /* check that subscriber write callback has been called - verify received value */ - ck_assert_uint_eq(*publisherData, sSubscriberWriteValue); + for (i = 0; i < NUMVARS; i++) { + /* check that subscriber write callback has been called - verify received value */ + ck_assert_uint_eq(*(publisherData[i]), sSubscriberWriteValue[i]); - /* set new publisher data and test again */ - *publisherData = 43; - sSubscriberWriteValue = 0; + /* set new publisher data and test again for checking buffered data handling */ + *(publisherData[i]) = 42 + 2*NUMVARS + i; + sSubscriberWriteValue[i] = 0; + } ServerDoProcess((UA_UInt32) writerGroupConfig.publishingInterval, 3); - ck_assert_uint_eq(*publisherData, sSubscriberWriteValue); + for (i = 0; i < NUMVARS; i++) { + ck_assert_uint_eq(*(publisherData[i]), sSubscriberWriteValue[i]); + } ck_assert_int_eq(UA_STATUSCODE_GOOD, UA_Server_setWriterGroupDisabled(server, writerGroupIdent)); ck_assert_int_eq(UA_STATUSCODE_GOOD, UA_Server_setReaderGroupDisabled(server, readerGroupIdentifier)); @@ -661,43 +695,53 @@ static void PublishSubscribeWithWriteCallback_Helper( UA_NodeId_clear(&publishedDataSetIdent); UA_NodeId_clear(&writerGroupIdent); UA_NodeId_clear(&dataSetWriterIdent); - UA_NodeId_clear(&dataSetFieldIdent); + for (i = 0; i < NUMVARS; i++) { + UA_NodeId_clear(&dataSetFieldIdents[i]); + } UA_NodeId_clear(&readerGroupIdentifier); UA_NodeId_clear(&readerIdentifier); } START_TEST(PublishSubscribeWithWriteCallback) { + int i; + UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PublishSubscribeWithWriteCallback() test start"); - /* Create variable to publish integer data */ - UA_NodeId publisherNode; + /* Create variables to publish integer data */ + UA_NodeId publisherNode[NUMVARS]; UA_VariableAttributes attr = UA_VariableAttributes_default; attr.description = UA_LOCALIZEDTEXT("en-US","Published Integer"); attr.displayName = UA_LOCALIZEDTEXT("en-US","Published Integer"); attr.dataType = UA_TYPES[UA_TYPES_UINT32].typeId; attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE; - UA_DataValue *publisherDataValue = UA_DataValue_new(); - ck_assert(publisherDataValue != 0); - UA_UInt32 *publisherData = UA_UInt32_new(); - ck_assert(publisherData != 0); - *publisherData = 42; - UA_Variant_setScalar(&publisherDataValue->value, publisherData, &UA_TYPES[UA_TYPES_UINT32]); - UA_StatusCode retVal = UA_Server_addVariableNode(server, - UA_NODEID_NUMERIC(1, 50001), - UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), - UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), - UA_QUALIFIEDNAME(1, "Published Integer"), - UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), - attr, NULL, &publisherNode); - ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - /* add external value backend for fast-path */ + UA_DataValue *publisherDataValue[NUMVARS]; + UA_UInt32 *publisherData[NUMVARS]; + UA_StatusCode retVal; UA_ValueBackend valueBackend; - valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; - valueBackend.backend.external.value = &publisherDataValue; - retVal = UA_Server_setVariableNode_valueBackend(server, publisherNode, valueBackend); - ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); + + for (i = 0; i < NUMVARS; i++) { + publisherDataValue[i] = UA_DataValue_new(); + ck_assert(publisherDataValue[i] != 0); + publisherData[i] = UA_UInt32_new(); + ck_assert(publisherData[i] != 0); + *publisherData[i] = 42*(i+1); + UA_Variant_setScalar(&publisherDataValue[i]->value, publisherData[i], &UA_TYPES[UA_TYPES_UINT32]); + retVal = UA_Server_addVariableNode(server, + UA_NODEID_NUMERIC(1, 50001+i), + UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), + UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), + UA_QUALIFIEDNAME(1, "Published Integer"), + UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), + attr, NULL, &publisherNode[i]); + ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); + /* add external value backend for fast-path */ + valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; + valueBackend.backend.external.value = &publisherDataValue[i]; + retVal = UA_Server_setVariableNode_valueBackend(server, publisherNode[i], valueBackend); + ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); + } /* Add Subscribed Variables */ UA_NodeId folderId; @@ -721,39 +765,45 @@ START_TEST(PublishSubscribeWithWriteCallback) { folderBrowseName, UA_NODEID_NUMERIC (0, UA_NS0ID_BASEOBJECTTYPE), oAttr, NULL, &folderId); ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - /* Variable to subscribe data */ + /* Variables to subscribe data */ UA_VariableAttributes vAttr = UA_VariableAttributes_default; vAttr.description = UA_LOCALIZEDTEXT ("en-US", "Subscribed UInt32"); vAttr.displayName = UA_LOCALIZEDTEXT ("en-US", "Subscribed UInt32"); vAttr.dataType = UA_TYPES[UA_TYPES_UINT32].typeId; - retVal = UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, 50002), - folderId, - UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Subscribed UInt32"), - UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &sSubscribeWriteCb_TargetVar_Id); - ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - - UA_DataValue *subscriberDataValue = UA_DataValue_new(); - ck_assert(subscriberDataValue != 0); - UA_UInt32 *subscriberData = UA_UInt32_new(); - ck_assert(subscriberData != 0); - *subscriberData = 0; - UA_Variant_setScalar(&subscriberDataValue->value, subscriberData, &UA_TYPES[UA_TYPES_UINT32]); + UA_DataValue *subscriberDataValue[NUMVARS]; + UA_UInt32 *subscriberData[NUMVARS]; + for (i = 0; i < NUMVARS; i++) { + retVal = UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, 50001 + NUMVARS + i), + folderId, + UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT), UA_QUALIFIEDNAME(1, "Subscribed UInt32"), + UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), vAttr, NULL, &sSubscribeWriteCb_TargetVar_Id[i]); + ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); - /* add external value backend for fast-path */ - memset(&valueBackend, 0, sizeof(valueBackend)); - valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; - valueBackend.backend.external.value = &subscriberDataValue; - retVal = UA_Server_setVariableNode_valueBackend(server, sSubscribeWriteCb_TargetVar_Id, valueBackend); - ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); + subscriberDataValue[i] = UA_DataValue_new(); + ck_assert(subscriberDataValue[i] != 0); + subscriberData[i] = UA_UInt32_new(); + ck_assert(subscriberData[i] != 0); + *subscriberData[i] = 0; + UA_Variant_setScalar(&subscriberDataValue[i]->value, subscriberData[i], &UA_TYPES[UA_TYPES_UINT32]); + + /* add external value backend for fast-path */ + memset(&valueBackend, 0, sizeof(valueBackend)); + valueBackend.backendType = UA_VALUEBACKENDTYPE_EXTERNAL; + valueBackend.backend.external.value = &subscriberDataValue[i]; + retVal = UA_Server_setVariableNode_valueBackend(server, sSubscribeWriteCb_TargetVar_Id[i], valueBackend); + ck_assert_int_eq(retVal, UA_STATUSCODE_GOOD); + } PublishSubscribeWithWriteCallback_Helper(publisherNode, publisherData, UA_FALSE); PublishSubscribeWithWriteCallback_Helper(publisherNode, publisherData, UA_TRUE); /* cleanup */ - UA_DataValue_delete(subscriberDataValue); - subscriberDataValue = 0; - UA_DataValue_delete(publisherDataValue); - publisherDataValue = 0; + for (i = 0; i < NUMVARS; i++) { + UA_DataValue_delete(subscriberDataValue[i]); + subscriberDataValue[i] = 0; + UA_DataValue_delete(publisherDataValue[i]); + publisherDataValue[i] = 0; + } UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "PublishSubscribeWithWriteCallback() test end"); } END_TEST diff --git a/tests/server/check_server_asyncop.c b/tests/server/check_server_asyncop.c index 3edfae0b4b0..4352f28b7f8 100644 --- a/tests/server/check_server_asyncop.c +++ b/tests/server/check_server_asyncop.c @@ -198,6 +198,74 @@ START_TEST(Async_timeout) { UA_Client_delete(client); } END_TEST +START_TEST(Async_cancel) { + UA_Client *client = UA_Client_new(); + UA_ClientConfig *clientConfig = UA_Client_getConfig(client); + UA_ClientConfig_setDefault(clientConfig); + + UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840"); + ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); + + /* Call async method, then the sync method. + * The sync method returns first. */ + UA_UInt32 reqId = 0; + retval = UA_Client_call_async(client, + UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER), + UA_NODEID_STRING(1, "asyncMethod"), + 0, NULL, clientReceiveCallback, NULL, &reqId); + ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); + + /* Cancel the request */ + UA_UInt32 cancelCount = 0; + UA_Client_cancelByRequestId(client, reqId, &cancelCount); + ck_assert_uint_eq(cancelCount, 1); + + /* We expect to receive the cancelled response */ + UA_Client_run_iterate(client, 0); + ck_assert_uint_eq(clientCounter, 1); + + UA_Client_disconnect(client); + UA_Client_delete(client); +} END_TEST + +START_TEST(Async_cancel_multiple) { + UA_Client *client = UA_Client_new(); + UA_ClientConfig *clientConfig = UA_Client_getConfig(client); + UA_ClientConfig_setDefault(clientConfig); + + UA_StatusCode retval = UA_Client_connect(client, "opc.tcp://localhost:4840"); + ck_assert_uint_eq(retval, UA_STATUSCODE_GOOD); + + UA_CallRequest creq; + UA_CallRequest_init(&creq); + creq.requestHeader.requestHandle = 1337; + UA_CallMethodRequest cmr; + UA_CallMethodRequest_init(&cmr); + cmr.objectId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER); + cmr.methodId = UA_NODEID_STRING(1, "asyncMethod"); + creq.methodsToCall = &cmr; + creq.methodsToCallSize = 1; + + __UA_Client_AsyncService(client, + &creq, &UA_TYPES[UA_TYPES_CALLREQUEST], + NULL, &UA_TYPES[UA_TYPES_CALLRESPONSE], + NULL, NULL); + + __UA_Client_AsyncService(client, + &creq, &UA_TYPES[UA_TYPES_CALLREQUEST], + NULL, &UA_TYPES[UA_TYPES_CALLRESPONSE], + NULL, NULL); + + /* Expect two cancelled requests */ + UA_UInt32 cancelCount = 0; + UA_Client_cancelByRequestHandle(client, 1337, &cancelCount); + ck_assert_uint_eq(cancelCount, 2); + + UA_Client_run_iterate(client, 0); + UA_Client_disconnect(client); + UA_Client_delete(client); +} END_TEST + /* Force a timeout when the operation is checked out with the worker */ START_TEST(Async_timeout_worker) { UA_Client *client = UA_Client_new(); @@ -255,6 +323,8 @@ static Suite* method_async_suite(void) { tcase_add_checked_fixture(tc_manager, setup, teardown); tcase_add_test(tc_manager, Async_call); tcase_add_test(tc_manager, Async_timeout); + tcase_add_test(tc_manager, Async_cancel); + tcase_add_test(tc_manager, Async_cancel_multiple); tcase_add_test(tc_manager, Async_timeout_worker); suite_add_tcase(s, tc_manager); diff --git a/tools/schema/datatypes_minimal.txt b/tools/schema/datatypes_minimal.txt index 8578e227f2d..64fc1679185 100644 --- a/tools/schema/datatypes_minimal.txt +++ b/tools/schema/datatypes_minimal.txt @@ -117,4 +117,6 @@ ServerState ServerDiagnosticsSummaryDataType RedundancySupport KeyValuePair -NamingRuleType \ No newline at end of file +NamingRuleType +CancelRequest +CancelResponse