Skip to content

Commit

Permalink
Always set contact header for outbound to IP.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Oct 10, 2024
1 parent 43a70ba commit c46cb2a
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 59 deletions.
16 changes: 9 additions & 7 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package config
import (
"fmt"
"net"
"net/netip"
"os"
"time"

Expand Down Expand Up @@ -164,14 +165,14 @@ func (c *Config) GetLoggerFields() logrus.Fields {
return fields
}

func GetLocalIP() (string, error) {
func GetLocalIP() (netip.Addr, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", nil
return netip.Addr{}, nil
}
type Iface struct {
Name string
Addr net.IP
Addr netip.Addr
}
var candidates []Iface
for _, ifc := range ifaces {
Expand All @@ -191,15 +192,16 @@ func GetLocalIP() (string, error) {
continue
}
if ip4 := ipnet.IP.To4(); ip4 != nil {
ip, _ := netip.AddrFromSlice(ip4)
candidates = append(candidates, Iface{
Name: ifc.Name, Addr: ip4,
Name: ifc.Name, Addr: ip,
})
logger.Debugw("considering interface", "iface", ifc.Name, "ip", ip4)
logger.Debugw("considering interface", "iface", ifc.Name, "ip", ip)
}
}
}
if len(candidates) == 0 {
return "", fmt.Errorf("No local IP found")
return netip.Addr{}, fmt.Errorf("No local IP found")
}
return candidates[0].Addr.String(), nil
return candidates[0].Addr, nil
}
14 changes: 10 additions & 4 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"log/slog"
"net/netip"
"sync"

"github.com/emiago/sipgo"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"

"github.com/livekit/sip/pkg/config"
siperrors "github.com/livekit/sip/pkg/errors"
"github.com/livekit/sip/pkg/stats"
Expand All @@ -39,8 +41,8 @@ type Client struct {
mon *stats.Monitor

sipCli *sipgo.Client
signalingIp string
signalingIpLocal string
signalingIp netip.Addr
signalingIpLocal netip.Addr

closing core.Fuse
cmu sync.Mutex
Expand Down Expand Up @@ -74,7 +76,11 @@ func (c *Client) Start(agent *sipgo.UserAgent) error {
return err
}
} else if c.conf.NAT1To1IP != "" {
c.signalingIp = c.conf.NAT1To1IP
ip, err := netip.ParseAddr(c.conf.NAT1To1IP)
if err != nil {
return err
}
c.signalingIp = ip
c.signalingIpLocal = c.signalingIp
} else {
if c.signalingIp, err = getLocalIP(c.conf.LocalNet); err != nil {
Expand All @@ -95,7 +101,7 @@ func (c *Client) Start(agent *sipgo.UserAgent) error {
}

c.sipCli, err = sipgo.NewClient(agent,
sipgo.WithClientHostname(c.signalingIp),
sipgo.WithClientHostname(c.signalingIp.String()),
sipgo.WithClientLogger(slog.New(logger.ToSlogHandler(c.log))),
)
if err != nil {
Expand Down
48 changes: 30 additions & 18 deletions pkg/sip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,47 @@ import (
"io"
"net"
"net/http"
"net/netip"
)

func getPublicIP() (string, error) {
func getPublicIP() (netip.Addr, error) {
req, err := http.Get("http://ip-api.com/json/")
if err != nil {
return "", err
return netip.Addr{}, err
}
defer req.Body.Close()

body, err := io.ReadAll(req.Body)
if err != nil {
return "", err
return netip.Addr{}, err
}

ip := struct {
Query string
}{}
if err = json.Unmarshal(body, &ip); err != nil {
return "", err
return netip.Addr{}, err
}

if ip.Query == "" {
return "", fmt.Errorf("Query entry was not populated")
return netip.Addr{}, fmt.Errorf("Query entry was not populated")
}

return ip.Query, nil
return netip.ParseAddr(ip.Query)
}

func getLocalIP(localNet string) (string, error) {
func getLocalIP(localNet string) (netip.Addr, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
return netip.Addr{}, err
}
var netw *net.IPNet
var netw *netip.Prefix
if localNet != "" {
_, netw, err = net.ParseCIDR(localNet)
nw, err := netip.ParsePrefix(localNet)
if err != nil {
return "", err
return netip.Addr{}, err
}
netw = &nw
}
for _, i := range ifaces {
addrs, err := i.Addrs()
Expand All @@ -69,23 +71,33 @@ func getLocalIP(localNet string) (string, error) {
for _, a := range addrs {
switch v := a.(type) {
case *net.IPAddr:
if netw != nil && !netw.Contains(v.IP) {
if v.IP.To4() == nil {
continue
}
if !v.IP.IsLoopback() && v.IP.To4() != nil {
return v.IP.String(), nil
ip, ok := netip.AddrFromSlice(v.IP.To4())
if !ok || ip.IsLoopback() {
continue
}
if netw != nil && !netw.Contains(ip) {
continue
}
return ip, nil
case *net.IPNet:
if netw != nil && !netw.Contains(v.IP) {
if v.IP.To4() == nil {
continue
}
if !v.IP.IsLoopback() && v.IP.To4() != nil {
return v.IP.String(), nil
ip, ok := netip.AddrFromSlice(v.IP.To4())
if !ok || ip.IsLoopback() {
continue
}
if netw != nil && !netw.Contains(ip) {
continue
}
return ip, nil
}

}
}

return "", fmt.Errorf("No local interface found")
return netip.Addr{}, fmt.Errorf("No local interface found")
}
9 changes: 5 additions & 4 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"net/netip"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -362,7 +363,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
acceptCall := func() bool {
c.log.Infow("Accepting the call", "headers", disp.Headers)
if err = c.cc.Accept(ctx, c.s.signalingIp, c.s.conf.SIPPort, answerData, disp.Headers); err != nil {
if err = c.cc.Accept(ctx, netip.AddrPortFrom(c.s.signalingIp, uint16(c.s.conf.SIPPort)), answerData, disp.Headers); err != nil {
c.log.Errorw("Cannot respond to INVITE", err)
return false
}
Expand Down Expand Up @@ -965,7 +966,7 @@ func (c *sipInbound) setDestFromVia(r *sip.Response) {
}
}

func (c *sipInbound) Accept(ctx context.Context, contactHost string, contactPort int, sdpData []byte, headers map[string]string) error {
func (c *sipInbound) Accept(ctx context.Context, contactHost netip.AddrPort, sdpData []byte, headers map[string]string) error {
ctx, span := tracer.Start(ctx, "sipInbound.Accept")
defer span.End()
c.mu.Lock()
Expand All @@ -976,7 +977,7 @@ func (c *sipInbound) Accept(ctx context.Context, contactHost string, contactPort
r := sip.NewResponseFromRequest(c.invite, 200, "OK", sdpData)

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
r.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: contactHost, Port: contactPort}})
r.AppendHeader(&sip.ContactHeader{Address: sip.Uri{Host: contactHost.Addr().String(), Port: int(contactHost.Port())}})

c.setDestFromVia(r)

Expand Down Expand Up @@ -1081,7 +1082,7 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
}

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
contactHeader := &sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp, Port: c.s.conf.SIPPort}}
contactHeader := &sip.ContactHeader{Address: sip.Uri{Host: c.s.signalingIp.String(), Port: c.s.conf.SIPPort}}

req := NewReferRequest(c.invite, c.inviteOk, contactHeader, transferTo)
c.setCSeq(req)
Expand Down
5 changes: 3 additions & 2 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sip

import (
"context"
"net/netip"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,7 +34,7 @@ import (
)

type MediaConfig struct {
IP string
IP netip.Addr
Ports rtcconfig.PortRange
MediaTimeoutInitial time.Duration
MediaTimeout time.Duration
Expand Down Expand Up @@ -71,7 +72,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn rtp.UDPCon
type MediaPort struct {
log logger.Logger
mon *stats.CallMonitor
externalIP string
externalIP netip.Addr
conn *rtp.Conn
mediaTimeout <-chan struct{}
dtmfAudioEnabled bool
Expand Down
13 changes: 11 additions & 2 deletions pkg/sip/media_port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math"
"net"
"net/netip"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -111,6 +112,14 @@ func PrintAudioInWriter(p *MediaPort) string {
return p.audioInHandler.(fmt.Stringer).String()
}

func newIP(v string) netip.Addr {
ip, err := netip.ParseAddr(v)
if err != nil {
panic(err)
}
return ip
}

func TestMediaPort(t *testing.T) {
codecs := media.Codecs()
disableAll := func() {
Expand Down Expand Up @@ -151,14 +160,14 @@ func TestMediaPort(t *testing.T) {
log := logger.GetLogger()

m1, err := NewMediaPortWith(log.WithName("one"), nil, c1, &MediaConfig{
IP: "1.1.1.1",
IP: newIP("1.1.1.1"),
Ports: rtcconfig.PortRange{Start: 10000},
}, rate)
require.NoError(t, err)
defer m1.Close()

m2, err := NewMediaPortWith(log.WithName("two"), nil, c2, &MediaConfig{
IP: "2.2.2.2",
IP: newIP("2.2.2.2"),
Ports: rtcconfig.PortRange{Start: 20000},
}, rate)
require.NoError(t, err)
Expand Down
17 changes: 11 additions & 6 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ type outboundCall struct {

func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) {
if sipConf.host == "" {
sipConf.host = c.signalingIp
sipConf.host = c.signalingIp.String()
}
call := &outboundCall{
c: c,
log: log,
cc: c.newOutbound(id, URI{
User: sipConf.from,
Host: sipConf.host,
Addr: netip.AddrPortFrom(netip.Addr{}, uint16(conf.SIPPort)),
Addr: netip.AddrPortFrom(c.signalingIp, uint16(conf.SIPPort)),
}),
sipConf: sipConf,
}
Expand Down Expand Up @@ -438,19 +438,24 @@ func (c *Client) newOutbound(id LocalTag, from URI) *sipOutbound {
Address: *from.GetURI(),
Params: sip.NewParams(),
}
contactHeader := &sip.ContactHeader{
Address: *from.GetContactURI(),
}
fromHeader.Params.Add("tag", string(id))
return &sipOutbound{
c: c,
id: id,
from: fromHeader,
contact: contactHeader,
referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request
}
}

type sipOutbound struct {
c *Client
id LocalTag
from *sip.FromHeader
c *Client
id LocalTag
from *sip.FromHeader
contact *sip.ContactHeader

mu sync.RWMutex
tag RemoteTag
Expand Down Expand Up @@ -631,7 +636,7 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, dest string, to *sip.To
req.SetBody(offer)
req.AppendHeader(to)
req.AppendHeader(c.from)
req.AppendHeader(&sip.ContactHeader{Address: c.from.Address})
req.AppendHeader(c.contact)

req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))
Expand Down
11 changes: 8 additions & 3 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"

"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -110,8 +111,8 @@ type Server struct {
sipConnUDP *net.UDPConn
sipConnTCP *net.TCPListener
sipUnhandled RequestHandler
signalingIp string
signalingIpLocal string
signalingIp netip.Addr
signalingIpLocal netip.Addr

inProgressInvites []*inProgressInvite

Expand Down Expand Up @@ -208,7 +209,11 @@ func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error {
return err
}
} else if s.conf.NAT1To1IP != "" {
s.signalingIp = s.conf.NAT1To1IP
ip, err := netip.ParseAddr(s.conf.NAT1To1IP)
if err != nil {
return err
}
s.signalingIp = ip
s.signalingIpLocal = s.signalingIp
} else {
if s.signalingIp, err = getLocalIP(s.conf.LocalNet); err != nil {
Expand Down
Loading

0 comments on commit c46cb2a

Please sign in to comment.