diff --git a/example/proxysip/go.mod b/example/proxysip/go.mod index 12dab23..ca0bcd9 100644 --- a/example/proxysip/go.mod +++ b/example/proxysip/go.mod @@ -19,7 +19,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect - github.com/gobwas/ws v1.2.1 // indirect + github.com/gobwas/ws v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/example/proxysip/go.sum b/example/proxysip/go.sum index 3a8a0e9..fe580ed 100644 --- a/example/proxysip/go.sum +++ b/example/proxysip/go.sum @@ -15,6 +15,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk= github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= +github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q= +github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= diff --git a/go.mod b/go.mod index df1a70b..b282088 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/emiago/sipgo -go 1.19 +go 1.21 require ( github.com/gobwas/ws v1.2.1 diff --git a/server_integration_test.go b/server_integration_test.go index 1bb14d3..98b75ff 100644 --- a/server_integration_test.go +++ b/server_integration_test.go @@ -240,28 +240,47 @@ func BenchmarkIntegrationClientServer(t *testing.B) { for _, tc := range testCases { t.Run(tc.transport, func(t *testing.B) { - // Build UAC - ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS)) - client, err := NewClient(ua) - require.NoError(t, err) - proto := "sip" if tc.encrypted { proto = "sips" } - t.ResetTimer() - for i := 0; i < t.N; i++ { - req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String()) - tx, err := client.TransactionRequest(ctx, req) + + t.RunParallel(func(p *testing.PB) { + // Build UAC + ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS)) + client, err := NewClient(ua) require.NoError(t, err) + for p.Next() { + req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String()) + tx, err := client.TransactionRequest(ctx, req) + require.NoError(t, err) - res := <-tx.Responses() - assert.Equal(t, sip.StatusCode(200), res.StatusCode) + res := <-tx.Responses() + assert.Equal(t, sip.StatusCode(200), res.StatusCode) - tx.Terminate() - } - t.ReportMetric(float64(t.N)/t.Elapsed().Seconds(), "req/s") + tx.Terminate() + } + + // t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s") + }) + t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s") + + // ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS)) + // client, err := NewClient(ua) + // require.NoError(t, err) + + // for i := 0; i < t.N; i++ { + // req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String()) + // tx, err := client.TransactionRequest(ctx, req) + // require.NoError(t, err) + + // res := <-tx.Responses() + // assert.Equal(t, sip.StatusCode(200), res.StatusCode) + + // tx.Terminate() + // } + // t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s") }) } } diff --git a/sip/transport_ws.go b/sip/transport_ws.go index 6d9c6d8..27a669e 100644 --- a/sip/transport_ws.go +++ b/sip/transport_ws.go @@ -111,7 +111,7 @@ func (t *transportWS) initConnection(conn net.Conn, addr string, clientSide bool t.log.Debug().Str("raddr", addr).Msg("New WS connection") c := &WSConnection{ Conn: conn, - refcount: 1, + refcount: 1 + IdleConnection, clientSide: clientSide, } t.pool.Add(addr, c) @@ -125,6 +125,7 @@ func (t *transportWS) readConnection(conn *WSConnection, raddr string, handler M // defer conn.Close() // defer t.pool.Del(raddr) defer t.pool.CloseAndDelete(conn, raddr) + defer t.log.Debug().Str("raddr", raddr).Msg("Websocket read connection stopped") // Create stream parser context par := t.parser.NewSIPStream() @@ -271,7 +272,7 @@ func (c *WSConnection) Close() error { c.mu.Lock() c.refcount = 0 c.mu.Unlock() - log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS doing hard close") + log.Debug().Str("ip", c.RemoteAddr().String()).Msg("WS doing hard close") return c.Conn.Close() } @@ -280,16 +281,16 @@ func (c *WSConnection) TryClose() (int, error) { c.refcount-- ref := c.refcount c.mu.Unlock() - log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS reference decrement") + log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS reference decrement") if ref > 0 { return ref, nil } if ref < 0 { - log.Warn().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS ref went negative") + log.Warn().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS ref went negative") return 0, nil } - log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS closing") + log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS closing") return ref, c.Conn.Close() } @@ -312,8 +313,25 @@ func (c *WSConnection) Read(b []byte) (n int, err error) { log.Debug().Str("caller", c.RemoteAddr().String()).Msgf("WS read connection header <- %s opcode=%d len=%d", c.Conn.RemoteAddr(), header.OpCode, header.Length) } - if header.OpCode == ws.OpClose { - return n, net.ErrClosed + if header.OpCode.IsControl() { + if header.OpCode == ws.OpClose { + return n, net.ErrClosed + } + continue + } + // if header.OpCode.IsReserved() { + // continue + // } + + // if !header.OpCode.IsData() { + // continue + // } + + if header.OpCode&ws.OpText == 0 { + if err := reader.Discard(); err != nil { + return 0, err + } + continue } data := make([]byte, header.Length) @@ -330,14 +348,14 @@ func (c *WSConnection) Read(b []byte) (n int, err error) { // continue // } - if SIPDebug { - log.Debug().Msgf("WS read %s <- %s:\n%s", c.Conn.LocalAddr().String(), c.Conn.RemoteAddr(), string(data)) - } - if header.Masked { ws.Cipher(data, header.Mask, 0) } + // header.Masked = false + if SIPDebug { + log.Debug().Msgf("WS read %s <- %s:\n%s", c.Conn.LocalAddr().String(), c.Conn.RemoteAddr(), string(data)) + } n += copy(b[n:], data)