Skip to content

Commit

Permalink
Add the rest of the metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Dec 22, 2023
1 parent c5ddec4 commit 0ff0c29
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 65 deletions.
1 change: 0 additions & 1 deletion pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (c *Client) Stop() {
c.sipCli.Close()
c.sipCli = nil
}
// FIXME: anything else?
}

func (c *Client) UpdateSIPParticipant(ctx context.Context, req *rpc.InternalUpdateSIPParticipantRequest) (*rpc.InternalUpdateSIPParticipantResponse, error) {
Expand Down
79 changes: 52 additions & 27 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/ulaw"
"github.com/livekit/sip/pkg/stats"
)

func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
Expand All @@ -54,7 +55,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr

h := req.GetHeader("Proxy-Authorization")
if h == nil {
logger.Infow(fmt.Sprintf("Requesting inbound auth for %s", from), "from", from)
logger.Infow("Requesting inbound auth", "from", from)

Check warning on line 58 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L58

Added line #L58 was not covered by tests
inviteState.challenge = digest.Challenge{
Realm: UserAgent,
Nonce: fmt.Sprintf("%d", time.Now().UnixMicro()),
Expand Down Expand Up @@ -94,7 +95,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr
}

func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
s.mon.InviteReqRaw(false)
s.mon.InviteReqRaw(stats.Inbound)
_ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil))

tag, err := getTagValue(req)
Expand All @@ -116,26 +117,30 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

s.mon.InviteReq(false, from.Address.String(), to.Address.String())
logger.Infow(fmt.Sprintf("INVITE from %q to %q", from.Address.String(), to.Address.String()),
"tag", tag, "from", from, "to", to)
cmon := s.mon.NewCall(stats.Inbound, from.Address.String(), to.Address.String())

cmon.InviteReq()
defer cmon.SessionDur()()
joinDur := cmon.JoinDur()
logger.Infow("INVITE received", "tag", tag, "from", from, "to", to)

username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src)
if err != nil {
s.mon.InviteError(false, from.Address.String(), to.Address.String(), "no-rule")
logger.Warnw(fmt.Sprintf("Rejecting inbound call to %q, doesn't match any Trunks", to.Address.String()), err,
cmon.InviteError("no-rule")
logger.Warnw("Rejecting inbound, doesn't match any Trunks", err,
"tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
sipErrorResponse(tx, req)
return
}
if !s.handleInviteAuth(req, tx, from.Address.User, username, password) {
s.mon.InviteError(false, from.Address.String(), to.Address.String(), "unauthorized")
cmon.InviteError("unauthorized")

Check warning on line 136 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L136

Added line #L136 was not covered by tests
// handleInviteAuth will generate the SIP Response as needed
return
}
s.mon.InviteReq(false, from.Address.String(), to.Address.String())
cmon.InviteAccept()

Check warning on line 140 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L140

Added line #L140 was not covered by tests

call := s.newInboundCall(tag, from, to, src)
call := s.newInboundCall(cmon, tag, from, to, src)
call.joinDur = joinDur

Check warning on line 143 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L142-L143

Added lines #L142 - L143 were not covered by tests
call.handleInvite(call.ctx, req, tx, s.conf)
}

Expand All @@ -158,6 +163,7 @@ func (s *Server) onBye(req *sip.Request, tx sip.ServerTransaction) {

type inboundCall struct {
s *Server
mon *stats.CallMonitor
tag string
ctx context.Context
cancel func()
Expand All @@ -170,12 +176,15 @@ type inboundCall struct {
audioHandler atomic.Pointer[rtp.Handler]
dtmf chan byte // buffered; DTMF digits as characters
lkRoom *Room // LiveKit room; only active after correct pin is entered
callDur func() time.Duration
joinDur func() time.Duration
done atomic.Bool
}

func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall {
func (s *Server) newInboundCall(mon *stats.CallMonitor, tag string, from *sip.FromHeader, to *sip.ToHeader, src string) *inboundCall {

Check warning on line 184 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L184

Added line #L184 was not covered by tests
c := &inboundCall{
s: s,
mon: mon,

Check warning on line 187 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L187

Added line #L187 was not covered by tests
tag: tag,
from: from,
to: to,
Expand All @@ -191,22 +200,24 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
}

func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip.ServerTransaction, conf *config.Config) {
c.s.mon.CallStart(false, c.from.Address.String(), c.to.Address.String())
defer c.s.mon.CallEnd(false, c.from.Address.String(), c.to.Address.String())
defer c.close()
c.mon.CallStart()
defer c.mon.CallEnd()
defer c.close("other")

Check warning on line 205 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L203-L205

Added lines #L203 - L205 were not covered by tests
// Send initial request. In the best case scenario, we will immediately get a room name to join.
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
roomName, identity, wsUrl, token, requirePin, rejectInvite := c.s.dispatchRuleHandler(ctx, c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false)
if rejectInvite {
logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src)
sipErrorResponse(tx, req)
c.close("no-dispatch")

Check warning on line 212 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L212

Added line #L212 was not covered by tests
return
}

// We need to start media first, otherwise we won't be able to send audio prompts to the caller, or receive DTMF.
answerData, err := c.runMediaConn(req.Body(), conf)
if err != nil {
sipErrorResponse(tx, req)
c.close("media-failed")

Check warning on line 220 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L220

Added line #L220 was not covered by tests
return
}

Expand All @@ -227,6 +238,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip
}
// Wait for the caller to terminate the call.
<-ctx.Done()
c.close("hangup")

Check warning on line 241 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L241

Added line #L241 was not covered by tests
}

func (c *inboundCall) sendBye() {
Expand Down Expand Up @@ -261,7 +273,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe
return nil, err
}
conn := NewMediaConn()
conn.OnRTP(c)
conn.OnRTP(&rtpStatsHandler{mon: c.mon, h: c})

Check warning on line 276 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L276

Added line #L276 was not covered by tests
if dst := sdpGetAudioDest(offer); dst != nil {
conn.SetDestAddr(dst)
}
Expand All @@ -272,7 +284,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config) (answe

// Encoding pipeline (LK -> SIP)
// Need to be created earlier to send the pin prompts.
s := rtp.NewMediaStreamOut[ulaw.Sample](conn, rtpPacketDur)
s := rtp.NewMediaStreamOut[ulaw.Sample](&rtpStatsWriter{mon: c.mon, w: conn}, rtpPacketDur)

Check warning on line 287 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L287

Added line #L287 was not covered by tests
c.lkRoom.SetOutput(ulaw.Encode(s))

return sdpGenerateAnswer(offer, c.s.signalingIp, conn.LocalAddr().Port)
Expand Down Expand Up @@ -305,7 +317,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context) {
if reject || requirePin || roomName == "" {
logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin)
c.playAudio(ctx, c.s.res.wrongPin)
c.Close()
c.close("wrong-pin")

Check warning on line 320 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L320

Added line #L320 was not covered by tests
return
}
c.joinRoom(ctx, roomName, identity, wsUrl, token)
Expand All @@ -315,24 +327,29 @@ func (c *inboundCall) pinPrompt(ctx context.Context) {
pin += string(b)
if len(pin) > pinLimit {
c.playAudio(ctx, c.s.res.wrongPin)
c.Close()
c.close("wrong-pin")

Check warning on line 330 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L330

Added line #L330 was not covered by tests
return
}
}
}
}

// close should only be called from handleInvite.
func (c *inboundCall) close() {
func (c *inboundCall) close(reason string) {

Check warning on line 338 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L338

Added line #L338 was not covered by tests
if !c.done.CompareAndSwap(false, true) {
return
}
logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User)
c.mon.CallTerminate(reason)
logger.Infow("Closing inbound call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "reason", reason)

Check warning on line 343 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L342-L343

Added lines #L342 - L343 were not covered by tests
c.sendBye()
c.closeMedia()
if c.callDur != nil {
c.callDur()
}

Check warning on line 348 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L346-L348

Added lines #L346 - L348 were not covered by tests
c.s.cmu.Lock()
delete(c.s.activeCalls, c.tag)
c.s.cmu.Unlock()
c.cancel()

Check warning on line 352 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L352

Added line #L352 was not covered by tests
}

func (c *inboundCall) Close() error {
Expand All @@ -354,13 +371,16 @@ func (c *inboundCall) closeMedia() {
}

func (c *inboundCall) HandleRTP(p *rtp.Packet) error {
if p.Marker && p.PayloadType == 101 {
c.handleDTMF(p.Payload)
return nil
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
switch p.PayloadType {
case 101:
if p.Marker {
c.handleDTMF(p.Payload)
}
default:
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
if h := c.audioHandler.Load(); h != nil {
return (*h).HandleRTP(p)
}

Check warning on line 383 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L374-L383

Added lines #L374 - L383 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -390,10 +410,15 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, roomName, pa
}

func (c *inboundCall) joinRoom(ctx context.Context, roomName, identity, wsUrl, token string) {
if c.joinDur != nil {
c.joinDur()
}
c.callDur = c.mon.CallDur()

Check warning on line 416 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L413-L416

Added lines #L413 - L416 were not covered by tests
logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity)
c.playAudio(ctx, c.s.res.roomJoin)
if err := c.createLiveKitParticipant(ctx, roomName, identity, wsUrl, token); err != nil {
logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag)
c.close("participant-failed")

Check warning on line 421 in pkg/sip/inbound.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/inbound.go#L421

Added line #L421 was not covered by tests
}
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/sip/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
package sip

import (
"strconv"
"time"

"github.com/pion/rtp"

srtp "github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/stats"
)

const (
Expand All @@ -25,3 +31,47 @@ const (
sampleDurPart = int(time.Second / sampleDur)
rtpPacketDur = uint32(sampleRate / sampleDurPart)
)

type rtpStatsHandler struct {
h srtp.Handler
mon *stats.CallMonitor
}

func (h *rtpStatsHandler) HandleRTP(p *rtp.Packet) error {
if h.mon != nil {
if typ, ok := rtpPacketType(p); ok {
h.mon.RTPPacketRecv(typ)
}

Check warning on line 44 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L40-L44

Added lines #L40 - L44 were not covered by tests
}
return h.h.HandleRTP(p)

Check warning on line 46 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L46

Added line #L46 was not covered by tests
}

type rtpStatsWriter struct {
w srtp.Writer
mon *stats.CallMonitor
}

func (h *rtpStatsWriter) WriteRTP(p *rtp.Packet) error {
if h.mon != nil {
if typ, ok := rtpPacketType(p); ok {
h.mon.RTPPacketSend(typ)
}

Check warning on line 58 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L54-L58

Added lines #L54 - L58 were not covered by tests
}
return h.w.WriteRTP(p)

Check warning on line 60 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L60

Added line #L60 was not covered by tests
}

func rtpPacketType(p *rtp.Packet) (string, bool) {
switch p.PayloadType {
case 101:
if p.Marker {
return "dtmf", true
}
default:
if p.PayloadType == 0 {
return "audio", true
} else {
return strconv.Itoa(int(p.PayloadType)), true
}

Check warning on line 74 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L63-L74

Added lines #L63 - L74 were not covered by tests
}
return "", false

Check warning on line 76 in pkg/sip/media.go

View check run for this annotation

Codecov / codecov/patch

pkg/sip/media.go#L76

Added line #L76 was not covered by tests
}
Loading

0 comments on commit 0ff0c29

Please sign in to comment.