Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic templating for SIP hostname. #211

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/livekit-sip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/livekit/protocol/redis"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/stats"

"github.com/livekit/sip/pkg/config"
Expand Down Expand Up @@ -90,7 +91,10 @@ func runService(c *cli.Context) error {
return err
}

sipsrv := sip.NewService(conf, mon, log)
sipsrv, err := sip.NewService(conf, mon, log)
if err != nil {
return err
}
svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcClient, bus, mon)
sipsrv.SetHandler(svc)

Expand Down
43 changes: 11 additions & 32 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"log/slog"
"net/netip"
"sync"

"github.com/emiago/sipgo"
Expand All @@ -36,13 +35,12 @@ import (
)

type Client struct {
conf *config.Config
log logger.Logger
mon *stats.Monitor
conf *config.Config
sconf *ServiceConfig
log logger.Logger
mon *stats.Monitor

sipCli *sipgo.Client
signalingIp netip.Addr
signalingIpLocal netip.Addr
sipCli *sipgo.Client

closing core.Fuse
cmu sync.Mutex
Expand All @@ -66,29 +64,9 @@ func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Clie
return c
}

func (c *Client) Start(agent *sipgo.UserAgent) error {
var err error
if c.conf.UseExternalIP {
if c.signalingIp, err = getPublicIP(); err != nil {
return err
}
if c.signalingIpLocal, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
} else if c.conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(c.conf.NAT1To1IP)
if err != nil {
return err
}
c.signalingIp = ip
c.signalingIpLocal = c.signalingIp
} else {
if c.signalingIp, err = getLocalIP(c.conf.LocalNet); err != nil {
return err
}
c.signalingIpLocal = c.signalingIp
}
c.log.Infow("client starting", "local", c.signalingIpLocal, "external", c.signalingIp)
func (c *Client) Start(agent *sipgo.UserAgent, sc *ServiceConfig) error {
c.sconf = sc
c.log.Infow("client starting", "local", c.sconf.SignalingIPLocal, "external", c.sconf.SignalingIP)

if agent == nil {
ua, err := sipgo.NewUA(
Expand All @@ -100,8 +78,9 @@ func (c *Client) Start(agent *sipgo.UserAgent) error {
agent = ua
}

var err error
c.sipCli, err = sipgo.NewClient(agent,
sipgo.WithClientHostname(c.signalingIp.String()),
sipgo.WithClientHostname(c.sconf.SignalingIP.String()),
sipgo.WithClientLogger(slog.New(logger.ToSlogHandler(c.log))),
)
if err != nil {
Expand Down Expand Up @@ -132,7 +111,7 @@ func (c *Client) SetHandler(handler Handler) {
}

func (c *Client) ContactURI(tr Transport) URI {
return getContactURI(c.conf, c.signalingIp, tr)
return getContactURI(c.conf, c.sconf.SignalingIP, tr)
}

func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) {
Expand Down
28 changes: 28 additions & 0 deletions pkg/sip/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,36 @@ import (
"net"
"net/http"
"net/netip"

"github.com/livekit/sip/pkg/config"
)

func GetServiceConfig(conf *config.Config) (*ServiceConfig, error) {
s := new(ServiceConfig)
var err error
if conf.UseExternalIP {
if s.SignalingIP, err = getPublicIP(); err != nil {
return nil, err
}
if s.SignalingIPLocal, err = getLocalIP(conf.LocalNet); err != nil {
return nil, err
}
} else if conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(conf.NAT1To1IP)
if err != nil {
return nil, err
}
s.SignalingIP = ip
s.SignalingIPLocal = s.SignalingIP
} else {
if s.SignalingIP, err = getLocalIP(conf.LocalNet); err != nil {
return nil, err
}
s.SignalingIPLocal = s.SignalingIP
}
return s, nil
}

func getPublicIP() (netip.Addr, error) {
req, err := http.Get("http://ip-api.com/json/")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config, featur
c.log.Debugw("SDP offer", "sdp", string(offerData))

mp, err := NewMediaPort(c.log, c.mon, &MediaConfig{
IP: c.s.signalingIp,
IP: c.s.sconf.SignalingIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: c.s.conf.MediaTimeout,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo
call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address)
var err error
call.media, err = NewMediaPort(call.log, call.mon, &MediaConfig{
IP: c.signalingIp,
IP: c.sconf.SignalingIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: c.conf.MediaTimeout,
Expand Down
48 changes: 14 additions & 34 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,11 @@ type Handler interface {
}

type Server struct {
log logger.Logger
mon *stats.Monitor
sipSrv *sipgo.Server
sipListeners []io.Closer
sipUnhandled RequestHandler
signalingIp netip.Addr
signalingIpLocal netip.Addr
log logger.Logger
mon *stats.Monitor
sipSrv *sipgo.Server
sipListeners []io.Closer
sipUnhandled RequestHandler

inProgressInvites []*inProgressInvite

Expand All @@ -127,6 +125,7 @@ type Server struct {

handler Handler
conf *config.Config
sconf *ServiceConfig

res mediaRes
}
Expand Down Expand Up @@ -156,7 +155,7 @@ func (s *Server) SetHandler(handler Handler) {
}

func (s *Server) ContactURI(tr Transport) URI {
return getContactURI(s.conf, s.signalingIp, tr)
return getContactURI(s.conf, s.sconf.SignalingIP, tr)
}

func (s *Server) startUDP(addr netip.AddrPort) error {
Expand All @@ -169,7 +168,7 @@ func (s *Server) startUDP(addr netip.AddrPort) error {
}
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.SIPPort,
"proto", "udp",
)
Expand All @@ -192,7 +191,7 @@ func (s *Server) startTCP(addr netip.AddrPort) error {
}
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.SIPPort,
"proto", "tcp",
)
Expand All @@ -216,7 +215,7 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error {
lis := tls.NewListener(tlis, conf)
s.sipListeners = append(s.sipListeners, lis)
s.log.Infow("sip signaling listening on",
"local", s.signalingIpLocal, "external", s.signalingIp,
"local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP,
"port", addr.Port(), "announce-port", s.conf.TLS.Port,
"proto", "tls",
)
Expand All @@ -231,29 +230,9 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error {

type RequestHandler func(req *sip.Request, tx sip.ServerTransaction) bool

func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error {
var err error
if s.conf.UseExternalIP {
if s.signalingIp, err = getPublicIP(); err != nil {
return err
}
if s.signalingIpLocal, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
} else if s.conf.NAT1To1IP != "" {
ip, err := netip.ParseAddr(s.conf.NAT1To1IP)
if err != nil {
return err
}
s.signalingIp = ip
s.signalingIpLocal = s.signalingIp
} else {
if s.signalingIp, err = getLocalIP(s.conf.LocalNet); err != nil {
return err
}
s.signalingIpLocal = s.signalingIp
}
s.log.Infow("server starting", "local", s.signalingIpLocal, "external", s.signalingIp)
func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, unhandled RequestHandler) error {
s.sconf = sc
s.log.Infow("server starting", "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP)

if agent == nil {
ua, err := sipgo.NewUA(
Expand All @@ -265,6 +244,7 @@ func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error {
agent = ua
}

var err error
s.sipSrv, err = sipgo.NewServer(agent,
sipgo.WithServerLogger(slog.New(logger.ToSlogHandler(s.log))),
)
Expand Down
48 changes: 39 additions & 9 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ package sip

import (
"context"
"fmt"
"net/netip"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,18 +29,25 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/media"
"github.com/livekit/sip/pkg/stats"
"github.com/livekit/sip/version"
)

type ServiceConfig struct {
SignalingIP netip.Addr
SignalingIPLocal netip.Addr
}

type Service struct {
conf *config.Config
log logger.Logger
mon *stats.Monitor
cli *Client
srv *Server
conf *config.Config
sconf *ServiceConfig
log logger.Logger
mon *stats.Monitor
cli *Client
srv *Server

mu sync.Mutex
pendingTransfers map[transferKey]chan struct{}
Expand All @@ -48,7 +58,7 @@ type transferKey struct {
TransferTo string
}

func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service {
func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) (*Service, error) {
if log == nil {
log = logger.GetLogger()
}
Expand All @@ -60,7 +70,27 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser
srv: NewServer(conf, log, mon),
pendingTransfers: make(map[transferKey]chan struct{}),
}
return s
var err error
s.sconf, err = GetServiceConfig(s.conf)
if err != nil {
return nil, err
}

s.conf.SIPHostname = strings.ReplaceAll(
s.conf.SIPHostname,
"${IP}",
strings.NewReplacer(
".", "-", // IPv4
"[", "", "]", "", ":", "-", // IPv6
).Replace(s.sconf.SignalingIP.String()),
)
if strings.ContainsAny(s.conf.SIPHostname, "$%{}[]:/| ") {
return nil, fmt.Errorf("invalid hostname: %q", s.conf.SIPHostname)
}
if s.conf.SIPHostname != "" {
log.Infow("using hostname", "hostname", s.conf.SIPHostname)
}
return s, nil
}

func (s *Service) ActiveCalls() int {
Expand Down Expand Up @@ -111,12 +141,12 @@ func (s *Service) Start() error {
if err != nil {
return err
}
if err := s.cli.Start(ua); err != nil {
if err := s.cli.Start(ua, s.sconf); err != nil {
return err
}
// Server is responsible for answering all transactions. However, the client may also receive some (e.g. BYE).
// Thus, all unhandled transactions will be checked by the client.
if err := s.srv.Start(ua, s.cli.OnRequest); err != nil {
if err := s.srv.Start(ua, s.sconf, s.cli.OnRequest); err != nil {
return err
}
s.log.Debugw("sip service ready")
Expand Down
4 changes: 3 additions & 1 deletion pkg/sip/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"

"github.com/livekit/sip/pkg/media"

"github.com/livekit/sip/pkg/stats"
Expand Down Expand Up @@ -89,12 +90,13 @@ func testInvite(t *testing.T, h Handler, hidden bool, from, to string, test func
mon, err := stats.NewMonitor(&config.Config{MaxCpuUtilization: 0.9})
require.NoError(t, err)

s := NewService(&config.Config{
s, err := NewService(&config.Config{
HideInboundPort: hidden,
SIPPort: sipPort,
SIPPortListen: sipPort,
RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax},
}, mon, logger.GetLogger())
require.NoError(t, err)
require.NotNil(t, s)
t.Cleanup(s.Stop)

Expand Down
5 changes: 4 additions & 1 deletion test/integration/sip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func runSIPServer(t testing.TB, lk *LiveKit) *SIPServer {
if err != nil {
t.Fatal(err)
}
sipsrv := sip.NewService(conf, mon, log)
sipsrv, err := sip.NewService(conf, mon, log)
if err != nil {
t.Fatal(err)
}

svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcCli, bus, mon)
sipsrv.SetHandler(svc)
Expand Down
Loading