Skip to content

Commit

Permalink
* NEW [demo/scram] Add handler for demo tls_sub/tls_pub.
Browse files Browse the repository at this point in the history
Signed-off-by: wanghaemq <[email protected]>
  • Loading branch information
wanghaEMQ authored and JaylinYu committed Jul 29, 2024
1 parent 20ecfef commit 78c4de6
Showing 1 changed file with 80 additions and 3 deletions.
83 changes: 80 additions & 3 deletions demo/mqttv5_scram/mqttv5_scram.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,16 +560,93 @@ main(const int argc, const char **argv)
nng_thread_create(&threads[i], publish_cb, &params);
}

for (i = 0; i < nthread; i++) {
nng_thread_destroy(threads[i]);
}
} else if (strcmp(TLS_PUBLISH, cmd) == 0) {
const char *data = argv[5];
uint32_t interval = 0;
uint32_t nthread = 1;

nng_thread *threads[nthread];

params.sock = &sock, params.topic = topic;
params.data = (uint8_t *) data;
params.data_len = strlen(data);
params.qos = qos;
params.interval = interval;
params.verbose = verbose;

size_t i = 0;
for (i = 0; i < nthread; i++) {
nng_thread_create(&threads[i], publish_cb, &params);
}

for (i = 0; i < nthread; i++) {
nng_thread_destroy(threads[i]);
}
} else if (strcmp(SUBSCRIBE, cmd) == 0) {
nng_mqtt_topic_qos subscriptions[] = {
{
.qos = qos,
.topic = {
.topic = {
.buf = (uint8_t *) topic,
.length = strlen(topic),
},
.nolocal = 1,
.rap = 1,
.retain_handling = 0,
},
};
nng_mqtt_topic unsubscriptions[] = {
{
.buf = (uint8_t *) topic,
.length = strlen(topic),
},
};

property *plist = mqtt_property_alloc();
mqtt_property_append(plist,
mqtt_property_set_value_varint(
SUBSCRIPTION_IDENTIFIER, 120));
property *unsub_plist = NULL;
mqtt_property_dup(&unsub_plist, plist);

// Sync subscription
// rv = nng_mqtt_subscribe(sock, subscriptions, 1, plist);

// Asynchronous subscription
nng_mqtt_client *client = nng_mqtt_client_alloc(sock, &send_callback, NULL, true);
nng_mqtt_subscribe_async(client, subscriptions,
sizeof(subscriptions) / sizeof(nng_mqtt_topic_qos), plist);

printf("Start receiving loop:\n");
while (true) {
nng_msg *msg;
if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) {
fatal("nng_recvmsg", rv);
continue;
}

// we should only receive publish messages
assert(nng_mqtt_msg_get_packet_type(msg) == NNG_MQTT_PUBLISH);
msg_recv_deal(msg, verbose);
}

// Sync unsubscription
// rv = nng_mqtt_unsubscribe(sock, subscriptions, 1, plist);
// Asynchronous unsubscription
nng_mqtt_unsubscribe_async(client, unsubscriptions,
sizeof(unsubscriptions) / sizeof(nng_mqtt_topic),
unsub_plist);
nng_mqtt_client_free(client, true);
} else if (strcmp(TLS_SUBSCRIBE, cmd) == 0) {
nng_mqtt_topic_qos subscriptions[] = {
{
.qos = qos,
.topic = {
.buf = (uint8_t *) topic,
.length = strlen(topic),
.length = strlen(topic),
},
.nolocal = 1,
.rap = 1,
Expand Down Expand Up @@ -620,7 +697,7 @@ main(const int argc, const char **argv)
nng_mqtt_client_free(client, true);
}

// disconnect
// disconnect
property *plist = mqtt_property_alloc();
property *p = mqtt_property_set_value_strpair(
USER_PROPERTY, "aaa", strlen("aaa"), "aaa", strlen("aaa"), true);
Expand Down

0 comments on commit 78c4de6

Please sign in to comment.