From 8a5c2d534064479d84812284e0f6dbe2f63e3a11 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Mon, 28 Oct 2024 00:04:31 +0800 Subject: [PATCH] fix: concurrent map read and writes --- p2p/discover/portal_protocol.go | 2 +- p2p/discover/v5_udp.go | 25 +++++++++++++++++------ portalnetwork/history/storage.go | 35 +++++++++++++++++++++++++++++++- 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/p2p/discover/portal_protocol.go b/p2p/discover/portal_protocol.go index 98a563626264..56d391c9ea8a 100644 --- a/p2p/discover/portal_protocol.go +++ b/p2p/discover/portal_protocol.go @@ -326,7 +326,7 @@ func (p *PortalProtocol) setupUDPListening() error { func(buf []byte, addr *net.UDPAddr) (int, error) { p.Log.Info("will send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf)) - if n, ok := p.DiscV5.cachedAddrNode[addr.String()]; ok { + if n, ok := p.DiscV5.GetCachedNode(addr.String()); ok { //_, err := p.DiscV5.TalkRequestToID(id, addr, string(portalwire.UTPNetwork), buf) req := &v5wire.TalkRequest{Protocol: string(portalwire.Utp), Message: buf} p.DiscV5.sendFromAnotherThreadWithNode(n, netip.AddrPortFrom(netutil.IPToAddr(addr.IP), uint16(addr.Port)), req) diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 92a97929ea4a..72c9be5179f0 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -64,7 +64,7 @@ type UDPv5 struct { // static fields conn UDPConn tab *Table - cachedIds map[enode.ID]*enode.Node + nodeMu sync.Mutex cachedAddrNode map[string]*enode.Node netrestrict *netutil.Netlist priv *ecdsa.PrivateKey @@ -155,7 +155,6 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) { // static fields conn: newMeteredConn(conn), cachedAddrNode: make(map[string]*enode.Node), - cachedIds: make(map[enode.ID]*enode.Node), localNode: ln, db: ln.Database(), netrestrict: cfg.NetRestrict, @@ -729,8 +728,7 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet, return nonce, err } if c != nil && c.Node != nil { - t.cachedIds[toID] = c.Node - t.cachedAddrNode[toAddr.String()] = c.Node + t.putCache(toAddr.String(), c.Node) } _, err = t.conn.WriteToUDPAddrPort(enc, toAddr) @@ -793,8 +791,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error { if fromNode != nil { // Handshake succeeded, add to table. t.tab.addInboundNode(fromNode) - t.cachedIds[fromID] = fromNode - t.cachedAddrNode[fromAddr.String()] = fromNode + t.putCache(fromAddr.String(), fromNode) } if packet.Kind() != v5wire.WhoareyouPacket { // WHOAREYOU logged separately to report errors. @@ -999,3 +996,19 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes { } return resp } + +func (t *UDPv5) putCache(addr string, node *enode.Node) { + t.nodeMu.Lock() + defer t.nodeMu.Unlock() + if n, ok := t.cachedAddrNode[addr]; ok { + t.log.Debug("Update cached node", "old", n.ID(), "new", node.ID()) + } + t.cachedAddrNode[addr] = node +} + +func (t *UDPv5) GetCachedNode(addr string) (*enode.Node, bool) { + t.nodeMu.Lock() + defer t.nodeMu.Unlock() + n, ok := t.cachedAddrNode[addr] + return n, ok +} diff --git a/portalnetwork/history/storage.go b/portalnetwork/history/storage.go index 57b8f49fd877..28632cdab6b5 100644 --- a/portalnetwork/history/storage.go +++ b/portalnetwork/history/storage.go @@ -34,6 +34,7 @@ const ( deleteSql = "DELETE FROM kvstore WHERE key = (?1);" containSql = "SELECT 1 FROM kvstore WHERE key = (?1);" getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;" + getFarthestDistanceSql = "SELECT key, xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC Limit 1;" deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1" XorFindFarthestQuery = `SELECT xor(key, (?1)) as distance @@ -117,8 +118,8 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora } err = hs.initStmts() - // Check whether we already have data, and use it to set radius + hs.setRadiusToFarthestDistance() // necessary to test NetworkName==history because state also initialize HistoryStorage if strings.ToLower(config.NetworkName) == "history" { @@ -376,6 +377,38 @@ func (p *ContentStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256 return currentRadius, nil } +func (p *ContentStorage) setRadiusToFarthestDistance() { + rows, err := p.sqliteDB.Query(getFarthestDistanceSql, p.nodeId[:]) + if err != nil { + p.log.Error("failed to query farthest distance ", "err", err) + return + } + defer func(rows *sql.Rows) { + if rows != nil { + return + } + err = rows.Close() + if err != nil { + p.log.Error("failed to close rows", "err", err) + } + }(rows) + + if rows.Next() { + var contentId []byte + var distance []byte + err = rows.Scan(&contentId, &distance) + if err != nil { + p.log.Error("failed to scan rows for farthest distance", "err", err) + } + dis := uint256.NewInt(0) + err = dis.UnmarshalSSZ(distance) + if err != nil { + p.log.Error("failed to unmarshal ssz for farthest distance", "err", err) + } + p.radius.Store(dis) + } +} + func (p *ContentStorage) deleteContentFraction(fraction float64) (deleteCount int, err error) { if fraction <= 0 || fraction >= 1 { return deleteCount, errors.New("fraction should be between 0 and 1")