Skip to content

Commit

Permalink
fix: Only closing UA should close transport layer
Browse files Browse the repository at this point in the history
  • Loading branch information
emiago committed Oct 3, 2023
1 parent aba02f7 commit e3aa4b7
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 32 deletions.
7 changes: 2 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,9 @@ func NewClient(ua *UserAgent, options ...ClientOption) (*Client, error) {
return c, nil
}

// Close client handle. UserAgent must be closed for full transaction and transport layer closing.
func (c *Client) Close() error {
// stop transaction layer
c.tx.Close()

// stop transport layer
return c.tp.Close()
return nil
}

func (c *Client) GetHostname() string {
Expand Down
9 changes: 3 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,9 @@ func (srv *Server) WriteResponse(r *sip.Response) error {
return srv.tp.WriteMsg(r)
}

// Shutdown gracefully shutdowns SIP server
func (srv *Server) Close() {
// stop transaction layer
srv.tx.Close()
// stop transport layer
srv.tp.Close()
// Close server handle. UserAgent must be closed for full transaction and transport layer closing.
func (srv *Server) Close() error {
return nil
}

// OnRequest registers new request callback. Can be used as generic way to add handler
Expand Down
2 changes: 2 additions & 0 deletions sip/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type ServerTransaction interface {

type ClientTransaction interface {
Transaction
// Responses returns channel with all responses for transaction
Responses() <-chan *Response
// Cancel sends cancel request
Cancel() error
}
2 changes: 1 addition & 1 deletion transaction/client_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,5 +260,5 @@ func (tx *ClientTx) delete() {
tx.timer_d = nil
}
tx.mu.Unlock()
tx.log.Debug().Str("tx", tx.Key()).Msg("Destroyed")
tx.log.Debug().Str("tx", tx.Key()).Msg("Client transaction destroyed")
}
8 changes: 7 additions & 1 deletion transaction/server_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (tx *ServerTx) passResp() error {
}

func (tx *ServerTx) Terminate() {
tx.log.Debug().Msg("Server transaction terminating")
tx.delete()
}

Expand Down Expand Up @@ -253,6 +254,11 @@ func (tx *ServerTx) delete() {
close(tx.done)
tx.mu.Unlock()
tx.onTerminate(tx.key)

// TODO with ref this can be added, but normally we expect client does closing
// if _, err := tx.conn.TryClose(); err != nil {
// tx.log.Info().Err(err).Msg("Closing connection returned error")
// }
})

// time.Sleep(time.Microsecond)
Expand Down Expand Up @@ -280,5 +286,5 @@ func (tx *ServerTx) delete() {
tx.timer_1xx = nil
}
tx.mu.Unlock()
tx.log.Debug().Str("tx", tx.Key()).Msg("Destroyed")
tx.log.Debug().Str("tx", tx.Key()).Msg("Server transaction destroyed")
}
6 changes: 6 additions & 0 deletions transport/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ func (l *Layer) WriteMsgTo(msg sip.Message, addr string, network string) error {
// ClientRequestConnection is based on
// https://www.rfc-editor.org/rfc/rfc3261#section-18.1.1
// It is wrapper for getting and creating connection
//
// In case req destination is DNS resolved, destination will be cached or in
// other words SetDestination will be called
func (l *Layer) ClientRequestConnection(req *sip.Request) (c Connection, err error) {
network := NetworkToLower(req.Transport())
transport, ok := l.transports[network]
Expand All @@ -293,9 +296,12 @@ func (l *Layer) ClientRequestConnection(req *sip.Request) (c Connection, err err
}
if raddr.IP == nil {
ctx := context.Background()
// TODO: how to cache this address, for example reusing in dialog routing
if err := l.resolveAddr(ctx, network, host, &raddr); err != nil {
return nil, err
}
// Save destination in request to avoid repeated resolving
req.SetDestination(raddr.String())
}

// Now use Via header to determine our local address
Expand Down
29 changes: 10 additions & 19 deletions transport/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ var (
type UDPTransport struct {
// listener *net.UDPConn
parser *parser.Parser
conn *UDPConnection

pool ConnectionPool
listeners []*UDPConnection
Expand All @@ -40,7 +39,6 @@ type UDPTransport struct {
func NewUDPTransport(par *parser.Parser) *UDPTransport {
p := &UDPTransport{
parser: par,
conn: nil, // Making sure interface is nil in returns
pool: NewConnectionPool(),
}
p.log = log.Logger.With().Str("caller", "transport<UDP>").Logger()
Expand Down Expand Up @@ -71,14 +69,6 @@ func (t *UDPTransport) Serve(conn net.PacketConn, handler sip.MessageHandler) er

c := &UDPConnection{PacketConn: conn}

// In case single connection avoid pool
if len(t.listeners) == 0 {
t.conn = c
} else {
t.conn = nil
}

// t.listenersPool.Add(conn.LocalAddr().String(), c)
t.listeners = append(t.listeners, c)

for i := 0; i < UDPReadWorkers-1; i++ {
Expand All @@ -104,10 +94,6 @@ func (t *UDPTransport) GetConnection(addr string) (Connection, error) {
return conn, nil
}

if t.conn != nil {
return t.conn, nil
}

// TODO: How to pick listener. Some address range mapping
if len(t.listeners) > 0 {
return t.listeners[0], nil
Expand Down Expand Up @@ -160,7 +146,6 @@ func (t *UDPTransport) readConnection(conn *UDPConnection, handler sip.MessageHa
defer conn.Close()
for {
num, raddr, err := conn.ReadFrom(buf)

if err != nil {
if errors.Is(err, net.ErrClosed) {
t.log.Debug().Err(err).Msg("Read connection closed")
Expand Down Expand Up @@ -293,6 +278,7 @@ func (c *UDPConnection) Close() error {
c.mu.Lock()
c.refcount = 0
c.mu.Unlock()
log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.Conn.RemoteAddr().String()).Int("ref", 0).Msg("UDP doing hard close")
return c.Conn.Close()
}

Expand All @@ -315,7 +301,7 @@ func (c *UDPConnection) TryClose() (int, error) {
return 0, nil
}

// log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.RemoteAddr().String()).Int("ref", ref).Msg("TCP closing")
log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.Conn.RemoteAddr().String()).Int("ref", ref).Msg("UDP closing")
return 0, c.Conn.Close()
}

Expand Down Expand Up @@ -373,13 +359,18 @@ func (c *UDPConnection) WriteMsg(msg sip.Message) error {
} else {
var err error

dst := msg.Destination()
raddr, err := net.ResolveUDPAddr("udp", dst)
// TODO lets return this better
dst := msg.Destination() // Destination should be already resolved by transport layer
host, port, err := sip.ParseAddr(dst)
if err != nil {
return err
}
raddr := net.UDPAddr{
IP: net.ParseIP(host),
Port: port,
}

n, err = c.WriteTo(data, raddr)
n, err = c.WriteTo(data, &raddr)
if err != nil {
return fmt.Errorf("udp conn %s err. %w", c.PacketConn.LocalAddr().String(), err)
}
Expand Down
8 changes: 8 additions & 0 deletions ua.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func NewUA(options ...UserAgentOption) (*UserAgent, error) {
return ua, nil
}

func (ua *UserAgent) Close() error {
// stop transaction layer
ua.tx.Close()

// stop transport layer
return ua.tp.Close()
}

// Listen adds listener for serve
func (ua *UserAgent) setIP(ip net.IP) (err error) {
ua.ip = ip
Expand Down

0 comments on commit e3aa4b7

Please sign in to comment.