Skip to content

Commit

Permalink
fix: moving benchmark with parallel and small ws fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
emiago committed Feb 9, 2024
1 parent f418874 commit b13aa39
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 27 deletions.
2 changes: 1 addition & 1 deletion example/proxysip/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.2.1 // indirect
github.com/gobwas/ws v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions example/proxysip/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1 h1:F2aeBZrm2NDsc7vbovKrWSogd4wvfAxg0FQ89/iqOTk=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gobwas/ws v1.3.2 h1:zlnbNHxumkRvfPWgfXu8RBwyNR1x8wh9cf5PTOCqs9Q=
github.com/gobwas/ws v1.3.2/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/emiago/sipgo

go 1.19
go 1.21

require (
github.com/gobwas/ws v1.2.1
Expand Down
47 changes: 33 additions & 14 deletions server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,47 @@ func BenchmarkIntegrationClientServer(t *testing.B) {

for _, tc := range testCases {
t.Run(tc.transport, func(t *testing.B) {
// Build UAC
ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS))
client, err := NewClient(ua)
require.NoError(t, err)

proto := "sip"
if tc.encrypted {
proto = "sips"
}

t.ResetTimer()
for i := 0; i < t.N; i++ {
req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String())
tx, err := client.TransactionRequest(ctx, req)

t.RunParallel(func(p *testing.PB) {
// Build UAC
ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS))
client, err := NewClient(ua)
require.NoError(t, err)
for p.Next() {
req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String())
tx, err := client.TransactionRequest(ctx, req)
require.NoError(t, err)

res := <-tx.Responses()
assert.Equal(t, sip.StatusCode(200), res.StatusCode)
res := <-tx.Responses()
assert.Equal(t, sip.StatusCode(200), res.StatusCode)

tx.Terminate()
}
t.ReportMetric(float64(t.N)/t.Elapsed().Seconds(), "req/s")
tx.Terminate()
}

// t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s")
})
t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s")

// ua, _ := NewUA(WithUserAgenTLSConfig(clientTLS))
// client, err := NewClient(ua)
// require.NoError(t, err)

// for i := 0; i < t.N; i++ {
// req, _, _ := createTestInvite(t, proto+":bob@"+tc.serverAddr, tc.transport, client.ip.String())
// tx, err := client.TransactionRequest(ctx, req)
// require.NoError(t, err)

// res := <-tx.Responses()
// assert.Equal(t, sip.StatusCode(200), res.StatusCode)

// tx.Terminate()
// }
// t.ReportMetric(float64(t.N)/max(t.Elapsed().Seconds(), 1), "req/s")
})
}
}
40 changes: 29 additions & 11 deletions sip/transport_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *transportWS) initConnection(conn net.Conn, addr string, clientSide bool
t.log.Debug().Str("raddr", addr).Msg("New WS connection")
c := &WSConnection{
Conn: conn,
refcount: 1,
refcount: 1 + IdleConnection,
clientSide: clientSide,
}
t.pool.Add(addr, c)
Expand All @@ -125,6 +125,7 @@ func (t *transportWS) readConnection(conn *WSConnection, raddr string, handler M
// defer conn.Close()
// defer t.pool.Del(raddr)
defer t.pool.CloseAndDelete(conn, raddr)
defer t.log.Debug().Str("raddr", raddr).Msg("Websocket read connection stopped")

// Create stream parser context
par := t.parser.NewSIPStream()
Expand Down Expand Up @@ -271,7 +272,7 @@ func (c *WSConnection) Close() error {
c.mu.Lock()
c.refcount = 0
c.mu.Unlock()
log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS doing hard close")
log.Debug().Str("ip", c.RemoteAddr().String()).Msg("WS doing hard close")
return c.Conn.Close()
}

Expand All @@ -280,16 +281,16 @@ func (c *WSConnection) TryClose() (int, error) {
c.refcount--
ref := c.refcount
c.mu.Unlock()
log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS reference decrement")
log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS reference decrement")
if ref > 0 {
return ref, nil
}

if ref < 0 {
log.Warn().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS ref went negative")
log.Warn().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS ref went negative")
return 0, nil
}
log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", c.refcount).Msg("WS closing")
log.Debug().Str("ip", c.RemoteAddr().String()).Int("ref", ref).Msg("WS closing")
return ref, c.Conn.Close()
}

Expand All @@ -312,8 +313,25 @@ func (c *WSConnection) Read(b []byte) (n int, err error) {
log.Debug().Str("caller", c.RemoteAddr().String()).Msgf("WS read connection header <- %s opcode=%d len=%d", c.Conn.RemoteAddr(), header.OpCode, header.Length)
}

if header.OpCode == ws.OpClose {
return n, net.ErrClosed
if header.OpCode.IsControl() {
if header.OpCode == ws.OpClose {
return n, net.ErrClosed
}
continue
}
// if header.OpCode.IsReserved() {
// continue
// }

// if !header.OpCode.IsData() {
// continue
// }

if header.OpCode&ws.OpText == 0 {
if err := reader.Discard(); err != nil {
return 0, err
}
continue
}

data := make([]byte, header.Length)
Expand All @@ -330,14 +348,14 @@ func (c *WSConnection) Read(b []byte) (n int, err error) {
// continue
// }

if SIPDebug {
log.Debug().Msgf("WS read %s <- %s:\n%s", c.Conn.LocalAddr().String(), c.Conn.RemoteAddr(), string(data))
}

if header.Masked {
ws.Cipher(data, header.Mask, 0)
}

// header.Masked = false
if SIPDebug {
log.Debug().Msgf("WS read %s <- %s:\n%s", c.Conn.LocalAddr().String(), c.Conn.RemoteAddr(), string(data))
}

n += copy(b[n:], data)

Expand Down

0 comments on commit b13aa39

Please sign in to comment.