Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shared media conn code in test client. #20

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 129 additions & 116 deletions test/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package main

import (
"encoding/binary"
"flag"
"fmt"
"io"
"log/slog"
"math/rand"
"net"
"os"
Expand All @@ -29,9 +32,12 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/icholy/digest"
"github.com/livekit/sip/pkg/media/ulaw"
"github.com/pion/rtp"
"github.com/pion/sdp/v2"

"github.com/livekit/sip/pkg/media"
lkrtp "github.com/livekit/sip/pkg/media/rtp"
"github.com/livekit/sip/pkg/media/ulaw"
lksip "github.com/livekit/sip/pkg/sip"
)

var (
Expand All @@ -45,72 +51,73 @@ var (
filePathSave = flag.String("save", "save.mkv", "")
)

func startMediaListener() *net.UDPConn {
conn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: 0,
IP: net.ParseIP("0.0.0.0"),
})
func NewMKVWriter(w io.WriteCloser) (*MKVWriter, error) {
ws, err := webm.NewSimpleBlockWriter(w,
[]webm.TrackEntry{{
Name: "Audio",
TrackNumber: 1,
TrackUID: 12345,
CodecID: "A_PCM/INT/LIT",
TrackType: 2,
DefaultDuration: 20000000,
Audio: &webm.Audio{
SamplingFrequency: 8000.0,
Channels: 1,
},
}},
)
if err != nil {
panic(err)
return nil, err
}
return &MKVWriter{w: w, ws: ws[0]}, nil
}

go func() {
var (
p rtp.Packet
audioTimestamp int64
)
buf := make([]byte, 1500)

w, err := os.OpenFile(*filePathSave, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
panic(err)
}

ws, err := webm.NewSimpleBlockWriter(w,
[]webm.TrackEntry{
{
Name: "Audio",
TrackNumber: 1,
TrackUID: 12345,
CodecID: "A_PCM/INT/LIT",
TrackType: 2,
DefaultDuration: 20000000,
Audio: &webm.Audio{
SamplingFrequency: 8000.0,
Channels: 1,
},
},
})
if err != nil {
panic(err)
}

for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
return
}
type MKVWriter struct {
w io.WriteCloser
ws webm.BlockWriteCloser
audioTimestamp int64
buf []byte
}

p = rtp.Packet{}
if err := p.Unmarshal(buf[:n]); err != nil {
continue
}
func (w *MKVWriter) WriteSample(in media.PCM16Sample) error {
w.audioTimestamp += 20
if sz := 2 * len(in); cap(w.buf) < sz {
w.buf = make([]byte, sz)
} else {
w.buf = w.buf[:sz]
}
for i, v := range in {
binary.LittleEndian.PutUint16(w.buf[2*i:], uint16(v))
}
_, err := w.ws.Write(true, w.audioTimestamp, w.buf)
return err
}

audioTimestamp += 20
decoded := ulaw.DecodeUlaw(p.Payload)
out := []byte{}
for _, sample := range decoded {
out = append(out, byte(sample&0xff))
out = append(out, byte(sample>>8))
}
func (w *MKVWriter) Close() error {
var last error
if err := w.ws.Close(); err != nil {
last = err
}
if err := w.w.Close(); err != nil {
last = err
}
return last
}

if _, err := ws[0].Write(true, audioTimestamp, out); err != nil {
panic(err)
}
}
}()
func saveToFile(conn *lksip.MediaConn, path string) func() error {
f, err := os.Create(path)
if err != nil {
panic(err)
}
wr, err := NewMKVWriter(f)
if err != nil {
f.Close()
panic(err)
}

return conn
law := ulaw.Decode(wr)
conn.OnRTP(lkrtp.NewMediaStreamIn(law))
return wr.Close
}

func getLocalIP() string {
Expand All @@ -130,15 +137,18 @@ func getLocalIP() string {
}

func getResponse(tx sip.ClientTransaction) *sip.Response {
select {
case <-tx.Done():
panic("transaction failed to complete")
case res := <-tx.Responses():
if res.StatusCode == 100 || res.StatusCode == 180 || res.StatusCode == 183 {
return getResponse(tx)
for {
select {
case <-tx.Done():
panic("transaction failed to complete")
case res := <-tx.Responses():
switch res.StatusCode {
default:
return res
case 100, 180, 183:
slog.With("status", res.StatusCode).Info(res.Reason)
}
}

return res
}
}

Expand All @@ -162,23 +172,23 @@ func createOffer(port int) ([]byte, error) {
Address: &sdp.Address{Address: getLocalIP()},
},
TimeDescriptions: []sdp.TimeDescription{
sdp.TimeDescription{
{
Timing: sdp.Timing{
StartTime: 0,
StopTime: 0,
},
},
},
MediaDescriptions: []*sdp.MediaDescription{
&sdp.MediaDescription{
{
MediaName: sdp.MediaName{
Media: "audio",
Port: sdp.RangedPort{Value: port},
Protos: []string{"RTP", "AVP"},
Formats: []string{"0"},
},
Attributes: []sdp.Attribute{
sdp.Attribute{Key: "rtpmap", Value: "0 PCMU/8000"},
{Key: "rtpmap", Value: "0 PCMU/8000"},
},
},
},
Expand All @@ -196,9 +206,7 @@ func parseAnswer(in []byte) (string, int) {
return offer.ConnectionInformation.Address.Address, offer.MediaDescriptions[0].MediaName.Port.Value
}

func sendAudioPackets(conn *net.UDPConn, body []byte) {
ip, port := parseAnswer(body)

func sendAudioPackets(conn *lksip.MediaConn) {
r, err := os.Open(*filePathPlay)
if err != nil {
panic(err)
Expand All @@ -213,68 +221,49 @@ func sendAudioPackets(conn *net.UDPConn, body []byte) {
panic(err)
}

audioFrames := [][]byte{}
var audioFrames [][]byte
for _, cluster := range ret.Segment.Cluster {
for _, block := range cluster.SimpleBlock {
audioFrames = append(audioFrames, block.Data...)
}
}

i := 0
rtpPkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
SSRC: 5000,
},
}
s := lkrtp.NewMediaStreamOut[ulaw.Sample](conn, 160)

dstAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
panic(err)
}

for range time.NewTicker(20 * time.Millisecond).C {
tick := time.NewTicker(20 * time.Millisecond)
defer tick.Stop()
for range tick.C {
if i >= len(audioFrames) {
break
}

rtpPkt.Payload = audioFrames[i]

raw, err := rtpPkt.Marshal()
if err != nil {
panic(err)
}

if _, err = conn.WriteTo(raw, dstAddr); err != nil {
if err = s.WriteSample(audioFrames[i]); err != nil {
return
}

rtpPkt.Header.Timestamp += 160
rtpPkt.Header.SequenceNumber += 1
i++
}
}

func attemptInvite(sipClient *sipgo.Client, offer []byte, authorizationHeaderValue string) (*sip.Request, *sip.Response) {
inviteRecipent := &sip.Uri{User: *to, Host: *sipUri}
inviteRequest := sip.NewRequest(sip.INVITE, inviteRecipent)
inviteRequest.SetDestination(*sipServer)
inviteRequest.SetBody(offer)
inviteRequest.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
inviteRequest.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("<sip:livekit@%s:5060>", getLocalIP())))
inviteRequest.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))
req := sip.NewRequest(sip.INVITE, inviteRecipent)
req.SetDestination(*sipServer)
req.SetBody(offer)
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("<sip:livekit@%s:5060>", getLocalIP())))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))

if authorizationHeaderValue != "" {
inviteRequest.AppendHeader(sip.NewHeader("Proxy-Authorization", authorizationHeaderValue))
req.AppendHeader(sip.NewHeader("Proxy-Authorization", authorizationHeaderValue))
}

tx, err := sipClient.TransactionRequest(inviteRequest)
tx, err := sipClient.TransactionRequest(req)
if err != nil {
panic(err)
}
defer tx.Terminate()

return inviteRequest, getResponse(tx)
return req, getResponse(tx)
}

func main() {
Expand All @@ -284,15 +273,27 @@ func main() {
*sipServer = getLocalIP() + ":5060"
}

mediaConn := startMediaListener()
offer, err := createOffer(mediaConn.LocalAddr().(*net.UDPAddr).Port)
mediaConn := lksip.NewMediaConn()
if *filePathSave != "" {
saveDone := saveToFile(mediaConn, *filePathSave)
defer func() {
if err := saveDone(); err != nil {
slog.Error("cannot save the file", err)
}
}()
}
if err := mediaConn.Start(0, 0, ""); err != nil {
panic(err)
}
defer mediaConn.Close()
slog.With("port", mediaConn.LocalAddr().Port).Info("media listener started")

offer, err := createOffer(mediaConn.LocalAddr().Port)
if err != nil {
panic(err)
}

ua, err := sipgo.NewUA(
sipgo.WithUserAgent(*from),
)
ua, err := sipgo.NewUA()
if err != nil {
panic(err)
}
Expand All @@ -308,7 +309,11 @@ func main() {
inviteRequest *sip.Request
)

for {
slog.Info("sending invite")
for try := 1; ; try++ {
if try > 1 {
slog.With("attempt", try).Info("invite attempt")
}
inviteRequest, inviteResponse = attemptInvite(sipClient, offer, authorizationHeaderValue)

if inviteResponse.StatusCode == 407 {
Expand Down Expand Up @@ -343,6 +348,7 @@ func main() {

break
}
slog.Info("invite success")

if contactHeader, ok := inviteResponse.Contact(); ok {
inviteRequest.Recipient = &contactHeader.Address
Expand Down Expand Up @@ -379,7 +385,14 @@ func main() {
byeSent.Store(true)
}()

sendAudioPackets(mediaConn, inviteResponse.Body())
ip, port := parseAnswer(inviteResponse.Body())
dstAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, port))
if err != nil {
panic(err)
}
mediaConn.SetDestAddr(dstAddr)

sendAudioPackets(mediaConn)
if !byeSent.Load() {
sendBye()
}
Expand Down
Loading