diff --git a/go.mod b/go.mod index bf268e7..2798fbc 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 - github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264 + github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289 github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639 github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 diff --git a/go.sum b/go.sum index fcada2e..adb3a9c 100644 --- a/go.sum +++ b/go.sum @@ -120,8 +120,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ= github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE= -github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264 h1:jj0lLMRFhk1Y7X1Ugi8wd47wNtgIoju36qic6mSjGPE= -github.com/livekit/protocol v1.23.1-0.20241003220239-75af842a1264/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= +github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289 h1:Uj1kuYVzE2F7MhjpGhfefLuXdXTWKh/DMGv0vtFQY7k= +github.com/livekit/protocol v1.24.1-0.20241010185750-19b686d31289/go.mod h1:nxRzmQBKSYK64gqr7ABWwt78hvrgiO2wYuCojRYb7Gs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs= github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241003085414-b42e5a1da639 h1:5+iT4OaIukZ4TJwbOXAN+uh/wuAGArUoJyk5vmfQMY0= diff --git a/pkg/service/psrpc.go b/pkg/service/psrpc.go index 4ae8708..4a3cdbe 100644 --- a/pkg/service/psrpc.go +++ b/pkg/service/psrpc.go @@ -102,6 +102,8 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger. Headers: resp.Headers, HeadersToAttributes: resp.HeadersToAttributes, EnabledFeatures: resp.EnabledFeatures, + RingingTimeout: resp.RingingTimeout.AsDuration(), + MaxCallDuration: resp.MaxCallDuration.AsDuration(), } case rpc.SIPDispatchResult_ACCEPT: return sip.CallDispatch{ @@ -123,6 +125,8 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger. Headers: resp.Headers, HeadersToAttributes: resp.HeadersToAttributes, EnabledFeatures: resp.EnabledFeatures, + RingingTimeout: resp.RingingTimeout.AsDuration(), + MaxCallDuration: resp.MaxCallDuration.AsDuration(), } case rpc.SIPDispatchResult_REQUEST_PIN: return sip.CallDispatch{ diff --git a/pkg/sip/client.go b/pkg/sip/client.go index bad9bc6..0bfecd2 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -179,17 +179,19 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea }, } sipConf := sipOutboundConfig{ - address: req.Address, - transport: req.Transport, - host: req.Hostname, - from: req.Number, - to: req.CallTo, - user: req.Username, - pass: req.Password, - dtmf: req.Dtmf, - ringtone: req.PlayRingtone, - headers: req.Headers, - headersToAttrs: req.HeadersToAttributes, + address: req.Address, + transport: req.Transport, + host: req.Hostname, + from: req.Number, + to: req.CallTo, + user: req.Username, + pass: req.Password, + dtmf: req.Dtmf, + ringtone: req.PlayRingtone, + headers: req.Headers, + headersToAttrs: req.HeadersToAttributes, + ringingTimeout: req.RingingTimeout.AsDuration(), + maxCallDuration: req.MaxCallDuration.AsDuration(), } log.Infow("Creating SIP participant") call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index ab729fd..77ba9cf 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -47,10 +47,6 @@ const ( // audioBridgeMaxDelay delays sending audio for certain time, unless RTP packet is received. // This is done because of audio cutoff at the beginning of calls observed in the wild. audioBridgeMaxDelay = 1 * time.Second - - // callSubscribeTimeout is a maximal duration which SIP participant will wait for other participant tracks. - // If no participant tracks are published by this time, the call will disconnect. - callSubscribeTimeout = 3 * time.Minute ) func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) { @@ -397,6 +393,14 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI } } } + if disp.MaxCallDuration <= 0 || disp.MaxCallDuration > maxCallDuration { + disp.MaxCallDuration = maxCallDuration + } + if disp.RingingTimeout <= 0 { + disp.RingingTimeout = defaultRingingTimeout + } + ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) + defer cancel() if !c.joinRoom(ctx, disp.Room) { return // already sent a response } @@ -411,7 +415,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI c.log.Infow("Waiting for track subscription(s)") // For dispatches without pin, we first wait for LK participant to become available, // and also for at least one track subscription. In the meantime we keep ringing. - if !c.waitSubscribe(ctx) { + if !c.waitSubscribe(ctx, disp.RingingTimeout) { return // already sent a response } if !acceptCall() { @@ -502,11 +506,11 @@ func (c *inboundCall) waitMedia(ctx context.Context) bool { return true } -func (c *inboundCall) waitSubscribe(ctx context.Context) bool { +func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) bool { ctx, span := tracer.Start(ctx, "inboundCall.waitSubscribe") defer span.End() - timeout := time.NewTimer(callSubscribeTimeout) - defer timeout.Stop() + timer := time.NewTimer(timeout) + defer timer.Stop() select { case <-c.cc.Cancelled(): c.closeWithCancelled() @@ -520,7 +524,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context) bool { case <-c.media.Timeout(): c.closeWithTimeout() return false - case <-timeout.C: + case <-timer.C: c.close(false, callDropped, "cannot-subscribe") return false case <-c.lkRoom.Subscribed(): @@ -665,7 +669,7 @@ func (c *inboundCall) setStatus(v CallStatus) { } r.LocalParticipant.SetAttributes(map[string]string{ - AttrSIPCallStatus: attr, + livekit.AttrSIPCallStatus: attr, }) } @@ -679,7 +683,7 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo for k, v := range c.extraAttrs { partConf.Attributes[k] = v } - partConf.Attributes[AttrSIPCallStatus] = CallActive.Attribute() + partConf.Attributes[livekit.AttrSIPCallStatus] = CallActive.Attribute() c.forwardDTMF.Store(true) select { case <-ctx.Done(): diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index a90ace2..a63de24 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -43,17 +43,19 @@ import ( ) type sipOutboundConfig struct { - address string - transport livekit.SIPTransport - host string - from string - to string - user string - pass string - dtmf string - ringtone bool - headers map[string]string - headersToAttrs map[string]string + address string + transport livekit.SIPTransport + host string + from string + to string + user string + pass string + dtmf string + ringtone bool + headers map[string]string + headersToAttrs map[string]string + ringingTimeout time.Duration + maxCallDuration time.Duration } type outboundCall struct { @@ -76,6 +78,12 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo if sipConf.host == "" { sipConf.host = c.signalingIp.String() } + if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration { + sipConf.maxCallDuration = maxCallDuration + } + if sipConf.ringingTimeout <= 0 { + sipConf.ringingTimeout = defaultRingingTimeout + } call := &outboundCall{ c: c, log: log, @@ -112,6 +120,8 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo func (c *outboundCall) Start(ctx context.Context) { ctx = context.WithoutCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, c.sipConf.maxCallDuration) + defer cancel() c.mon.CallStart() defer c.mon.CallEnd() err := c.ConnectSIP(ctx) @@ -231,7 +241,7 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig) erro c.c.RegisterTransferSIPParticipant(sipCallID, c) } - attrs[AttrSIPCallStatus] = CallDialing.Attribute() + attrs[livekit.AttrSIPCallStatus] = CallDialing.Attribute() lkNew.Participant.Attributes = attrs r := NewRoom(c.log) if err := r.Connect(c.c.conf, lkNew); err != nil { @@ -330,7 +340,7 @@ func (c *outboundCall) setStatus(v CallStatus) { return } r.LocalParticipant.SetAttributes(map[string]string{ - AttrSIPCallStatus: attr, + livekit.AttrSIPCallStatus: attr, }) } @@ -338,6 +348,12 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { ctx, span := tracer.Start(ctx, "outboundCall.sipSignal") defer span.End() + if c.sipConf.ringingTimeout > 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, c.sipConf.ringingTimeout) + defer cancel() + } + ctx, cancel := context.WithCancel(ctx) defer cancel() go func() { diff --git a/pkg/sip/participant.go b/pkg/sip/participant.go index 4e324e3..f94c3c7 100644 --- a/pkg/sip/participant.go +++ b/pkg/sip/participant.go @@ -14,7 +14,22 @@ package sip -import "github.com/livekit/protocol/livekit" +import ( + "time" + + "github.com/livekit/protocol/livekit" +) + +const ( + // maxCallDuration sets a global max call duration. + maxCallDuration = 24 * time.Hour + // defaultRingingTimeout is a maximal duration which SIP participant will wait to connect. + // + // For inbound, the participant will wait this duration for other participant tracks. + // + // For outbound, this sets a timeout for the other end to pick up the call. + defaultRingingTimeout = 3 * time.Minute +) var headerToLog = map[string]string{ "X-Twilio-AccountSid": "twilioAccSID", @@ -27,10 +42,6 @@ var headerToAttr = map[string]string{ "X-Lk-Test-Id": "lktest.id", } -const ( - AttrSIPCallStatus = livekit.AttrSIPPrefix + "callStatus" -) - type CallStatus int func (v CallStatus) Attribute() string { diff --git a/pkg/sip/server.go b/pkg/sip/server.go index ffcc976..1d87463 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -22,6 +22,7 @@ import ( "net" "net/netip" "sync" + "time" "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" @@ -93,6 +94,8 @@ type CallDispatch struct { Headers map[string]string HeadersToAttributes map[string]string EnabledFeatures []rpc.SIPFeature + RingingTimeout time.Duration + MaxCallDuration time.Duration } type Handler interface {