Skip to content

Commit

Permalink
Add basic call metrics (#29)
Browse files Browse the repository at this point in the history
Add basic call metrics.
  • Loading branch information
dennwc authored Dec 16, 2023
1 parent 634148a commit b1e6972
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 44 deletions.
9 changes: 5 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type Config struct {
ApiSecret string `yaml:"api_secret"` // required (env LIVEKIT_API_SECRET)
WsUrl string `yaml:"ws_url"` // required (env LIVEKIT_WS_URL)

HealthPort int `yaml:"health_port"`
SIPPort int `yaml:"sip_port"`
RTPPort rtcconfig.PortRange `yaml:"rtp_port"`
Logging logger.Config `yaml:"logging"`
HealthPort int `yaml:"health_port"`
PrometheusPort int `yaml:"prometheus_port"`
SIPPort int `yaml:"sip_port"`
RTPPort rtcconfig.PortRange `yaml:"rtp_port"`
Logging logger.Config `yaml:"logging"`

UseExternalIP bool `yaml:"use_external_ip"`
NAT1To1IP string `yaml:"nat_1_to_1_ip"`
Expand Down
48 changes: 34 additions & 14 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,41 @@ package service
import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/frostbyte73/core"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)

const shutdownTimer = time.Second * 5

type (
sipServiceStopFunc func()
sipServiceActiveCallsFunc func() int
type sipServiceStopFunc func()
type sipServiceActiveCallsFunc func() int

Service struct {
conf *config.Config
type Service struct {
conf *config.Config

psrpcServer rpc.SIPInternalServerImpl
psrpcClient rpc.IOInfoClient
bus psrpc.MessageBus
psrpcServer rpc.SIPInternalServerImpl
psrpcClient rpc.IOInfoClient
bus psrpc.MessageBus

sipServiceStop sipServiceStopFunc
sipServiceActiveCalls sipServiceActiveCallsFunc
promServer *http.Server

shutdown core.Fuse
killed atomic.Bool
}
)
sipServiceStop sipServiceStopFunc
sipServiceActiveCalls sipServiceActiveCallsFunc

shutdown core.Fuse
killed atomic.Bool
}

func NewService(
conf *config.Config, srv rpc.SIPInternalServerImpl, sipServiceStop sipServiceStopFunc,
Expand All @@ -66,6 +69,12 @@ func NewService(

shutdown: core.NewFuse(),
}
if conf.PrometheusPort > 0 {
s.promServer = &http.Server{
Addr: fmt.Sprintf(":%d", conf.PrometheusPort),
Handler: promhttp.Handler(),
}
}
return s
}

Expand All @@ -77,6 +86,17 @@ func (s *Service) Stop(kill bool) {
func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

if s.promServer != nil {
promListener, err := net.Listen("tcp", s.promServer.Addr)
if err != nil {
return err
}
defer promListener.Close()
go func() {
_ = s.promServer.Serve(promListener)
}()
}

srv, err := rpc.NewSIPInternalServer(s.psrpcServer, s.bus)
if err != nil {
return err
Expand Down
5 changes: 4 additions & 1 deletion pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/stats"
)

type Client struct {
conf *config.Config
mon *stats.Monitor

sipCli *sipgo.Client
signalingIp string
Expand All @@ -37,9 +39,10 @@ type Client struct {
activeCalls map[string]*outboundCall
}

func NewClient(conf *config.Config) *Client {
func NewClient(conf *config.Config, mon *stats.Monitor) *Client {
c := &Client{
conf: conf,
mon: mon,
activeCalls: make(map[string]*outboundCall),
}
return c
Expand Down
8 changes: 8 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *Server) handleInviteAuth(req *sip.Request, tx sip.ServerTransaction, fr
}

func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
s.mon.InviteReqRaw(false)
_ = tx.Respond(sip.NewResponseFromRequest(req, 180, "Ringing", nil))

tag, err := getTagValue(req)
Expand All @@ -114,18 +115,23 @@ func (s *Server) onInvite(req *sip.Request, tx sip.ServerTransaction) {
}
src := req.Source()

s.mon.InviteReq(false, from.Address.String(), to.Address.String())
logger.Infow("INVITE", "tag", tag, "from", from, "to", to)

username, password, err := s.authHandler(from.Address.User, to.Address.User, to.Address.Host, src)
if err != nil {
s.mon.InviteError(false, from.Address.String(), to.Address.String(), "no-rule")
logger.Warnw("Rejecting inbound call, doesn't match any Trunks", err, "tag", tag, "src", src, "from", from, "to", to, "to-host", to.Address.Host)
sipErrorResponse(tx, req)
return
}
if !s.handleInviteAuth(req, tx, from.Address.User, username, password) {
s.mon.InviteError(false, from.Address.String(), to.Address.String(), "unauthorized")
// handleInviteAuth will generate the SIP Response as needed
return
}
s.mon.InviteReq(false, from.Address.String(), to.Address.String())

call := s.newInboundCall(tag, from, to, src)
call.handleInvite(call.ctx, req, tx, s.conf)
}
Expand Down Expand Up @@ -182,6 +188,8 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead
}

func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, tx sip.ServerTransaction, conf *config.Config) {
c.s.mon.CallStart(false, c.from.Address.String(), c.to.Address.String())
defer c.s.mon.CallEnd(false, c.from.Address.String(), c.to.Address.String())
defer c.close()
// Send initial request. In the best case scenario, we will immediately get a room name to join.
// Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required.
Expand Down
42 changes: 22 additions & 20 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/stats"
)

const (
Expand All @@ -36,34 +37,35 @@ var (
contentTypeHeaderSDP = sip.ContentTypeHeader("application/sdp")
)

type (
AuthHandlerFunc func(fromUser, toUser, toHost, srcAddress string) (username, password string, err error)
DispatchRuleHandlerFunc func(ctx context.Context, fromUser, toUser, toHost, srcAddress string, pin string, noPin bool) (joinRoom, identity, wsUrl, token string, requestPin, rejectInvite bool)
Server struct {
sipSrv *sipgo.Server
signalingIp string
type AuthHandlerFunc func(fromUser, toUser, toHost, srcAddress string) (username, password string, err error)
type DispatchRuleHandlerFunc func(ctx context.Context, fromUser, toUser, toHost, srcAddress string, pin string, noPin bool) (joinRoom, identity, wsUrl, token string, requestPin, rejectInvite bool)

inProgressInvites []*inProgressInvite
type Server struct {
mon *stats.Monitor
sipSrv *sipgo.Server
signalingIp string

cmu sync.RWMutex
activeCalls map[string]*inboundCall
inProgressInvites []*inProgressInvite

authHandler AuthHandlerFunc
dispatchRuleHandler DispatchRuleHandlerFunc
conf *config.Config
cmu sync.RWMutex
activeCalls map[string]*inboundCall

res mediaRes
}
authHandler AuthHandlerFunc
dispatchRuleHandler DispatchRuleHandlerFunc
conf *config.Config

inProgressInvite struct {
from string
challenge digest.Challenge
}
)
res mediaRes
}

type inProgressInvite struct {
from string
challenge digest.Challenge
}

func NewServer(conf *config.Config) *Server {
func NewServer(conf *config.Config, mon *stats.Monitor) *Server {
s := &Server{
conf: conf,
mon: mon,
activeCalls: make(map[string]*inboundCall),
inProgressInvites: []*inProgressInvite{},
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/sip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/exp/maps"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/stats"
"github.com/livekit/sip/version"
)

Expand All @@ -44,16 +45,21 @@ func init() {
}

type Service struct {
cli *Client
srv *Server
conf *config.Config
mon *stats.Monitor
cli *Client
srv *Server
}

func NewService(conf *config.Config) (*Service, error) {
cli := NewClient(conf)
mon := stats.NewMonitor()
cli := NewClient(conf, mon)
s := &Service{
cli: cli,
conf: conf,
mon: mon,
cli: cli,
}
s.srv = NewServer(conf)
s.srv = NewServer(conf, mon)
return s, nil
}

Expand All @@ -72,6 +78,7 @@ func (s *Service) ActiveCalls() int {
func (s *Service) Stop() {
s.cli.Stop()
s.srv.Stop()
s.mon.Stop()
}

func (s *Service) SetAuthHandler(handler AuthHandlerFunc) {
Expand All @@ -88,6 +95,10 @@ func (s *Service) InternalServerImpl() rpc.SIPInternalServerImpl {

func (s *Service) Start() error {
logger.Debugw("starting sip service", "version", version.Version)

if err := s.mon.Start(s.conf); err != nil {
return err
}
ua, err := sipgo.NewUA(
sipgo.WithUserAgent(UserAgent),
)
Expand Down
Loading

0 comments on commit b1e6972

Please sign in to comment.