Skip to content

Commit

Permalink
Delay bridging audio until first audio RTP or a max delay.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Jan 11, 2024
1 parent 298649e commit 136b2f9
Showing 1 changed file with 57 additions and 25 deletions.
82 changes: 57 additions & 25 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import (
"github.com/livekit/sip/pkg/stats"
)

const (
// audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received.
// This is done because of audio cutoff at the beginning of calls observed in the wild.
audioBridgeMaxDelay = 1 * time.Second
)

func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
if username == "" || password == "" {
return true
Expand Down Expand Up @@ -162,35 +168,38 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) {
}

type inboundCall struct {
s *Server
mon *stats.CallMonitor
tag string
ctx context.Context
cancel func()
inviteReq *sip.Request
inviteResp *sip.Response
from *sip.FromHeader
to *sip.ToHeader
src string
rtpConn *MediaConn
audioHandler atomic.Pointer[rtp.Handler]
dtmf chan byte // buffered; DTMF digits as characters
lkRoom *Room // LiveKit room; only active after correct pin is entered
callDur func() time.Duration
joinDur func() time.Duration
done atomic.Bool
s *Server
mon *stats.CallMonitor
tag string
ctx context.Context
cancel func()
inviteReq *sip.Request
inviteResp *sip.Response
from *sip.FromHeader
to *sip.ToHeader
src string
rtpConn *MediaConn
audioHandler atomic.Pointer[rtp.Handler]
audioReceived atomic.Bool
audioRecvChan chan struct{}
dtmf chan byte // buffered; DTMF digits as characters
lkRoom *Room // LiveKit room; only active after correct pin is entered
callDur func() time.Duration
joinDur func() time.Duration
done atomic.Bool
}

func (s *Server) newInboundCall(mon *stats.CallMonitor, tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall {
c := &inboundCall{
s: s,
mon: mon,
tag: tag,
from: from,
to: to,
src: src,
dtmf: make(chan byte, 10),
lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts
s: s,
mon: mon,
tag: tag,
from: from,
to: to,
src: src,
audioRecvChan: make(chan struct{}),
dtmf: make(chan byte, 10),
lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts
}
c.ctx, c.cancel = context.WithCancel(context.Background())
s.cmu.Lock()
Expand Down Expand Up @@ -246,7 +255,27 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip
}
c.inviteReq = req
c.inviteResp = res

// Wait for either a first RTP packet or a predefined delay.
//
// If the delay kicks in earlier than the caller is ready, they might miss some audio packets.
//
// On the other hand, if we always wait for RTP, it might be harder to diagnose firewall/routing issues.
// In that case both sides will hear nothing, instead of only one side having issues.
//
// Thus, we wait at most a fixed amount of time before bridging audio.

// We own this goroutine, so can freely block.
delay := time.NewTimer(audioBridgeMaxDelay)
select {
case <-ctx.Done():
delay.Stop()
c.close("hangup")
return
case <-c.audioRecvChan:
delay.Stop()
case <-delay.C:
}
if requirePin {
c.pinPrompt(ctx)
} else {
Expand Down Expand Up @@ -394,6 +423,9 @@ func (c *inboundCall) HandleRTP(p *rtp.Packet) error {
c.handleDTMF(p.Payload)
}
default:
if c.audioReceived.CompareAndSwap(false, true) {
close(c.audioRecvChan)
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
Expand Down

0 comments on commit 136b2f9

Please sign in to comment.