From fa6557e3b8927e36403cb0fcb564f23378e90d29 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Fri, 25 Oct 2024 12:30:54 +0300 Subject: [PATCH] Basic templating for SIP hostname. --- cmd/livekit-sip/main.go | 6 ++++- pkg/sip/client.go | 43 +++++++++----------------------- pkg/sip/config.go | 28 +++++++++++++++++++++ pkg/sip/inbound.go | 2 +- pkg/sip/outbound.go | 2 +- pkg/sip/server.go | 48 +++++++++++------------------------- pkg/sip/service.go | 48 +++++++++++++++++++++++++++++------- pkg/sip/service_test.go | 4 ++- test/integration/sip_test.go | 5 +++- 9 files changed, 106 insertions(+), 80 deletions(-) diff --git a/cmd/livekit-sip/main.go b/cmd/livekit-sip/main.go index 92492d6..e0b7c50 100644 --- a/cmd/livekit-sip/main.go +++ b/cmd/livekit-sip/main.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/protocol/redis" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" + "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/pkg/config" @@ -90,7 +91,10 @@ func runService(c *cli.Context) error { return err } - sipsrv := sip.NewService(conf, mon, log) + sipsrv, err := sip.NewService(conf, mon, log) + if err != nil { + return err + } svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcClient, bus, mon) sipsrv.SetHandler(svc) diff --git a/pkg/sip/client.go b/pkg/sip/client.go index c223e12..3b959f8 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "log/slog" - "net/netip" "sync" "github.com/emiago/sipgo" @@ -36,13 +35,12 @@ import ( ) type Client struct { - conf *config.Config - log logger.Logger - mon *stats.Monitor + conf *config.Config + sconf *ServiceConfig + log logger.Logger + mon *stats.Monitor - sipCli *sipgo.Client - signalingIp netip.Addr - signalingIpLocal netip.Addr + sipCli *sipgo.Client closing core.Fuse cmu sync.Mutex @@ -66,29 +64,9 @@ func NewClient(conf *config.Config, log logger.Logger, mon *stats.Monitor) *Clie return c } -func (c *Client) Start(agent *sipgo.UserAgent) error { - var err error - if c.conf.UseExternalIP { - if c.signalingIp, err = getPublicIP(); err != nil { - return err - } - if c.signalingIpLocal, err = getLocalIP(c.conf.LocalNet); err != nil { - return err - } - } else if c.conf.NAT1To1IP != "" { - ip, err := netip.ParseAddr(c.conf.NAT1To1IP) - if err != nil { - return err - } - c.signalingIp = ip - c.signalingIpLocal = c.signalingIp - } else { - if c.signalingIp, err = getLocalIP(c.conf.LocalNet); err != nil { - return err - } - c.signalingIpLocal = c.signalingIp - } - c.log.Infow("client starting", "local", c.signalingIpLocal, "external", c.signalingIp) +func (c *Client) Start(agent *sipgo.UserAgent, sc *ServiceConfig) error { + c.sconf = sc + c.log.Infow("client starting", "local", c.sconf.SignalingIPLocal, "external", c.sconf.SignalingIP) if agent == nil { ua, err := sipgo.NewUA( @@ -100,8 +78,9 @@ func (c *Client) Start(agent *sipgo.UserAgent) error { agent = ua } + var err error c.sipCli, err = sipgo.NewClient(agent, - sipgo.WithClientHostname(c.signalingIp.String()), + sipgo.WithClientHostname(c.sconf.SignalingIP.String()), sipgo.WithClientLogger(slog.New(logger.ToSlogHandler(c.log))), ) if err != nil { @@ -132,7 +111,7 @@ func (c *Client) SetHandler(handler Handler) { } func (c *Client) ContactURI(tr Transport) URI { - return getContactURI(c.conf, c.signalingIp, tr) + return getContactURI(c.conf, c.sconf.SignalingIP, tr) } func (c *Client) CreateSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (*rpc.InternalCreateSIPParticipantResponse, error) { diff --git a/pkg/sip/config.go b/pkg/sip/config.go index a96d9c7..9c15648 100644 --- a/pkg/sip/config.go +++ b/pkg/sip/config.go @@ -21,8 +21,36 @@ import ( "net" "net/http" "net/netip" + + "github.com/livekit/sip/pkg/config" ) +func GetServiceConfig(conf *config.Config) (*ServiceConfig, error) { + s := new(ServiceConfig) + var err error + if conf.UseExternalIP { + if s.SignalingIP, err = getPublicIP(); err != nil { + return nil, err + } + if s.SignalingIPLocal, err = getLocalIP(conf.LocalNet); err != nil { + return nil, err + } + } else if conf.NAT1To1IP != "" { + ip, err := netip.ParseAddr(conf.NAT1To1IP) + if err != nil { + return nil, err + } + s.SignalingIP = ip + s.SignalingIPLocal = s.SignalingIP + } else { + if s.SignalingIP, err = getLocalIP(conf.LocalNet); err != nil { + return nil, err + } + s.SignalingIPLocal = s.SignalingIP + } + return s, nil +} + func getPublicIP() (netip.Addr, error) { req, err := http.Get("http://ip-api.com/json/") if err != nil { diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 7001b45..809c53d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -451,7 +451,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, conf *config.Config, featur c.log.Debugw("SDP offer", "sdp", string(offerData)) mp, err := NewMediaPort(c.log, c.mon, &MediaConfig{ - IP: c.s.signalingIp, + IP: c.s.sconf.SignalingIP, Ports: conf.RTPPort, MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, MediaTimeout: c.s.conf.MediaTimeout, diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index b65bd92..2d2f973 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -99,7 +99,7 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address) var err error call.media, err = NewMediaPort(call.log, call.mon, &MediaConfig{ - IP: c.signalingIp, + IP: c.sconf.SignalingIP, Ports: conf.RTPPort, MediaTimeoutInitial: c.conf.MediaTimeoutInitial, MediaTimeout: c.conf.MediaTimeout, diff --git a/pkg/sip/server.go b/pkg/sip/server.go index e1ce040..b57be6c 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -110,13 +110,11 @@ type Handler interface { } type Server struct { - log logger.Logger - mon *stats.Monitor - sipSrv *sipgo.Server - sipListeners []io.Closer - sipUnhandled RequestHandler - signalingIp netip.Addr - signalingIpLocal netip.Addr + log logger.Logger + mon *stats.Monitor + sipSrv *sipgo.Server + sipListeners []io.Closer + sipUnhandled RequestHandler inProgressInvites []*inProgressInvite @@ -127,6 +125,7 @@ type Server struct { handler Handler conf *config.Config + sconf *ServiceConfig res mediaRes } @@ -156,7 +155,7 @@ func (s *Server) SetHandler(handler Handler) { } func (s *Server) ContactURI(tr Transport) URI { - return getContactURI(s.conf, s.signalingIp, tr) + return getContactURI(s.conf, s.sconf.SignalingIP, tr) } func (s *Server) startUDP(addr netip.AddrPort) error { @@ -169,7 +168,7 @@ func (s *Server) startUDP(addr netip.AddrPort) error { } s.sipListeners = append(s.sipListeners, lis) s.log.Infow("sip signaling listening on", - "local", s.signalingIpLocal, "external", s.signalingIp, + "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP, "port", addr.Port(), "announce-port", s.conf.SIPPort, "proto", "udp", ) @@ -192,7 +191,7 @@ func (s *Server) startTCP(addr netip.AddrPort) error { } s.sipListeners = append(s.sipListeners, lis) s.log.Infow("sip signaling listening on", - "local", s.signalingIpLocal, "external", s.signalingIp, + "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP, "port", addr.Port(), "announce-port", s.conf.SIPPort, "proto", "tcp", ) @@ -216,7 +215,7 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error { lis := tls.NewListener(tlis, conf) s.sipListeners = append(s.sipListeners, lis) s.log.Infow("sip signaling listening on", - "local", s.signalingIpLocal, "external", s.signalingIp, + "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP, "port", addr.Port(), "announce-port", s.conf.TLS.Port, "proto", "tls", ) @@ -231,29 +230,9 @@ func (s *Server) startTLS(addr netip.AddrPort, conf *tls.Config) error { type RequestHandler func(req *sip.Request, tx sip.ServerTransaction) bool -func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error { - var err error - if s.conf.UseExternalIP { - if s.signalingIp, err = getPublicIP(); err != nil { - return err - } - if s.signalingIpLocal, err = getLocalIP(s.conf.LocalNet); err != nil { - return err - } - } else if s.conf.NAT1To1IP != "" { - ip, err := netip.ParseAddr(s.conf.NAT1To1IP) - if err != nil { - return err - } - s.signalingIp = ip - s.signalingIpLocal = s.signalingIp - } else { - if s.signalingIp, err = getLocalIP(s.conf.LocalNet); err != nil { - return err - } - s.signalingIpLocal = s.signalingIp - } - s.log.Infow("server starting", "local", s.signalingIpLocal, "external", s.signalingIp) +func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, unhandled RequestHandler) error { + s.sconf = sc + s.log.Infow("server starting", "local", s.sconf.SignalingIPLocal, "external", s.sconf.SignalingIP) if agent == nil { ua, err := sipgo.NewUA( @@ -265,6 +244,7 @@ func (s *Server) Start(agent *sipgo.UserAgent, unhandled RequestHandler) error { agent = ua } + var err error s.sipSrv, err = sipgo.NewServer(agent, sipgo.WithServerLogger(slog.New(logger.ToSlogHandler(s.log))), ) diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 5f5d330..123b120 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -16,6 +16,9 @@ package sip import ( "context" + "fmt" + "net/netip" + "strings" "sync" "sync/atomic" "time" @@ -26,18 +29,25 @@ import ( "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" "github.com/livekit/psrpc" + "github.com/livekit/sip/pkg/config" "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/version" ) +type ServiceConfig struct { + SignalingIP netip.Addr + SignalingIPLocal netip.Addr +} + type Service struct { - conf *config.Config - log logger.Logger - mon *stats.Monitor - cli *Client - srv *Server + conf *config.Config + sconf *ServiceConfig + log logger.Logger + mon *stats.Monitor + cli *Client + srv *Server mu sync.Mutex pendingTransfers map[transferKey]chan struct{} @@ -48,7 +58,7 @@ type transferKey struct { TransferTo string } -func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service { +func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) (*Service, error) { if log == nil { log = logger.GetLogger() } @@ -60,7 +70,27 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser srv: NewServer(conf, log, mon), pendingTransfers: make(map[transferKey]chan struct{}), } - return s + var err error + s.sconf, err = GetServiceConfig(s.conf) + if err != nil { + return nil, err + } + + s.conf.SIPHostname = strings.ReplaceAll( + s.conf.SIPHostname, + "${IP}", + strings.NewReplacer( + ".", "-", // IPv4 + "[", "", "]", "", ":", "-", // IPv6 + ).Replace(s.sconf.SignalingIP.String()), + ) + if strings.ContainsAny(s.conf.SIPHostname, "$%{}[]:/| ") { + return nil, fmt.Errorf("invalid hostname: %q", s.conf.SIPHostname) + } + if s.conf.SIPHostname != "" { + log.Infow("using hostname", "hostname", s.conf.SIPHostname) + } + return s, nil } func (s *Service) ActiveCalls() int { @@ -111,12 +141,12 @@ func (s *Service) Start() error { if err != nil { return err } - if err := s.cli.Start(ua); err != nil { + if err := s.cli.Start(ua, s.sconf); err != nil { return err } // Server is responsible for answering all transactions. However, the client may also receive some (e.g. BYE). // Thus, all unhandled transactions will be checked by the client. - if err := s.srv.Start(ua, s.cli.OnRequest); err != nil { + if err := s.srv.Start(ua, s.sconf, s.cli.OnRequest); err != nil { return err } s.log.Debugw("sip service ready") diff --git a/pkg/sip/service_test.go b/pkg/sip/service_test.go index 37b386f..7e2c9e9 100644 --- a/pkg/sip/service_test.go +++ b/pkg/sip/service_test.go @@ -14,6 +14,7 @@ import ( "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/stats" @@ -89,12 +90,13 @@ func testInvite(t *testing.T, h Handler, hidden bool, from, to string, test func mon, err := stats.NewMonitor(&config.Config{MaxCpuUtilization: 0.9}) require.NoError(t, err) - s := NewService(&config.Config{ + s, err := NewService(&config.Config{ HideInboundPort: hidden, SIPPort: sipPort, SIPPortListen: sipPort, RTPPort: rtcconfig.PortRange{Start: testPortRTPMin, End: testPortRTPMax}, }, mon, logger.GetLogger()) + require.NoError(t, err) require.NotNil(t, s) t.Cleanup(s.Stop) diff --git a/test/integration/sip_test.go b/test/integration/sip_test.go index 93c7ad0..2199d13 100644 --- a/test/integration/sip_test.go +++ b/test/integration/sip_test.go @@ -79,7 +79,10 @@ func runSIPServer(t testing.TB, lk *LiveKit) *SIPServer { if err != nil { t.Fatal(err) } - sipsrv := sip.NewService(conf, mon, log) + sipsrv, err := sip.NewService(conf, mon, log) + if err != nil { + t.Fatal(err) + } svc := service.NewService(conf, log, sipsrv, sipsrv.Stop, sipsrv.ActiveCalls, psrpcCli, bus, mon) sipsrv.SetHandler(svc)