From 54af3dca7bfca0f17e93d2095a85897e359ff2c5 Mon Sep 17 00:00:00 2001 From: Sean DuBois Date: Mon, 4 Dec 2023 14:57:57 -0500 Subject: [PATCH] Support DispatchRuleHandler responding with Token --- go.mod | 4 +-- go.sum | 6 ++-- pkg/service/service.go | 6 ++-- pkg/sip/inbound.go | 16 +++++----- pkg/sip/room.go | 68 +++++++++++++++++++++++------------------- pkg/sip/server.go | 2 +- 6 files changed, 54 insertions(+), 48 deletions(-) diff --git a/go.mod b/go.mod index 2d7e69b5..7ca1f332 100644 --- a/go.mod +++ b/go.mod @@ -9,13 +9,14 @@ require ( github.com/icholy/digest v0.1.22 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e - github.com/livekit/protocol v1.9.4-0.20231204164031-5ffa9f0c6545 + github.com/livekit/protocol v1.9.4-0.20231204184853-a64882f692f9 github.com/livekit/psrpc v0.5.2 github.com/livekit/server-sdk-go v1.1.1 github.com/pion/interceptor v0.1.25 github.com/pion/rtp v1.8.3 github.com/pion/sdp/v2 v2.4.0 github.com/pion/webrtc/v3 v3.2.23 + github.com/rs/zerolog v1.28.0 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.7 @@ -76,7 +77,6 @@ require ( github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.11.1 // indirect github.com/redis/go-redis/v9 v9.3.0 // indirect - github.com/rs/zerolog v1.28.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect github.com/thoas/go-funk v0.9.3 // indirect diff --git a/go.sum b/go.sum index 48698938..d1a0c16e 100644 --- a/go.sum +++ b/go.sum @@ -90,10 +90,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e h1:yNeIo7MSMUWgoLu7LkNKnBYnJBFPFH9Wq4S6h1kS44M= github.com/livekit/mediatransportutil v0.0.0-20231017082622-43f077b4e60e/go.mod h1:+WIOYwiBMive5T81V8B2wdAc2zQNRjNQiJIcPxMTILY= -github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd h1:DlyhqC/Ge176SQFNCP/VgSabh3Gwf0cjEKqooznbO1E= -github.com/livekit/protocol v1.9.2-0.20231121183749-0cb26043c3cd/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= -github.com/livekit/protocol v1.9.4-0.20231204164031-5ffa9f0c6545 h1:PvGxeNAXwNvZY/x4akBKA1SN3sjmcoyMy56/reSWjho= -github.com/livekit/protocol v1.9.4-0.20231204164031-5ffa9f0c6545/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= +github.com/livekit/protocol v1.9.4-0.20231204184853-a64882f692f9 h1:n2CEik59FIPXqJM7nlKJfYsGYmZUWbNWBEWZ5hzSDy4= +github.com/livekit/protocol v1.9.4-0.20231204184853-a64882f692f9/go.mod h1:8f342d5nvfNp9YAEfJokSR+zbNFpaivgU0h6vwaYhes= github.com/livekit/psrpc v0.5.2 h1:+MvG8Otm/J6MTg2MP/uuMbrkxOWsrj2hDhu/I1VIU1U= github.com/livekit/psrpc v0.5.2/go.mod h1:cQjxg1oCxYHhxxv6KJH1gSvdtCHQoRZCHgPdm5N8v2g= github.com/livekit/server-sdk-go v1.1.1 h1:TkDD/Ecyh7XNuxgxhpsDQ1uzbTlDWwwJrbkyUjQmcbY= diff --git a/pkg/service/service.go b/pkg/service/service.go index 8aa478a0..6aa90240 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -87,7 +87,7 @@ func (s *Service) HandleTrunkAuthentication(from, to, toHost, srcAddress string) return resp.Username, resp.Password, nil } -func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, srcAddress string, pin string, noPin bool) (joinRoom, identity string, requestPin, rejectInvite bool) { +func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, srcAddress string, pin string, noPin bool) (joinRoom, identity, wsUrl, token string, requestPin, rejectInvite bool) { resp, err := s.psrpcClient.EvaluateSIPDispatchRules(context.TODO(), &rpc.EvaluateSIPDispatchRulesRequest{ CallingNumber: callingNumber, CalledNumber: calledNumber, @@ -99,10 +99,10 @@ func (s *Service) HandleDispatchRules(callingNumber, calledNumber, calledHost, s if err != nil { logger.Warnw("SIP handle dispatch rule error", err) - return "", "", false, true + return "", "", "", "", false, true } - return resp.RoomName, resp.ParticipantIdentity, resp.RequestPin, false + return resp.RoomName, resp.ParticipantIdentity, resp.WsUrl, resp.Token, resp.RequestPin, false } func (s *Service) CanAccept() bool { diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 19eef59d..eaa3ff30 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -163,7 +163,7 @@ func (s *Server) newInboundCall(tag string, from *sip.FromHeader, to *sip.ToHead func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, conf *config.Config) { // Send initial request. In the best case scenario, we will immediately get a room name to join. // Otherwise, we could even learn that this number is not allowed and reject the call, or ask for pin if required. - roomName, identity, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false) + roomName, identity, wsUrl, token, requirePin, rejectInvite := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, "", false) if rejectInvite { logger.Infow("Rejecting inbound call, doesn't match any Dispatch Rules", "from", c.from.Address.User, "to", c.to.Address.User, "to-host", c.to.Address.Host, "src", c.src) sipErrorResponse(tx, req) @@ -193,7 +193,7 @@ func (c *inboundCall) handleInvite(req *sip.Request, tx sip.ServerTransaction, c if requirePin { c.pinPrompt() } else { - c.joinRoom(roomName, identity) + c.joinRoom(roomName, identity, wsUrl, token) } } @@ -248,14 +248,14 @@ func (c *inboundCall) pinPrompt() { noPin = pin == "" logger.Infow("Checking Pin for SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin) - roomName, identity, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, pin, noPin) + roomName, identity, wsUrl, token, requirePin, reject := c.s.dispatchRuleHandler(c.from.Address.User, c.to.Address.User, c.to.Address.Host, c.src, pin, noPin) if reject || requirePin || roomName == "" { logger.Infow("Rejecting call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "pin", pin, "noPin", noPin) c.playAudio(c.s.res.wrongPin) c.Close() return } - c.joinRoom(roomName, identity) + c.joinRoom(roomName, identity, wsUrl, token) return } // Gather pin numbers @@ -307,8 +307,8 @@ func (c *inboundCall) HandleRTP(p *rtp.Packet) error { return nil } -func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error { - err := c.lkRoom.Connect(c.s.conf, roomName, participantIdentity) +func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity, wsUrl, token string) error { + err := c.lkRoom.Connect(c.s.conf, roomName, participantIdentity, wsUrl, token) if err != nil { return err } @@ -327,10 +327,10 @@ func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity str return nil } -func (c *inboundCall) joinRoom(roomName, identity string) { +func (c *inboundCall) joinRoom(roomName, identity, wsUrl, token string) { logger.Infow("Bridging SIP call", "tag", c.tag, "from", c.from.Address.User, "to", c.to.Address.User, "roomName", roomName, "identity", identity) c.playAudio(c.s.res.roomJoin) - if err := c.createLiveKitParticipant(roomName, identity); err != nil { + if err := c.createLiveKitParticipant(roomName, identity, wsUrl, token); err != nil { logger.Errorw("Cannot create LiveKit participant", err, "tag", c.tag) } } diff --git a/pkg/sip/room.go b/pkg/sip/room.go index bcae9172..8bf3597a 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -51,39 +51,47 @@ func NewRoom() *Room { return r } -func (r *Room) Connect(conf *config.Config, roomName string, identity string) error { +func (r *Room) Connect(conf *config.Config, roomName, identity, wsUrl, token string) error { + var ( + err error + room *lksdk.Room + ) r.identity = identity - - room, err := lksdk.ConnectToRoom(conf.WsUrl, - lksdk.ConnectInfo{ - APIKey: conf.ApiKey, - APISecret: conf.ApiSecret, - RoomName: roomName, - ParticipantIdentity: identity, - }, - &lksdk.RoomCallback{ - ParticipantCallback: lksdk.ParticipantCallback{ - OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { - if track.Kind() != webrtc.RTPCodecTypeAudio { - if err := pub.SetSubscribed(false); err != nil { - logger.Errorw("Cannot unsubscribe from the track", err) - } - return - } - - mtrack := r.NewTrack() - defer mtrack.Close() - - odec, err := opus.Decode(mtrack, sampleRate, channels) - if err != nil { - return + roomCallback := &lksdk.RoomCallback{ + ParticipantCallback: lksdk.ParticipantCallback{ + OnTrackSubscribed: func(track *webrtc.TrackRemote, pub *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { + if track.Kind() != webrtc.RTPCodecTypeAudio { + if err := pub.SetSubscribed(false); err != nil { + logger.Errorw("Cannot unsubscribe from the track", err) } - h := rtp.NewMediaStreamIn[opus.Sample](odec) - _ = rtp.HandleLoop(track, h) - }, + return + } + + mtrack := r.NewTrack() + defer mtrack.Close() + + odec, err := opus.Decode(mtrack, sampleRate, channels) + if err != nil { + return + } + h := rtp.NewMediaStreamIn[opus.Sample](odec) + _ = rtp.HandleLoop(track, h) }, }, - ) + } + + if wsUrl == "" || token == "" { + room, err = lksdk.ConnectToRoom(conf.WsUrl, + lksdk.ConnectInfo{ + APIKey: conf.ApiKey, + APISecret: conf.ApiSecret, + RoomName: roomName, + ParticipantIdentity: identity, + }, roomCallback) + } else { + room, err = lksdk.ConnectToRoomWithToken(wsUrl, token, roomCallback) + } + if err != nil { return err } @@ -93,7 +101,7 @@ func (r *Room) Connect(conf *config.Config, roomName string, identity string) er func ConnectToRoom(conf *config.Config, roomName string, identity string) (*Room, error) { r := NewRoom() - if err := r.Connect(conf, roomName, identity); err != nil { + if err := r.Connect(conf, roomName, identity, "", ""); err != nil { return nil, err } return r, nil diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 7deb540d..1003ef58 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -40,7 +40,7 @@ var ( type ( AuthHandlerFunc func(fromUser, toUser, toHost, srcAddress string) (username, password string, err error) - DispatchRuleHandlerFunc func(fromUser, toUser, toHost, srcAddress string, pin string, noPin bool) (joinRoom, identity string, requestPin, rejectInvite bool) + DispatchRuleHandlerFunc func(fromUser, toUser, toHost, srcAddress string, pin string, noPin bool) (joinRoom, identity, wsUrl, token string, requestPin, rejectInvite bool) Server struct { sipSrv *sipgo.Server signalingIp string