diff --git a/tests/lib/clients/03-publish-fill-inflight.py b/tests/lib/clients/03-publish-fill-inflight.py new file mode 100644 index 00000000..7ce262d1 --- /dev/null +++ b/tests/lib/clients/03-publish-fill-inflight.py @@ -0,0 +1,38 @@ +import logging + +import paho.mqtt.client as mqtt + +from tests.paho_test import get_test_server_port, loop_until_keyboard_interrupt + +def expected_payload(i: int) -> bytes: + return f"message{i}".encode("utf8") + + +def on_message(mqttc, obj, msg): + assert msg.mid == 123, f"Invalid mid: ({msg.mid})" + assert msg.topic == "pub/qos1/receive", f"Invalid topic: ({msg.topic})" + assert msg.payload == expected_payload, f"Invalid payload: ({msg.payload})" + assert msg.qos == 1, f"Invalid qos: ({msg.qos})" + assert msg.retain is not False, f"Invalid retain: ({msg.retain})" + + +def on_connect(mqttc, obj, flags, rc): + assert rc == 0, f"Connect failed ({rc})" + for i in range(12): + mqttc.publish("topic", expected_payload(i), qos=1) + +def on_disconnect(mqttc, rc, properties): + logging.info("disconnected") + mqttc.reconnect() + +logging.basicConfig(level=logging.DEBUG) +logging.info(str(mqtt)) +mqttc = mqtt.Client("publish-qos1-test") +mqttc.max_inflight_messages_set(10) +mqttc.on_connect = on_connect +mqttc.on_disconnect = on_disconnect +mqttc.on_message = on_message +mqttc.enable_logger() + +mqttc.connect("localhost", get_test_server_port()) +loop_until_keyboard_interrupt(mqttc) diff --git a/tests/lib/test_03_publish_fill_inflight.py b/tests/lib/test_03_publish_fill_inflight.py new file mode 100644 index 00000000..c3d917f8 --- /dev/null +++ b/tests/lib/test_03_publish_fill_inflight.py @@ -0,0 +1,89 @@ +# Test whether a client responds to max-inflight and reconnect when max-inflight is reached + +# The client should connect with keepalive=60, clean session set, +# and client id publish-fill-inflight +# The test will send a CONNACK message to the client with rc=0. Upon receiving +# the CONNACK the client should verify that rc==0. +# Then client should send 10 PUBLISH with QoS == 1. On client side 12 message will be +# submitted, so 2 will be queued. +# The test will wait 0.5 seconds after received the 10 PUBLISH. After this wait, it will +# disconnect the client. +# The client should re-connect and re-sent the first 10 messages. +# The test will PUBACK one message, it should receive another PUBLISH. +# The test will wait 0.5 seconds and expect no PUBLISH. +# The test will then PUBACK all message. +# The client should disconnect once everything is acked. + +import pytest + +import tests.paho_test as paho_test + +def expected_payload(i: int) -> bytes: + return f"message{i}" + +connect_packet = paho_test.gen_connect("publish-qos1-test", keepalive=60) +connack_packet = paho_test.gen_connack(rc=0) + +disconnect_packet = paho_test.gen_disconnect() + +first_connection_publishs = [ + paho_test.gen_publish( + "topic", qos=1, mid=i+1, payload=expected_payload(i), + ) + for i in range(10) +] +second_connection_publishs = [ + paho_test.gen_publish( + # I'm not sure we should have the mid+13. + # Currently on reconnection client will do two wrong thing: + # * it sent more than max_inflight packet + # * it re-send message both with mid = old_mid + 12 AND with mid = old_mid & dup=1 + "topic", qos=1, mid=i+13, payload=expected_payload(i), + ) + for i in range(12) +] +second_connection_pubacks = [ + paho_test.gen_puback(i+13) + for i in range(12) +] + +@pytest.mark.xfail +def test_03_publish_fill_inflight(server_socket, start_client): + start_client("03-publish-fill-inflight.py") + + (conn, address) = server_socket.accept() + conn.settimeout(10) + + paho_test.expect_packet(conn, "connect", connect_packet) + conn.send(connack_packet) + + for packet in first_connection_publishs: + paho_test.expect_packet(conn, "publish", packet) + + paho_test.expect_no_packet(conn, 0.5) + + conn.close() + + (conn, address) = server_socket.accept() + conn.settimeout(10) + + paho_test.expect_packet(conn, "connect", connect_packet) + conn.send(connack_packet) + + for packet in second_connection_publishs[:10]: + paho_test.expect_packet(conn, "publish", packet) + + paho_test.expect_no_packet(conn, 0.2) + + conn.send(second_connection_pubacks[0]) + paho_test.expect_packet(conn, "publish", second_connection_publishs[10]) + + paho_test.expect_no_packet(conn, 0.5) + + for packet in second_connection_pubacks[1:11]: + conn.send(packet) + + paho_test.expect_packet(conn, "publish", second_connection_publishs[11]) + + paho_test.expect_no_packet(conn, 0.5) + diff --git a/tests/paho_test.py b/tests/paho_test.py index 3c3cb14f..572ae161 100644 --- a/tests/paho_test.py +++ b/tests/paho_test.py @@ -84,6 +84,27 @@ def expect_packet(sock, name, expected): assert packet_matches(name, packet_recvd, expected) return True +def expect_no_packet(sock, delay=1): + """ expect that nothing is received within given delay + """ + sock.settimeout(delay) + try: + previous_timeout = sock.gettimeout() + data = sock.recv(1024) + except socket.timeout: + data = None + finally: + sock.settimeout(previous_timeout) + + if data is not None: + try: + print("Received: " + to_string(data)) + except struct.error: + print("Received (not decoded): 0x" + + binascii.b2a_hex(data).decode('utf8')) + + assert data is None, "shouldn't receive any data" + def packet_matches(name, recvd, expected): if recvd != expected: