From 136b2f972959b8e3347bf726320a06f8637a4b79 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 11 Jan 2024 19:06:58 +0200 Subject: [PATCH] Delay bridging audio until first audio RTP or a max delay. --- pkg/sip/inbound.go | 82 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 22653b9b..3155e85e 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -32,6 +32,12 @@ import ( "github.com/livekit/sip/pkg/stats" ) +const ( + // audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received. + // This is done because of audio cutoff at the beginning of calls observed in the wild. + audioBridgeMaxDelay = 1 * time.Second +) + func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { if username == "" || password == "" { return true @@ -162,35 +168,38 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) { } type inboundCall struct { - s *Server - mon *stats.CallMonitor - tag string - ctx context.Context - cancel func() - inviteReq *sip.Request - inviteResp *sip.Response - from *sip.FromHeader - to *sip.ToHeader - src string - rtpConn *MediaConn - audioHandler atomic.Pointer[rtp.Handler] - dtmf chan byte // buffered; DTMF digits as characters - lkRoom *Room // LiveKit room; only active after correct pin is entered - callDur func() time.Duration - joinDur func() time.Duration - done atomic.Bool + s *Server + mon *stats.CallMonitor + tag string + ctx context.Context + cancel func() + inviteReq *sip.Request + inviteResp *sip.Response + from *sip.FromHeader + to *sip.ToHeader + src string + rtpConn *MediaConn + audioHandler atomic.Pointer[rtp.Handler] + audioReceived atomic.Bool + audioRecvChan chan struct{} + dtmf chan byte // buffered; DTMF digits as characters + lkRoom *Room // LiveKit room; only active after correct pin is entered + callDur func() time.Duration + joinDur func() time.Duration + done atomic.Bool } func (s *Server) newInboundCall(mon *stats.CallMonitor, tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall { c := &inboundCall{ - s: s, - mon: mon, - tag: tag, - from: from, - to: to, - src: src, - dtmf: make(chan byte, 10), - lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts + s: s, + mon: mon, + tag: tag, + from: from, + to: to, + src: src, + audioRecvChan: make(chan struct{}), + dtmf: make(chan byte, 10), + lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts } c.ctx, c.cancel = context.WithCancel(context.Background()) s.cmu.Lock() @@ -246,7 +255,27 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip } c.inviteReq = req c.inviteResp = res + + // Wait for either a first RTP packet or a predefined delay. + // + // If the delay kicks in earlier than the caller is ready, they might miss some audio packets. + // + // On the other hand, if we always wait for RTP, it might be harder to diagnose firewall/routing issues. + // In that case both sides will hear nothing, instead of only one side having issues. + // + // Thus, we wait at most a fixed amount of time before bridging audio. + // We own this goroutine, so can freely block. + delay := time.NewTimer(audioBridgeMaxDelay) + select { + case <-ctx.Done(): + delay.Stop() + c.close("hangup") + return + case <-c.audioRecvChan: + delay.Stop() + case <-delay.C: + } if requirePin { c.pinPrompt(ctx) } else { @@ -394,6 +423,9 @@ func (c *inboundCall) HandleRTP(p *rtp.Packet) error { c.handleDTMF(p.Payload) } default: + if c.audioReceived.CompareAndSwap(false, true) { + close(c.audioRecvChan) + } // TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it? if h := c.audioHandler.Load(); h != nil { return (*h).HandleRTP(p)