Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use bad peer removal logic only for lightpush and filter and option to restart missing message verifier #1244

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +163,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}

sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand All @@ -188,6 +190,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
Expand Down
42 changes: 34 additions & 8 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MessageTracker interface {
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
type MissingMessageVerifier struct {
ctx context.Context
cancel context.CancelFunc
params missingMessageVerifierParams

storenodeRequestor common.StorenodeRequestor
Expand All @@ -43,10 +44,12 @@ type MissingMessageVerifier struct {
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex

C <-chan *protocol.Envelope
C chan *protocol.Envelope

timesource timesource.Timesource
logger *zap.Logger
timesource timesource.Timesource
logger *zap.Logger
isRunning bool
runningMutex sync.RWMutex
}

// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
Expand All @@ -63,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
criteriaInterest: make(map[string]criteriaInterest),
C: make(chan *protocol.Envelope, 1000),
}
}

Expand Down Expand Up @@ -97,12 +102,24 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
}

func (m *MissingMessageVerifier) setRunning(running bool) {
m.runningMutex.Lock()
defer m.runningMutex.Unlock()
m.isRunning = running
}

func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.ctx = ctx
m.criteriaInterest = make(map[string]criteriaInterest)
m.runningMutex.Lock()
if m.isRunning { //make sure verifier only runs once.
m.runningMutex.Unlock()
return
}
m.isRunning = true
m.runningMutex.Unlock()

c := make(chan *protocol.Envelope, 1000)
m.C = c
ctx, cancelFunc := context.WithCancel(ctx)
m.ctx = ctx
m.cancel = cancelFunc

go func() {
defer utils.LogOnPanic()
Expand All @@ -123,24 +140,33 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
for _, interest := range critIntList {
select {
case <-ctx.Done():
m.setRunning(false)
return
default:
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanic()
m.fetchHistory(c, interest)
m.fetchHistory(m.C, interest)
<-semaphore
}(interest)
}
}

case <-ctx.Done():
m.setRunning(false)
return
}
}
}()
}

func (m *MissingMessageVerifier) Stop() {
m.cancel()
m.runningMutex.Lock()
defer m.runningMutex.Unlock()
m.isRunning = false
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) {
contentTopics := interest.contentFilter.ContentTopics.ToList()
for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest {
Expand Down
26 changes: 8 additions & 18 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2

// 80% relay peers 20% service peers
Expand Down Expand Up @@ -258,32 +257,27 @@ func (pm *PeerManager) Start(ctx context.Context) {
}
}

func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
pm.peerConnector.onlineChecker.IsOnline() {
if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
}
}
}
Expand Down Expand Up @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}

if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
Expand All @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
14 changes: 12 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -267,6 +268,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
wf.metrics.RecordError(dialFailure)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wf.pm.CheckAndRemoveBadPeer(peerID)
}
}
return err
}
Expand Down Expand Up @@ -355,7 +360,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
if params.pm != nil && reqPeerCount > 0 {

wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -368,7 +373,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/filter_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
if err != nil {
if err != nil && wf.onlineChecker.IsOnline() {
richard-ramos marked this conversation as resolved.
Show resolved Hide resolved
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
wakuLP.metrics.RecordError(dialFailure)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wakuLP.pm.CheckAndRemoveBadPeer(peerID)
}
}
return nil, err
}
Expand Down
Loading