diff --git a/client.go b/client.go index 1a0a215..70cfa4e 100644 --- a/client.go +++ b/client.go @@ -81,12 +81,9 @@ func NewClient(ua *UserAgent, options ...ClientOption) (*Client, error) { return c, nil } +// Close client handle. UserAgent must be closed for full transaction and transport layer closing. func (c *Client) Close() error { - // stop transaction layer - c.tx.Close() - - // stop transport layer - return c.tp.Close() + return nil } func (c *Client) GetHostname() string { diff --git a/server.go b/server.go index 8c7c448..a786017 100644 --- a/server.go +++ b/server.go @@ -246,12 +246,9 @@ func (srv *Server) WriteResponse(r *sip.Response) error { return srv.tp.WriteMsg(r) } -// Shutdown gracefully shutdowns SIP server -func (srv *Server) Close() { - // stop transaction layer - srv.tx.Close() - // stop transport layer - srv.tp.Close() +// Close server handle. UserAgent must be closed for full transaction and transport layer closing. +func (srv *Server) Close() error { + return nil } // OnRequest registers new request callback. Can be used as generic way to add handler diff --git a/sip/transaction.go b/sip/transaction.go index a4a89a4..ff6d07e 100644 --- a/sip/transaction.go +++ b/sip/transaction.go @@ -18,6 +18,8 @@ type ServerTransaction interface { type ClientTransaction interface { Transaction + // Responses returns channel with all responses for transaction Responses() <-chan *Response + // Cancel sends cancel request Cancel() error } diff --git a/transaction/client_tx.go b/transaction/client_tx.go index 110b83b..53c136f 100644 --- a/transaction/client_tx.go +++ b/transaction/client_tx.go @@ -260,5 +260,5 @@ func (tx *ClientTx) delete() { tx.timer_d = nil } tx.mu.Unlock() - tx.log.Debug().Str("tx", tx.Key()).Msg("Destroyed") + tx.log.Debug().Str("tx", tx.Key()).Msg("Client transaction destroyed") } diff --git a/transaction/server_tx.go b/transaction/server_tx.go index 11e2701..e5702c6 100644 --- a/transaction/server_tx.go +++ b/transaction/server_tx.go @@ -222,6 +222,7 @@ func (tx *ServerTx) passResp() error { } func (tx *ServerTx) Terminate() { + tx.log.Debug().Msg("Server transaction terminating") tx.delete() } @@ -253,6 +254,11 @@ func (tx *ServerTx) delete() { close(tx.done) tx.mu.Unlock() tx.onTerminate(tx.key) + + // TODO with ref this can be added, but normally we expect client does closing + // if _, err := tx.conn.TryClose(); err != nil { + // tx.log.Info().Err(err).Msg("Closing connection returned error") + // } }) // time.Sleep(time.Microsecond) @@ -280,5 +286,5 @@ func (tx *ServerTx) delete() { tx.timer_1xx = nil } tx.mu.Unlock() - tx.log.Debug().Str("tx", tx.Key()).Msg("Destroyed") + tx.log.Debug().Str("tx", tx.Key()).Msg("Server transaction destroyed") } diff --git a/transport/layer.go b/transport/layer.go index bc62b8e..0989cab 100644 --- a/transport/layer.go +++ b/transport/layer.go @@ -271,6 +271,9 @@ func (l *Layer) WriteMsgTo(msg sip.Message, addr string, network string) error { // ClientRequestConnection is based on // https://www.rfc-editor.org/rfc/rfc3261#section-18.1.1 // It is wrapper for getting and creating connection +// +// In case req destination is DNS resolved, destination will be cached or in +// other words SetDestination will be called func (l *Layer) ClientRequestConnection(req *sip.Request) (c Connection, err error) { network := NetworkToLower(req.Transport()) transport, ok := l.transports[network] @@ -293,9 +296,12 @@ func (l *Layer) ClientRequestConnection(req *sip.Request) (c Connection, err err } if raddr.IP == nil { ctx := context.Background() + // TODO: how to cache this address, for example reusing in dialog routing if err := l.resolveAddr(ctx, network, host, &raddr); err != nil { return nil, err } + // Save destination in request to avoid repeated resolving + req.SetDestination(raddr.String()) } // Now use Via header to determine our local address diff --git a/transport/udp.go b/transport/udp.go index ad9c870..bd6626b 100644 --- a/transport/udp.go +++ b/transport/udp.go @@ -29,7 +29,6 @@ var ( type UDPTransport struct { // listener *net.UDPConn parser *parser.Parser - conn *UDPConnection pool ConnectionPool listeners []*UDPConnection @@ -40,7 +39,6 @@ type UDPTransport struct { func NewUDPTransport(par *parser.Parser) *UDPTransport { p := &UDPTransport{ parser: par, - conn: nil, // Making sure interface is nil in returns pool: NewConnectionPool(), } p.log = log.Logger.With().Str("caller", "transport").Logger() @@ -71,14 +69,6 @@ func (t *UDPTransport) Serve(conn net.PacketConn, handler sip.MessageHandler) er c := &UDPConnection{PacketConn: conn} - // In case single connection avoid pool - if len(t.listeners) == 0 { - t.conn = c - } else { - t.conn = nil - } - - // t.listenersPool.Add(conn.LocalAddr().String(), c) t.listeners = append(t.listeners, c) for i := 0; i < UDPReadWorkers-1; i++ { @@ -104,10 +94,6 @@ func (t *UDPTransport) GetConnection(addr string) (Connection, error) { return conn, nil } - if t.conn != nil { - return t.conn, nil - } - // TODO: How to pick listener. Some address range mapping if len(t.listeners) > 0 { return t.listeners[0], nil @@ -160,7 +146,6 @@ func (t *UDPTransport) readConnection(conn *UDPConnection, handler sip.MessageHa defer conn.Close() for { num, raddr, err := conn.ReadFrom(buf) - if err != nil { if errors.Is(err, net.ErrClosed) { t.log.Debug().Err(err).Msg("Read connection closed") @@ -293,6 +278,7 @@ func (c *UDPConnection) Close() error { c.mu.Lock() c.refcount = 0 c.mu.Unlock() + log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.Conn.RemoteAddr().String()).Int("ref", 0).Msg("UDP doing hard close") return c.Conn.Close() } @@ -315,7 +301,7 @@ func (c *UDPConnection) TryClose() (int, error) { return 0, nil } - // log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.RemoteAddr().String()).Int("ref", ref).Msg("TCP closing") + log.Debug().Str("ip", c.LocalAddr().String()).Str("dst", c.Conn.RemoteAddr().String()).Int("ref", ref).Msg("UDP closing") return 0, c.Conn.Close() } @@ -373,13 +359,18 @@ func (c *UDPConnection) WriteMsg(msg sip.Message) error { } else { var err error - dst := msg.Destination() - raddr, err := net.ResolveUDPAddr("udp", dst) + // TODO lets return this better + dst := msg.Destination() // Destination should be already resolved by transport layer + host, port, err := sip.ParseAddr(dst) if err != nil { return err } + raddr := net.UDPAddr{ + IP: net.ParseIP(host), + Port: port, + } - n, err = c.WriteTo(data, raddr) + n, err = c.WriteTo(data, &raddr) if err != nil { return fmt.Errorf("udp conn %s err. %w", c.PacketConn.LocalAddr().String(), err) } diff --git a/ua.go b/ua.go index 888e661..7fb874d 100644 --- a/ua.go +++ b/ua.go @@ -86,6 +86,14 @@ func NewUA(options ...UserAgentOption) (*UserAgent, error) { return ua, nil } +func (ua *UserAgent) Close() error { + // stop transaction layer + ua.tx.Close() + + // stop transport layer + return ua.tp.Close() +} + // Listen adds listener for serve func (ua *UserAgent) setIP(ip net.IP) (err error) { ua.ip = ip