diff --git a/src/mqtt/transport/tcp/mqtt_tcp.c b/src/mqtt/transport/tcp/mqtt_tcp.c index f9dd2ace..4c118921 100644 --- a/src/mqtt/transport/tcp/mqtt_tcp.c +++ b/src/mqtt/transport/tcp/mqtt_tcp.c @@ -403,7 +403,9 @@ mqtt_tcptran_pipe_nego_cb(void *arg) goto mqtt_error; property_free(ep->property); property *prop = (void *)nni_mqtt_msg_get_connack_property(p->rxmsg); - property_dup((property **)&ep->property, prop); + if (property_dup((property **)&ep->property, prop) != 0) { + goto mqtt_error; + } property_data *data; data = property_get_value(ep->property, RECEIVE_MAXIMUM); if (data) { @@ -708,8 +710,6 @@ mqtt_tcptran_pipe_recv_cb(void *arg) nni_msg_header_append(p->rxmsg, p->rxlen, pos + 1); msg = p->rxmsg; p->rxmsg = NULL; - // n = nni_msg_len(msg); - nni_plat_printf("length2 %d\n", nni_msg_len(msg)); type = p->rxlen[0] & 0xf0; flags = p->rxlen[0] & 0x0f; diff --git a/src/mqtt/transport/tls/mqtt_tls.c b/src/mqtt/transport/tls/mqtt_tls.c index 3902ab90..621ab566 100644 --- a/src/mqtt/transport/tls/mqtt_tls.c +++ b/src/mqtt/transport/tls/mqtt_tls.c @@ -54,7 +54,7 @@ struct mqtts_tcptran_pipe { nni_aio * rxaio; nni_aio * rpaio; nni_aio * qsaio; - nni_lmq rslmq; + // nni_lmq rslmq; nni_aio * negoaio; nni_msg * rxmsg; nni_msg * smsg; @@ -130,7 +130,7 @@ mqtts_tcptran_pipe_close(void *arg) nni_mtx_lock(&p->mtx); p->closed = true; - nni_lmq_flush(&p->rslmq); + // nni_lmq_flush(&p->rslmq); nni_mtx_unlock(&p->mtx); nni_aio_close(p->rxaio); @@ -192,7 +192,7 @@ mqtts_tcptran_pipe_init(void *arg, nni_pipe *npipe) mqtts_tcptran_pipe *p = arg; p->npipe = npipe; - nni_lmq_init(&p->rslmq, 1024); + // nni_lmq_init(&p->rslmq, 1024); p->packmax = 0xFFFF; p->qosmax = 2; p->busy = false; @@ -225,7 +225,7 @@ mqtts_tcptran_pipe_fini(void *arg) nni_aio_free(p->rpaio); nng_stream_free(p->conn); nni_msg_free(p->rxmsg); - nni_lmq_fini(&p->rslmq); + // nni_lmq_fini(&p->rslmq); nni_mtx_fini(&p->mtx); nni_aio_fini(&p->tmaio); NNI_FREE_STRUCT(p); @@ -393,10 +393,10 @@ mqtts_tcptran_pipe_nego_cb(void *arg) property *prop = (void *) nni_mqtt_msg_get_connack_property( p->rxmsg); - property_dup((property **) &ep->property, prop); + if (property_dup((property **) &ep->property, prop) != 0) + goto mqtt_error; property_data *data; - data = - property_get_value(ep->property, RECEIVE_MAXIMUM); + data = property_get_value(ep->property, RECEIVE_MAXIMUM); if (data) { if (data->p_value.u16 == 0) { rv = MQTT_ERR_PROTOCOL; @@ -508,21 +508,21 @@ mqtts_tcptran_pipe_qos_send_cb(void *arg) msg = nni_aio_get_msg(p->qsaio); if (msg != NULL) nni_msg_free(msg); - if (nni_lmq_get(&p->rslmq, &msg) == 0) { - nni_iov iov; - // TLS transport can only use one single iov - nni_msg_insert( - msg, nni_msg_header(msg), nni_msg_header_len(msg)); - iov.iov_len = nni_msg_len(msg); - iov.iov_buf = nni_msg_body(msg); - p->busy = true; - nni_aio_set_msg(p->qsaio, msg); - // send ACK down... - nni_aio_set_iov(p->qsaio, 1, &iov); - nng_stream_send(p->conn, p->qsaio); - nni_mtx_unlock(&p->mtx); - return; - } + // if (nni_lmq_get(&p->rslmq, &msg) == 0) { + // nni_iov iov; + // // TLS transport can only use one single iov + // nni_msg_insert( + // msg, nni_msg_header(msg), nni_msg_header_len(msg)); + // iov.iov_len = nni_msg_len(msg); + // iov.iov_buf = nni_msg_body(msg); + // p->busy = true; + // nni_aio_set_msg(p->qsaio, msg); + // // send ACK down... + // nni_aio_set_iov(p->qsaio, 1, &iov); + // nng_stream_send(p->conn, p->qsaio); + // nni_mtx_unlock(&p->mtx); + // return; + // } p->busy = false; nni_aio_set_msg(qsaio, NULL); nni_mtx_unlock(&p->mtx); @@ -615,6 +615,7 @@ mqtts_tcptran_pipe_recv_cb(void *arg) rv = PACKET_TOO_LARGE; goto recv_error; } + // TLS only accept 1 iov // same packet, continue receving next byte of remaining length iov.iov_buf = &p->rxlen[p->gotrxhead]; iov.iov_len = 1; @@ -658,7 +659,6 @@ mqtts_tcptran_pipe_recv_cb(void *arg) nni_msg_header_append(p->rxmsg, p->rxlen, pos + 1); msg = p->rxmsg; p->rxmsg = NULL; - n = nni_msg_len(msg); type = p->rxlen[0] & 0xf0; flags = p->rxlen[0] & 0x0f; // set the payload pointer of msg according to packet_type @@ -677,6 +677,9 @@ mqtts_tcptran_pipe_recv_cb(void *arg) ack_cmd = CMD_PUBACK; } else if (qos_pac == 2) { ack_cmd = CMD_PUBREC; + } else { + rv = PROTOCOL_ERROR; + goto recv_error; } packet_id = nni_msg_get_pub_pid(msg); ack = true; @@ -747,30 +750,31 @@ mqtts_tcptran_pipe_recv_cb(void *arg) // send ACK down... nni_aio_set_iov(p->qsaio, 1, &iov); nng_stream_send(p->conn, p->qsaio); - } else { - if (nni_lmq_full(&p->rslmq)) { - // Make space for the new message. TODO add max - // limit of msgq len in conf - if (nni_lmq_cap(&p->rslmq) <= - NNG_TRAN_MAX_LMQ_SIZE) { - if ((rv = nni_lmq_resize(&p->rslmq, - nni_lmq_cap(&p->rslmq) * - 2)) == 0) { - nni_lmq_put(&p->rslmq, qmsg); - } else { - // memory error. - nni_msg_free(qmsg); - } - } else { - nni_msg *old; - (void) nni_lmq_get(&p->rslmq, &old); - nni_msg_free(old); - nni_lmq_put(&p->rslmq, qmsg); - } - } else { - nni_lmq_put(&p->rslmq, qmsg); - } } + // else { + // if (nni_lmq_full(&p->rslmq)) { + // // Make space for the new message. TODO add max + // // limit of msgq len in conf + // if (nni_lmq_cap(&p->rslmq) <= + // NNG_TRAN_MAX_LMQ_SIZE) { + // if ((rv = nni_lmq_resize(&p->rslmq, + // nni_lmq_cap(&p->rslmq) * + // 2)) == 0) { + // nni_lmq_put(&p->rslmq, qmsg); + // } else { + // // memory error. + // nni_msg_free(qmsg); + // } + // } else { + // nni_msg *old; + // (void) nni_lmq_get(&p->rslmq, &old); + // nni_msg_free(old); + // nni_lmq_put(&p->rslmq, qmsg); + // } + // } else { + // nni_lmq_put(&p->rslmq, qmsg); + // } + // } ack = false; } @@ -779,9 +783,9 @@ mqtts_tcptran_pipe_recv_cb(void *arg) if (!nni_list_empty(&p->recvq)) { mqtts_tcptran_pipe_recv_start(p); } + nni_aio_set_msg(aio, msg); nni_mtx_unlock(&p->mtx); - nni_aio_finish_sync(aio, 0, n); return; diff --git a/src/supplemental/mqtt/mqtt_codec.c b/src/supplemental/mqtt/mqtt_codec.c index 96d8d26e..31b8c662 100644 --- a/src/supplemental/mqtt/mqtt_codec.c +++ b/src/supplemental/mqtt/mqtt_codec.c @@ -3731,10 +3731,10 @@ int property_dup(property **dup, const property *src) { property *item = NULL; - if (src == NULL) { + property *list = property_alloc(); + if (src == NULL || list == NULL) { return -1; } - property *list = property_alloc(); for (property *p = src->next; p != NULL; p = p->next) { property_type_enum type = property_get_value_type(p->id);