Skip to content

Commit

Permalink
Add TLS support. Resolves #71.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Oct 24, 2024
1 parent 3b1e63b commit cd4cbd3
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 56 deletions.
24 changes: 23 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,25 @@ import (
)

const (
DefaultSIPPort int = 5060
DefaultSIPPort int = 5060
DefaultSIPPortTLS int = 5061
)

var (
DefaultRTPPortRange = rtcconfig.PortRange{Start: 10000, End: 20000}
)

type TLSCert struct {
CertFile string `yaml:"cert_file"`
KeyFile string `yaml:"key_file"`
}

type TLSConfig struct {
Port int `yaml:"port"` // announced SIP signaling port
ListenPort int `yaml:"port_listen"` // SIP signaling port to listen on
Certs []TLSCert `yaml:"certs"`
}

type Config struct {
Redis *redis.RedisConfig `yaml:"redis"` // required
ApiKey string `yaml:"api_key"` // required (env LIVEKIT_API_KEY)
Expand All @@ -53,6 +65,8 @@ type Config struct {
PProfPort int `yaml:"pprof_port"`
SIPPort int `yaml:"sip_port"` // announced SIP signaling port
SIPPortListen int `yaml:"sip_port_listen"` // SIP signaling port to listen on
SIPHostname string `yaml:"sip_hostname"`
TLS *TLSConfig `yaml:"tls"`
RTPPort rtcconfig.PortRange `yaml:"rtp_port"`
Logging logger.Config `yaml:"logging"`
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
Expand Down Expand Up @@ -109,6 +123,14 @@ func (c *Config) Init() error {
if c.SIPPortListen == 0 {
c.SIPPortListen = c.SIPPort
}
if tc := c.TLS; tc != nil {
if tc.Port == 0 {
tc.Port = DefaultSIPPortTLS
}
if tc.ListenPort == 0 {
tc.ListenPort = tc.Port
}
}
if c.RTPPort.Start == 0 {
c.RTPPort.Start = DefaultRTPPortRange.Start
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (c *Client) SetHandler(handler Handler) {
c.handler = handler
}

func (c *Client) ContactURI(tr Transport) URI {
return getContactURI(c.conf, c.signalingIp, tr)
}

func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) {
ctx, span := tracer.Start(ctx, "Client.CreateSIPParticipant")
defer span.End()
Expand Down
34 changes: 20 additions & 14 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
"toIP", req.Destination(),
)

cc := s.newInbound(LocalTag(callID), req, tx)
tr := transportFromReq(req)
cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx)
log = LoggerWithParams(log, cc)
log = LoggerWithHeaders(log, cc)
log.Infow("processing invite")
Expand Down Expand Up @@ -341,7 +342,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
case DispatchNoRuleDrop:
c.log.Debugw("Rejecting inbound flood")
c.cc.Drop()
c.close(false, callDropped, "flood")
c.close(false, callFlood, "flood")
return
case DispatchNoRuleReject:
c.log.Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
Expand All @@ -364,7 +365,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
acceptCall := func() bool {
c.log.Infow("Accepting the call", "headers", disp.Headers)
if err = c.cc.Accept(ctx, netip.AddrPortFrom(c.s.signalingIp, uint16(c.s.conf.SIPPort)), answerData, disp.Headers); err != nil {
if err = c.cc.Accept(ctx, answerData, disp.Headers); err != nil {
c.log.Errorw("Cannot respond to INVITE", err)
return false
}
Expand Down Expand Up @@ -620,7 +621,10 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
} else {
c.log.Infow("Closing inbound call", "reason", reason)
}
defer c.log.Infow("Inbound call closed", "reason", reason)
if status != callFlood {
defer c.log.Infow("Inbound call closed", "reason", reason)
}

c.closeMedia()
c.cc.Close()
if c.callDur != nil {
Expand Down Expand Up @@ -782,12 +786,15 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error

}

func (s *Server) newInbound(id LocalTag, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound {
func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound {
c := &sipInbound{
s: s,
id: id,
invite: invite,
inviteTx: inviteTx,
s: s,
id: id,
invite: invite,
inviteTx: inviteTx,
contact: &sip.ContactHeader{
Address: *contact.GetContactURI(),
},
cancelled: make(chan struct{}),
referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request
}
Expand All @@ -810,6 +817,7 @@ type sipInbound struct {
tag RemoteTag
invite *sip.Request
inviteTx sip.ServerTransaction
contact *sip.ContactHeader
cancelled chan struct{}
from *sip.FromHeader
to *sip.ToHeader
Expand Down Expand Up @@ -976,7 +984,7 @@ func (c *sipInbound) setDestFromVia(r *sip.Response) {
}
}

func (c *sipInbound) Accept(ctx context.Context, contactHost netip.AddrPort, sdpData []byte, headers map[string]string) error {
func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[string]string) error {
ctx, span := tracer.Start(ctx, "sipInbound.Accept")
defer span.End()
c.mu.Lock()
Expand All @@ -987,7 +995,7 @@ func (c *sipInbound) Accept(ctx context.Context, contactHost netip.AddrPort, sdp
r := sip.NewResponseFromRequest(c.invite, 200, "OK", sdpData)

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
r.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: contactHost.Addr().String(), Port: int(contactHost.Port())}})
r.AppendHeader(c.contact)

c.setDestFromVia(r)

Expand Down Expand Up @@ -1092,9 +1100,7 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
}

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
contactHeader := &sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp.String(), Port: c.s.conf.SIPPort}}

req := NewReferRequest(c.invite, c.inviteOk, contactHeader, transferTo)
req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo)
c.setCSeq(req)
c.swapSrcDst(req)

Expand Down
38 changes: 17 additions & 21 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"math"
"net/netip"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -75,23 +74,26 @@ type outboundCall struct {
}

func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) {
if sipConf.host == "" {
sipConf.host = c.signalingIp.String()
}
if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration {
sipConf.maxCallDuration = maxCallDuration
}
if sipConf.ringingTimeout <= 0 {
sipConf.ringingTimeout = defaultRingingTimeout
}
tr := TransportFrom(sipConf.transport)
contact := c.ContactURI(tr)
if sipConf.host == "" {
sipConf.host = contact.GetHost()
}
call := &outboundCall{
c: c,
log: log,
cc: c.newOutbound(id, URI{
User: sipConf.from,
Host: sipConf.host,
Addr: netip.AddrPortFrom(c.signalingIp, uint16(conf.SIPPort)),
}),
User: sipConf.from,
Host: sipConf.host,
Addr: contact.Addr,
Transport: tr,
}, contact),
sipConf: sipConf,
}
call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address)
Expand Down Expand Up @@ -376,9 +378,10 @@ func (c *outboundCall) sipSignal(ctx context.Context) error {
joinDur := c.mon.JoinDur()

c.mon.InviteReq()
sdpResp, err := c.cc.Invite(ctx, c.sipConf.transport, URI{
User: c.sipConf.to,
Host: c.sipConf.address,
sdpResp, err := c.cc.Invite(ctx, URI{
User: c.sipConf.to,
Host: c.sipConf.address,
Transport: TransportFrom(c.sipConf.transport),
}, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer)
if err != nil {
// TODO: should we retry? maybe new offer will work
Expand Down Expand Up @@ -448,15 +451,15 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string) erro
return nil
}

func (c *Client) newOutbound(id LocalTag, from URI) *sipOutbound {
func (c *Client) newOutbound(id LocalTag, from, contact URI) *sipOutbound {
from = from.Normalize()
fromHeader := &sip.FromHeader{
DisplayName: from.User,
Address: *from.GetURI(),
Params: sip.NewParams(),
}
contactHeader := &sip.ContactHeader{
Address: *from.GetContactURI(),
Address: *contact.GetContactURI(),
}
fromHeader.Params.Add("tag", string(id))
return &sipOutbound{
Expand Down Expand Up @@ -517,20 +520,13 @@ func (c *sipOutbound) RemoteHeaders() Headers {
return c.inviteOk.Headers()
}

func (c *sipOutbound) Invite(ctx context.Context, transport livekit.SIPTransport, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) {
func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) {
ctx, span := tracer.Start(ctx, "sipOutbound.Invite")
defer span.End()
c.mu.Lock()
defer c.mu.Unlock()
to = to.Normalize()
toHeader := &sip.ToHeader{Address: *to.GetURI()}
toHeader.Address.UriParams = make(sip.HeaderParams)
switch transport {
case livekit.SIPTransport_SIP_TRANSPORT_UDP:
toHeader.Address.UriParams.Add("transport", "udp")
case livekit.SIPTransport_SIP_TRANSPORT_TCP:
toHeader.Address.UriParams.Add("transport", "tcp")
}

dest := to.GetDest()

Expand Down
1 change: 1 addition & 0 deletions pkg/sip/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (v CallStatus) DisconnectReason() livekit.DisconnectReason {

const (
callDropped = CallStatus(iota)
callFlood
CallDialing
CallAutomation
CallActive
Expand Down
32 changes: 32 additions & 0 deletions pkg/sip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sip
import (
"context"
"fmt"
"net/netip"
"regexp"
"strconv"
"strings"
Expand All @@ -25,6 +26,8 @@ import (
"github.com/emiago/sipgo/sip"
"github.com/livekit/psrpc"
"github.com/pkg/errors"

"github.com/livekit/sip/pkg/config"
)

const (
Expand Down Expand Up @@ -61,6 +64,35 @@ type Signaling interface {
Drop()
}

func transportFromReq(req *sip.Request) Transport {
if to, _ := req.To(); to != nil {
if tr, _ := to.Params.Get("transport"); tr != "" {
return Transport(strings.ToLower(tr))
}
}
if via, _ := req.Via(); via != nil {
return Transport(strings.ToLower(via.Transport))
}
return ""
}

func transportPort(c *config.Config, t Transport) int {
if t == TransportTLS {
if tc := c.TLS; tc != nil {
return tc.Port
}
}
return c.SIPPort
}

func getContactURI(c *config.Config, ip netip.Addr, t Transport) URI {
return URI{
Host: c.SIPHostname,
Addr: netip.AddrPortFrom(ip, uint16(transportPort(c, t))),
Transport: t,
}
}

func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) {
tx, err := c.Transaction(req)
if err != nil {
Expand Down
Loading

0 comments on commit cd4cbd3

Please sign in to comment.