diff --git a/go.mod b/go.mod index f3c5878..46bb311 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b + github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 diff --git a/go.sum b/go.sum index 28a3f2f..2b14ce4 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b h1:kbGOwqbLMPTw8zMn8vluHoJIR+NjLDWbsMBI8GWJwzQ= -github.com/livekit/protocol v1.26.1-0.20241016113321-d16f740cf07b/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a h1:31YXXJLEwCflp7KEe9rRAwmONyCwHFujTl4MdxegTxw= +github.com/livekit/protocol v1.26.1-0.20241022031344-538889e5de0a/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o= diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 0bfecd2..1a3d9e1 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -187,7 +187,7 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea user: req.Username, pass: req.Password, dtmf: req.Dtmf, - ringtone: req.PlayRingtone, + dialtone: req.PlayDialtone, headers: req.Headers, headersToAttrs: req.HeadersToAttributes, ringingTimeout: req.RingingTimeout.AsDuration(), diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 235a972..4a667af 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "math" "net/netip" "slices" "sync" @@ -25,6 +26,7 @@ import ( "time" "github.com/emiago/sipgo/sip" + "github.com/frostbyte73/core" "github.com/icholy/digest" "github.com/livekit/protocol/livekit" @@ -39,6 +41,7 @@ import ( "github.com/livekit/sip/pkg/media" "github.com/livekit/sip/pkg/media/dtmf" "github.com/livekit/sip/pkg/media/rtp" + "github.com/livekit/sip/pkg/media/tones" "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/res" ) @@ -274,6 +277,7 @@ type inboundCall struct { joinDur func() time.Duration forwardDTMF atomic.Bool done atomic.Bool + started core.Fuse } func (s *Server) newInboundCall( @@ -427,6 +431,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI return // already sent a response } } + + c.started.Break() + // Wait for the caller to terminate the call. select { case <-ctx.Done(): @@ -766,8 +773,34 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) { } } -func (c *inboundCall) transferCall(ctx context.Context, transferTo string) error { - err := c.cc.TransferCall(ctx, transferTo) +func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { + var err error + + if dialtone && c.started.IsBroken() && !c.done.Load() { + const ringVolume = math.MaxInt16 / 2 + rctx, rcancel := context.WithCancel(ctx) + defer rcancel() + + // mute the room audio to the SIP participant + w := c.lkRoom.SwapOutput(nil) + + defer func() { + if retErr != nil && !c.done.Load() { + c.lkRoom.SwapOutput(w) + } else { + w.Close() + } + }() + + go func() { + aw := c.media.GetAudioWriter() + + tones.Play(rctx, aw, ringVolume, tones.ETSIRinging) + aw.Close() + }() + } + + err = c.cc.TransferCall(ctx, transferTo) if err != nil { c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo) return err diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 7f01c65..68b8662 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -51,7 +51,7 @@ type sipOutboundConfig struct { user string pass string dtmf string - ringtone bool + dialtone bool headers map[string]string headersToAttrs map[string]string ringingTimeout time.Duration @@ -63,15 +63,15 @@ type outboundCall struct { log logger.Logger cc *sipOutbound media *MediaPort + started core.Fuse stopped core.Fuse closing core.Fuse - mu sync.RWMutex - mon *stats.CallMonitor - lkRoom *Room - lkRoomIn media.PCM16Writer // output to room; OPUS at 48k - sipConf sipOutboundConfig - sipRunning bool + mu sync.RWMutex + mon *stats.CallMonitor + lkRoom *Room + lkRoomIn media.PCM16Writer // output to room; OPUS at 48k + sipConf sipOutboundConfig } func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig) (*outboundCall, error) { @@ -223,6 +223,7 @@ func (c *outboundCall) ConnectSIP(ctx context.Context) error { return fmt.Errorf("update SIP failed: %w", err) } c.connectMedia() + c.started.Break() c.lkRoom.Subscribe() c.log.Infow("Outbound SIP call established") return nil @@ -247,7 +248,7 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro if err := r.Connect(c.c.conf, lkNew); err != nil { return err } - // We have to create the track early because we might play a ringtone while SIP connects. + // We have to create the track early because we might play a dialtone while SIP connects. // Thus, we are forced to set full sample rate here instead of letting the codec adapt to the SIP source sample rate. local, err := r.NewParticipantTrack(RoomSampleRate) if err != nil { @@ -260,12 +261,12 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro } func (c *outboundCall) dialSIP(ctx context.Context) error { - if c.sipConf.ringtone { + if c.sipConf.dialtone { const ringVolume = math.MaxInt16 / 2 rctx, rcancel := context.WithCancel(ctx) defer rcancel() - // Play a ringtone to the room while participant connects + // Play dialtone to the room while participant connects go func() { rctx, span := tracer.Start(rctx, "tones.Play") defer span.End() @@ -286,7 +287,6 @@ func (c *outboundCall) dialSIP(ctx context.Context) error { } c.setStatus(CallActive) - c.sipRunning = true return nil } @@ -327,7 +327,6 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru func (c *outboundCall) stopSIP(reason string) { c.mon.CallTerminate(reason) c.cc.Close() - c.sipRunning = false } func (c *outboundCall) setStatus(v CallStatus) { @@ -433,8 +432,34 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) { }, lksdk.WithDataPublishReliable(true)) } -func (c *outboundCall) transferCall(ctx context.Context, transferTo string) error { - err := c.cc.transferCall(ctx, transferTo) +func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { + var err error + + if dialtone && c.started.IsBroken() && !c.stopped.IsBroken() { + const ringVolume = math.MaxInt16 / 2 + rctx, rcancel := context.WithCancel(ctx) + defer rcancel() + + // mute the room audio to the SIP participant + w := c.lkRoom.SwapOutput(nil) + + defer func() { + if retErr != nil && !c.stopped.IsBroken() { + c.lkRoom.SwapOutput(w) + } else { + w.Close() + } + }() + + go func() { + aw := c.media.GetAudioWriter() + + tones.Play(rctx, aw, ringVolume, tones.ETSIRinging) + aw.Close() + }() + } + + err = c.cc.transferCall(ctx, transferTo) if err != nil { c.log.Infow("outound call failed to transfer", "error", err, "transferTo", transferTo) return err diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 6982f3e..5f5d330 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -151,7 +151,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cdone() - err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo) + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayDialtone) transfetResult.Store(&err) close(done) @@ -177,14 +177,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT } } -func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error { +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, dialtone bool) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, transferTo) + err := out.transferCall(ctx, transferTo, dialtone) if err != nil { return err } @@ -197,7 +197,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string, s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, transferTo) + err := in.transferCall(ctx, transferTo, dialtone) if err != nil { return err }