Skip to content

Commit

Permalink
Update subscribe api
Browse files Browse the repository at this point in the history
Signed-off-by: JaylinYu <[email protected]>
  • Loading branch information
JaylinYu committed Nov 20, 2024
1 parent 01177b1 commit f8f89be
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/supplemental/mqtt/mqtt_public.c
Original file line number Diff line number Diff line change
Expand Up @@ -944,23 +944,41 @@ nng_mqtt_unsubscribe_async(nng_mqtt_client *client, nng_mqtt_topic *sbs, size_t
return 0;
}

// send a blocking sub msg until suback returns
int
nng_mqtt_subscribe(nng_socket sock, nng_mqtt_topic_qos *sbs, size_t count, property *pl)
{
int rv = 0;
// create a SUBSCRIBE message
nng_msg *submsg;
nng_mqtt_msg_alloc(&submsg, 0);
nng_msg *submsg, *msg;
nng_aio *aio;
rv = nng_mqtt_msg_alloc(&submsg, 0);
if (rv != 0) {
return (NNG_EINVAL);
}
nng_mqtt_msg_set_packet_type(submsg, NNG_MQTT_SUBSCRIBE);
nng_mqtt_msg_set_subscribe_topics(submsg, sbs, count);

if (pl) {
nng_mqtt_msg_set_subscribe_property(submsg, pl);
}

if ((rv = nng_sendmsg(sock, submsg, NNG_FLAG_ALLOC)) != 0) {
nng_msg_free(submsg);
nng_aio_alloc(&aio, NULL, NULL);
nng_aio_set_timeout(aio, 5000);

nng_aio_set_msg(aio, submsg);
nng_send_aio(sock, aio);
nng_aio_wait(aio);

rv = nng_aio_result(aio);
if (rv != 0)
return (NNG_ECLOSED); // connection shall be closed this time
msg = nng_aio_get_msg(aio);
if (msg) {
uint8_t *code = nng_mqtt_msg_get_suback_return_codes(msg, &count);
nng_msg_free(msg);
}
nng_aio_free(aio);

return rv;
}
Expand Down

0 comments on commit f8f89be

Please sign in to comment.