From 8cd66ec45f44843239f5d6749647f1b18b2cda1a Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Wed, 9 Oct 2024 13:53:34 -0700 Subject: [PATCH 1/2] WiP --- pkg/sip/service.go | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 239ccb6..1db1106 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -16,6 +16,7 @@ package sip import ( "context" + "sync" "time" "github.com/emiago/sipgo" @@ -36,6 +37,14 @@ type Service struct { mon *stats.Monitor cli *Client srv *Server + + mu sync.Mutex + pendingTransfers map[transferKey]struct{} +} + +type transferKey struct { + SipCallId string + TransferTo string } func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service { @@ -43,11 +52,12 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser log = logger.GetLogger() } s := &Service{ - conf: conf, - log: log, - mon: mon, - cli: NewClient(conf, log, mon), - srv: NewServer(conf, log, mon), + conf: conf, + log: log, + mon: mon, + cli: NewClient(conf, log, mon), + srv: NewServer(conf, log, mon), + pendingTransfers: make(map[transferKey]struct{}), } return s } @@ -127,6 +137,25 @@ func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalT ctx, done := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) defer done() + s.mu.Lock() + k := transferKey{ + SipCallId: req.SipCallId, + TransferTo: req.TransferTo, + } + if _, ok := s.pendingTransfers[k]; ok { + s.mu.Unlock() + s.log.Debugw("repeated request for call transfer", "callID", req.SipCallId, "transferTo", req.TransferTo) + return &emptypb.Empty{}, nil + } + s.pendingTransfers[k] = struct{}{} + s.mu.Unlock() + + defer func() { + s.mu.Lock() + delete(s.pendingTransfers, k) + s.mu.Unlock() + }() + // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() out := s.cli.activeCalls[LocalTag(req.SipCallId)] From 43ebe23368bd6f67f5399be1e1a6a2f7e999a1d8 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Thu, 10 Oct 2024 11:56:42 -0700 Subject: [PATCH 2/2] WiP --- pkg/sip/service.go | 66 ++++++++++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 22 deletions(-) diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 1db1106..6982f3e 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -17,6 +17,7 @@ package sip import ( "context" "sync" + "sync/atomic" "time" "github.com/emiago/sipgo" @@ -39,7 +40,7 @@ type Service struct { srv *Server mu sync.Mutex - pendingTransfers map[transferKey]struct{} + pendingTransfers map[transferKey]chan struct{} } type transferKey struct { @@ -57,7 +58,7 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser mon: mon, cli: NewClient(conf, log, mon), srv: NewServer(conf, log, mon), - pendingTransfers: make(map[transferKey]struct{}), + pendingTransfers: make(map[transferKey]chan struct{}), } return s } @@ -134,54 +135,75 @@ func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.Int func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) { s.log.Infow("transfering SIP call", "callID", req.SipCallId, "transferTo", req.TransferTo) - ctx, done := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) - defer done() + var transfetResult atomic.Pointer[error] s.mu.Lock() k := transferKey{ SipCallId: req.SipCallId, TransferTo: req.TransferTo, } - if _, ok := s.pendingTransfers[k]; ok { - s.mu.Unlock() + done, ok := s.pendingTransfers[k] + if !ok { + done = make(chan struct{}) + s.pendingTransfers[k] = done + + go func() { + ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cdone() + + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo) + transfetResult.Store(&err) + close(done) + + s.mu.Lock() + delete(s.pendingTransfers, k) + s.mu.Unlock() + }() + } else { s.log.Debugw("repeated request for call transfer", "callID", req.SipCallId, "transferTo", req.TransferTo) - return &emptypb.Empty{}, nil } - s.pendingTransfers[k] = struct{}{} s.mu.Unlock() - defer func() { - s.mu.Lock() - delete(s.pendingTransfers, k) - s.mu.Unlock() - }() + select { + case <-done: + var err error + errPtr := transfetResult.Load() + if errPtr != nil { + err = *errPtr + } + return &emptypb.Empty{}, err + case <-ctx.Done(): + return &emptypb.Empty{}, psrpc.NewError(psrpc.Canceled, ctx.Err()) + } +} +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() - out := s.cli.activeCalls[LocalTag(req.SipCallId)] + out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, req.TransferTo) + err := out.transferCall(ctx, transferTo) if err != nil { - return nil, err + return err } - return &emptypb.Empty{}, nil + return nil } s.srv.cmu.Lock() - in := s.srv.byLocal[LocalTag(req.SipCallId)] + in := s.srv.byLocal[LocalTag(callID)] s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, req.TransferTo) + err := in.transferCall(ctx, transferTo) if err != nil { - return nil, err + return err } - return &emptypb.Empty{}, nil + return nil } - return nil, psrpc.NewErrorf(psrpc.NotFound, "unknown call") + return psrpc.NewErrorf(psrpc.NotFound, "unknown call") }