From fc4f1fed371588c18a98ba35993aed15804713e8 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 18 Oct 2024 13:08:03 -0700 Subject: [PATCH 1/7] WiP --- go.mod | 2 ++ pkg/sip/outbound.go | 27 +++++++++++++++++++++++++-- pkg/sip/service.go | 6 +++--- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index f3c5878..1f1f1f5 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,8 @@ go 1.22.7 toolchain go1.22.8 +replace github.com/livekit/protocol => ../protocol + require ( github.com/at-wat/ebml-go v0.17.1 github.com/emiago/sipgo v0.13.1 diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 7f01c65..184d534 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -63,6 +63,7 @@ type outboundCall struct { log logger.Logger cc *sipOutbound media *MediaPort + started core.Fuse stopped core.Fuse closing core.Fuse @@ -223,6 +224,7 @@ func (c *outboundCall) ConnectSIP(ctx context.Context) error { return fmt.Errorf("update SIP failed: %w", err) } c.connectMedia() + c.started.Break() c.lkRoom.Subscribe() c.log.Infow("Outbound SIP call established") return nil @@ -433,8 +435,29 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) { }, lksdk.WithDataPublishReliable(true)) } -func (c *outboundCall) transferCall(ctx context.Context, transferTo string) error { - err := c.cc.transferCall(ctx, transferTo) +func (c *outboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error { + var err error + + if ringtone && c.started.IsBroken() && !c.stopped.IsBroken() { + const ringVolume = math.MaxInt16 / 2 + rctx, rcancel := context.WithCancel(ctx) + defer rcancel() + + // mute the room audio to the SIP participant + w := c.lkRoom.SwapOutput(nil) + + defer func() { + if err != nil && !c.stopped.IsBroken() { + c.lkRoom.SwapOutput(w) + } + }() + + go func() { + tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging) + }() + } + + err = c.cc.transferCall(ctx, transferTo) if err != nil { c.log.Infow("outound call failed to transfer", "error", err, "transferTo", transferTo) return err diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 6982f3e..9fd963d 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -151,7 +151,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cdone() - err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo) + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayRingtone) transfetResult.Store(&err) close(done) @@ -177,14 +177,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT } } -func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error { +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, ringtone bool) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, transferTo) + err := out.transferCall(ctx, transferTo, ringtone) if err != nil { return err } From 4c2195feceb7d12f3b5935a493c7bd201e175f8b Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Sun, 20 Oct 2024 20:24:08 -0700 Subject: [PATCH 2/7] inbound --- pkg/sip/inbound.go | 32 ++++++++++++++++++++++++++++++-- pkg/sip/outbound.go | 13 +++++-------- pkg/sip/service.go | 2 +- 3 files changed, 36 insertions(+), 11 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 235a972..c1fd0e1 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "math" "net/netip" "slices" "sync" @@ -25,6 +26,7 @@ import ( "time" "github.com/emiago/sipgo/sip" + "github.com/frostbyte73/core" "github.com/icholy/digest" "github.com/livekit/protocol/livekit" @@ -39,6 +41,7 @@ import ( "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/dtmf" "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/tones" "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/res" ) @@ -274,6 +277,7 @@ type inboundCall struct { joinDur func() time.Duration forwardDTMF atomic.Bool done atomic.Bool + started core.Fuse } func (s *Server) newInboundCall( @@ -427,6 +431,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI return // already sent a response } } + + c.started.Break() + // Wait for the caller to terminate the call. select { case <-ctx.Done(): @@ -766,8 +773,29 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) { } } -func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error { - err := c.cc.TransferCall(ctx, transferTo) +func (c *inboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error { + var err error + + if ringtone && c.started.IsBroken() && !c.done.Load() { + const ringVolume = math.MaxInt16 / 2 + rctx, rcancel := context.WithCancel(ctx) + defer rcancel() + + // mute the room audio to the SIP participant + w := c.lkRoom.SwapOutput(nil) + + defer func() { + if err != nil && !c.done.Load() { + c.lkRoom.SwapOutput(w) + } + }() + + go func() { + tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging) + }() + } + + err = c.cc.TransferCall(ctx, transferTo) if err != nil { c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo) return err diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 184d534..519c630 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -67,12 +67,11 @@ type outboundCall struct { stopped core.Fuse closing core.Fuse - mu sync.RWMutex - mon *stats.CallMonitor - lkRoom *Room - lkRoomIn media.PCM16Writer // output to room; OPUS at 48k - sipConf sipOutboundConfig - sipRunning bool + mu sync.RWMutex + mon *stats.CallMonitor + lkRoom *Room + lkRoomIn media.PCM16Writer // output to room; OPUS at 48k + sipConf sipOutboundConfig } func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) { @@ -288,7 +287,6 @@ func (c *outboundCall) dialSIP(ctx context.Context) error { } c.setStatus(CallActive) - c.sipRunning = true return nil } @@ -329,7 +327,6 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru func (c *outboundCall) stopSIP(reason string) { c.mon.CallTerminate(reason) c.cc.Close() - c.sipRunning = false } func (c *outboundCall) setStatus(v CallStatus) { diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 9fd963d..2717016 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -197,7 +197,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string, s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, transferTo) + err := in.transferCall(ctx, transferTo, ringtone) if err != nil { return err } From 20d6d4667641c60bb5e45fab859c310204dbb06a Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 21 Oct 2024 17:01:51 -0700 Subject: [PATCH 3/7] ringtone -> dialtone --- pkg/sip/client.go | 2 +- pkg/sip/inbound.go | 4 ++-- pkg/sip/outbound.go | 12 ++++++------ pkg/sip/service.go | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 0bfecd2..1a3d9e1 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -187,7 +187,7 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea user: req.Username, pass: req.Password, dtmf: req.Dtmf, - ringtone: req.PlayRingtone, + dialtone: req.PlayDialtone, headers: req.Headers, headersToAttrs: req.HeadersToAttributes, ringingTimeout: req.RingingTimeout.AsDuration(), diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index c1fd0e1..73e1941 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -773,10 +773,10 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) { } } -func (c *inboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error { +func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error { var err error - if ringtone && c.started.IsBroken() && !c.done.Load() { + if dialtone && c.started.IsBroken() && !c.done.Load() { const ringVolume = math.MaxInt16 / 2 rctx, rcancel := context.WithCancel(ctx) defer rcancel() diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 519c630..a53518f 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -51,7 +51,7 @@ type sipOutboundConfig struct { user string pass string dtmf string - ringtone bool + dialtone bool headers map[string]string headersToAttrs map[string]string ringingTimeout time.Duration @@ -248,7 +248,7 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro if err := r.Connect(c.c.conf, lkNew); err != nil { return err } - // We have to create the track early because we might play a ringtone while SIP connects. + // We have to create the track early because we might play a dialtone while SIP connects. // Thus, we are forced to set full sample rate here instead of letting the codec adapt to the SIP source sample rate. local, err := r.NewParticipantTrack(RoomSampleRate) if err != nil { @@ -261,12 +261,12 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro } func (c *outboundCall) dialSIP(ctx context.Context) error { - if c.sipConf.ringtone { + if c.sipConf.dialtone { const ringVolume = math.MaxInt16 / 2 rctx, rcancel := context.WithCancel(ctx) defer rcancel() - // Play a ringtone to the room while participant connects + // Play dialtone to the room while participant connects go func() { rctx, span := tracer.Start(rctx, "tones.Play") defer span.End() @@ -432,10 +432,10 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) { }, lksdk.WithDataPublishReliable(true)) } -func (c *outboundCall) transferCall(ctx context.Context, transferTo string, ringtone bool) error { +func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error { var err error - if ringtone && c.started.IsBroken() && !c.stopped.IsBroken() { + if dialtone && c.started.IsBroken() && !c.stopped.IsBroken() { const ringVolume = math.MaxInt16 / 2 rctx, rcancel := context.WithCancel(ctx) defer rcancel() diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 2717016..5f5d330 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -151,7 +151,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cdone() - err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayRingtone) + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayDialtone) transfetResult.Store(&err) close(done) @@ -177,14 +177,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT } } -func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, ringtone bool) error { +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, dialtone bool) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, transferTo, ringtone) + err := out.transferCall(ctx, transferTo, dialtone) if err != nil { return err } @@ -197,7 +197,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string, s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, transferTo, ringtone) + err := in.transferCall(ctx, transferTo, dialtone) if err != nil { return err } From db24672d3d6c402c9c86f6c04a3266414e563d4f Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 21 Oct 2024 20:44:00 -0700 Subject: [PATCH 4/7] Update protocol --- go.mod | 4 +--- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 1f1f1f5..46bb311 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,6 @@ go 1.22.7 toolchain go1.22.8 -replace github.com/livekit/protocol => ../protocol - require ( github.com/at-wat/ebml-go v0.17.1 github.com/emiago/sipgo v0.13.1 @@ -15,7 +13,7 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b + github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 diff --git a/go.sum b/go.sum index 28a3f2f..2b14ce4 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b h1:kbGOwqbLMPTw8zMn8vluHoJIR+NjLDWbsMBI8GWJwzQ= -github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a h1:31YXXJLEwCflp7KEe9rRAwmONyCwHFujTl4MdxegTxw= +github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= From af37c40762ab79b4764a0fc9568cb7691ff0c660 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Mon, 21 Oct 2024 20:54:25 -0700 Subject: [PATCH 5/7] Close the tones AudioWriter when done --- pkg/sip/inbound.go | 5 ++++- pkg/sip/outbound.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 73e1941..7b6e04a 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -791,7 +791,10 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt }() go func() { - tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging) + aw := c.media.GetAudioWriter() + + tones.Play(rctx, aw, ringVolume, tones.ETSIRinging) + aw.Close() }() } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index a53518f..6d0f811 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -450,7 +450,10 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial }() go func() { - tones.Play(rctx, c.media.GetAudioWriter(), ringVolume, tones.ETSIRinging) + aw := c.media.GetAudioWriter() + + tones.Play(rctx, aw, ringVolume, tones.ETSIRinging) + aw.Close() }() } From a657692c66b439cfb92f109464611544b872a953 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 22 Oct 2024 15:44:53 -0700 Subject: [PATCH 6/7] Close output in case of success --- pkg/sip/inbound.go | 2 ++ pkg/sip/outbound.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 7b6e04a..cf9b5ca 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -787,6 +787,8 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt defer func() { if err != nil && !c.done.Load() { c.lkRoom.SwapOutput(w) + } else { + w.Close() } }() diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 6d0f811..83c7e5a 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -446,6 +446,8 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial defer func() { if err != nil && !c.stopped.IsBroken() { c.lkRoom.SwapOutput(w) + } else { + w.Close() } }() From 47b4452bf6b862f071e552550d6a525736af79d5 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Tue, 22 Oct 2024 16:07:47 -0700 Subject: [PATCH 7/7] cleanup --- pkg/sip/inbound.go | 4 ++-- pkg/sip/outbound.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index cf9b5ca..4a667af 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -773,7 +773,7 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) { } } -func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error { +func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { var err error if dialtone && c.started.IsBroken() && !c.done.Load() { @@ -785,7 +785,7 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt w := c.lkRoom.SwapOutput(nil) defer func() { - if err != nil && !c.done.Load() { + if retErr != nil && !c.done.Load() { c.lkRoom.SwapOutput(w) } else { w.Close() diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 83c7e5a..68b8662 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -432,7 +432,7 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) { }, lksdk.WithDataPublishReliable(true)) } -func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) error { +func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { var err error if dialtone && c.started.IsBroken() && !c.stopped.IsBroken() { @@ -444,7 +444,7 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial w := c.lkRoom.SwapOutput(nil) defer func() { - if err != nil && !c.stopped.IsBroken() { + if retErr != nil && !c.stopped.IsBroken() { c.lkRoom.SwapOutput(w) } else { w.Close()