Skip to content

Commit

Permalink
Merge branch 'new-network-connection' into itest-light-node-state-hash
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeykiselev committed Dec 4, 2024
2 parents 4f8b6b7 + 9705342 commit 5e753de
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 40 deletions.
21 changes: 14 additions & 7 deletions cmd/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,21 @@ func spawnPeersByAddresses(ctx context.Context, addressesByComma string, pm *pee
}
addresses := strings.Split(addressesByComma, ",")
for _, addr := range addresses {
tcpAddr := proto.NewTCPAddrFromString(addr)
if tcpAddr.Empty() {
// That means that configuration parameter is invalid
return errors.Errorf("Failed to parse TCPAddr from string %q", tcpAddr.String())
peerInfos, err := proto.NewPeerInfosFromString(addr)
if err != nil {
return errors.Wrapf(err, "failed to resolve TCP addresses from string %q", addr)
}
if pErr := pm.AddAddress(ctx, tcpAddr); pErr != nil {
// That means that we have problems with peers storage
return errors.Wrapf(pErr, "failed to add address %q into known peers storage", tcpAddr.String())
for _, pi := range peerInfos {
tcpAddr := proto.NewTCPAddr(pi.Addr, int(pi.Port))
if tcpAddr.Empty() {
return errors.Errorf("failed to create TCP address from IP %q and port %d",
fmt.Stringer(pi.Addr), pi.Port,
)
}
if pErr := pm.AddAddress(ctx, tcpAddr); pErr != nil {
// That means that we have problems with peers storage
return errors.Wrapf(pErr, "failed to add address %q into known peers storage", tcpAddr.String())
}
}
}
return nil
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ func (a *App) TransactionsBroadcast(ctx context.Context, b []byte) (proto.Transa
)
defer func() {
if !delay.Stop() && !fired {
<-delay.C
select {
case <-delay.C:
default:
}
}
}()
select {
Expand Down
5 changes: 4 additions & 1 deletion pkg/api/metamask/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ func (s RPCService) Eth_SendRawTransaction(signedTxData string) (proto.EthereumH
return proto.EthereumHash{}, errors.New("timeout waiting response from internal FSM")
case err := <-respCh:
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
if err != nil {
zap.S().Debugf("Eth_SendRawTransaction: error from internal FSM for ethereum tx (ethTxID=%q, to=%q, from=%q): %v",
Expand Down
5 changes: 4 additions & 1 deletion pkg/libs/ntptime/ntptime.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func (a *ntpTimeImpl) Run(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
return
case <-timer.C:
Expand Down
5 changes: 5 additions & 0 deletions pkg/networking/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"net"
Expand Down Expand Up @@ -85,6 +86,10 @@ func newSession(ctx context.Context, config *Config, conn io.ReadWriteCloser, tp
return s, nil
}

func (s *Session) String() string {
return fmt.Sprintf("Session{local=%s,remote=%s}", s.LocalAddr(), s.RemoteAddr())
}

// LocalAddr returns the local network address.
func (s *Session) LocalAddr() net.Addr {
if a, ok := s.conn.(addressable); ok {
Expand Down
5 changes: 4 additions & 1 deletion pkg/node/fsm/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ func (a SnapshotTimeoutTask) Run(ctx context.Context, output chan AsyncTask) err
t := time.NewTimer(a.timeout)
defer func() {
if !t.Stop() {
<-t.C
select {
case <-t.C:
default:
}
}
}()
select {
Expand Down
10 changes: 8 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (a *Node) runInternalMetrics(ctx context.Context, ch chan peer.ProtoMessage
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
return
case <-timer.C:
Expand All @@ -238,7 +241,10 @@ func (a *Node) runOutgoingConnections(ctx context.Context) {
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
return
case <-timer.C:
Expand Down
104 changes: 82 additions & 22 deletions pkg/proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"fmt"
"io"
"math/rand/v2"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -380,23 +381,14 @@ func (a TCPAddr) Equal(other TCPAddr) bool {
return a.IP.Equal(other.IP) && a.Port == other.Port
}

// NewTCPAddrFromString creates TCPAddr from string.
// Returns empty TCPAddr if string can't be parsed.
func NewTCPAddrFromString(s string) TCPAddr {
host, port, err := net.SplitHostPort(s)
pi, err := NewPeerInfoFromString(s)
if err != nil {
return TCPAddr{}
return TCPAddr{} // return empty TCPAddr in case of error
}
ip := net.ParseIP(host)
if ip == nil {
ips, err := net.LookupIP(host)
if err == nil {
ip = ips[0]
}
}
p, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return TCPAddr{}
}
return NewTCPAddr(ip, int(p))
return NewTCPAddr(pi.Addr, int(pi.Port))
}

func NewTcpAddrFromUint64(value uint64) TCPAddr {
Expand Down Expand Up @@ -738,26 +730,94 @@ func (a *IpPort) String() string {
return NewTCPAddr(a.Addr(), a.Port()).String()
}

func filterToIPV4(ips []net.IP) []net.IP {
for i := 0; i < len(ips); i++ {
ipV4 := ips[i].To4()
if ipV4 == nil { // for now we support only IPv4
iLast := len(ips) - 1
ips[i], ips[iLast] = ips[iLast], nil // move last address to the current position, order is not important
ips = ips[:iLast] // remove last address
i-- // move back to check the previously last address
} else {
ips[i] = ipV4 // replace with exact IPv4 form (ipV4 can be in both forms: ipv4 and ipV4 in ipv6)
}
}
return ips
}

func resolveHostToIPsv4(host string) ([]net.IP, error) {
if ip := net.ParseIP(host); ip != nil { // try to parse host as IP address
ipV4 := ip.To4() // try to convert to IPv4
if ipV4 == nil {
return nil, errors.Errorf("non-IPv4 address %q", host)
}
return []net.IP{ipV4}, nil // host is already an IP address
}
ips, err := net.LookupIP(host) // try to resolve host
if err != nil {
return nil, errors.Wrapf(err, "failed to resolve host %q", host)
}
ips = filterToIPV4(ips)
if len(ips) == 0 {
return nil, errors.Errorf("no IPv4 addresses found for host %q", host)
}
return ips, nil
}

// PeerInfo represents the address of a single peer
type PeerInfo struct {
Addr net.IP
Port uint16
}

func NewPeerInfoFromString(addr string) (PeerInfo, error) {
parts := strings.Split(addr, ":")
if len(parts) != 2 {
return PeerInfo{}, errors.Errorf("invalid addr %s", addr)
func ipsV4PortFromString(addr string) ([]net.IP, uint16, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to split host and port")
}
portNum, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return nil, 0, errors.Errorf("invalid port %q", port)
}
if portNum == 0 {
return nil, 0, errors.Errorf("invalid port %q", port)
}
ips, err := resolveHostToIPsv4(host)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to resolve host")
}
return ips, uint16(portNum), nil
}

// NewPeerInfosFromString creates PeerInfo slice from string 'host:port'.
// It resolves host to IPv4 addresses and creates PeerInfo for each of them.
func NewPeerInfosFromString(addr string) ([]PeerInfo, error) {
ips, portNum, err := ipsV4PortFromString(addr)
if err != nil {
return nil, err
}
res := make([]PeerInfo, 0, len(ips))
for _, ip := range ips {
res = append(res, PeerInfo{
Addr: ip,
Port: portNum,
})
}
return res, nil
}

ip := net.ParseIP(parts[0])
port, err := strconv.ParseUint(parts[1], 10, 16)
// NewPeerInfoFromString creates PeerInfo from string 'host:port'.
// It resolves host to IPv4 addresses and selects the random one using math/rand/v2.
func NewPeerInfoFromString(addr string) (PeerInfo, error) {
ips, portNum, err := ipsV4PortFromString(addr)
if err != nil {
return PeerInfo{}, errors.Errorf("invalid port %s", parts[1])
return PeerInfo{}, err
}
n := rand.IntN(len(ips)) // #nosec: it's ok to use math/rand/v2 here
ip := ips[n] // Select random IPv4 from the list
return PeerInfo{
Addr: ip,
Port: uint16(port),
Port: portNum,
}, nil
}

Expand Down
90 changes: 86 additions & 4 deletions pkg/proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net"
"sort"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/wavesplatform/gowaves/pkg/crypto"
)

Expand Down Expand Up @@ -277,10 +279,90 @@ func TestPeerInfoMarshalJSON(t *testing.T) {
}

func TestNewPeerInfoFromString(t *testing.T) {
rs, err := NewPeerInfoFromString("34.253.153.4:6868")
require.NoError(t, err)
assert.Equal(t, "34.253.153.4", rs.Addr.String())
assert.EqualValues(t, 6868, rs.Port)
tests := []struct {
in string
out PeerInfo
err string
platformDependentErrMsg bool
}{
{in: "34.253.153.4:6868", out: PeerInfo{net.IPv4(34, 253, 153, 4).To4(), 6868}, err: ""},
{
in: "34.444.153.4:6868",
out: PeerInfo{},
err: "failed to resolve host: failed to resolve host \"34.444.153.4\": lookup 34.444.153.4: no such host",
},
{
in: "jfhasjdhfkmnn:6868",
out: PeerInfo{},
err: "failed to resolve host: failed to resolve host \"jfhasjdhfkmnn\": ",
platformDependentErrMsg: true,
},
{
in: "localhost:6868",
out: PeerInfo{net.IPv4(127, 0, 0, 1).To4(), 6868},
err: "",
},
{
in: "127.0.0.1:6868",
out: PeerInfo{net.IPv4(127, 0, 0, 1).To4(), 6868},
err: "",
},
{
in: fmt.Sprintf("34.44.153.4:%d", math.MaxUint16+1),
out: PeerInfo{},
err: fmt.Sprintf("invalid port \"%d\"", math.MaxUint16+1),
},
{
in: fmt.Sprintf("34.44.153.4:%d", -42),
out: PeerInfo{},
err: fmt.Sprintf("invalid port \"%d\"", -42),
},
{in: "34.44.153.4:bugaga", out: PeerInfo{}, err: "invalid port \"bugaga\""},
{in: "34.44.153.4:0", out: PeerInfo{}, err: "invalid port \"0\""},
{in: "34.44.153.4:", out: PeerInfo{}, err: "invalid port \"\""},
{
in: "34.44.153.4",
out: PeerInfo{},
err: "failed to split host and port: address 34.44.153.4: missing port in address",
},
{
in: "34.44.153.4:42:",
out: PeerInfo{},
err: "failed to split host and port: address 34.44.153.4:42:: too many colons in address",
},
}
for i, tc := range tests {
t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) {
t.Run("NewPeerInfoFromString", func(t *testing.T) {
rs, err := NewPeerInfoFromString(tc.in)
if tc.err != "" {
if tc.platformDependentErrMsg {
assert.ErrorContains(t, err, tc.err)
} else {
assert.EqualError(t, err, tc.err)
}
} else {
require.NoError(t, err)
assert.Equal(t, tc.out, rs)
}
})
t.Run("NewPeerInfosFromString", func(t *testing.T) {
rs, err := NewPeerInfosFromString(tc.in)
if tc.err != "" {
if tc.platformDependentErrMsg {
assert.ErrorContains(t, err, tc.err)
} else {
assert.EqualError(t, err, tc.err)
}
} else {
require.NoError(t, err)
assert.Len(t, rs, 1)
res := rs[0]
assert.Equal(t, tc.out, res)
}
})
})
}
}

func TestPeerInfoUnmarshalJSON(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/limit_listener/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (l *limitListener) acquire() bool {
timer := time.NewTimer(l.waitConnQuotaTimeout)
stopTimer := func() {
if !timer.Stop() {
<-timer.C
select {
case <-timer.C:
default:
}
}
}
select {
Expand Down

0 comments on commit 5e753de

Please sign in to comment.