Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1.4' into merge_14_master_24
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfr committed Nov 15, 2024
2 parents 6b17b97 + d4c5aaa commit bfaec71
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 37 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
# overwritten with more detailed information if git is available.
set(OPEN62541_VER_MAJOR 1)
set(OPEN62541_VER_MINOR 4)
set(OPEN62541_VER_PATCH 6)
set(OPEN62541_VER_PATCH 7)
set(OPEN62541_VER_LABEL "-undefined") # like "-rc1" or "-g4538abcd" or "-g4538abcd-dirty"
set(OPEN62541_VER_COMMIT "unknown-commit")

Expand Down
2 changes: 1 addition & 1 deletion examples/pubsub/tutorial_pubsub_mqtt_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ addPubSubConnection(UA_Server *server, char *addressUrl) {
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING(CONNECTION_NAME);
if (useJson) {
if(useJson) {
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_JSON);
} else {
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI_UADP);
Expand Down
8 changes: 6 additions & 2 deletions examples/pubsub/tutorial_pubsub_mqtt_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
#include <open62541/plugin/log_stdout.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#if defined(UA_ENABLE_PUBSUB_ENCRYPTION)
#include <open62541/plugin/securitypolicy_default.h>
#endif

#include <stdio.h>

#include <stdio.h>

Expand Down Expand Up @@ -111,8 +115,8 @@ addReaderGroup(UA_Server *server) {
readerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;

/* configure the mqtt publish topic */
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
UA_BrokerDataSetReaderTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetReaderTransportDataType));
/* Assign the Topic at which MQTT publish should happen */
/*ToDo: Pass the topic as argument from the reader group */
brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
Expand Down
54 changes: 48 additions & 6 deletions src/pubsub/ua_pubsub_ns0.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,23 +380,65 @@ addWriterGroupConfig(UA_Server *server, UA_NodeId connectionId,
writerGroupConfig.name = writerGroup->name;
writerGroupConfig.publishingInterval = writerGroup->publishingInterval;
writerGroupConfig.writerGroupId = writerGroup->writerGroupId;
//TODO remove hard coded UADP
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.priority = writerGroup->priority;

UA_UadpWriterGroupMessageDataType writerGroupMessage;
UA_ExtensionObject *eoWG = &writerGroup->messageSettings;
UA_UadpWriterGroupMessageDataType uadpWriterGroupMessage;
UA_JsonWriterGroupMessageDataType jsonWriterGroupMessage;
if(eoWG->encoding == UA_EXTENSIONOBJECT_DECODED){
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
if(eoWG->content.decoded.type == &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE]){
if(UA_UadpWriterGroupMessageDataType_copy((UA_UadpWriterGroupMessageDataType *) eoWG->content.decoded.data,
&writerGroupMessage) != UA_STATUSCODE_GOOD){
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
if(UA_UadpWriterGroupMessageDataType_copy(
(UA_UadpWriterGroupMessageDataType *)eoWG->content.decoded.data,
&uadpWriterGroupMessage) != UA_STATUSCODE_GOOD) {
return UA_STATUSCODE_BADOUTOFMEMORY;
}
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
writerGroupConfig.messageSettings.content.decoded.data = &writerGroupMessage;
writerGroupConfig.messageSettings.content.decoded.data = &uadpWriterGroupMessage;
} else if(eoWG->content.decoded.type == &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE]) {
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
if(UA_JsonWriterGroupMessageDataType_copy(
(UA_JsonWriterGroupMessageDataType *)eoWG->content.decoded.data,
&jsonWriterGroupMessage) != UA_STATUSCODE_GOOD) {
return UA_STATUSCODE_BADOUTOFMEMORY;
}
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE];
writerGroupConfig.messageSettings.content.decoded.data = &jsonWriterGroupMessage;
}
}

eoWG = &writerGroup->transportSettings;
UA_BrokerWriterGroupTransportDataType brokerWriterGroupTransport;
UA_DatagramWriterGroupTransportDataType datagramWriterGroupTransport;
if(eoWG->encoding == UA_EXTENSIONOBJECT_DECODED) {
writerGroupConfig.transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
if(eoWG->content.decoded.type == &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE]) {
if(UA_BrokerWriterGroupTransportDataType_copy(
(UA_BrokerWriterGroupTransportDataType*)eoWG->content.decoded.data,
&brokerWriterGroupTransport) != UA_STATUSCODE_GOOD) {
return UA_STATUSCODE_BADOUTOFMEMORY;
}
writerGroupConfig.transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
writerGroupConfig.transportSettings.content.decoded.data = &brokerWriterGroupTransport;
} else if(eoWG->content.decoded.type == &UA_TYPES[UA_TYPES_DATAGRAMWRITERGROUPTRANSPORTDATATYPE]) {
if(UA_DatagramWriterGroupTransportDataType_copy(
(UA_DatagramWriterGroupTransportDataType *)eoWG->content.decoded.data,
&datagramWriterGroupTransport) != UA_STATUSCODE_GOOD) {
return UA_STATUSCODE_BADOUTOFMEMORY;
}
writerGroupConfig.transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_DATAGRAMWRITERGROUPTRANSPORTDATATYPE];
writerGroupConfig.transportSettings.content.decoded.data = &datagramWriterGroupTransport;
}
}
if (writerGroupConfig.encodingMimeType == UA_PUBSUB_ENCODING_JSON
&& (writerGroupConfig.transportSettings.encoding != UA_EXTENSIONOBJECT_DECODED ||
writerGroupConfig.transportSettings.content.decoded.type !=
&UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE])) {
UA_LOG_ERROR(server->config.logging, UA_LOGCATEGORY_SERVER,
"JSON encoding is supported only for MQTT transport");
return UA_STATUSCODE_BADCONFIGURATIONERROR;
}

return UA_WriterGroup_create(psm, connectionId, &writerGroupConfig, writerGroupId);
}
Expand Down
2 changes: 2 additions & 0 deletions src/pubsub/ua_pubsub_readergroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ UA_ReaderGroupConfig_copy(const UA_ReaderGroupConfig *src,
res |= UA_String_copy(&src->name, &dst->name);
res |= UA_KeyValueMap_copy(&src->groupProperties, &dst->groupProperties);
res |= UA_String_copy(&src->securityGroupId, &dst->securityGroupId);
res |= UA_ExtensionObject_copy(&src->transportSettings, &dst->transportSettings);
if(res != UA_STATUSCODE_GOOD)
UA_ReaderGroupConfig_clear(dst);
return res;
Expand All @@ -76,6 +77,7 @@ UA_ReaderGroupConfig_clear(UA_ReaderGroupConfig *readerGroupConfig) {
UA_String_clear(&readerGroupConfig->name);
UA_KeyValueMap_clear(&readerGroupConfig->groupProperties);
UA_String_clear(&readerGroupConfig->securityGroupId);
UA_ExtensionObject_clear(&readerGroupConfig->transportSettings);
}

UA_StatusCode
Expand Down
21 changes: 0 additions & 21 deletions src/server/ua_services_discovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,27 +491,6 @@ Service_GetEndpoints(UA_Server *server, UA_Session *session,
setCurrentEndPointsArray(server, request->endpointUrl,
request->profileUris, request->profileUrisSize,
&response->endpoints, &response->endpointsSize);

/* Check if the ServerUrl is already present in the DiscoveryUrl array.
* Add if not already there. */
UA_SecureChannel *channel = session->channel;
for(size_t i = 0; i < server->config.applicationDescription.discoveryUrlsSize; i++) {
if(UA_String_equal(&channel->endpointUrl,
&server->config.applicationDescription.discoveryUrls[i])) {
return;
}
}
if(server->config.applicationDescription.discoveryUrls == NULL){
server->config.applicationDescription.discoveryUrls = (UA_String*)UA_Array_new(1, &UA_TYPES[UA_TYPES_STRING]);
server->config.applicationDescription.discoveryUrlsSize = 0;
}
UA_StatusCode retval = UA_STATUSCODE_GOOD;
retval = UA_Array_appendCopy((void**)&server->config.applicationDescription.discoveryUrls,
&server->config.applicationDescription.discoveryUrlsSize,
&request->endpointUrl, &UA_TYPES[UA_TYPES_STRING]);
if(retval != UA_STATUSCODE_GOOD)
UA_LOG_ERROR(server->config.logging, UA_LOGCATEGORY_SERVER,
"Error adding the ServerUrl to theDiscoverUrl list.");
}

#ifdef UA_ENABLE_DISCOVERY
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
if(MSVC)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /W1") # Reduce warnings level
add_compile_options(
$<$<CONFIG:>:/MD>
$<$<CONFIG:Debug>:/MDd>
$<$<CONFIG:Release>:/MD>
)
else()
add_compile_options(-Wno-unused-variable -Wno-unused-function)
add_compile_options(-Wno-gnu-zero-variadic-macro-arguments) # silence warning for the check library
Expand Down
60 changes: 55 additions & 5 deletions tests/pubsub/check_pubsub_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ static void setup(void) {
connectionConfig.publisherId.id.uint16 = 2234;

/* configure options, set mqtt client id */
const int connectionOptionsCount = 1;

UA_KeyValuePair connectionOptions[connectionOptionsCount];
UA_KeyValuePair connectionOptions[1];

size_t connectionOptionIndex = 0;
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
Expand Down Expand Up @@ -224,8 +222,8 @@ START_TEST(SinglePublishSubscribeDateTime){
readerGroupConfig.name = UA_STRING("ReaderGroup1");

/* configure the mqtt publish topic */
UA_BrokerWriterGroupTransportDataType brokerTransportSettingsSubscriber;
memset(&brokerTransportSettingsSubscriber, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
UA_BrokerDataSetReaderTransportDataType brokerTransportSettingsSubscriber;
memset(&brokerTransportSettingsSubscriber, 0, sizeof(UA_BrokerDataSetReaderTransportDataType));

brokerTransportSettingsSubscriber.queueName = UA_STRING(SUBSCRIBE_TOPIC);
brokerTransportSettingsSubscriber.resourceUri = UA_STRING_NULL;
Expand Down Expand Up @@ -326,10 +324,62 @@ START_TEST(SinglePublishSubscribeDateTime){

} END_TEST

START_TEST(CreateReaderGroup) {
UA_StatusCode retval = UA_STATUSCODE_GOOD;

// add reader group
UA_ReaderGroupConfig readerGroupConfig;
memset(&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup1");

/* configure the mqtt publish topic */
UA_BrokerDataSetReaderTransportDataType transportSettingsData;
memset(&transportSettingsData, 0, sizeof(UA_BrokerDataSetReaderTransportDataType));

transportSettingsData.queueName = UA_STRING(SUBSCRIBE_TOPIC);
transportSettingsData.resourceUri = UA_STRING_NULL;
transportSettingsData.authenticationProfileUri = UA_STRING_NULL;

transportSettingsData.requestedDeliveryGuarantee =
UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type =
&UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &transportSettingsData;

readerGroupConfig.transportSettings = transportSettings;

retval = UA_Server_addReaderGroup(server, connectionIdent, &readerGroupConfig,
&readerGroupIdent);
ck_assert_int_eq(retval, UA_STATUSCODE_GOOD);

// Check if reader group was created correctly (Issue #6808)
memset(&transportSettingsData, 0,
sizeof(UA_BrokerDataSetReaderTransportDataType));

UA_ReaderGroup *rg = UA_ReaderGroup_findRGbyId(server, readerGroupIdent);
ck_assert(rg != 0);
UA_ExtensionObject *ts = &rg->config.transportSettings;

ck_assert((ts->encoding == UA_EXTENSIONOBJECT_DECODED ||
ts->encoding == UA_EXTENSIONOBJECT_DECODED_NODELETE) &&
ts->content.decoded.type ==
&UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE]);
UA_String *topic =
&((UA_BrokerDataSetReaderTransportDataType *)ts->content.decoded.data)->queueName;
ck_assert(topic->data != 0 && topic->length != 0 &&
strncmp(SUBSCRIBE_TOPIC, (const char *)topic->data,
strlen(SUBSCRIBE_TOPIC)) == 0);
} END_TEST

int main(void) {
TCase *tc_pubsub_subscribe_mqtt = tcase_create("PubSub subscribe mqtt");
tcase_add_checked_fixture(tc_pubsub_subscribe_mqtt, setup, teardown);
tcase_add_test(tc_pubsub_subscribe_mqtt, SinglePublishSubscribeDateTime);
tcase_add_test(tc_pubsub_subscribe_mqtt, CreateReaderGroup);

Suite *s = suite_create("PubSub subscribe via mqtt");
suite_add_tcase(s, tc_pubsub_subscribe_mqtt);
Expand Down
12 changes: 11 additions & 1 deletion tests/pubsub/check_pubsub_publisherid.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <stdlib.h>

static UA_Server *server = NULL;
UA_Logger logger;

/* global variables for test configuration */
static UA_Boolean UseFastPath = UA_FALSE;
Expand All @@ -25,6 +26,15 @@ static void setup(void) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "setup");
server = UA_Server_newForUnitTest();
ck_assert(server != NULL);

UA_ServerConfig *config = UA_Server_getConfig(server);
ck_assert(config != 0);

/* Silence the log, because this test might produce an enormous amount of noise */
logger = UA_Log_Stdout_withLevel(UA_LOGLEVEL_ERROR);
config->logging->clear(config->logging);
*config->logging = logger;

ck_assert_int_eq(UA_STATUSCODE_GOOD, UA_Server_run_startup(server));
}

Expand Down Expand Up @@ -295,7 +305,7 @@ ValidatePublishSubscribe(
tmpValue = TestValue + (UA_Int32)i;
if(UseFastPath) {
ck_assert(fastPathSubscriberValues[i] != 0);
if(tmpValue != *(UA_Int32 *)fastPathSubscriberValues[i]->value.data) {
if(tmpValue != *((UA_Int32 *)fastPathSubscriberValues[i]->value.data)) {
done = false;
break;
}
Expand Down

0 comments on commit bfaec71

Please sign in to comment.