Skip to content

Commit

Permalink
Implement DTMF Pin input and refactor the server.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc authored and Sean-Der committed Nov 17, 2023
1 parent fa7db89 commit 819608b
Show file tree
Hide file tree
Showing 3 changed files with 289 additions and 127 deletions.
7 changes: 5 additions & 2 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/version"
)
Expand Down Expand Up @@ -79,17 +80,19 @@ func (s *Service) HandleTrunkAuthentication(from, to, srcAddress string) (userna
return resp.Username, resp.Password, nil
}

func (s *Service) HandleDispatchRules(callingNumber, calledNumber, srcAddress string) (joinRoom, identity string, requestPin, rejectInvite bool) {
func (s *Service) HandleDispatchRules(callingNumber, calledNumber, srcAddress string, pin string, skipPin bool) (joinRoom, identity string, requestPin, rejectInvite bool) {
resp, err := s.psrpcClient.EvaluateSIPDispatchRules(context.TODO(), &rpc.EvaluateSIPDispatchRulesRequest{
CallingNumber: callingNumber,
CalledNumber: calledNumber,
SrcAddress: srcAddress,
Pin: pin,
// FIXME: NoPin
})

if err != nil {
log.Println(err)
return "", "", false, true
}

return resp.RoomName, resp.ParticipantIdentity, false, false
return resp.RoomName, resp.ParticipantIdentity, resp.RequestPin, false
}
261 changes: 163 additions & 98 deletions pkg/sip/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"sync/atomic"
"time"

"github.com/livekit/sip/pkg/config"
"github.com/livekit/sip/pkg/mixer"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/zaf/g711"
"gopkg.in/hraban/opus.v2"

"github.com/livekit/sip/pkg/mixer"

lksdk "github.com/livekit/server-sdk-go"
)

Expand All @@ -37,72 +37,34 @@ const (
sampleRate = 8000
)

func createLiveKitParticipant(conf *config.Config, roomName, participantIdentity string, audioMixer *mixer.Mixer) (*webrtc.TrackLocalStaticSample, *lksdk.Room, error) {
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if track.Kind() == webrtc.RTPCodecTypeVideo {
if err := publication.SetSubscribed(false); err != nil {
log.Println(err)
}
return
}

decoder, err := opus.NewDecoder(8000, 1)
if err != nil {
return
}

input := audioMixer.AddInput()
samples := make([]int16, 1000)
for {
rtpPkt, _, err := track.ReadRTP()
if err != nil {
break
}

n, err := decoder.Decode(rtpPkt.Payload, samples)
if err != nil {
break
}

input.Push(samples[:n])
}

audioMixer.RemoveInput(input)

},
},
}
type mediaData struct {
conn *net.UDPConn
mix *mixer.Mixer
enc *opus.Encoder
dest atomic.Pointer[net.UDPAddr]
track atomic.Pointer[webrtc.TrackLocalStaticSample]
room atomic.Pointer[lksdk.Room]
dtmf chan byte
}

room, err := lksdk.ConnectToRoom(conf.WsUrl,
lksdk.ConnectInfo{
APIKey: conf.ApiKey,
APISecret: conf.ApiSecret,
RoomName: roomName,
ParticipantIdentity: participantIdentity,
},
roomCB,
)
if err != nil {
return nil, nil, err
}
func (c *inboundCall) initMedia() {
c.media.dtmf = make(chan byte, 10)
}

track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
return nil, nil, err
func (c *inboundCall) closeMedia() {
if p := c.media.room.Load(); p != nil {
p.Disconnect()
c.media.room.Store(nil)
}

if _, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: participantIdentity,
}); err != nil {
return nil, nil, err
if p := c.media.track.Load(); p != nil {
c.media.track.Store(nil)
}

return track, room, nil
c.media.mix.Stop()
c.media.conn.Close()
close(c.media.dtmf)
}

func createMediaSession(conf *config.Config, roomName, participantIdentity string) (*net.UDPConn, error) {
func (c *inboundCall) createMediaSession() (*net.UDPAddr, error) {
var rtpDestination atomic.Value
conn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: 0,
Expand All @@ -111,14 +73,15 @@ func createMediaSession(conf *config.Config, roomName, participantIdentity strin
if err != nil {
return nil, err
}
c.media.conn = conn

mixerRtpPkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: 5000,
},
}
audioMixer := mixer.NewMixer(func(audioSample []byte) {
c.media.mix = mixer.NewMixer(func(audioSample []byte) {
dstAddr, ok := rtpDestination.Load().(*net.UDPAddr)
if !ok || dstAddr == nil {
return
Expand All @@ -131,62 +94,164 @@ func createMediaSession(conf *config.Config, roomName, participantIdentity strin
return
}

if _, err = conn.WriteTo(raw, dstAddr); err != nil {
if _, err = c.media.conn.WriteTo(raw, dstAddr); err != nil {
return
}

mixerRtpPkt.Header.Timestamp += 160
mixerRtpPkt.Header.SequenceNumber += 1
}, 8000)

track, room, err := createLiveKitParticipant(conf, roomName, participantIdentity, audioMixer)
enc, err := opus.NewEncoder(sampleRate, channels, opus.AppVoIP)
if err != nil {
return nil, err
}
c.media.enc = enc

enc, err := opus.NewEncoder(sampleRate, channels, opus.AppVoIP)
go c.readMedia()
return conn.LocalAddr().(*net.UDPAddr), nil
}

func (c *inboundCall) readMedia() {
buff := make([]byte, 1500)
var rtpPkt rtp.Packet
for {
n, srcAddr, err := c.media.conn.ReadFromUDP(buff)
if err != nil {
return
}
c.media.dest.Store(srcAddr)

if err := rtpPkt.Unmarshal(buff[:n]); err != nil {
continue
}
c.handleRTP(&rtpPkt)
}
}

func (c *inboundCall) handleRTP(p *rtp.Packet) {
if p.Marker && p.PayloadType == 101 {
c.handleDTMF(p.Payload)
return
}
// TODO: Audio data appears to be coming with PayloadType=0, so maybe enforce it?
c.handleAudio(p.Payload)
}

var dtmfEventToChar = [256]byte{
0: '0', 1: '1', 2: '2', 3: '3', 4: '4',
5: '5', 6: '6', 7: '7', 8: '8', 9: '9',
10: '*', 11: '#',
12: 'a', 13: 'b', 14: 'c', 15: 'd',
}

func (c *inboundCall) handleDTMF(data []byte) { // RFC2833
if len(data) < 4 {
return
}
ev := data[0]
b := dtmfEventToChar[ev]
// We should have enough buffer here.
select {
case c.media.dtmf <- b:
default:
}
}

func (c *inboundCall) handleAudio(audioData []byte) {
track := c.media.track.Load()
if track == nil {
return
}
decoded := g711.DecodeUlaw(audioData)

var pcm []int16
for i := 0; i < len(decoded); i += 2 {
sample := binary.LittleEndian.Uint16(decoded[i:])
pcm = append(pcm, int16(sample))
}

data := make([]byte, 1000)
n, err := c.media.enc.Encode(pcm, data)
if err != nil {
return nil, err
return
}
if err = track.WriteSample(media.Sample{Data: data[:n], Duration: time.Millisecond * 20}); err != nil {
return
}
}

go func() {
buff := make([]byte, 1500)
rtpPkt := &rtp.Packet{}
func (c *inboundCall) createLiveKitParticipant(roomName, participantIdentity string) error {
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnTrackSubscribed: func(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) {
if track.Kind() == webrtc.RTPCodecTypeVideo {
if err := publication.SetSubscribed(false); err != nil {
log.Println(err)
}
return
}

for {
n, srcAddr, err := conn.ReadFromUDP(buff)
if err != nil {
break
}
decoder, err := opus.NewDecoder(8000, 1)
if err != nil {
return
}

rtpDestination.Store(srcAddr)
input := c.media.mix.AddInput()
samples := make([]int16, 1000)
for {
rtpPkt, _, err := track.ReadRTP()
if err != nil {
break
}

if err := rtpPkt.Unmarshal(buff[:n]); err != nil {
continue
}
n, err := decoder.Decode(rtpPkt.Payload, samples)
if err != nil {
break
}

decoded := g711.DecodeUlaw(rtpPkt.Payload)
input.Push(samples[:n])
}
c.media.mix.RemoveInput(input)
},
},
}

var pcm []int16
for i := 0; i < len(decoded); i += 2 {
sample := binary.LittleEndian.Uint16(decoded[i:])
pcm = append(pcm, int16(sample))
}
room, err := lksdk.ConnectToRoom(c.s.conf.WsUrl,
lksdk.ConnectInfo{
APIKey: c.s.conf.ApiKey,
APISecret: c.s.conf.ApiSecret,
RoomName: roomName,
ParticipantIdentity: participantIdentity,
},
roomCB,
)
if err != nil {
return err
}

data := make([]byte, 1000)
n, err = enc.Encode(pcm, data)
if err != nil {
continue
}
track, err := webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
return err
}

if err = track.WriteSample(media.Sample{Data: data[:n], Duration: time.Millisecond * 20}); err != nil {
continue
}
}
if _, err = room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: participantIdentity,
}); err != nil {
return err
}
c.media.track.Store(track)
c.media.room.Store(room)
return nil
}

room.Disconnect()
audioMixer.Stop()
}()
func (c *inboundCall) joinRoom(roomName, identity string) {
log.Printf("Bridging SIP call %q -> %q to room %q (as %q)\n", c.from.Address.User, c.to.Address.User, roomName, identity)
if err := c.createLiveKitParticipant(roomName, identity); err != nil {
log.Println(err)
}
}

return conn, nil
func (c *inboundCall) playPleaseEnterPin() {
// FIXME: play "Please enter room pin" audio
}
Loading

0 comments on commit 819608b

Please sign in to comment.