Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support #210

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,25 @@ import (
)

const (
DefaultSIPPort int = 5060
DefaultSIPPort int = 5060
DefaultSIPPortTLS int = 5061
)

var (
DefaultRTPPortRange = rtcconfig.PortRange{Start: 10000, End: 20000}
)

type TLSCert struct {
CertFile string `yaml:"cert_file"`
KeyFile string `yaml:"key_file"`
}

type TLSConfig struct {
Port int `yaml:"port"` // announced SIP signaling port
ListenPort int `yaml:"port_listen"` // SIP signaling port to listen on
Certs []TLSCert `yaml:"certs"`
}

type Config struct {
Redis *redis.RedisConfig `yaml:"redis"` // required
ApiKey string `yaml:"api_key"` // required (env LIVEKIT_API_KEY)
Expand All @@ -53,6 +65,8 @@ type Config struct {
PProfPort int `yaml:"pprof_port"`
SIPPort int `yaml:"sip_port"` // announced SIP signaling port
SIPPortListen int `yaml:"sip_port_listen"` // SIP signaling port to listen on
SIPHostname string `yaml:"sip_hostname"`
TLS *TLSConfig `yaml:"tls"`
RTPPort rtcconfig.PortRange `yaml:"rtp_port"`
Logging logger.Config `yaml:"logging"`
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
Expand Down Expand Up @@ -109,6 +123,14 @@ func (c *Config) Init() error {
if c.SIPPortListen == 0 {
c.SIPPortListen = c.SIPPort
}
if tc := c.TLS; tc != nil {
if tc.Port == 0 {
tc.Port = DefaultSIPPortTLS
}
if tc.ListenPort == 0 {
tc.ListenPort = tc.Port
}
}
if c.RTPPort.Start == 0 {
c.RTPPort.Start = DefaultRTPPortRange.Start
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (c *Client) SetHandler(handler Handler) {
c.handler = handler
}

func (c *Client) ContactURI(tr Transport) URI {
return getContactURI(c.conf, c.signalingIp, tr)
}

func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) {
ctx, span := tracer.Start(ctx, "Client.CreateSIPParticipant")
defer span.End()
Expand Down
34 changes: 20 additions & 14 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
"toIP", req.Destination(),
)

cc := s.newInbound(LocalTag(callID), req, tx)
tr := transportFromReq(req)
cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx)
log = LoggerWithParams(log, cc)
log = LoggerWithHeaders(log, cc)
log.Infow("processing invite")
Expand Down Expand Up @@ -341,7 +342,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
case DispatchNoRuleDrop:
c.log.Debugw("Rejecting inbound flood")
c.cc.Drop()
c.close(false, callDropped, "flood")
c.close(false, callFlood, "flood")
return
case DispatchNoRuleReject:
c.log.Infow("Rejecting inbound call, doesn't match any Dispatch Rules")
Expand All @@ -364,7 +365,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
acceptCall := func() bool {
c.log.Infow("Accepting the call", "headers", disp.Headers)
if err = c.cc.Accept(ctx, netip.AddrPortFrom(c.s.signalingIp, uint16(c.s.conf.SIPPort)), answerData, disp.Headers); err != nil {
if err = c.cc.Accept(ctx, answerData, disp.Headers); err != nil {
c.log.Errorw("Cannot respond to INVITE", err)
return false
}
Expand Down Expand Up @@ -620,7 +621,10 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
} else {
c.log.Infow("Closing inbound call", "reason", reason)
}
defer c.log.Infow("Inbound call closed", "reason", reason)
if status != callFlood {
defer c.log.Infow("Inbound call closed", "reason", reason)
}

c.closeMedia()
c.cc.Close()
if c.callDur != nil {
Expand Down Expand Up @@ -782,12 +786,15 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error

}

func (s *Server) newInbound(id LocalTag, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound {
func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound {
c := &sipInbound{
s: s,
id: id,
invite: invite,
inviteTx: inviteTx,
s: s,
id: id,
invite: invite,
inviteTx: inviteTx,
contact: &sip.ContactHeader{
Address: *contact.GetContactURI(),
},
cancelled: make(chan struct{}),
referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request
}
Expand All @@ -810,6 +817,7 @@ type sipInbound struct {
tag RemoteTag
invite *sip.Request
inviteTx sip.ServerTransaction
contact *sip.ContactHeader
cancelled chan struct{}
from *sip.FromHeader
to *sip.ToHeader
Expand Down Expand Up @@ -976,7 +984,7 @@ func (c *sipInbound) setDestFromVia(r *sip.Response) {
}
}

func (c *sipInbound) Accept(ctx context.Context, contactHost netip.AddrPort, sdpData []byte, headers map[string]string) error {
func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[string]string) error {
ctx, span := tracer.Start(ctx, "sipInbound.Accept")
defer span.End()
c.mu.Lock()
Expand All @@ -987,7 +995,7 @@ func (c *sipInbound) Accept(ctx context.Context, contactHost netip.AddrPort, sdp
r := sip.NewResponseFromRequest(c.invite, 200, "OK", sdpData)

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
r.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: contactHost.Addr().String(), Port: int(contactHost.Port())}})
r.AppendHeader(c.contact)

c.setDestFromVia(r)

Expand Down Expand Up @@ -1092,9 +1100,7 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
}

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
contactHeader := &sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp.String(), Port: c.s.conf.SIPPort}}

req := NewReferRequest(c.invite, c.inviteOk, contactHeader, transferTo)
req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! :)

c.setCSeq(req)
c.swapSrcDst(req)

Expand Down
38 changes: 17 additions & 21 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"math"
"net/netip"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -75,23 +74,26 @@ type outboundCall struct {
}

func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) {
if sipConf.host == "" {
sipConf.host = c.signalingIp.String()
}
if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration {
sipConf.maxCallDuration = maxCallDuration
}
if sipConf.ringingTimeout <= 0 {
sipConf.ringingTimeout = defaultRingingTimeout
}
tr := TransportFrom(sipConf.transport)
contact := c.ContactURI(tr)
if sipConf.host == "" {
sipConf.host = contact.GetHost()
}
call := &outboundCall{
c: c,
log: log,
cc: c.newOutbound(id, URI{
User: sipConf.from,
Host: sipConf.host,
Addr: netip.AddrPortFrom(c.signalingIp, uint16(conf.SIPPort)),
}),
User: sipConf.from,
Host: sipConf.host,
Addr: contact.Addr,
Transport: tr,
}, contact),
sipConf: sipConf,
}
call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address)
Expand Down Expand Up @@ -376,9 +378,10 @@ func (c *outboundCall) sipSignal(ctx context.Context) error {
joinDur := c.mon.JoinDur()

c.mon.InviteReq()
sdpResp, err := c.cc.Invite(ctx, c.sipConf.transport, URI{
User: c.sipConf.to,
Host: c.sipConf.address,
sdpResp, err := c.cc.Invite(ctx, URI{
User: c.sipConf.to,
Host: c.sipConf.address,
Transport: TransportFrom(c.sipConf.transport),
}, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOffer)
if err != nil {
// TODO: should we retry? maybe new offer will work
Expand Down Expand Up @@ -448,15 +451,15 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string) erro
return nil
}

func (c *Client) newOutbound(id LocalTag, from URI) *sipOutbound {
func (c *Client) newOutbound(id LocalTag, from, contact URI) *sipOutbound {
from = from.Normalize()
fromHeader := &sip.FromHeader{
DisplayName: from.User,
Address: *from.GetURI(),
Params: sip.NewParams(),
}
contactHeader := &sip.ContactHeader{
Address: *from.GetContactURI(),
Address: *contact.GetContactURI(),
}
fromHeader.Params.Add("tag", string(id))
return &sipOutbound{
Expand Down Expand Up @@ -517,20 +520,13 @@ func (c *sipOutbound) RemoteHeaders() Headers {
return c.inviteOk.Headers()
}

func (c *sipOutbound) Invite(ctx context.Context, transport livekit.SIPTransport, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) {
func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, headers map[string]string, sdpOffer []byte) ([]byte, error) {
ctx, span := tracer.Start(ctx, "sipOutbound.Invite")
defer span.End()
c.mu.Lock()
defer c.mu.Unlock()
to = to.Normalize()
toHeader := &sip.ToHeader{Address: *to.GetURI()}
toHeader.Address.UriParams = make(sip.HeaderParams)
switch transport {
case livekit.SIPTransport_SIP_TRANSPORT_UDP:
toHeader.Address.UriParams.Add("transport", "udp")
case livekit.SIPTransport_SIP_TRANSPORT_TCP:
toHeader.Address.UriParams.Add("transport", "tcp")
}

dest := to.GetDest()

Expand Down
1 change: 1 addition & 0 deletions pkg/sip/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (v CallStatus) DisconnectReason() livekit.DisconnectReason {

const (
callDropped = CallStatus(iota)
callFlood
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant should not be exported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't distinguish it in participant attributes, so it's unexported.

CallDialing
CallAutomation
CallActive
Expand Down
32 changes: 32 additions & 0 deletions pkg/sip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sip
import (
"context"
"fmt"
"net/netip"
"regexp"
"strconv"
"strings"
Expand All @@ -25,6 +26,8 @@ import (
"github.com/emiago/sipgo/sip"
"github.com/livekit/psrpc"
"github.com/pkg/errors"

"github.com/livekit/sip/pkg/config"
)

const (
Expand Down Expand Up @@ -61,6 +64,35 @@ type Signaling interface {
Drop()
}

func transportFromReq(req *sip.Request) Transport {
if to, _ := req.To(); to != nil {
if tr, _ := to.Params.Get("transport"); tr != "" {
return Transport(strings.ToLower(tr))
}
}
if via, _ := req.Via(); via != nil {
return Transport(strings.ToLower(via.Transport))
}
return ""
}

func transportPort(c *config.Config, t Transport) int {
if t == TransportTLS {
if tc := c.TLS; tc != nil {
return tc.Port
}
}
return c.SIPPort
}

func getContactURI(c *config.Config, ip netip.Addr, t Transport) URI {
return URI{
Host: c.SIPHostname,
Addr: netip.AddrPortFrom(ip, uint16(transportPort(c, t))),
Transport: t,
}
}

func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) {
tx, err := c.Transaction(req)
if err != nil {
Expand Down
Loading
Loading