Skip to content

Commit

Permalink
Add test for cancellation of proxy request.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Oct 28, 2024
1 parent 8077ca4 commit c3c33d7
Show file tree
Hide file tree
Showing 2 changed files with 420 additions and 1 deletion.
167 changes: 166 additions & 1 deletion proxy/proxy_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"net"
"net/http/httptest"
"os"
Expand All @@ -46,6 +48,8 @@ import (
const (
KeypairSizeForTest = 2048
TokenIdForTest = "foo"

testTimeout = 10 * time.Second
)

func getWebsocketUrl(url string) string {
Expand All @@ -58,13 +62,48 @@ func getWebsocketUrl(url string) string {
}
}

func WaitForProxyServer(ctx context.Context, t *testing.T, proxy *ProxyServer) {
// Wait for any channel messages to be processed.
time.Sleep(10 * time.Millisecond)
proxy.Stop()
for {
proxy.clientsLock.Lock()
clients := len(proxy.clients)
sessions := len(proxy.sessions)
proxy.clientsLock.Unlock()
proxy.remoteConnectionsLock.Lock()
remoteConnections := len(proxy.remoteConnections)
proxy.remoteConnectionsLock.Unlock()
if clients == 0 &&
sessions == 0 &&
remoteConnections == 0 {
break
}

select {
case <-ctx.Done():
proxy.clientsLock.Lock()
proxy.remoteConnectionsLock.Lock()
assert.Fail(t, fmt.Sprintf("Error waiting for clients %+v / sessions %+v / remoteConnections %+v to terminate: %+v", proxy.clients, proxy.sessions, proxy.remoteConnections, ctx.Err()))
proxy.remoteConnectionsLock.Unlock()
proxy.clientsLock.Unlock()
return
default:
time.Sleep(time.Millisecond)
}
}
}

func newProxyServerForTest(t *testing.T) (*ProxyServer, *rsa.PrivateKey, *httptest.Server) {
require := require.New(t)
tempdir := t.TempDir()
var proxy *ProxyServer
t.Cleanup(func() {
if proxy != nil {
proxy.Stop()
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

WaitForProxyServer(ctx, t, proxy)
}
})

Expand Down Expand Up @@ -290,3 +329,129 @@ func TestWebsocketFeatures(t *testing.T) {

assert.NoError(conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Time{}))
}

func TestProxyCreateSession(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
_, key, server := newProxyServerForTest(t)

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()

require.NoError(client.SendHello(key))

if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}

_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)
}

type HangingTestMCU struct {
t *testing.T
ctx context.Context
}

func NewHangingTestMCU(t *testing.T) *HangingTestMCU {
ctx, closeFunc := context.WithCancel(context.Background())
t.Cleanup(func() {
closeFunc()
})

return &HangingTestMCU{
t: t,
ctx: ctx,
}
}

func (m *HangingTestMCU) Start(ctx context.Context) error {
return nil
}

func (m *HangingTestMCU) Stop() {
}

func (m *HangingTestMCU) Reload(config *goconf.ConfigFile) {
}

func (m *HangingTestMCU) SetOnConnected(f func()) {
}

func (m *HangingTestMCU) SetOnDisconnected(f func()) {
}

func (m *HangingTestMCU) GetStats() interface{} {
return nil
}

func (m *HangingTestMCU) NewPublisher(ctx context.Context, listener signaling.McuListener, id string, sid string, streamType signaling.StreamType, bitrate int, mediaTypes signaling.MediaType, initiator signaling.McuInitiator) (signaling.McuPublisher, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
}
}

func (m *HangingTestMCU) NewSubscriber(ctx context.Context, listener signaling.McuListener, publisher string, streamType signaling.StreamType, initiator signaling.McuInitiator) (signaling.McuSubscriber, error) {
ctx2, cancel := context.WithTimeout(m.ctx, testTimeout*2)
defer cancel()

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ctx2.Done():
return nil, errors.New("Should have been cancelled before")
}
}

func TestProxyCancelOnClose(t *testing.T) {
signaling.CatchLogForTest(t)
assert := assert.New(t)
require := require.New(t)
proxy, key, server := newProxyServerForTest(t)

proxy.mcu = NewHangingTestMCU(t)

ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

client := NewProxyTestClient(ctx, t, server.URL)
defer client.CloseWithBye()

require.NoError(client.SendHello(key))

if hello, err := client.RunUntilHello(ctx); assert.NoError(err) {
assert.NotEmpty(hello.Hello.SessionId, "%+v", hello)
}

_, err := client.RunUntilLoad(ctx, 0)
assert.NoError(err)

require.NoError(client.SendCommand(&signaling.CommandProxyClientMessage{
Type: "create-publisher",
StreamType: signaling.StreamTypeVideo,
}))

// Simulate expired session while request is still being processed.
go func() {
if session := proxy.GetSession(1); assert.NotNil(session) {
session.Close()
}
}()

if message, err := client.RunUntilMessage(ctx); assert.NoError(err) {
if err := checkMessageType(message, "error"); assert.NoError(err) {
assert.Equal("internal_error", message.Error.Code)
assert.Equal(context.Canceled.Error(), message.Error.Message)
}
}
}
Loading

0 comments on commit c3c33d7

Please sign in to comment.