Skip to content

Commit

Permalink
Support new header options.
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Dec 18, 2024
1 parent e9e0294 commit c8bdd12
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 64 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/jfreymuth/oggvorbis v1.0.5
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244
github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8
github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28
github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ=
github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo=
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 h1:Eg9HK+5bMCDRKhh5g5g16oyNaMbCqMrJvxFBaBuP7Vo=
github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43 h1:IGF6U7ZncoUOnzU2697gI4xrvpEBaeumRDUssqFW7ds=
github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY=
github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 h1:LadsWjdymTEST6ny/huFg5n4IoS7suvXnSrF+RhzBqo=
Expand Down
4 changes: 4 additions & 0 deletions pkg/service/psrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
TrunkID: resp.SipTrunkId,
DispatchRuleID: resp.SipDispatchRuleId,
Headers: resp.Headers,
IncludeHeaders: resp.IncludeHeaders,
HeadersToAttributes: resp.HeadersToAttributes,
AttributesToHeaders: resp.AttributesToHeaders,
EnabledFeatures: resp.EnabledFeatures,
RingingTimeout: resp.RingingTimeout.AsDuration(),
MaxCallDuration: resp.MaxCallDuration.AsDuration(),
Expand All @@ -124,7 +126,9 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger.
TrunkID: resp.SipTrunkId,
DispatchRuleID: resp.SipDispatchRuleId,
Headers: resp.Headers,
IncludeHeaders: resp.IncludeHeaders,
HeadersToAttributes: resp.HeadersToAttributes,
AttributesToHeaders: resp.AttributesToHeaders,
EnabledFeatures: resp.EnabledFeatures,
RingingTimeout: resp.RingingTimeout.AsDuration(),
MaxCallDuration: resp.MaxCallDuration.AsDuration(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
dtmf: req.Dtmf,
dialtone: req.PlayDialtone,
headers: req.Headers,
includeHeaders: req.IncludeHeaders,
headersToAttrs: req.HeadersToAttributes,
attrsToHeaders: req.AttributesToHeaders,
ringingTimeout: req.RingingTimeout.AsDuration(),
maxCallDuration: req.MaxCallDuration.AsDuration(),
enabledFeatures: req.EnabledFeatures,
Expand Down
77 changes: 50 additions & 27 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,20 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (*liv
"toIP", req.Destination(),
)

var call *inboundCall

tr := transportFromReq(req)
cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx)
cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx, func(headers map[string]string) map[string]string {
c := call
if c == nil || len(c.attrsToHdr) == 0 {
return headers
}
r := c.lkRoom.Room()
if r == nil {
return headers
}
return AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers)
})
log = LoggerWithParams(log, cc)
log = LoggerWithHeaders(log, cc)
log.Infow("processing invite")
Expand Down Expand Up @@ -238,7 +250,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (*liv
// ok
}

call := s.newInboundCall(log, cmon, cc, src, callInfo, ioClient, nil)
call = s.newInboundCall(log, cmon, cc, src, callInfo, ioClient, nil)
call.joinDur = joinDur
err = call.handleInvite(call.ctx, req, r.TrunkID, s.conf)
if err != nil {
Expand Down Expand Up @@ -311,6 +323,7 @@ type inboundCall struct {
mon *stats.CallMonitor
ioClient rpc.IOInfoClient
extraAttrs map[string]string
attrsToHdr map[string]string
ctx context.Context
cancel func()
callInfo *livekit.SIPCallInfo
Expand All @@ -334,8 +347,8 @@ func (s *Server) newInboundCall(
ioClient rpc.IOInfoClient,
extra map[string]string,
) *inboundCall {

extra = HeadersToAttrs(extra, nil, cc)
// Map known headers immediately on join. The rest of the mapping will be available later.
extra = HeadersToAttrs(extra, nil, 0, cc)
c := &inboundCall{
s: s,
log: log,
Expand Down Expand Up @@ -421,8 +434,13 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
return err
}
acceptCall := func() (bool, error) {
c.log.Infow("Accepting the call", "headers", disp.Headers)
if err = c.cc.Accept(ctx, answerData, disp.Headers); err != nil {
headers := disp.Headers
c.attrsToHdr = disp.AttributesToHeaders
if r := c.lkRoom.Room(); r != nil {
headers = AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers)
}
c.log.Infow("Accepting the call", "headers", headers)
if err = c.cc.Accept(ctx, answerData, headers); err != nil {
c.log.Errorw("Cannot respond to INVITE", err)
return false, err
}
Expand All @@ -445,18 +463,8 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
return err // already sent a response. Could be success if user hung up
}
}
if len(disp.HeadersToAttributes) != 0 {
p := &disp.Room.Participant
if p.Attributes == nil {
p.Attributes = make(map[string]string)
}
headers := c.cc.RemoteHeaders()
for hdr, attr := range disp.HeadersToAttributes {
if h := headers.GetHeader(hdr); h != nil {
p.Attributes[attr] = h.Value()
}
}
}
p := &disp.Room.Participant
p.Attributes = HeadersToAttrs(p.Attributes, disp.HeadersToAttributes, disp.IncludeHeaders, c.cc)
if disp.MaxCallDuration <= 0 || disp.MaxCallDuration > maxCallDuration {
disp.MaxCallDuration = maxCallDuration
}
Expand Down Expand Up @@ -856,7 +864,7 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) {
}
}

func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) {
func (c *inboundCall) transferCall(ctx context.Context, transferTo string, headers map[string]string, dialtone bool) (retErr error) {
var err error

if dialtone && c.started.IsBroken() && !c.done.Load() {
Expand All @@ -883,7 +891,7 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt
}()
}

err = c.cc.TransferCall(ctx, transferTo)
err = c.cc.TransferCall(ctx, transferTo, headers)
if err != nil {
c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo)
return err
Expand All @@ -898,7 +906,7 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt

}

func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound {
func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
c := &sipInbound{
s: s,
id: id,
Expand All @@ -907,8 +915,9 @@ func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, invit
contact: &sip.ContactHeader{
Address: *contact.GetContactURI(),
},
cancelled: make(chan struct{}),
referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request
cancelled: make(chan struct{}),
referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request
setHeaders: getHeaders,
}
c.from = invite.From()
if c.from != nil {
Expand Down Expand Up @@ -942,6 +951,7 @@ type sipInbound struct {
nextRequestCSeq uint32
referCseq uint32
ringing chan struct{}
setHeaders setHeadersFunc
}

func (c *sipInbound) ValidateInvite() error {
Expand Down Expand Up @@ -1192,6 +1202,11 @@ func (c *sipInbound) sendBye() {
defer span.End()
// This function is for clients, so we need to swap src and dest
r := sip.NewByeRequest(c.invite, c.inviteOk, nil)
if c.setHeaders != nil {
for k, v := range c.setHeaders(nil) {
r.AppendHeader(sip.NewHeader(k, v))
}
}

c.setCSeq(r)
c.swapSrcDst(r)
Expand All @@ -1210,6 +1225,11 @@ func (c *sipInbound) sendRejected() {
defer span.End()

r := sip.NewResponseFromRequest(c.invite, sip.StatusBusyHere, "Rejected", nil)
if c.setHeaders != nil {
for k, v := range c.setHeaders(nil) {
r.AppendHeader(sip.NewHeader(k, v))
}
}
c.setDestFromVia(r)
_ = c.inviteTx.Respond(r)
c.drop()
Expand All @@ -1223,7 +1243,7 @@ func (c *sipInbound) Transaction(req *sip.Request) (sip.ClientTransaction, error
return c.s.sipSrv.TransactionLayer().Request(req)
}

func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
func (c *sipInbound) newReferReq(transferTo string, headers map[string]string) (*sip.Request, error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -1235,9 +1255,12 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
if from == nil {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "no From URI in invite")
}
if c.setHeaders != nil {
headers = c.setHeaders(headers)
}

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo)
req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers)
c.setCSeq(req)
c.swapSrcDst(req)

Expand All @@ -1249,8 +1272,8 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) {
return req, nil
}

func (c *sipInbound) TransferCall(ctx context.Context, transferTo string) error {
req, err := c.newReferReq(transferTo)
func (c *sipInbound) TransferCall(ctx context.Context, transferTo string, headers map[string]string) error {
req, err := c.newReferReq(transferTo, headers)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit c8bdd12

Please sign in to comment.