Skip to content

Commit

Permalink
Fix RTP session termination for inbound. Improve logging. (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Dec 4, 2023
1 parent 4a86c34 commit 08e24b4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 21 deletions.
3 changes: 1 addition & 2 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service

import (
"context"
"log"

"github.com/frostbyte73/core"
"github.com/livekit/protocol/logger"
Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, s
})

if err != nil {
log.Println(err)
logger.Warnw("SIP handle dispatch rule error", err)
return "", "", false, true
}

Expand Down
54 changes: 39 additions & 15 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package sip

import (
"fmt"
"log"
"sync/atomic"
"time"

"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -92,6 +92,8 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr
}

func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
_ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil))

tag, err := getTagValue(req)
if err != nil {
sipErrorResponse(tx, req)
Expand All @@ -111,9 +113,11 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

logger.Infow("INVITE", "tag", tag, "from", from, "to", to)

username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src)
if err != nil {
log.Printf("Rejecting inbound call, doesn't match any Trunks %q %q %q %q\n", from.Address.User, to.Address.User, to.Address.Host, src)
logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
sipErrorResponse(tx, req)
return
}
Expand All @@ -128,6 +132,8 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
type inboundCall struct {
s *Server
tag string
inviteReq *sip.Request
inviteResp *sip.Response
from *sip.FromHeader
to *sip.ToHeader
src string
Expand All @@ -148,6 +154,9 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
dtmf: make(chan byte, 10),
lkRoom: NewRoom(), // we need it created earlier so that the audio mixer is available for pin prompts
}
s.cmu.Lock()
s.activeCalls[tag] = c
s.cmu.Unlock()
return c
}

Expand All @@ -156,30 +165,30 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false)
if rejectInvite {
log.Printf("Rejecting inbound call, doesn't match any Dispatch Rules %q %q %q %q\n", c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src)
logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src)
sipErrorResponse(tx, req)
c.Close()
return
}

// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
answerData, err := c.runMediaConn(req.Body(), conf)

if err != nil {
sipErrorResponse(tx, req)
c.Close()
return
}
c.s.cmu.Lock()
c.s.activeCalls[c.tag] = c
c.s.cmu.Unlock()

res := sip.NewResponseFromRequest(req, 200, "OK", answerData)
res.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp, Port: c.s.conf.SIPPort}})
res.AppendHeader(&contentTypeHeaderSDP)
if err = tx.Respond(res); err != nil {
log.Println(err)
logger.Errorw("Cannot respond to INVITE", err)
// TODO: should we close the call in this case?
return
}
c.inviteReq = req
c.inviteResp = res
// We own this goroutine, so can freely block.
if requirePin {
c.pinPrompt()
Expand All @@ -188,6 +197,16 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c
}
}

func (c *inboundCall) sendBye() {
if c.inviteReq == nil {
return
}
res := sip.NewByeRequest(c.inviteReq, c.inviteResp, nil)
c.s.sipSrv.TransportLayer().WriteMsg(res)
c.inviteReq = nil
c.inviteResp = nil
}

func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answerData []byte, _ error) {
conn := NewMediaConn()
conn.OnRTP(c)
Expand All @@ -209,7 +228,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe
}

func (c *inboundCall) pinPrompt() {
log.Printf("Requesting Pin for SIP call %q -> %q\n", c.from.Address.User, c.to.Address.User)
logger.Infow("Requesting Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User)
const pinLimit = 16
c.playAudio(c.s.res.enterPin)
pin := ""
Expand All @@ -228,9 +247,10 @@ func (c *inboundCall) pinPrompt() {
// End of the pin
noPin = pin == ""

log.Printf("Checking Pin for SIP call %q -> %q = %q (noPin = %v)\n", c.from.Address.User, c.to.Address.User, pin, noPin)
logger.Infow("Checking Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin)
roomName, identity, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, pin, noPin)
if reject || requirePin || roomName == "" {
logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin)
c.playAudio(c.s.res.wrongPin)
c.Close()
return
Expand All @@ -250,14 +270,15 @@ func (c *inboundCall) pinPrompt() {
}

func (c *inboundCall) Close() error {
if c.done.CompareAndSwap(false, true) {
if !c.done.CompareAndSwap(false, true) {
return nil
}
logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User)
c.s.cmu.Lock()
delete(c.s.activeCalls, c.tag)
c.s.cmu.Unlock()
c.closeMedia()
// FIXME: drop the actual call
c.sendBye()
return nil
}

Expand All @@ -267,7 +288,10 @@ func (c *inboundCall) closeMedia() {
p.Close()
c.lkRoom = nil
}
c.rtpConn.Close()
if c.rtpConn != nil {
c.rtpConn.Close()
c.rtpConn = nil
}
close(c.dtmf)
}

Expand Down Expand Up @@ -304,10 +328,10 @@ func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity str
}

func (c *inboundCall) joinRoom(roomName, identity string) {
log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity)
logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity)
c.playAudio(c.s.res.roomJoin)
if err := c.createLiveKitParticipant(roomName, identity); err != nil {
log.Println(err)
logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag)
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
package sip

import (
"log"
"sync/atomic"

"github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go"
"github.com/pion/webrtc/v3"

Expand Down Expand Up @@ -66,7 +66,7 @@ func (r *Room) Connect(conf *config.Config, roomName string, identity string) er
OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if track.Kind() != webrtc.RTPCodecTypeAudio {
if err := pub.SetSubscribed(false); err != nil {
log.Println(err)
logger.Errorw("Cannot unsubscribe from the track", err)
}
return
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (r *Room) SetOutput(out media.Writer[media.LPCM16Sample]) {
}

func (r *Room) Close() error {
if r.room == nil {
if r.room != nil {
r.room.Disconnect()
r.room = nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/protocol/logger"
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -144,6 +145,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error {
sipErrorResponse(tx, req)
return
}
logger.Infow("BYE", "tag", tag)

s.cmu.RLock()
c := s.activeCalls[tag]
Expand All @@ -158,6 +160,7 @@ func (s *Server) Start(agent *sipgo.UserAgent) error {
// Ignore ACKs
s.sipSrv.OnAck(func(req *sip.Request, tx sip.ServerTransaction) {})

// TODO: pass proper context here
go func() {
panic(s.sipSrv.ListenAndServe(context.TODO(), "udp", fmt.Sprintf("0.0.0.0:%d", s.conf.SIPPort)))
}()
Expand Down
18 changes: 18 additions & 0 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@
package sip

import (
"errors"
"log"

"github.com/emiago/sipgo"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/rs/zerolog"
zlog "github.com/rs/zerolog/log"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)

func init() {
zlog.Logger = zerolog.New(nil).Hook(zerolog.HookFunc(func(e *zerolog.Event, level zerolog.Level, message string) {
switch level {
case zerolog.DebugLevel:
logger.Debugw(message)
case zerolog.InfoLevel:
logger.Infow(message)
case zerolog.WarnLevel:
logger.Warnw(message, errors.New(message))
case zerolog.ErrorLevel:
logger.Errorw(message, errors.New(message))
}
}))
}

type Service struct {
cli *Client
srv *Server
Expand Down
3 changes: 2 additions & 1 deletion test/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
username = flag.String("username", "", "")
password = flag.String("password", "", "")
sipUri = flag.String("sip-uri", "example.pstn.twilio.com", "")
filePath = flag.String("play", "audio.mkv", "")
)

func startMediaListener() *net.UDPConn {
Expand Down Expand Up @@ -140,7 +141,7 @@ func parseAnswer(in []byte) (string, int) {
func sendAudioPackets(conn *net.UDPConn, body []byte) {
ip, port := parseAnswer(body)

r, err := os.Open("audio.mkv")
r, err := os.Open(*filePath)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 08e24b4

Please sign in to comment.