diff --git a/src/internal_modules/roc_pipeline/receiver_endpoint.cpp b/src/internal_modules/roc_pipeline/receiver_endpoint.cpp index 210eabf5f..411cdd435 100644 --- a/src/internal_modules/roc_pipeline/receiver_endpoint.cpp +++ b/src/internal_modules/roc_pipeline/receiver_endpoint.cpp @@ -199,7 +199,7 @@ packet::IWriter& ReceiverEndpoint::inbound_writer() { return *this; } -status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_time) { +status::StatusCode ReceiverEndpoint::pull_packets() { roc_panic_if(init_status_ != status::StatusOK); roc_panic_if(!parser_); @@ -209,7 +209,7 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti // queue were added in a very short time or are being added currently. It's // acceptable to consider such packets late and pull them next time. while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) { - const status::StatusCode code = handle_packet_(packet, current_time); + const status::StatusCode code = handle_packet_(packet); state_tracker_.unregister_packet(); if (code != status::StatusOK) { @@ -220,14 +220,13 @@ status::StatusCode ReceiverEndpoint::pull_packets(core::nanoseconds_t current_ti return status::StatusOK; } -status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +status::StatusCode ReceiverEndpoint::handle_packet_(const packet::PacketPtr& packet) { if (!parser_->parse(*packet, packet->buffer())) { roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't parse"); return status::StatusOK; } - const status::StatusCode code = session_group_.route_packet(packet, current_time); + const status::StatusCode code = session_group_.route_packet(packet); if (code == status::StatusNoRoute) { roc_log(LogDebug, "receiver endpoint: dropping bad packet: can't route"); diff --git a/src/internal_modules/roc_pipeline/receiver_endpoint.h b/src/internal_modules/roc_pipeline/receiver_endpoint.h index d997080cc..440d3313c 100644 --- a/src/internal_modules/roc_pipeline/receiver_endpoint.h +++ b/src/internal_modules/roc_pipeline/receiver_endpoint.h @@ -93,13 +93,12 @@ class ReceiverEndpoint : public core::RefCountedhas_flags(packet::Packet::FlagControl)) { - return route_control_packet_(packet, current_time); + return route_control_packet_(packet); } return route_transport_packet_(packet); @@ -344,15 +343,14 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) { } status::StatusCode -ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) { if (!rtcp_communicator_) { roc_panic("session group: rtcp communicator is null"); } // This will invoke IParticipant methods implemented by us, // in particular notify_recv_stream() and maybe halt_recv_stream(). - return rtcp_communicator_->process_packet(packet, current_time); + return rtcp_communicator_->process_packet(packet); } bool ReceiverSessionGroup::can_create_session_(const packet::PacketPtr& packet) { diff --git a/src/internal_modules/roc_pipeline/receiver_session_group.h b/src/internal_modules/roc_pipeline/receiver_session_group.h index 946a5d367..f3bca39c9 100644 --- a/src/internal_modules/roc_pipeline/receiver_session_group.h +++ b/src/internal_modules/roc_pipeline/receiver_session_group.h @@ -91,8 +91,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip void reclock_sessions(core::nanoseconds_t playback_time); //! Route packet to session. - ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); + ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet); //! Get number of sessions in group. size_t num_sessions() const; @@ -130,8 +129,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip virtual void halt_recv_stream(packet::stream_source_t send_source_id); status::StatusCode route_transport_packet_(const packet::PacketPtr& packet); - status::StatusCode route_control_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); + status::StatusCode route_control_packet_(const packet::PacketPtr& packet); bool can_create_session_(const packet::PacketPtr& packet); diff --git a/src/internal_modules/roc_pipeline/receiver_slot.cpp b/src/internal_modules/roc_pipeline/receiver_slot.cpp index 38a4cf46a..51781d5c2 100644 --- a/src/internal_modules/roc_pipeline/receiver_slot.cpp +++ b/src/internal_modules/roc_pipeline/receiver_slot.cpp @@ -84,19 +84,19 @@ status::StatusCode ReceiverSlot::refresh(core::nanoseconds_t current_time, status::StatusCode code = status::NoStatus; if (source_endpoint_) { - if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = source_endpoint_->pull_packets()) != status::StatusOK) { return code; } } if (repair_endpoint_) { - if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) { return code; } } if (control_endpoint_) { - if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = control_endpoint_->pull_packets()) != status::StatusOK) { return code; } } diff --git a/src/internal_modules/roc_pipeline/sender_endpoint.cpp b/src/internal_modules/roc_pipeline/sender_endpoint.cpp index cf8748a69..90e7f3057 100644 --- a/src/internal_modules/roc_pipeline/sender_endpoint.cpp +++ b/src/internal_modules/roc_pipeline/sender_endpoint.cpp @@ -184,7 +184,7 @@ packet::IWriter* SenderEndpoint::inbound_writer() { return this; } -status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time) { +status::StatusCode SenderEndpoint::pull_packets() { roc_panic_if(init_status_ != status::StatusOK); if (!parser_) { @@ -197,7 +197,7 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time // queue were added in a very short time or are being added currently. It's // acceptable to consider such packets late and pull them next time. while (packet::PacketPtr packet = inbound_queue_.try_pop_front_exclusive()) { - const status::StatusCode code = handle_packet_(packet, current_time); + const status::StatusCode code = handle_packet_(packet); state_tracker_.unregister_packet(); if (code != status::StatusOK) { @@ -208,14 +208,13 @@ status::StatusCode SenderEndpoint::pull_packets(core::nanoseconds_t current_time return status::StatusOK; } -status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +status::StatusCode SenderEndpoint::handle_packet_(const packet::PacketPtr& packet) { if (!parser_->parse(*packet, packet->buffer())) { roc_log(LogDebug, "sender endpoint: dropping bad packet: can't parse"); return status::StatusOK; } - const status::StatusCode code = sender_session_.route_packet(packet, current_time); + const status::StatusCode code = sender_session_.route_packet(packet); if (code == status::StatusNoRoute) { roc_log(LogDebug, "sender endpoint: dropping bad packet: can't route"); diff --git a/src/internal_modules/roc_pipeline/sender_endpoint.h b/src/internal_modules/roc_pipeline/sender_endpoint.h index 751a7820b..79334d5e5 100644 --- a/src/internal_modules/roc_pipeline/sender_endpoint.h +++ b/src/internal_modules/roc_pipeline/sender_endpoint.h @@ -91,13 +91,12 @@ class SenderEndpoint : public core::NonCopyable<>, private packet::IWriter { //! Packets are written to inbound_writer() from network thread. //! They don't appear in pipeline immediately. Instead, pipeline thread //! should periodically call pull_packets() to make them available. - ROC_ATTR_NODISCARD status::StatusCode pull_packets(core::nanoseconds_t current_time); + ROC_ATTR_NODISCARD status::StatusCode pull_packets(); private: virtual ROC_ATTR_NODISCARD status::StatusCode write(const packet::PacketPtr& packet); - status::StatusCode handle_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); + status::StatusCode handle_packet_(const packet::PacketPtr& packet); const address::Protocol proto_; diff --git a/src/internal_modules/roc_pipeline/sender_session.cpp b/src/internal_modules/roc_pipeline/sender_session.cpp index e8936b634..d07cc4e61 100644 --- a/src/internal_modules/roc_pipeline/sender_session.cpp +++ b/src/internal_modules/roc_pipeline/sender_session.cpp @@ -281,8 +281,7 @@ status::StatusCode SenderSession::refresh(core::nanoseconds_t current_time, return status::StatusOK; } -status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet) { roc_panic_if(init_status_ != status::StatusOK); if (fail_status_ != status::NoStatus) { @@ -294,7 +293,7 @@ status::StatusCode SenderSession::route_packet(const packet::PacketPtr& packet, roc_panic("sender session: unexpected non-control packet"); } - return route_control_packet_(packet, current_time); + return route_control_packet_(packet); } status::StatusCode SenderSession::write(audio::Frame& frame) { @@ -439,15 +438,13 @@ void SenderSession::start_feedback_monitor_() { feedback_monitor_->start(); } -status::StatusCode -SenderSession::route_control_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +status::StatusCode SenderSession::route_control_packet_(const packet::PacketPtr& packet) { if (!rtcp_communicator_) { roc_panic("sender session: rtcp communicator is null"); } // This will invoke IParticipant methods implemented by us. - return rtcp_communicator_->process_packet(packet, current_time); + return rtcp_communicator_->process_packet(packet); } } // namespace pipeline diff --git a/src/internal_modules/roc_pipeline/sender_session.h b/src/internal_modules/roc_pipeline/sender_session.h index ba736ff5e..193a2c5fb 100644 --- a/src/internal_modules/roc_pipeline/sender_session.h +++ b/src/internal_modules/roc_pipeline/sender_session.h @@ -96,8 +96,7 @@ class SenderSession : public core::NonCopyable<>, //! This way feedback packets from receiver reach sender pipeline. //! Packets are stored inside internal pipeline queues, and then fetched //! when frame are passed from frame_writer(). - ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); + ROC_ATTR_NODISCARD status::StatusCode route_packet(const packet::PacketPtr& packet); //! Get slot metrics. //! @remarks @@ -133,8 +132,7 @@ class SenderSession : public core::NonCopyable<>, void start_feedback_monitor_(); - status::StatusCode route_control_packet_(const packet::PacketPtr& packet, - core::nanoseconds_t current_time); + status::StatusCode route_control_packet_(const packet::PacketPtr& packet); core::IArena& arena_; diff --git a/src/internal_modules/roc_pipeline/sender_slot.cpp b/src/internal_modules/roc_pipeline/sender_slot.cpp index b51cea8b1..1dcb12886 100644 --- a/src/internal_modules/roc_pipeline/sender_slot.cpp +++ b/src/internal_modules/roc_pipeline/sender_slot.cpp @@ -142,19 +142,19 @@ status::StatusCode SenderSlot::refresh(core::nanoseconds_t current_time, status::StatusCode code = status::NoStatus; if (source_endpoint_) { - if ((code = source_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = source_endpoint_->pull_packets()) != status::StatusOK) { return code; } } if (repair_endpoint_) { - if ((code = repair_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = repair_endpoint_->pull_packets()) != status::StatusOK) { return code; } } if (control_endpoint_) { - if ((code = control_endpoint_->pull_packets(current_time)) != status::StatusOK) { + if ((code = control_endpoint_->pull_packets()) != status::StatusOK) { return code; } } diff --git a/src/internal_modules/roc_rtcp/communicator.cpp b/src/internal_modules/roc_rtcp/communicator.cpp index ab5c27e5f..7d441bcf7 100644 --- a/src/internal_modules/roc_rtcp/communicator.cpp +++ b/src/internal_modules/roc_rtcp/communicator.cpp @@ -72,8 +72,7 @@ size_t Communicator::total_streams() const { return reporter_.total_streams(); } -status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet, - core::nanoseconds_t current_time) { +status::StatusCode Communicator::process_packet(const packet::PacketPtr& packet) { roc_panic_if(init_status_ != status::StatusOK); roc_panic_if_msg(!packet, "rtcp communicator: null packet"); diff --git a/src/internal_modules/roc_rtcp/communicator.h b/src/internal_modules/roc_rtcp/communicator.h index f4d87fcbc..d2de6bf0a 100644 --- a/src/internal_modules/roc_rtcp/communicator.h +++ b/src/internal_modules/roc_rtcp/communicator.h @@ -76,8 +76,7 @@ class Communicator : public core::NonCopyable<> { //! Parse and process incoming packet. //! Invokes IParticipant methods during processing. - ROC_ATTR_NODISCARD status::StatusCode - process_packet(const packet::PacketPtr& packet, core::nanoseconds_t current_time); + ROC_ATTR_NODISCARD status::StatusCode process_packet(const packet::PacketPtr& packet); //! When we should generate packets next time. //! Returns absolute time. diff --git a/src/tests/roc_pipeline/test_helpers/control_writer.h b/src/tests/roc_pipeline/test_helpers/control_writer.h index ce7720e59..6b8279c4d 100644 --- a/src/tests/roc_pipeline/test_helpers/control_writer.h +++ b/src/tests/roc_pipeline/test_helpers/control_writer.h @@ -180,6 +180,7 @@ class ControlWriter : public core::NonCopyable<> { pp->udp()->src_addr = src_addr_; pp->udp()->dst_addr = dst_addr_; + pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix); pp->set_buffer(buffer); diff --git a/src/tests/roc_rtcp/test_communicator.cpp b/src/tests/roc_rtcp/test_communicator.cpp index 9dff46388..71f01890f 100644 --- a/src/tests/roc_rtcp/test_communicator.cpp +++ b/src/tests/roc_rtcp/test_communicator.cpp @@ -438,13 +438,16 @@ void expect_recv_report(const RecvReport& report, } } -packet::PacketPtr read_packet(packet::FifoQueue& source) { +packet::PacketPtr read_packet(packet::FifoQueue& source, core::nanoseconds_t ts = -1) { CHECK(source.size() != 0); packet::PacketPtr pp; LONGS_EQUAL(status::StatusOK, source.read(pp, packet::ModeFetch)); CHECK(pp); CHECK(pp->rtcp()); CHECK(pp->rtcp()->payload); + if (ts >= 0) { + pp->udp()->receive_timestamp = ts; + } roc_log(LogTrace, "delivering rtcp packet"); if (core::Logger::instance().get_level() >= LogTrace) { print_packet(pp->rtcp()->payload); @@ -655,7 +658,7 @@ TEST(communicator, one_sender_one_receiver) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -678,7 +681,7 @@ TEST(communicator, one_sender_one_receiver) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed3)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); CHECK_EQUAL(1, send_comm.total_destinations()); @@ -729,7 +732,7 @@ TEST(communicator, two_senders_one_receiver) { // Deliver sender 1 report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -750,7 +753,7 @@ TEST(communicator, two_senders_one_receiver) { // Deliver sender 2 report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(2, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -776,15 +779,17 @@ TEST(communicator, two_senders_one_receiver) { // Deliver receiver report to sender 1 and 2 packet::PacketPtr pp = read_packet(recv_queue); + pp->udp()->receive_timestamp = send1_time; send1_part.set_send_report( make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed5)); - LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp, send1_time)); + LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp)); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); + pp->udp()->receive_timestamp = send2_time; send2_part.set_send_report( make_send_report(send2_time, Send2Cname, Send2Ssrc, Seed6)); - LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp, send2_time)); + LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp)); CHECK_EQUAL(1, send2_comm.total_streams()); CHECK_EQUAL(1, send2_comm.total_destinations()); @@ -839,10 +844,12 @@ TEST(communicator, one_sender_two_receivers) { // Deliver sender report to receiver 1 and 2 packet::PacketPtr pp = read_packet(send_queue); - LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp, recv1_time)); + pp->udp()->receive_timestamp = recv1_time; + LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp)); CHECK_EQUAL(1, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); - LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp, recv2_time)); + pp->udp()->receive_timestamp = recv2_time; + LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp)); CHECK_EQUAL(1, recv2_comm.total_streams()); CHECK_EQUAL(0, recv2_comm.total_destinations()); @@ -870,7 +877,7 @@ TEST(communicator, one_sender_two_receivers) { // Deliver receiver 1 report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed3)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv1_queue), send_time)); + send_comm.process_packet(read_packet(recv1_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); CHECK_EQUAL(1, send_comm.total_destinations()); @@ -893,7 +900,7 @@ TEST(communicator, one_sender_two_receivers) { // Deliver receiver 1 report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed5)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv2_queue), send_time)); + send_comm.process_packet(read_packet(recv2_queue, send_time))); CHECK_EQUAL(2, send_comm.total_streams()); CHECK_EQUAL(1, send_comm.total_destinations()); @@ -937,7 +944,7 @@ TEST(communicator, receiver_report_first) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed2)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); CHECK_EQUAL(1, send_comm.total_destinations()); @@ -960,7 +967,7 @@ TEST(communicator, receiver_report_first) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed3)); LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(1, recv_comm.total_destinations()); @@ -1005,7 +1012,7 @@ TEST(communicator, bidirectional_peers) { // Deliver report to peer 2 LONGS_EQUAL(status::StatusOK, - peer2_comm.process_packet(read_packet(peer1_queue), peer2_time)); + peer2_comm.process_packet(read_packet(peer1_queue, peer2_time))); CHECK_EQUAL(1, peer2_comm.total_streams()); CHECK_EQUAL(0, peer2_comm.total_destinations()); @@ -1033,7 +1040,7 @@ TEST(communicator, bidirectional_peers) { peer1_part.set_recv_report( 0, make_recv_report(peer1_time, Peer1Cname, Peer1Ssrc, Peer2Ssrc, Seed4)); LONGS_EQUAL(status::StatusOK, - peer1_comm.process_packet(read_packet(peer2_queue), peer1_time)); + peer1_comm.process_packet(read_packet(peer2_queue, peer1_time))); CHECK_EQUAL(1, peer1_comm.total_streams()); CHECK_EQUAL(1, peer1_comm.total_destinations()); @@ -1063,7 +1070,7 @@ TEST(communicator, bidirectional_peers) { peer2_part.set_recv_report( 0, make_recv_report(peer2_time, Peer2Cname, Peer2Ssrc, Peer1Ssrc, Seed6)); LONGS_EQUAL(status::StatusOK, - peer2_comm.process_packet(read_packet(peer1_queue), peer2_time)); + peer2_comm.process_packet(read_packet(peer1_queue, peer2_time))); CHECK_EQUAL(1, peer2_comm.total_streams()); CHECK_EQUAL(1, peer2_comm.total_destinations()); @@ -1134,21 +1141,23 @@ TEST(communicator, long_run) { // Deliver sender 1 report to receiver 1 and 2 pp = read_packet(send1_queue); + pp->udp()->receive_timestamp = recv1_time; if (iter != 0) { recv1_part.set_recv_report( 0, make_recv_report(recv1_time, Recv1Cname, Recv1Ssrc, Send1Ssrc, seed)); recv1_part.set_recv_report( 1, make_recv_report(recv1_time, Recv1Cname, Recv1Ssrc, Send2Ssrc, seed)); } - LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp, recv1_time)); + LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp)); + pp->udp()->receive_timestamp = recv2_time; if (iter != 0) { recv2_part.set_recv_report( 0, make_recv_report(recv2_time, Recv2Cname, Recv2Ssrc, Send1Ssrc, seed)); recv2_part.set_recv_report( 1, make_recv_report(recv2_time, Recv2Cname, Recv2Ssrc, Send2Ssrc, seed)); } - LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp, recv2_time)); + LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp)); advance_time(send1_time); advance_time(send2_time); @@ -1163,22 +1172,23 @@ TEST(communicator, long_run) { // Deliver sender 2 report to receiver 1 and 2 pp = read_packet(send2_queue); - + pp->udp()->receive_timestamp = recv1_time; if (iter != 0) { recv1_part.set_recv_report( 0, make_recv_report(recv1_time, Recv1Cname, Recv1Ssrc, Send1Ssrc, seed)); recv1_part.set_recv_report( 1, make_recv_report(recv1_time, Recv1Cname, Recv1Ssrc, Send2Ssrc, seed)); } - LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp, recv1_time)); + LONGS_EQUAL(status::StatusOK, recv1_comm.process_packet(pp)); + pp->udp()->receive_timestamp = recv2_time; if (iter != 0) { recv2_part.set_recv_report( 0, make_recv_report(recv2_time, Recv2Cname, Recv2Ssrc, Send1Ssrc, seed)); recv2_part.set_recv_report( 1, make_recv_report(recv2_time, Recv2Cname, Recv2Ssrc, Send2Ssrc, seed)); } - LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp, recv2_time)); + LONGS_EQUAL(status::StatusOK, recv2_comm.process_packet(pp)); advance_time(send1_time); advance_time(send2_time); @@ -1196,13 +1206,15 @@ TEST(communicator, long_run) { // Deliver receiver 1 report to sender 1 and 2 pp = read_packet(recv1_queue); + pp->udp()->receive_timestamp = send1_time; send1_part.set_send_report( make_send_report(send1_time, Send1Cname, Send1Ssrc, seed)); - LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp, send1_time)); + LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp)); + pp->udp()->receive_timestamp = send2_time; send2_part.set_send_report( make_send_report(send2_time, Send2Cname, Send2Ssrc, seed)); - LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp, send2_time)); + LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp)); advance_time(send1_time); advance_time(send2_time); @@ -1220,13 +1232,15 @@ TEST(communicator, long_run) { // Deliver receiver 2 report to sender 1 and 2 pp = read_packet(recv2_queue); + pp->udp()->receive_timestamp = send1_time; send1_part.set_send_report( make_send_report(send1_time, Send1Cname, Send1Ssrc, seed)); - LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp, send1_time)); + LONGS_EQUAL(status::StatusOK, send1_comm.process_packet(pp)); + pp->udp()->receive_timestamp = send2_time; send2_part.set_send_report( make_send_report(send2_time, Send2Cname, Send2Ssrc, seed)); - LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp, send2_time)); + LONGS_EQUAL(status::StatusOK, send2_comm.process_packet(pp)); advance_time(send1_time); advance_time(send2_time); @@ -1304,7 +1318,7 @@ TEST(communicator, halt_goodbye) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1325,7 +1339,7 @@ TEST(communicator, halt_goodbye) { // Deliver sender goodbye to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(0, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1381,7 +1395,7 @@ TEST(communicator, halt_timeout) { // Deliver sender 1 report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(iter == 0 ? 1 : 2, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1404,7 +1418,7 @@ TEST(communicator, halt_timeout) { // Deliver sender 2 report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(2, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1427,7 +1441,7 @@ TEST(communicator, halt_timeout) { // Deliver sender 1 report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1494,7 +1508,7 @@ TEST(communicator, halt_cname_change) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1515,7 +1529,7 @@ TEST(communicator, halt_cname_change) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1576,7 +1590,7 @@ TEST(communicator, cname_comes_earlier) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1595,7 +1609,7 @@ TEST(communicator, cname_comes_earlier) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1655,7 +1669,7 @@ TEST(communicator, cname_comes_later) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send1_queue), recv_time)); + recv_comm.process_packet(read_packet(send1_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1675,7 +1689,7 @@ TEST(communicator, cname_comes_later) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); CHECK_EQUAL(0, recv_comm.total_destinations()); @@ -1734,7 +1748,7 @@ TEST(communicator, collision_send_report) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -1764,7 +1778,7 @@ TEST(communicator, collision_send_report) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrcA, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(2, recv_comm.total_streams()); CHECK_EQUAL(1, recv_comm.total_destinations()); @@ -1795,7 +1809,7 @@ TEST(communicator, collision_send_report) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(0, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -1819,7 +1833,7 @@ TEST(communicator, collision_send_report) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -1876,7 +1890,7 @@ TEST(communicator, collision_recv_report) { // Deliver report from sender to receiver 1 LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send_queue, recv1_time))); CHECK_EQUAL(1, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -1908,7 +1922,7 @@ TEST(communicator, collision_recv_report) { // same SSRC as sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrcA, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv2_queue), send_time)); + send_comm.process_packet(read_packet(recv2_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); CHECK_EQUAL(1, send_comm.total_destinations()); @@ -1937,7 +1951,7 @@ TEST(communicator, collision_recv_report) { // Deliver report from sender to receiver 1 LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send_queue, recv1_time))); CHECK_EQUAL(0, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -1959,7 +1973,7 @@ TEST(communicator, collision_recv_report) { // Deliver report from sender to receiver 1 LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send_queue, recv1_time))); CHECK_EQUAL(1, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -2018,7 +2032,7 @@ TEST(communicator, collision_unrelated_recv_report) { // Deliver report from sender 1 to receiver 1 LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send1_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send1_queue, recv1_time))); CHECK_EQUAL(1, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -2051,7 +2065,7 @@ TEST(communicator, collision_unrelated_recv_report) { send1_part.set_send_report( make_send_report(send1_time, Send1Cname, Send1SsrcA, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv2_queue), send1_time)); + send1_comm.process_packet(read_packet(recv2_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2078,7 +2092,7 @@ TEST(communicator, collision_unrelated_recv_report) { send1_part.set_send_report( make_send_report(send1_time, Send1Cname, Send1SsrcB, Seed)); LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send1_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send1_queue, recv1_time))); CHECK_EQUAL(0, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -2101,7 +2115,7 @@ TEST(communicator, collision_unrelated_recv_report) { // Deliver report from sender 1 to receiver 1 LONGS_EQUAL(status::StatusOK, - recv1_comm.process_packet(read_packet(send1_queue), recv1_time)); + recv1_comm.process_packet(read_packet(send1_queue, recv1_time))); CHECK_EQUAL(1, recv1_comm.total_streams()); CHECK_EQUAL(0, recv1_comm.total_destinations()); @@ -2165,7 +2179,7 @@ TEST(communicator, collision_sdes_different_cname) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2195,7 +2209,7 @@ TEST(communicator, collision_sdes_different_cname) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrcA, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(2, recv_comm.total_streams()); CHECK_EQUAL(1, recv_comm.total_destinations()); @@ -2224,7 +2238,7 @@ TEST(communicator, collision_sdes_different_cname) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(0, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2248,7 +2262,7 @@ TEST(communicator, collision_sdes_different_cname) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2312,7 +2326,7 @@ TEST(communicator, collision_sdes_same_cname) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2338,7 +2352,7 @@ TEST(communicator, collision_sdes_same_cname) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send2_queue), recv_time)); + recv_comm.process_packet(read_packet(send2_queue, recv_time))); CHECK_EQUAL(2, recv_comm.total_streams()); CHECK_EQUAL(1, recv_comm.total_destinations()); @@ -2364,7 +2378,7 @@ TEST(communicator, collision_sdes_same_cname) { // Deliver report from receiver to sender 1 send1_part.set_send_report(make_send_report(send1_time, Send1Cname, Send1Ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - send1_comm.process_packet(read_packet(recv_queue), send1_time)); + send1_comm.process_packet(read_packet(recv_queue, send1_time))); CHECK_EQUAL(1, send1_comm.total_streams()); CHECK_EQUAL(1, send1_comm.total_destinations()); @@ -2407,7 +2421,7 @@ TEST(communicator, network_loop) { // Deliver report to remote peer LONGS_EQUAL(status::StatusOK, - remote_comm.process_packet(read_packet(local_queue), remote_time)); + remote_comm.process_packet(read_packet(local_queue, remote_time))); // Check notifications on remote peer CHECK_EQUAL(1, remote_part.pending_notifications()); @@ -2430,7 +2444,7 @@ TEST(communicator, network_loop) { local_part.set_recv_report( 0, make_recv_report(local_time, LocalCname, LocalSsrc, RemoteSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - local_comm.process_packet(read_packet(remote_queue), local_time)); + local_comm.process_packet(read_packet(remote_queue, local_time))); // Check notifications on local peer CHECK_EQUAL(2, local_part.pending_notifications()); @@ -2451,7 +2465,7 @@ TEST(communicator, network_loop) { // Loop report back to local peer LONGS_EQUAL(status::StatusOK, - local_comm.process_packet(read_packet(local_queue), local_time)); + local_comm.process_packet(read_packet(local_queue, local_time))); // Expect no notifications CHECK_EQUAL(0, local_part.pending_notifications()); @@ -2513,7 +2527,7 @@ TEST(communicator, missing_sender_sdes) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); // Check notifications on receiver @@ -2560,7 +2574,7 @@ TEST(communicator, missing_receiver_sdes) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); // Check notifications on sender @@ -2606,7 +2620,7 @@ TEST(communicator, missing_sender_sr) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); // Check notifications on receiver @@ -2652,7 +2666,7 @@ TEST(communicator, missing_receiver_rr) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); // Check notifications on sender @@ -2696,7 +2710,7 @@ TEST(communicator, missing_sender_xr) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); // Check notifications on receiver @@ -2744,7 +2758,7 @@ TEST(communicator, missing_receiver_xr) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); CHECK_EQUAL(1, send_comm.total_streams()); // Check notifications on sender @@ -2799,7 +2813,7 @@ TEST(communicator, split_sender_report) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); // Check notifications on sender CHECK_EQUAL(1, send_part.pending_notifications()); @@ -2833,7 +2847,7 @@ TEST(communicator, split_sender_report) { recv_part.set_recv_report( 0, make_recv_report(recv_time, recv_cname, recv_ssrc, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); } CHECK_EQUAL(1, recv_comm.total_streams()); @@ -2887,7 +2901,7 @@ TEST(communicator, split_receiver_report) { send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); } CHECK_EQUAL(1, send_comm.total_streams()); @@ -2946,7 +2960,7 @@ TEST(communicator, split_bidirectional_report) { local_part.set_send_report( make_send_report(local_time, local_cname, LocalSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - local_comm.process_packet(read_packet(remote_queue), local_time)); + local_comm.process_packet(read_packet(remote_queue, local_time))); // Check notifications on local peer CHECK_EQUAL(1, local_part.pending_notifications()); @@ -2986,7 +3000,7 @@ TEST(communicator, split_bidirectional_report) { remote_part.set_send_report( make_send_report(remote_time, remote_cname, remote_ssrc, Seed)); LONGS_EQUAL(status::StatusOK, - remote_comm.process_packet(read_packet(local_queue), remote_time)); + remote_comm.process_packet(read_packet(local_queue, remote_time))); } CHECK_EQUAL(1, remote_comm.total_streams()); @@ -3068,10 +3082,10 @@ TEST(communicator, report_to_address_sender) { CHECK_EQUAL(1, recv1_queue.size()); // Deliver receiver 1 report to sender - pp = read_packet(recv1_queue); + pp = read_packet(recv1_queue, send_time); set_src_address(pp, recv1_addr); send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); CHECK_EQUAL(1, send_comm.total_streams()); // Check notifications on sender @@ -3093,7 +3107,7 @@ TEST(communicator, report_to_address_sender) { CHECK_EQUAL(1, send_comm.total_destinations()); CHECK_EQUAL(1, send_queue.size()); - pp = read_packet(send_queue); + pp = read_packet(send_queue, send_time); expect_has_dest_address(pp, send_dest_addr); expect_has_orig_ssrc(pp, SendSsrc, true); expect_has_dest_ssrc(pp, Recv1Ssrc, true); @@ -3110,10 +3124,10 @@ TEST(communicator, report_to_address_sender) { CHECK_EQUAL(1, recv2_queue.size()); // Deliver receiver 2 report to sender - pp = read_packet(recv2_queue); + pp = read_packet(recv2_queue, send_time); set_src_address(pp, recv2_addr); send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); CHECK_EQUAL(2, send_comm.total_streams()); // Check notifications on sender @@ -3241,10 +3255,10 @@ TEST(communicator, report_back_sender) { CHECK_EQUAL(1, recv1_queue.size()); // Deliver receiver 1 report to sender - pp = read_packet(recv1_queue); + pp = read_packet(recv1_queue, send_time); set_src_address(pp, recv1_addr); send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); CHECK_EQUAL(1, send_comm.total_streams()); // Check notifications on sender @@ -3265,7 +3279,7 @@ TEST(communicator, report_back_sender) { CHECK_EQUAL(1, send_comm.total_destinations()); CHECK_EQUAL(1, send_queue.size()); - pp = read_packet(send_queue); + pp = read_packet(send_queue, send_time); expect_has_dest_address(pp, recv1_addr); expect_has_orig_ssrc(pp, SendSsrc, true); expect_has_dest_ssrc(pp, Recv1Ssrc, true); @@ -3282,10 +3296,10 @@ TEST(communicator, report_back_sender) { CHECK_EQUAL(1, recv2_queue.size()); // Deliver receiver 2 report to sender - pp = read_packet(recv2_queue); + pp = read_packet(recv2_queue, send_time); set_src_address(pp, recv2_addr); send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); CHECK_EQUAL(2, send_comm.total_streams()); // Check notifications on sender @@ -3306,13 +3320,13 @@ TEST(communicator, report_back_sender) { CHECK_EQUAL(2, send_comm.total_destinations()); CHECK_EQUAL(2, send_queue.size()); - pp = read_packet(send_queue); + pp = read_packet(send_queue, send_time); expect_has_dest_address(pp, recv1_addr); expect_has_orig_ssrc(pp, SendSsrc, true); expect_has_dest_ssrc(pp, Recv1Ssrc, true); expect_has_dest_ssrc(pp, Recv2Ssrc, false); - pp = read_packet(send_queue); + pp = read_packet(send_queue, send_time); expect_has_dest_address(pp, recv2_addr); expect_has_orig_ssrc(pp, SendSsrc, true); expect_has_dest_ssrc(pp, Recv1Ssrc, false); @@ -3379,13 +3393,13 @@ TEST(communicator, report_back_receiver) { CHECK_EQUAL(1, send1_queue.size()); // Deliver sender 1 report to receiver - pp = read_packet(send1_queue); + pp = read_packet(send1_queue, recv_time); set_src_address(pp, send1_addr); recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, Send1Ssrc, Seed)); recv_part.set_recv_report( 1, make_recv_report(recv_time, RecvCname, RecvSsrc, Send2Ssrc, Seed)); - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); CHECK_EQUAL(2, recv_comm.total_streams()); // Check notifications on receiver @@ -3409,7 +3423,7 @@ TEST(communicator, report_back_receiver) { CHECK_EQUAL(1, recv_comm.total_destinations()); CHECK_EQUAL(1, recv_queue.size()); - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, send1_time); expect_has_dest_address(pp, send1_addr); expect_has_orig_ssrc(pp, RecvSsrc, true); expect_has_dest_ssrc(pp, Send1Ssrc, true); @@ -3425,13 +3439,13 @@ TEST(communicator, report_back_receiver) { CHECK_EQUAL(1, send2_queue.size()); // Deliver sender 2 report to receiver - pp = read_packet(send2_queue); + pp = read_packet(send2_queue, recv_time); set_src_address(pp, send2_addr); recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, Send1Ssrc, Seed)); recv_part.set_recv_report( 1, make_recv_report(recv_time, RecvCname, RecvSsrc, Send2Ssrc, Seed)); - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); CHECK_EQUAL(2, recv_comm.total_streams()); // Check notifications on receiver @@ -3455,13 +3469,13 @@ TEST(communicator, report_back_receiver) { CHECK_EQUAL(2, recv_comm.total_destinations()); CHECK_EQUAL(2, recv_queue.size()); - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, recv_time); expect_has_dest_address(pp, send1_addr); expect_has_orig_ssrc(pp, RecvSsrc, true); expect_has_dest_ssrc(pp, Send1Ssrc, true); expect_has_dest_ssrc(pp, Send2Ssrc, false); - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, recv_time); expect_has_dest_address(pp, send2_addr); expect_has_orig_ssrc(pp, RecvSsrc, true); expect_has_dest_ssrc(pp, Send1Ssrc, false); @@ -3521,9 +3535,9 @@ TEST(communicator, report_back_combine_reports) { CHECK_EQUAL(1, send1_queue.size()); // Deliver sender 1 report to receiver - pp = read_packet(send1_queue); + pp = read_packet(send1_queue, recv_time); set_src_address(pp, send1_addr); - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); CHECK_EQUAL(1, recv_comm.total_streams()); // Check notifications on receiver @@ -3542,9 +3556,9 @@ TEST(communicator, report_back_combine_reports) { CHECK_EQUAL(1, send2_queue.size()); // Deliver sender 2 report to receiver - pp = read_packet(send2_queue); + pp = read_packet(send2_queue, recv_time); set_src_address(pp, send2_addr); - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); CHECK_EQUAL(2, recv_comm.total_streams()); // Check notifications on receiver @@ -3563,9 +3577,9 @@ TEST(communicator, report_back_combine_reports) { CHECK_EQUAL(1, send3_queue.size()); // Deliver sender 3 report to receiver - pp = read_packet(send3_queue); + pp = read_packet(send3_queue, recv_time); set_src_address(pp, send3_addr); - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); CHECK_EQUAL(3, recv_comm.total_streams()); // Check notifications on receiver @@ -3592,14 +3606,14 @@ TEST(communicator, report_back_combine_reports) { CHECK_EQUAL(2, recv_comm.total_destinations()); CHECK_EQUAL(2, recv_queue.size()); - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, send3_time); expect_has_dest_address(pp, send1_addr); expect_has_orig_ssrc(pp, RecvSsrc, true); expect_has_dest_ssrc(pp, Send1Ssrc, true); expect_has_dest_ssrc(pp, Send2Ssrc, true); expect_has_dest_ssrc(pp, Send3Ssrc, false); - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, send3_time); expect_has_dest_address(pp, send3_addr); expect_has_orig_ssrc(pp, RecvSsrc, true); expect_has_dest_ssrc(pp, Send1Ssrc, false); @@ -3660,9 +3674,9 @@ TEST(communicator, report_back_split_reports) { CHECK_EQUAL(1, remote_queue.size()); // Deliver remote peer report to local peer - packet::PacketPtr pp = read_packet(remote_queue); + packet::PacketPtr pp = read_packet(remote_queue, local_time); set_src_address(pp, group_addr[n_grp]); - LONGS_EQUAL(status::StatusOK, local_comm.process_packet(pp, local_time)); + LONGS_EQUAL(status::StatusOK, local_comm.process_packet(pp)); // Check notifications on local peer CHECK_EQUAL(1, local_part.pending_notifications()); @@ -3760,7 +3774,7 @@ TEST(communicator, rtt) { 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); { // Check metrics on receiver @@ -3796,7 +3810,7 @@ TEST(communicator, rtt) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); { // Check metrics on sender @@ -3864,7 +3878,7 @@ TEST(communicator, rtt_clock_drift) { 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); { // Check metrics on receiver @@ -3892,7 +3906,7 @@ TEST(communicator, rtt_clock_drift) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); { // Check metrics on sender @@ -3959,7 +3973,7 @@ TEST(communicator, rtt_network_jitter) { 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); { // Check metrics on receiver @@ -3986,7 +4000,7 @@ TEST(communicator, rtt_network_jitter) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); { // Check metrics on sender @@ -4049,14 +4063,14 @@ TEST(communicator, rtt_network_losses) { advance_time(recv_time, Rtt / 2); // Deliver sender report to receiver - pp = read_packet(send_queue); + pp = read_packet(send_queue, recv_time); if (!loss_send_report) { if (iter != 0) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); // Check metrics on receiver const SendReport report = recv_part.next_send_notification(); @@ -4080,12 +4094,12 @@ TEST(communicator, rtt_network_losses) { advance_time(recv_time, Rtt / 2); // Deliver receiver report to sender - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, send_time); if (!loss_recv_report) { send_part.set_send_report( make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); // Check metrics on sender const RecvReport report = send_part.next_recv_notification(); @@ -4152,14 +4166,14 @@ TEST(communicator, rtt_network_delays) { // Deliver sender reports to receiver if (!start_send_delay && send_delay_countdown == 0) { while (send_queue.size() != 0) { - packet::PacketPtr pp = read_packet(send_queue); + packet::PacketPtr pp = read_packet(send_queue, recv_time); if (iter != 0) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); // Check metrics on receiver const SendReport report = recv_part.next_send_notification(); @@ -4193,11 +4207,11 @@ TEST(communicator, rtt_network_delays) { // Deliver receiver reports to sender if (!start_recv_delay && recv_delay_countdown == 0) { while (recv_queue.size() != 0) { - packet::PacketPtr pp = read_packet(recv_queue); + packet::PacketPtr pp = read_packet(recv_queue, send_time); send_part.set_send_report( make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); // Check metrics on sender const RecvReport report = send_part.next_recv_notification(); @@ -4279,14 +4293,14 @@ TEST(communicator, rtt_network_reordering) { } while (send_queue.size() != 0) { - packet::PacketPtr pp = read_packet(send_queue); + packet::PacketPtr pp = read_packet(send_queue, recv_time); if (iter != 0) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); // Check metrics on receiver const SendReport report = recv_part.next_send_notification(); @@ -4328,11 +4342,11 @@ TEST(communicator, rtt_network_reordering) { } while (recv_queue.size() != 0) { - packet::PacketPtr pp = read_packet(recv_queue); + packet::PacketPtr pp = read_packet(recv_queue, send_time); send_part.set_send_report( make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); // Check metrics on sender const RecvReport report = send_part.next_recv_notification(); @@ -4346,7 +4360,7 @@ TEST(communicator, rtt_network_reordering) { recv_reorder_countdown = ReorderBurst; } recv_reorder_countdown--; - packet::PacketPtr pp = read_packet(recv_queue); + packet::PacketPtr pp = read_packet(recv_queue, recv_time); recv_packet_stash.push_back(*pp); } @@ -4418,13 +4432,13 @@ TEST(communicator, rtt_network_duplicates) { // Deliver sender reports to receiver while (send_queue.size() != 0) { - pp = read_packet(send_queue); + pp = read_packet(send_queue, recv_time); if (iter != 0) { recv_part.set_recv_report( 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } - LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp, recv_time)); + LONGS_EQUAL(status::StatusOK, recv_comm.process_packet(pp)); // Check metrics on receiver const SendReport report = recv_part.next_send_notification(); @@ -4462,11 +4476,11 @@ TEST(communicator, rtt_network_duplicates) { // Deliver receiver reports to sender while (recv_queue.size() != 0) { - pp = read_packet(recv_queue); + pp = read_packet(recv_queue, send_time); send_part.set_send_report( make_send_report(send_time, SendCname, SendSsrc, Seed)); - LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp, send_time)); + LONGS_EQUAL(status::StatusOK, send_comm.process_packet(pp)); // Check metrics on sender const RecvReport report = send_part.next_recv_notification(); @@ -4532,7 +4546,7 @@ TEST(communicator, rtt_missing_xr) { 0, make_recv_report(recv_time, RecvCname, RecvSsrc, SendSsrc, Seed)); } LONGS_EQUAL(status::StatusOK, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); { // Check metrics on receiver @@ -4556,7 +4570,7 @@ TEST(communicator, rtt_missing_xr) { // Deliver receiver report to sender send_part.set_send_report(make_send_report(send_time, SendCname, SendSsrc, Seed)); LONGS_EQUAL(status::StatusOK, - send_comm.process_packet(read_packet(recv_queue), send_time)); + send_comm.process_packet(read_packet(recv_queue, send_time))); { // Check metrics on sender @@ -4679,7 +4693,7 @@ TEST(communicator, processing_error) { // Deliver sender report to receiver const status::StatusCode status = - recv_comm.process_packet(read_packet(send_queue), recv_time); + recv_comm.process_packet(read_packet(send_queue, recv_time)); if (status == status::StatusOK) { // Check notifications on receiver @@ -4736,7 +4750,7 @@ TEST(communicator, notification_error) { // Deliver sender report to receiver LONGS_EQUAL(status::StatusDrain, - recv_comm.process_packet(read_packet(send_queue), recv_time)); + recv_comm.process_packet(read_packet(send_queue, recv_time))); CHECK_EQUAL(1, recv_comm.total_streams()); // Check notifications on receiver