diff --git a/demo/mqttv5_scram/mqttv5_scram.c b/demo/mqttv5_scram/mqttv5_scram.c index 2609ea4b..5b03df31 100644 --- a/demo/mqttv5_scram/mqttv5_scram.c +++ b/demo/mqttv5_scram/mqttv5_scram.c @@ -560,6 +560,28 @@ main(const int argc, const char **argv) nng_thread_create(&threads[i], publish_cb, ¶ms); } + for (i = 0; i < nthread; i++) { + nng_thread_destroy(threads[i]); + } + } else if (strcmp(TLS_PUBLISH, cmd) == 0) { + const char *data = argv[5]; + uint32_t interval = 0; + uint32_t nthread = 1; + + nng_thread *threads[nthread]; + + params.sock = &sock, params.topic = topic; + params.data = (uint8_t *) data; + params.data_len = strlen(data); + params.qos = qos; + params.interval = interval; + params.verbose = verbose; + + size_t i = 0; + for (i = 0; i < nthread; i++) { + nng_thread_create(&threads[i], publish_cb, ¶ms); + } + for (i = 0; i < nthread; i++) { nng_thread_destroy(threads[i]); } @@ -567,9 +589,64 @@ main(const int argc, const char **argv) nng_mqtt_topic_qos subscriptions[] = { { .qos = qos, - .topic = { + .topic = { + .buf = (uint8_t *) topic, + .length = strlen(topic), + }, + .nolocal = 1, + .rap = 1, + .retain_handling = 0, + }, + }; + nng_mqtt_topic unsubscriptions[] = { + { + .buf = (uint8_t *) topic, + .length = strlen(topic), + }, + }; + + property *plist = mqtt_property_alloc(); + mqtt_property_append(plist, + mqtt_property_set_value_varint( + SUBSCRIPTION_IDENTIFIER, 120)); + property *unsub_plist = NULL; + mqtt_property_dup(&unsub_plist, plist); + + // Sync subscription + // rv = nng_mqtt_subscribe(sock, subscriptions, 1, plist); + + // Asynchronous subscription + nng_mqtt_client *client = nng_mqtt_client_alloc(sock, &send_callback, NULL, true); + nng_mqtt_subscribe_async(client, subscriptions, + sizeof(subscriptions) / sizeof(nng_mqtt_topic_qos), plist); + + printf("Start receiving loop:\n"); + while (true) { + nng_msg *msg; + if ((rv = nng_recvmsg(sock, &msg, 0)) != 0) { + fatal("nng_recvmsg", rv); + continue; + } + + // we should only receive publish messages + assert(nng_mqtt_msg_get_packet_type(msg) == NNG_MQTT_PUBLISH); + msg_recv_deal(msg, verbose); + } + + // Sync unsubscription + // rv = nng_mqtt_unsubscribe(sock, subscriptions, 1, plist); + // Asynchronous unsubscription + nng_mqtt_unsubscribe_async(client, unsubscriptions, + sizeof(unsubscriptions) / sizeof(nng_mqtt_topic), + unsub_plist); + nng_mqtt_client_free(client, true); + } else if (strcmp(TLS_SUBSCRIBE, cmd) == 0) { + nng_mqtt_topic_qos subscriptions[] = { + { + .qos = qos, + .topic = { .buf = (uint8_t *) topic, - .length = strlen(topic), + .length = strlen(topic), }, .nolocal = 1, .rap = 1, @@ -620,7 +697,7 @@ main(const int argc, const char **argv) nng_mqtt_client_free(client, true); } - // disconnect + // disconnect property *plist = mqtt_property_alloc(); property *p = mqtt_property_set_value_strpair( USER_PROPERTY, "aaa", strlen("aaa"), "aaa", strlen("aaa"), true);