Skip to content

Commit

Permalink
Merge branch 'finos:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
topilski authored Dec 30, 2024
2 parents 02094a6 + b6af4db commit 25077fc
Show file tree
Hide file tree
Showing 37 changed files with 341 additions and 198 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/pr-unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ jobs:
fail-fast: false
matrix:
include:
- os: aws-linux
image: latest
build:
type: Release
- os: el8
image: latest
build:
Expand Down
15 changes: 15 additions & 0 deletions doc/api/READMEapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -1048,3 +1048,18 @@ GET /ice-candidates
"ufrag": "y2uBVS3RATC4Sd"
}
```

## Data Channel notifications
SMB send messages over the data channel to all participants. The messages carrry information about which endpoint is dominant speaker at the moment. It also sends messages listing all the last-N active users in order.
This means that the dominant speaker can be derived from that list, but also that the enpoints that are in the list at the moment and which main video ssrc they are sending. This can be used by client to associate view port with the user being shown in that port.

User media map contains the list of forwarded user streams
```
{"type":"UserMediaMap","endpoints":[{"endpoint":"28885389-571f-4a43-8c20-d5fb225884bb", "ssrcs":[2035838989]}]}
```


Dominant speaker message looks like this:
```
{"type":"DominantSpeaker", "endpoint":"28885389-571f-4a43-8c20-d5fb225884bb"}
```
5 changes: 4 additions & 1 deletion logger/PacketLogger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ PacketLogReader::PacketLogReader(FILE* logFile) : _logFile(logFile) {}

PacketLogReader::~PacketLogReader()
{
::fclose(_logFile);
if (_logFile)
{
::fclose(_logFile);
}
}

bool PacketLogReader::getNext(PacketLogItem& item)
Expand Down
2 changes: 2 additions & 0 deletions logger/PacketLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PacketLogReader

void rewind();

bool isOpen() const { return _logFile != nullptr; }

private:
FILE* _logFile;
};
Expand Down
2 changes: 1 addition & 1 deletion test/bwe/FakeAudioSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ size_t FakeAudioSource::randomPacketSize()
}
}
++_counter;
return _talkSprint + randomSize(_talkSprint, 0.1);
return _talkSprint + randomSize(_talkSprint / 4, 0.75);
}
} // namespace fakenet
30 changes: 25 additions & 5 deletions test/bwe/FakeVideoSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ FakeVideoSource::FakeVideoSource(memory::PacketPoolAllocator& allocator,
_frameSize(0),
_fps(30),
_pacing(0),
_mtu(1400),
_mtu(1250),
_ssrc(ssrc),
_sequenceCounter(0),
_avgRate(0.0005),
Expand Down Expand Up @@ -110,11 +110,11 @@ memory::UniquePacket FakeVideoSource::getPacket(uint64_t timestamp)
bool lastInFrame = false;
if (_frameSize > 0)
{
_releaseTime += _counter % 2 == 0 ? 0 : _pacing;
_releaseTime += _packetsInFrame < 3 ? 0 : _pacing;
}
else
{
_releaseTime = _frameReleaseTime;
_releaseTime += utils::Time::us * 100;
lastInFrame = true;
}
_avgRate.update(packet->getLength() * 8, timestamp);
Expand All @@ -140,8 +140,8 @@ memory::UniquePacket FakeVideoSource::getPacket(uint64_t timestamp)
{
setNextFrameSize();
_rtpTimestamp += 90000 / _fps;
_releaseTime = timestamp;
_frameReleaseTime += utils::Time::sec / _fps;
_releaseTime = _frameReleaseTime;
return getPacket(timestamp);
}

Expand All @@ -150,15 +150,35 @@ memory::UniquePacket FakeVideoSource::getPacket(uint64_t timestamp)

void FakeVideoSource::setNextFrameSize()
{
if (_bandwidthKbps < 100)
{
_frameSize = 0;
return;
}
auto meanSize = _bandwidthKbps * 125 / _fps;
// key frame every 15s
if (_counter % (_fps * 15) == 0)
{
meanSize *= 4;
meanSize *= std::max(1u, 4 * _fps / 30);
_keyFrame = true;
}
++_counter;
_packetsInFrame = 0;
_frameSize = randomSize(meanSize, 0.2);
_pacing = (utils::Time::sec / _fps) * _mtu / (2 * (_frameSize + _mtu));
}

void FakeVideoSource::setBandwidth(uint32_t kbps)
{
_bandwidthKbps = kbps;
_fps = 30;
if (_bandwidthKbps < 400)
{
_fps = 15;
}
if (_bandwidthKbps < 200)
{
_fps = 7;
}
}
} // namespace fakenet
2 changes: 1 addition & 1 deletion test/bwe/FakeVideoSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class FakeVideoSource : public MediaSource
return std::max(int64_t(0), static_cast<int64_t>(_releaseTime - timestamp));
}

void setBandwidth(uint32_t kbps) override { _bandwidthKbps = kbps; }
void setBandwidth(uint32_t kbps) override;
uint32_t getBandwidth() const override { return _bandwidthKbps; }

double getBitRate() const { return _avgRate.get() / 1000; }
Expand Down
9 changes: 9 additions & 0 deletions test/concurrency/LockFreeListTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,24 @@ class QueueWrapper

TEST(LFList, consistencyPlenty)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
consistencyTest<LockFreeList>(8, 7 * BATCH_SIZE);
}
TEST(LFList, consistencyFew)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
consistencyTest<LockFreeList>(8, 128);
}

TEST(LFList, plainConsistencyPlenty)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
consistencyTest<QueueWrapper>(8, 7 * BATCH_SIZE);
}

Expand Down
3 changes: 3 additions & 0 deletions test/concurrency/MpmcMapTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ TEST(MpmcMap, concurrency)

TEST(MpmcMap, performance)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
using HMap = concurrency::MpmcHashmap32<uint64_t, Complicated<uint32_t>>;
HMap hmap(4096);
bool running = true;
Expand Down
6 changes: 5 additions & 1 deletion test/gtest_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class TestEventSink : public ::testing::TestEventListener
virtual void OnTestEnd(const TestInfo& test_info) override
{
utils::Time::initialize(); // the time source may be deleted by now
logger::info("Test Ended %s.%s <<<", "gtest", test_info.test_case_name(), test_info.name());
logger::info("Test Ended %s.%s (%" PRIi64 " ms) <<<",
"gtest",
test_info.test_case_name(),
test_info.name(),
test_info.result()->elapsed_time());
logger::awaitLogDrained();
}
virtual void OnEnvironmentsTearDownStart(const UnitTest& unit_test) override {}
Expand Down
6 changes: 5 additions & 1 deletion test/gtest_main2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class TestEventSink : public ::testing::TestEventListener
virtual void OnTestEnd(const TestInfo& test_info) override
{
utils::Time::initialize(); // the time source may be deleted by now
logger::info("Test Ended %s.%s <<<", "gtest", test_info.test_case_name(), test_info.name());
logger::info("Test Ended %s.%s (%" PRIi64 " ms) <<<",
"gtest",
test_info.test_case_name(),
test_info.name(),
test_info.result()->elapsed_time());
logger::awaitLogDrained();
}
virtual void OnEnvironmentsTearDownStart(const UnitTest& unit_test) override {}
Expand Down
2 changes: 1 addition & 1 deletion test/include/mocks/TimeSourceMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace test

struct TimeSourceMock : public utils::TimeSource
{
MOCK_METHOD(uint64_t, getAbsoluteTime, (), (override));
MOCK_METHOD(uint64_t, getAbsoluteTime, (), (const, override));
MOCK_METHOD(void, nanoSleep, (uint64_t nanoSeconds), (override));
MOCK_METHOD(std::chrono::system_clock::time_point, wallClock, (), (const, override));
MOCK_METHOD(void, advance, (uint64_t ns), (override));
Expand Down
23 changes: 13 additions & 10 deletions test/integration/BarbellTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Test setup:
*/
TEST_F(BarbellTest, packetLossViaBarbell)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
runTestInThread(expectedTestThreadCount(2), [this]() {
constexpr auto PACKET_LOSS_RATE = 0.03;

Expand Down Expand Up @@ -235,12 +238,12 @@ TEST_F(BarbellTest, packetLossViaBarbell)
if (id == 0)
{
EXPECT_NEAR(fps, 30.0, 5.0);
EXPECT_NEAR(videoStats.numDecodedFrames, 146, 11);
EXPECT_NEAR(videoStats.numDecodedFrames, 146, 20);
}
else
{
EXPECT_NEAR(fps, 30.0, 2.0);
EXPECT_NEAR(videoStats.numDecodedFrames, 150, 11);
EXPECT_NEAR(videoStats.numDecodedFrames, 150, 20);
}
}
}
Expand Down Expand Up @@ -799,18 +802,18 @@ TEST_F(BarbellTest, barbellStats)
EXPECT_EQ(s2["video"]["inbound"]["activeStreamCount"], 3);

// Packates per second for audio (expectations for values per second we can hardcode):
EXPECT_NEAR(s1["video"]["inbound"]["packetsPerSecond"], 660, 30.0);
EXPECT_NEAR(s2["video"]["outbound"]["packetsPerSecond"], 660, 30.0);
EXPECT_NEAR(s1["video"]["inbound"]["packetsPerSecond"], 500, 30.0);
EXPECT_NEAR(s2["video"]["outbound"]["packetsPerSecond"], 500, 30.0);

EXPECT_NEAR(s1["video"]["outbound"]["packetsPerSecond"], 330, 15.0);
EXPECT_NEAR(s2["video"]["inbound"]["packetsPerSecond"], 330, 15.0);
EXPECT_NEAR(s1["video"]["outbound"]["packetsPerSecond"], 250, 15.0);
EXPECT_NEAR(s2["video"]["inbound"]["packetsPerSecond"], 250, 15.0);

// Audio bitrate symmetry (expectations for values per second we can hardcode):
EXPECT_NEAR(s1["video"]["inbound"]["bitrateKbps"], 6222, 200.0);
EXPECT_NEAR(s2["video"]["outbound"]["bitrateKbps"], 6220, 200.0);
EXPECT_NEAR(s1["video"]["inbound"]["bitrateKbps"], 4500, 200.0);
EXPECT_NEAR(s2["video"]["outbound"]["bitrateKbps"], 4500, 200.0);

EXPECT_NEAR(s1["video"]["outbound"]["bitrateKbps"], 3100, 100.0);
EXPECT_NEAR(s2["video"]["inbound"]["bitrateKbps"], 3100, 100.0);
EXPECT_NEAR(s1["video"]["outbound"]["bitrateKbps"], 2235, 100.0);
EXPECT_NEAR(s2["video"]["inbound"]["bitrateKbps"], 2235, 100.0);

// Packets sent / received symmetry (exact value could vary, but s1.inbound ~=~ s2.outbount):
EXPECT_NE(s1["video"]["inbound"]["packets"], 0);
Expand Down
3 changes: 3 additions & 0 deletions test/integration/ConfIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,9 @@ TEST_F(IntegrationTest, neighbours)

TEST_F(IntegrationTest, dynamicNeighbours_removeNeighbours)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
runTestInThread(expectedTestThreadCount(1), [this]() {
_config.readFromString(_defaultSmbConfig);

Expand Down
3 changes: 2 additions & 1 deletion test/integration/FFTanalysis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ void fftProducer(const std::vector<int16_t>& recording,
const size_t threadId,
CmplxArray& spectrum)
{
for (size_t cursor = fftWindowSize * threadId; cursor < size - fftWindowSize; cursor += fftWindowSize * numThreads)
for (size_t cursor = fftWindowSize * threadId; cursor + fftWindowSize < std::min(recording.size(), size);
cursor += fftWindowSize * numThreads)
{
CmplxArray testVector(fftWindowSize);
for (size_t x = 0; x < fftWindowSize; ++x)
Expand Down
3 changes: 3 additions & 0 deletions test/integration/IntegrationAudioTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class IntegrationAudioTest : public IntegrationTest

TEST_F(IntegrationAudioTest, longMute)
{
#ifdef NOPERF_TEST
GTEST_SKIP();
#endif
runTestInThread(
expectedTestThreadCount(1),
[this]() {
Expand Down
14 changes: 8 additions & 6 deletions test/integration/IntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ IntegrationTest::~IntegrationTest()
// Fake internet thread, JobManager timer thread, worker threads.
void IntegrationTest::SetUp()
{
#ifdef NOPERF_TEST
// GTEST_SKIP();
#endif
#if !ENABLE_LEGACY_API
GTEST_SKIP();
#endif
Expand Down Expand Up @@ -90,13 +87,18 @@ void IntegrationTest::SetUp()

void IntegrationTest::TearDown()
{
#ifdef NOPERF_TEST
// GTEST_SKIP();
#endif
#if !ENABLE_LEGACY_API
GTEST_SKIP();
#endif

// if test ran, it will have re initialized, otherwise it is only threads started in Setup that runs.
if (!utils::Time::isDefaultTimeSource())
{
_timeSource.waitForThreadsToSleep(_workerThreads.size() + 2, 3 * utils::Time::sec);
utils::Time::initialize();
}
_timeSource.shutdown();

_bridge.reset();
_clientTransportFactory.reset();
_publicTransportFactory.reset();
Expand Down
11 changes: 8 additions & 3 deletions test/integration/emulator/FakeTcpEndpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FakeTcpEndpoint::FakeTcpEndpoint(jobmanager::JobManager& jobManager,
_sendJobs(jobManager, 256 * 1024),
_fakeFd(++_fdGenerator)
{
while (!_network->isLocalPortFree(_localPort.setPort(_portCounter++)))
while (!_network->isLocalPortFree(_localPort.setPort(_portCounter++), fakenet::Protocol::TCPDATA))
{
}
}
Expand Down Expand Up @@ -95,7 +95,11 @@ void FakeTcpEndpoint::sendStunTo(const transport::SocketAddress& target,

void FakeTcpEndpoint::connect(const transport::SocketAddress& target)
{
_network->addLocal(this);
if (!_network->addLocal(this))
{
logger::error("Cannot open tcp port. IP clash", _name.c_str());
return;
}
_state = State::CONNECTING;
_peerPort = target;

Expand Down Expand Up @@ -148,8 +152,9 @@ void FakeTcpEndpoint::onReceive(fakenet::Protocol protocol,
size_t length,
uint64_t timestamp)
{
assert(hasIp(target));
assert(hasIp(target, protocol));
assert(protocol != fakenet::Protocol::UDP);
assert(source == _peerPort);
auto packet = memory::makeUniquePacket(_networkLinkAllocator, data, length);
assert(!isWeirdPacket(*packet));
_networkLink->push(serializeInbound(_networkLinkAllocator, protocol, source, data, length), timestamp);
Expand Down
8 changes: 6 additions & 2 deletions test/integration/emulator/FakeTcpEndpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,12 @@ class FakeTcpEndpoint : public transport::TcpEndpoint, public FakeEndpointImpl
const void* data,
size_t length,
uint64_t timestamp) override;
bool hasIp(const transport::SocketAddress& target) override { return target == _localPort; }

bool hasIp(const transport::SocketAddress& target, fakenet::Protocol protocol) const override
{
return target == _localPort && protocol > fakenet::Protocol::SYN_ACK;
}
bool hasIpClash(const NetworkNode& node) const override { return false; }
fakenet::Protocol getProtocol() const override { return fakenet::Protocol::TCPDATA; }
void process(uint64_t timestamp) override;
std::shared_ptr<fakenet::NetworkLink> getDownlink() override { return _networkLink; }

Expand Down
Loading

0 comments on commit 25077fc

Please sign in to comment.