Skip to content

Commit

Permalink
Basic templating for SIP hostname. (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored Oct 25, 2024
1 parent 18910f1 commit 2b83d56
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 80 deletions.
6 changes: 5 additions & 1 deletion cmd/livekit-sip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/stats"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -90,7 +91,10 @@ func runService(c *cli.Context) error {
return err
}

sipsrv := sip.NewService(conf, mon, log)
sipsrv, err := sip.NewService(conf, mon, log)
if err != nil {
return err
}
svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcClient, bus, mon)
sipsrv.SetHandler(svc)

Expand Down
43 changes: 11 additions & 32 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"log/slog"
"net/netip"
"sync"

"github.com/emiago/sipgo"
Expand All @@ -36,13 +35,12 @@ import (
)

type Client struct {
conf *config.Config
log logger.Logger
mon *stats.Monitor
conf *config.Config
sconf *ServiceConfig
log logger.Logger
mon *stats.Monitor

sipCli *sipgo.Client
signalingIp netip.Addr
signalingIpLocal netip.Addr
sipCli *sipgo.Client

closing core.Fuse
cmu sync.Mutex
Expand All @@ -66,29 +64,9 @@ func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Clie
return c
}

func (c *Client) Start(agent *sipgo.UserAgent) error {
var err error
if c.conf.UseExternalIP {
if c.signalingIp, err = getPublicIP(); err != nil {
return err
}
if c.signalingIpLocal, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
} else if c.conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(c.conf.NAT1To1IP)
if err != nil {
return err
}
c.signalingIp = ip
c.signalingIpLocal = c.signalingIp
} else {
if c.signalingIp, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
c.signalingIpLocal = c.signalingIp
}
c.log.Infow("client starting", "local", c.signalingIpLocal, "external", c.signalingIp)
func (c *Client) Start(agent *sipgo.UserAgent, sc *ServiceConfig) error {
c.sconf = sc
c.log.Infow("client starting", "local", c.sconf.SignalingIPLocal, "external", c.sconf.SignalingIP)

if agent == nil {
ua, err := sipgo.NewUA(
Expand All @@ -100,8 +78,9 @@ func (c *Client) Start(agent *sipgo.UserAgent) error {
agent = ua
}

var err error
c.sipCli, err = sipgo.NewClient(agent,
sipgo.WithClientHostname(c.signalingIp.String()),
sipgo.WithClientHostname(c.sconf.SignalingIP.String()),
sipgo.WithClientLogger(slog.New(logger.ToSlogHandler(c.log))),
)
if err != nil {
Expand Down Expand Up @@ -132,7 +111,7 @@ func (c *Client) SetHandler(handler Handler) {
}

func (c *Client) ContactURI(tr Transport) URI {
return getContactURI(c.conf, c.signalingIp, tr)
return getContactURI(c.conf, c.sconf.SignalingIP, tr)
}

func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/sip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,36 @@ import (
"net"
"net/http"
"net/netip"

"github.com/livekit/sip/pkg/config"
)

func GetServiceConfig(conf *config.Config) (*ServiceConfig, error) {
s := new(ServiceConfig)
var err error
if conf.UseExternalIP {
if s.SignalingIP, err = getPublicIP(); err != nil {
return nil, err
}
if s.SignalingIPLocal, err = getLocalIP(conf.LocalNet); err != nil {
return nil, err
}
} else if conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(conf.NAT1To1IP)
if err != nil {
return nil, err
}
s.SignalingIP = ip
s.SignalingIPLocal = s.SignalingIP
} else {
if s.SignalingIP, err = getLocalIP(conf.LocalNet); err != nil {
return nil, err
}
s.SignalingIPLocal = s.SignalingIP
}
return s, nil
}

func getPublicIP() (netip.Addr, error) {
req, err := http.Get("http://ip-api.com/json/")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config, featur
c.log.Debugw("SDP offer", "sdp", string(offerData))

mp, err := NewMediaPort(c.log, c.mon, &MediaConfig{
IP: c.s.signalingIp,
IP: c.s.sconf.SignalingIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: c.s.conf.MediaTimeout,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo
call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address)
var err error
call.media, err = NewMediaPort(call.log, call.mon, &MediaConfig{
IP: c.signalingIp,
IP: c.sconf.SignalingIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: c.conf.MediaTimeout,
Expand Down
48 changes: 14 additions & 34 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,11 @@ type Handler interface {
}

type Server struct {
log logger.Logger
mon *stats.Monitor
sipSrv *sipgo.Server
sipListeners []io.Closer
sipUnhandled RequestHandler
signalingIp netip.Addr
signalingIpLocal netip.Addr
log logger.Logger
mon *stats.Monitor
sipSrv *sipgo.Server
sipListeners []io.Closer
sipUnhandled RequestHandler

inProgressInvites []*inProgressInvite

Expand All @@ -127,6 +125,7 @@ type Server struct {

handler Handler
conf *config.Config
sconf *ServiceConfig

res mediaRes
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (s *Server) SetHandler(handler Handler) {
}

func (s *Server) ContactURI(tr Transport) URI {
return getContactURI(s.conf, s.signalingIp, tr)
return getContactURI(s.conf, s.sconf.SignalingIP, tr)
}

func (s *Server) startUDP(addr netip.AddrPort) error {
Expand All @@ -169,7 +168,7 @@ func (s *Server) startUDP(addr netip.AddrPort) error {
}
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.SIPPort,
"proto", "udp",
)
Expand All @@ -192,7 +191,7 @@ func (s *Server) startTCP(addr netip.AddrPort) error {
}
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.SIPPort,
"proto", "tcp",
)
Expand All @@ -216,7 +215,7 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error {
lis := tls.NewListener(tlis, conf)
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.TLS.Port,
"proto", "tls",
)
Expand All @@ -231,29 +230,9 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error {

type RequestHandler func(req *sip.Request, tx sip.ServerTransaction) bool

func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error {
var err error
if s.conf.UseExternalIP {
if s.signalingIp, err = getPublicIP(); err != nil {
return err
}
if s.signalingIpLocal, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
} else if s.conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(s.conf.NAT1To1IP)
if err != nil {
return err
}
s.signalingIp = ip
s.signalingIpLocal = s.signalingIp
} else {
if s.signalingIp, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
s.signalingIpLocal = s.signalingIp
}
s.log.Infow("server starting", "local", s.signalingIpLocal, "external", s.signalingIp)
func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, unhandled RequestHandler) error {
s.sconf = sc
s.log.Infow("server starting", "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP)

if agent == nil {
ua, err := sipgo.NewUA(
Expand All @@ -265,6 +244,7 @@ func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error {
agent = ua
}

var err error
s.sipSrv, err = sipgo.NewServer(agent,
sipgo.WithServerLogger(slog.New(logger.ToSlogHandler(s.log))),
)
Expand Down
48 changes: 39 additions & 9 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package sip

import (
"context"
"fmt"
"net/netip"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,18 +29,25 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/stats"
"github.com/livekit/sip/version"
)

type ServiceConfig struct {
SignalingIP netip.Addr
SignalingIPLocal netip.Addr
}

type Service struct {
conf *config.Config
log logger.Logger
mon *stats.Monitor
cli *Client
srv *Server
conf *config.Config
sconf *ServiceConfig
log logger.Logger
mon *stats.Monitor
cli *Client
srv *Server

mu sync.Mutex
pendingTransfers map[transferKey]chan struct{}
Expand All @@ -48,7 +58,7 @@ type transferKey struct {
TransferTo string
}

func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service {
func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) (*Service, error) {
if log == nil {
log = logger.GetLogger()
}
Expand All @@ -60,7 +70,27 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser
srv: NewServer(conf, log, mon),
pendingTransfers: make(map[transferKey]chan struct{}),
}
return s
var err error
s.sconf, err = GetServiceConfig(s.conf)
if err != nil {
return nil, err
}

s.conf.SIPHostname = strings.ReplaceAll(
s.conf.SIPHostname,
"${IP}",
strings.NewReplacer(
".", "-", // IPv4
"[", "", "]", "", ":", "-", // IPv6
).Replace(s.sconf.SignalingIP.String()),
)
if strings.ContainsAny(s.conf.SIPHostname, "$%{}[]:/| ") {
return nil, fmt.Errorf("invalid hostname: %q", s.conf.SIPHostname)
}
if s.conf.SIPHostname != "" {
log.Infow("using hostname", "hostname", s.conf.SIPHostname)
}
return s, nil
}

func (s *Service) ActiveCalls() int {
Expand Down Expand Up @@ -111,12 +141,12 @@ func (s *Service) Start() error {
if err != nil {
return err
}
if err := s.cli.Start(ua); err != nil {
if err := s.cli.Start(ua, s.sconf); err != nil {
return err
}
// Server is responsible for answering all transactions. However, the client may also receive some (e.g. BYE).
// Thus, all unhandled transactions will be checked by the client.
if err := s.srv.Start(ua, s.cli.OnRequest); err != nil {
if err := s.srv.Start(ua, s.sconf, s.cli.OnRequest); err != nil {
return err
}
s.log.Debugw("sip service ready")
Expand Down
4 changes: 3 additions & 1 deletion pkg/sip/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"

"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/stats"
Expand Down Expand Up @@ -89,12 +90,13 @@ func testInvite(t *testing.T, h Handler, hidden bool, from, to string, test func
mon, err := stats.NewMonitor(&config.Config{MaxCpuUtilization: 0.9})
require.NoError(t, err)

s := NewService(&config.Config{
s, err := NewService(&config.Config{
HideInboundPort: hidden,
SIPPort: sipPort,
SIPPortListen: sipPort,
RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax},
}, mon, logger.GetLogger())
require.NoError(t, err)
require.NotNil(t, s)
t.Cleanup(s.Stop)

Expand Down
5 changes: 4 additions & 1 deletion test/integration/sip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func runSIPServer(t testing.TB, lk *LiveKit) *SIPServer {
if err != nil {
t.Fatal(err)
}
sipsrv := sip.NewService(conf, mon, log)
sipsrv, err := sip.NewService(conf, mon, log)
if err != nil {
t.Fatal(err)
}

svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcCli, bus, mon)
sipsrv.SetHandler(svc)
Expand Down

0 comments on commit 2b83d56

Please sign in to comment.