From 711a46ea06cc56d088b2e71cf2d6f49fb1767b49 Mon Sep 17 00:00:00 2001 From: Benjamin Pracht Date: Fri, 11 Oct 2024 08:09:52 -0700 Subject: [PATCH] Ignore repeated TransferSIPParticipant RPC request with same parameters (#195) --- pkg/sip/service.go | 83 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 16 deletions(-) diff --git a/pkg/sip/service.go b/pkg/sip/service.go index 239ccb6..6982f3e 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -16,6 +16,8 @@ package sip import ( "context" + "sync" + "sync/atomic" "time" "github.com/emiago/sipgo" @@ -36,6 +38,14 @@ type Service struct { mon *stats.Monitor cli *Client srv *Server + + mu sync.Mutex + pendingTransfers map[transferKey]chan struct{} +} + +type transferKey struct { + SipCallId string + TransferTo string } func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Service { @@ -43,11 +53,12 @@ func NewService(conf *config.Config, mon *stats.Monitor, log logger.Logger) *Ser log = logger.GetLogger() } s := &Service{ - conf: conf, - log: log, - mon: mon, - cli: NewClient(conf, log, mon), - srv: NewServer(conf, log, mon), + conf: conf, + log: log, + mon: mon, + cli: NewClient(conf, log, mon), + srv: NewServer(conf, log, mon), + pendingTransfers: make(map[transferKey]chan struct{}), } return s } @@ -124,35 +135,75 @@ func (s *Service) CreateSIPParticipantAffinity(ctx context.Context, req *rpc.Int func (s *Service) TransferSIPParticipant(ctx context.Context, req *rpc.InternalTransferSIPParticipantRequest) (*emptypb.Empty, error) { s.log.Infow("transfering SIP call", "callID", req.SipCallId, "transferTo", req.TransferTo) - ctx, done := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) - defer done() + var transfetResult atomic.Pointer[error] + + s.mu.Lock() + k := transferKey{ + SipCallId: req.SipCallId, + TransferTo: req.TransferTo, + } + done, ok := s.pendingTransfers[k] + if !ok { + done = make(chan struct{}) + s.pendingTransfers[k] = done + + go func() { + ctx, cdone := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cdone() + + err := s.processParticipantTransfer(ctx, req.SipCallId, req.TransferTo) + transfetResult.Store(&err) + close(done) + + s.mu.Lock() + delete(s.pendingTransfers, k) + s.mu.Unlock() + }() + } else { + s.log.Debugw("repeated request for call transfer", "callID", req.SipCallId, "transferTo", req.TransferTo) + } + s.mu.Unlock() + + select { + case <-done: + var err error + errPtr := transfetResult.Load() + if errPtr != nil { + err = *errPtr + } + return &emptypb.Empty{}, err + case <-ctx.Done(): + return &emptypb.Empty{}, psrpc.NewError(psrpc.Canceled, ctx.Err()) + } +} +func (s *Service) processParticipantTransfer(ctx context.Context, callID string, transferTo string) error { // Look for call both in client (outbound) and server (inbound) s.cli.cmu.Lock() - out := s.cli.activeCalls[LocalTag(req.SipCallId)] + out := s.cli.activeCalls[LocalTag(callID)] s.cli.cmu.Unlock() if out != nil { - err := out.transferCall(ctx, req.TransferTo) + err := out.transferCall(ctx, transferTo) if err != nil { - return nil, err + return err } - return &emptypb.Empty{}, nil + return nil } s.srv.cmu.Lock() - in := s.srv.byLocal[LocalTag(req.SipCallId)] + in := s.srv.byLocal[LocalTag(callID)] s.srv.cmu.Unlock() if in != nil { - err := in.transferCall(ctx, req.TransferTo) + err := in.transferCall(ctx, transferTo) if err != nil { - return nil, err + return err } - return &emptypb.Empty{}, nil + return nil } - return nil, psrpc.NewErrorf(psrpc.NotFound, "unknown call") + return psrpc.NewErrorf(psrpc.NotFound, "unknown call") }