diff --git a/go.mod b/go.mod index 413bcc9..83ee7bb 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/jfreymuth/oggvorbis v1.0.5 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 - github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 + github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43 github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 github.com/livekit/sipgo v0.13.2-0.20241209123643-27500ef99c39 diff --git a/go.sum b/go.sum index 074b071..1b02cca 100644 --- a/go.sum +++ b/go.sum @@ -122,8 +122,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98 h1:QA7DqIC/ZSsMj8HC0+zNfMMwssHbA0alZALK68r30LQ= github.com/livekit/mediatransportutil v0.0.0-20241128072814-c363618d4c98/go.mod h1:WIVFAGzVZ7VMjPC5+nbSfwdFjWcbuLgx97KeNSUDTEo= -github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244 h1:Eg9HK+5bMCDRKhh5g5g16oyNaMbCqMrJvxFBaBuP7Vo= -github.com/livekit/protocol v1.29.5-0.20241209183753-f6b5078b2244/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M= +github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43 h1:IGF6U7ZncoUOnzU2697gI4xrvpEBaeumRDUssqFW7ds= +github.com/livekit/protocol v1.29.5-0.20241218124228-1975b61b7e43/go.mod h1:NDg1btMpKCzr/w6QR5kDuXw/e4Y7yOBE+RUAHsc+Y/M= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8 h1:Ibh0LoFl5NW5a1KFJEE0eLxxz7dqqKmYTj/BfCb0PbY= github.com/livekit/psrpc v0.6.1-0.20241018124827-1efff3d113a8/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/livekit/server-sdk-go/v2 v2.4.1-0.20241211082531-7610e1639c28 h1:LadsWjdymTEST6ny/huFg5n4IoS7suvXnSrF+RhzBqo= diff --git a/pkg/service/psrpc.go b/pkg/service/psrpc.go index d9b71ae..e32b042 100644 --- a/pkg/service/psrpc.go +++ b/pkg/service/psrpc.go @@ -101,7 +101,9 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger. TrunkID: resp.SipTrunkId, DispatchRuleID: resp.SipDispatchRuleId, Headers: resp.Headers, + IncludeHeaders: resp.IncludeHeaders, HeadersToAttributes: resp.HeadersToAttributes, + AttributesToHeaders: resp.AttributesToHeaders, EnabledFeatures: resp.EnabledFeatures, RingingTimeout: resp.RingingTimeout.AsDuration(), MaxCallDuration: resp.MaxCallDuration.AsDuration(), @@ -124,7 +126,9 @@ func DispatchCall(ctx context.Context, psrpcClient rpc.IOInfoClient, log logger. TrunkID: resp.SipTrunkId, DispatchRuleID: resp.SipDispatchRuleId, Headers: resp.Headers, + IncludeHeaders: resp.IncludeHeaders, HeadersToAttributes: resp.HeadersToAttributes, + AttributesToHeaders: resp.AttributesToHeaders, EnabledFeatures: resp.EnabledFeatures, RingingTimeout: resp.RingingTimeout.AsDuration(), MaxCallDuration: resp.MaxCallDuration.AsDuration(), diff --git a/pkg/sip/client.go b/pkg/sip/client.go index 3be9ef6..ddfc498 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -211,7 +211,9 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea dtmf: req.Dtmf, dialtone: req.PlayDialtone, headers: req.Headers, + includeHeaders: req.IncludeHeaders, headersToAttrs: req.HeadersToAttributes, + attrsToHeaders: req.AttributesToHeaders, ringingTimeout: req.RingingTimeout.AsDuration(), maxCallDuration: req.MaxCallDuration.AsDuration(), enabledFeatures: req.EnabledFeatures, diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 805bc0b..ea0c4e6 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -154,8 +154,20 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (*liv "toIP", req.Destination(), ) + var call *inboundCall + tr := transportFromReq(req) - cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx) + cc := s.newInbound(LocalTag(callID), s.ContactURI(tr), req, tx, func(headers map[string]string) map[string]string { + c := call + if c == nil || len(c.attrsToHdr) == 0 { + return headers + } + r := c.lkRoom.Room() + if r == nil { + return headers + } + return AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers) + }) log = LoggerWithParams(log, cc) log = LoggerWithHeaders(log, cc) log.Infow("processing invite") @@ -238,7 +250,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (*liv // ok } - call := s.newInboundCall(log, cmon, cc, src, callInfo, ioClient, nil) + call = s.newInboundCall(log, cmon, cc, src, callInfo, ioClient, nil) call.joinDur = joinDur err = call.handleInvite(call.ctx, req, r.TrunkID, s.conf) if err != nil { @@ -311,6 +323,7 @@ type inboundCall struct { mon *stats.CallMonitor ioClient rpc.IOInfoClient extraAttrs map[string]string + attrsToHdr map[string]string ctx context.Context cancel func() callInfo *livekit.SIPCallInfo @@ -334,8 +347,8 @@ func (s *Server) newInboundCall( ioClient rpc.IOInfoClient, extra map[string]string, ) *inboundCall { - - extra = HeadersToAttrs(extra, nil, cc) + // Map known headers immediately on join. The rest of the mapping will be available later. + extra = HeadersToAttrs(extra, nil, 0, cc) c := &inboundCall{ s: s, log: log, @@ -421,8 +434,13 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI return err } acceptCall := func() (bool, error) { - c.log.Infow("Accepting the call", "headers", disp.Headers) - if err = c.cc.Accept(ctx, answerData, disp.Headers); err != nil { + headers := disp.Headers + c.attrsToHdr = disp.AttributesToHeaders + if r := c.lkRoom.Room(); r != nil { + headers = AttrsToHeaders(r.LocalParticipant.Attributes(), c.attrsToHdr, headers) + } + c.log.Infow("Accepting the call", "headers", headers) + if err = c.cc.Accept(ctx, answerData, headers); err != nil { c.log.Errorw("Cannot respond to INVITE", err) return false, err } @@ -445,18 +463,8 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI return err // already sent a response. Could be success if user hung up } } - if len(disp.HeadersToAttributes) != 0 { - p := &disp.Room.Participant - if p.Attributes == nil { - p.Attributes = make(map[string]string) - } - headers := c.cc.RemoteHeaders() - for hdr, attr := range disp.HeadersToAttributes { - if h := headers.GetHeader(hdr); h != nil { - p.Attributes[attr] = h.Value() - } - } - } + p := &disp.Room.Participant + p.Attributes = HeadersToAttrs(p.Attributes, disp.HeadersToAttributes, disp.IncludeHeaders, c.cc) if disp.MaxCallDuration <= 0 || disp.MaxCallDuration > maxCallDuration { disp.MaxCallDuration = maxCallDuration } @@ -856,7 +864,7 @@ func (c *inboundCall) handleDTMF(tone dtmf.Event) { } } -func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { +func (c *inboundCall) transferCall(ctx context.Context, transferTo string, headers map[string]string, dialtone bool) (retErr error) { var err error if dialtone && c.started.IsBroken() && !c.done.Load() { @@ -883,7 +891,7 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt }() } - err = c.cc.TransferCall(ctx, transferTo) + err = c.cc.TransferCall(ctx, transferTo, headers) if err != nil { c.log.Infow("inbound call failed to transfer", "error", err, "transferTo", transferTo) return err @@ -898,7 +906,7 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, dialt } -func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction) *sipInbound { +func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { c := &sipInbound{ s: s, id: id, @@ -907,8 +915,9 @@ func (s *Server) newInbound(id LocalTag, contact URI, invite *sip.Request, invit contact: &sip.ContactHeader{ Address: *contact.GetContactURI(), }, - cancelled: make(chan struct{}), - referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request + cancelled: make(chan struct{}), + referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request + setHeaders: getHeaders, } c.from = invite.From() if c.from != nil { @@ -942,6 +951,7 @@ type sipInbound struct { nextRequestCSeq uint32 referCseq uint32 ringing chan struct{} + setHeaders setHeadersFunc } func (c *sipInbound) ValidateInvite() error { @@ -1192,6 +1202,11 @@ func (c *sipInbound) sendBye() { defer span.End() // This function is for clients, so we need to swap src and dest r := sip.NewByeRequest(c.invite, c.inviteOk, nil) + if c.setHeaders != nil { + for k, v := range c.setHeaders(nil) { + r.AppendHeader(sip.NewHeader(k, v)) + } + } c.setCSeq(r) c.swapSrcDst(r) @@ -1210,6 +1225,11 @@ func (c *sipInbound) sendRejected() { defer span.End() r := sip.NewResponseFromRequest(c.invite, sip.StatusBusyHere, "Rejected", nil) + if c.setHeaders != nil { + for k, v := range c.setHeaders(nil) { + r.AppendHeader(sip.NewHeader(k, v)) + } + } c.setDestFromVia(r) _ = c.inviteTx.Respond(r) c.drop() @@ -1223,7 +1243,7 @@ func (c *sipInbound) Transaction(req *sip.Request) (sip.ClientTransaction, error return c.s.sipSrv.TransactionLayer().Request(req) } -func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) { +func (c *sipInbound) newReferReq(transferTo string, headers map[string]string) (*sip.Request, error) { c.mu.Lock() defer c.mu.Unlock() @@ -1235,9 +1255,12 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) { if from == nil { return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "no From URI in invite") } + if c.setHeaders != nil { + headers = c.setHeaders(headers) + } // This will effectively redirect future SIP requests to this server instance (if host address is not LB). - req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo) + req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers) c.setCSeq(req) c.swapSrcDst(req) @@ -1249,8 +1272,8 @@ func (c *sipInbound) newReferReq(transferTo string) (*sip.Request, error) { return req, nil } -func (c *sipInbound) TransferCall(ctx context.Context, transferTo string) error { - req, err := c.newReferReq(transferTo) +func (c *sipInbound) TransferCall(ctx context.Context, transferTo string, headers map[string]string) error { + req, err := c.newReferReq(transferTo, headers) if err != nil { return err } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 4d9e166..b2e71e7 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -54,7 +54,9 @@ type sipOutboundConfig struct { dtmf string dialtone bool headers map[string]string + includeHeaders livekit.SIPHeaderOptions headersToAttrs map[string]string + attrsToHeaders map[string]string ringingTimeout time.Duration maxCallDuration time.Duration enabledFeatures []livekit.SIPFeature @@ -92,18 +94,28 @@ func (c *Client) newCall(ctx context.Context, conf *config.Config, log logger.Lo sipConf.host = contact.GetHost() } call := &outboundCall{ - c: c, - log: log, - cc: c.newOutbound(log, id, URI{ - User: sipConf.from, - Host: sipConf.host, - Addr: contact.Addr, - Transport: tr, - }, contact), + c: c, + log: log, sipConf: sipConf, callInfo: callInfo, ioClient: ioClient, } + call.cc = c.newOutbound(log, id, URI{ + User: sipConf.from, + Host: sipConf.host, + Addr: contact.Addr, + Transport: tr, + }, contact, func(headers map[string]string) map[string]string { + c := call + if len(c.sipConf.attrsToHeaders) == 0 { + return headers + } + r := c.lkRoom.Room() + if r == nil { + return headers + } + return AttrsToHeaders(r.LocalParticipant.Attributes(), c.sipConf.attrsToHeaders, headers) + }) call.mon = c.mon.NewCall(stats.Outbound, sipConf.host, sipConf.address) var err error @@ -457,7 +469,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { } joinDur() - extra := HeadersToAttrs(nil, c.sipConf.headersToAttrs, c.cc) + extra := HeadersToAttrs(nil, c.sipConf.headersToAttrs, c.sipConf.includeHeaders, c.cc) if c.lkRoom != nil && len(extra) != 0 { room := c.lkRoom.Room() if room != nil { @@ -476,7 +488,7 @@ func (c *outboundCall) handleDTMF(ev dtmf.Event) { }, lksdk.WithDataPublishReliable(true)) } -func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dialtone bool) (retErr error) { +func (c *outboundCall) transferCall(ctx context.Context, transferTo string, headers map[string]string, dialtone bool) (retErr error) { var err error if dialtone && c.started.IsBroken() && !c.stopped.IsBroken() { @@ -503,7 +515,7 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial }() } - err = c.cc.transferCall(ctx, transferTo) + err = c.cc.transferCall(ctx, transferTo, headers) if err != nil { c.log.Infow("outound call failed to transfer", "error", err, "transferTo", transferTo) return err @@ -517,7 +529,7 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, dial return nil } -func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI) *sipOutbound { +func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, getHeaders setHeadersFunc) *sipOutbound { from = from.Normalize() fromHeader := &sip.FromHeader{ DisplayName: from.User, @@ -529,13 +541,14 @@ func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI) } fromHeader.Params.Add("tag", string(id)) return &sipOutbound{ - log: log, - c: c, - id: id, - from: fromHeader, - contact: contactHeader, - referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request - nextCSeq: 1, + log: log, + c: c, + id: id, + from: fromHeader, + contact: contactHeader, + referDone: make(chan error), // Do not buffer the channel to avoid reading a result for an old request + nextCSeq: 1, + getHeaders: getHeaders, } } @@ -546,13 +559,14 @@ type sipOutbound struct { from *sip.FromHeader contact *sip.ContactHeader - mu sync.RWMutex - tag RemoteTag - callID string - invite *sip.Request - inviteOk *sip.Response - to *sip.ToHeader - nextCSeq uint32 + mu sync.RWMutex + tag RemoteTag + callID string + invite *sip.Request + inviteOk *sip.Response + to *sip.ToHeader + nextCSeq uint32 + getHeaders setHeadersFunc referCseq uint32 referDone chan error @@ -776,6 +790,11 @@ func (c *sipOutbound) sendBye() { defer span.End() r := sip.NewByeRequest(c.invite, c.inviteOk, nil) r.AppendHeader(sip.NewHeader("User-Agent", "LiveKit")) + if c.getHeaders != nil { + for k, v := range c.getHeaders(nil) { + r.AppendHeader(sip.NewHeader(k, v)) + } + } if c.c.closing.IsBroken() { // do not wait for a response _ = c.WriteRequest(r) @@ -798,7 +817,7 @@ func (c *sipOutbound) Drop() { c.drop() } -func (c *sipOutbound) transferCall(ctx context.Context, transferTo string) error { +func (c *sipOutbound) transferCall(ctx context.Context, transferTo string, headers map[string]string) error { c.mu.Lock() if c.invite == nil || c.inviteOk == nil { @@ -811,7 +830,11 @@ func (c *sipOutbound) transferCall(ctx context.Context, transferTo string) error return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't transfer hung up call") } - req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo) + if c.getHeaders != nil { + headers = c.getHeaders(headers) + } + + req := NewReferRequest(c.invite, c.inviteOk, c.contact, transferTo, headers) c.setCSeq(req) cseq := req.CSeq() diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index 902d601..e2414f3 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -116,6 +116,8 @@ func (e *ErrorStatus) Error() string { return fmt.Sprintf("sip status: %d", e.StatusCode) } +type setHeadersFunc func(headers map[string]string) map[string]string + type Signaling interface { From() sip.Uri To() sip.Uri @@ -174,7 +176,7 @@ func sendAndACK(ctx context.Context, c Signaling, req *sip.Request) { } } -func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, contactHeader *sip.ContactHeader, referToUrl string) *sip.Request { +func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, contactHeader *sip.ContactHeader, referToUrl string, headers map[string]string) *sip.Request { req := sip.NewRequest(sip.REFER, inviteRequest.Recipient) req.SipVersion = inviteRequest.SipVersion @@ -234,6 +236,10 @@ func NewReferRequest(inviteRequest *sip.Request, inviteResponse *sip.Response, c req.SetSource(inviteRequest.Source()) req.SetDestination(inviteRequest.Destination()) + for k, v := range headers { + req.AppendHeader(sip.NewHeader(k, v)) + } + return req } diff --git a/pkg/sip/server.go b/pkg/sip/server.go index eef7b70..c0fd33b 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -94,6 +94,8 @@ type CallDispatch struct { DispatchRuleID string Headers map[string]string HeadersToAttributes map[string]string + IncludeHeaders livekit.SIPHeaderOptions + AttributesToHeaders map[string]string EnabledFeatures []livekit.SIPFeature RingingTimeout time.Duration MaxCallDuration time.Duration diff --git a/pkg/sip/service.go b/pkg/sip/service.go index d1b8b40..9934af4 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -183,7 +183,7 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer cdone() - err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.PlayDialtone) + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo, req.Headers, req.PlayDialtone) transfetResult.Store(&err) close(done) @@ -209,14 +209,14 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT } } -func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, dialtone bool) error { +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string, headers map[string]string, dialtone bool) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, transferTo, dialtone) + err := out.transferCall(ctx, transferTo, headers, dialtone) if err != nil { return err } @@ -229,7 +229,7 @@ func (s *Service) processParticipantTransfer(ctx context.Context, callID string, s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, transferTo, dialtone) + err := in.transferCall(ctx, transferTo, headers, dialtone) if err != nil { return err } diff --git a/pkg/sip/types.go b/pkg/sip/types.go index 1b0ef73..96f2260 100644 --- a/pkg/sip/types.go +++ b/pkg/sip/types.go @@ -19,6 +19,7 @@ import ( "net" "net/netip" "strconv" + "strings" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" @@ -240,21 +241,43 @@ func LoggerWithHeaders(log logger.Logger, c Signaling) logger.Logger { return log } -func HeadersToAttrs(attrs, hdrToAttr map[string]string, c Signaling) map[string]string { +func HeadersToAttrs(attrs, hdrToAttr map[string]string, opts livekit.SIPHeaderOptions, c Signaling) map[string]string { if attrs == nil { attrs = make(map[string]string) } headers := c.RemoteHeaders() + // Map all headers, if requested + if opts != livekit.SIPHeaderOptions_SIP_NO_HEADERS { + for _, h := range headers { + if h == nil { + continue + } + name := strings.ToLower(h.Name()) + if name == "" { + continue + } + switch opts { + case livekit.SIPHeaderOptions_SIP_X_HEADERS: + if !strings.HasPrefix(name, "x-") { + continue + } + } + attrs[livekit.AttrSIPHeaderPrefix+name] = h.Value() + } + } + // Global header mapping for hdr, name := range headerToAttr { if h := headers.GetHeader(hdr); h != nil { attrs[name] = h.Value() } } + // Request mapping for hdr, name := range hdrToAttr { if h := headers.GetHeader(hdr); h != nil { attrs[name] = h.Value() } } + // Other metadata if tag := c.Tag(); tag != "" { attrs[AttrSIPCallTag] = string(tag) } @@ -263,3 +286,20 @@ func HeadersToAttrs(attrs, hdrToAttr map[string]string, c Signaling) map[string] } return attrs } + +func AttrsToHeaders(attrs, attrToHdr, headers map[string]string) map[string]string { + if len(attrToHdr) == 0 { + return headers + } + if headers == nil { + headers = make(map[string]string) + } + for attr, hdr := range attrToHdr { + val, ok := attrs[attr] + if !ok { + continue + } + headers[hdr] = val + } + return headers +}