From 7a6e58368595c5f9c1d00512d4b10a67cb7db051 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Fri, 8 Dec 2023 19:19:14 +0200 Subject: [PATCH] Use shared media conn code in test client. --- test/client/main.go | 245 +++++++++++++++++++++++--------------------- 1 file changed, 129 insertions(+), 116 deletions(-) diff --git a/test/client/main.go b/test/client/main.go index c58af721..497da590 100644 --- a/test/client/main.go +++ b/test/client/main.go @@ -15,8 +15,11 @@ package main import ( + "encoding/binary" "flag" "fmt" + "io" + "log/slog" "math/rand" "net" "os" @@ -29,9 +32,12 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/icholy/digest" - "github.com/livekit/sip/pkg/media/ulaw" - "github.com/pion/rtp" "github.com/pion/sdp/v2" + + "github.com/livekit/sip/pkg/media" + lkrtp "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/ulaw" + lksip "github.com/livekit/sip/pkg/sip" ) var ( @@ -45,72 +51,73 @@ var ( filePathSave = flag.String("save", "save.mkv", "") ) -func startMediaListener() *net.UDPConn { - conn, err := net.ListenUDP("udp", &net.UDPAddr{ - Port: 0, - IP: net.ParseIP("0.0.0.0"), - }) +func NewMKVWriter(w io.WriteCloser) (*MKVWriter, error) { + ws, err := webm.NewSimpleBlockWriter(w, + []webm.TrackEntry{{ + Name: "Audio", + TrackNumber: 1, + TrackUID: 12345, + CodecID: "A_PCM/INT/LIT", + TrackType: 2, + DefaultDuration: 20000000, + Audio: &webm.Audio{ + SamplingFrequency: 8000.0, + Channels: 1, + }, + }}, + ) if err != nil { - panic(err) + return nil, err } + return &MKVWriter{w: w, ws: ws[0]}, nil +} - go func() { - var ( - p rtp.Packet - audioTimestamp int64 - ) - buf := make([]byte, 1500) - - w, err := os.OpenFile(*filePathSave, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600) - if err != nil { - panic(err) - } - - ws, err := webm.NewSimpleBlockWriter(w, - []webm.TrackEntry{ - { - Name: "Audio", - TrackNumber: 1, - TrackUID: 12345, - CodecID: "A_PCM/INT/LIT", - TrackType: 2, - DefaultDuration: 20000000, - Audio: &webm.Audio{ - SamplingFrequency: 8000.0, - Channels: 1, - }, - }, - }) - if err != nil { - panic(err) - } - - for { - n, _, err := conn.ReadFromUDP(buf) - if err != nil { - return - } +type MKVWriter struct { + w io.WriteCloser + ws webm.BlockWriteCloser + audioTimestamp int64 + buf []byte +} - p = rtp.Packet{} - if err := p.Unmarshal(buf[:n]); err != nil { - continue - } +func (w *MKVWriter) WriteSample(in media.PCM16Sample) error { + w.audioTimestamp += 20 + if sz := 2 * len(in); cap(w.buf) < sz { + w.buf = make([]byte, sz) + } else { + w.buf = w.buf[:sz] + } + for i, v := range in { + binary.LittleEndian.PutUint16(w.buf[2*i:], uint16(v)) + } + _, err := w.ws.Write(true, w.audioTimestamp, w.buf) + return err +} - audioTimestamp += 20 - decoded := ulaw.DecodeUlaw(p.Payload) - out := []byte{} - for _, sample := range decoded { - out = append(out, byte(sample&0xff)) - out = append(out, byte(sample>>8)) - } +func (w *MKVWriter) Close() error { + var last error + if err := w.ws.Close(); err != nil { + last = err + } + if err := w.w.Close(); err != nil { + last = err + } + return last +} - if _, err := ws[0].Write(true, audioTimestamp, out); err != nil { - panic(err) - } - } - }() +func saveToFile(conn *lksip.MediaConn, path string) func() error { + f, err := os.Create(path) + if err != nil { + panic(err) + } + wr, err := NewMKVWriter(f) + if err != nil { + f.Close() + panic(err) + } - return conn + law := ulaw.Decode(wr) + conn.OnRTP(lkrtp.NewMediaStreamIn(law)) + return wr.Close } func getLocalIP() string { @@ -130,15 +137,18 @@ func getLocalIP() string { } func getResponse(tx sip.ClientTransaction) *sip.Response { - select { - case <-tx.Done(): - panic("transaction failed to complete") - case res := <-tx.Responses(): - if res.StatusCode == 100 || res.StatusCode == 180 || res.StatusCode == 183 { - return getResponse(tx) + for { + select { + case <-tx.Done(): + panic("transaction failed to complete") + case res := <-tx.Responses(): + switch res.StatusCode { + default: + return res + case 100, 180, 183: + slog.With("status", res.StatusCode).Info(res.Reason) + } } - - return res } } @@ -162,7 +172,7 @@ func createOffer(port int) ([]byte, error) { Address: &sdp.Address{Address: getLocalIP()}, }, TimeDescriptions: []sdp.TimeDescription{ - sdp.TimeDescription{ + { Timing: sdp.Timing{ StartTime: 0, StopTime: 0, @@ -170,7 +180,7 @@ func createOffer(port int) ([]byte, error) { }, }, MediaDescriptions: []*sdp.MediaDescription{ - &sdp.MediaDescription{ + { MediaName: sdp.MediaName{ Media: "audio", Port: sdp.RangedPort{Value: port}, @@ -178,7 +188,7 @@ func createOffer(port int) ([]byte, error) { Formats: []string{"0"}, }, Attributes: []sdp.Attribute{ - sdp.Attribute{Key: "rtpmap", Value: "0 PCMU/8000"}, + {Key: "rtpmap", Value: "0 PCMU/8000"}, }, }, }, @@ -196,9 +206,7 @@ func parseAnswer(in []byte) (string, int) { return offer.ConnectionInformation.Address.Address, offer.MediaDescriptions[0].MediaName.Port.Value } -func sendAudioPackets(conn *net.UDPConn, body []byte) { - ip, port := parseAnswer(body) - +func sendAudioPackets(conn *lksip.MediaConn) { r, err := os.Open(*filePathPlay) if err != nil { panic(err) @@ -213,7 +221,7 @@ func sendAudioPackets(conn *net.UDPConn, body []byte) { panic(err) } - audioFrames := [][]byte{} + var audioFrames [][]byte for _, cluster := range ret.Segment.Cluster { for _, block := range cluster.SimpleBlock { audioFrames = append(audioFrames, block.Data...) @@ -221,60 +229,41 @@ func sendAudioPackets(conn *net.UDPConn, body []byte) { } i := 0 - rtpPkt := &rtp.Packet{ - Header: rtp.Header{ - Version: 2, - SSRC: 5000, - }, - } + s := lkrtp.NewMediaStreamOut[ulaw.Sample](conn, 160) - dstAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, port)) - if err != nil { - panic(err) - } - - for range time.NewTicker(20 * time.Millisecond).C { + tick := time.NewTicker(20 * time.Millisecond) + defer tick.Stop() + for range tick.C { if i >= len(audioFrames) { break } - - rtpPkt.Payload = audioFrames[i] - - raw, err := rtpPkt.Marshal() - if err != nil { - panic(err) - } - - if _, err = conn.WriteTo(raw, dstAddr); err != nil { + if err = s.WriteSample(audioFrames[i]); err != nil { return } - - rtpPkt.Header.Timestamp += 160 - rtpPkt.Header.SequenceNumber += 1 i++ } } func attemptInvite(sipClient *sipgo.Client, offer []byte, authorizationHeaderValue string) (*sip.Request, *sip.Response) { inviteRecipent := &sip.Uri{User: *to, Host: *sipUri} - inviteRequest := sip.NewRequest(sip.INVITE, inviteRecipent) - inviteRequest.SetDestination(*sipServer) - inviteRequest.SetBody(offer) - inviteRequest.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) - inviteRequest.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("", getLocalIP()))) - inviteRequest.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + req := sip.NewRequest(sip.INVITE, inviteRecipent) + req.SetDestination(*sipServer) + req.SetBody(offer) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Contact", fmt.Sprintf("", getLocalIP()))) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) if authorizationHeaderValue != "" { - inviteRequest.AppendHeader(sip.NewHeader("Proxy-Authorization", authorizationHeaderValue)) + req.AppendHeader(sip.NewHeader("Proxy-Authorization", authorizationHeaderValue)) } - tx, err := sipClient.TransactionRequest(inviteRequest) + tx, err := sipClient.TransactionRequest(req) if err != nil { panic(err) } defer tx.Terminate() - return inviteRequest, getResponse(tx) + return req, getResponse(tx) } func main() { @@ -284,15 +273,27 @@ func main() { *sipServer = getLocalIP() + ":5060" } - mediaConn := startMediaListener() - offer, err := createOffer(mediaConn.LocalAddr().(*net.UDPAddr).Port) + mediaConn := lksip.NewMediaConn() + if *filePathSave != "" { + saveDone := saveToFile(mediaConn, *filePathSave) + defer func() { + if err := saveDone(); err != nil { + slog.Error("cannot save the file", err) + } + }() + } + if err := mediaConn.Start(0, 0, ""); err != nil { + panic(err) + } + defer mediaConn.Close() + slog.With("port", mediaConn.LocalAddr().Port).Info("media listener started") + + offer, err := createOffer(mediaConn.LocalAddr().Port) if err != nil { panic(err) } - ua, err := sipgo.NewUA( - sipgo.WithUserAgent(*from), - ) + ua, err := sipgo.NewUA() if err != nil { panic(err) } @@ -308,7 +309,11 @@ func main() { inviteRequest *sip.Request ) - for { + slog.Info("sending invite") + for try := 1; ; try++ { + if try > 1 { + slog.With("attempt", try).Info("invite attempt") + } inviteRequest, inviteResponse = attemptInvite(sipClient, offer, authorizationHeaderValue) if inviteResponse.StatusCode == 407 { @@ -343,6 +348,7 @@ func main() { break } + slog.Info("invite success") if contactHeader, ok := inviteResponse.Contact(); ok { inviteRequest.Recipient = &contactHeader.Address @@ -379,7 +385,14 @@ func main() { byeSent.Store(true) }() - sendAudioPackets(mediaConn, inviteResponse.Body()) + ip, port := parseAnswer(inviteResponse.Body()) + dstAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("%s:%d", ip, port)) + if err != nil { + panic(err) + } + mediaConn.SetDestAddr(dstAddr) + + sendAudioPackets(mediaConn) if !byeSent.Load() { sendBye() }