-
Notifications
You must be signed in to change notification settings - Fork 0
/
respawn.go
89 lines (74 loc) · 1.94 KB
/
respawn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package main
import (
"context"
"net/netip"
"time"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/wavesplatform/gowaves/pkg/proto"
"github.com/rhansen/go-kairos/kairos"
"github.com/alexeykiselev/waves-fork-detector/peers"
)
const respawnInterval = 10 * time.Second
type Respawn struct {
ctx context.Context
wait func() error
timer *kairos.Timer
reg *peers.Registry
cm *ConnectionManager
}
func NewRespawn(reg *peers.Registry, cm *ConnectionManager) *Respawn {
return &Respawn{
timer: kairos.NewStoppedTimer(),
reg: reg,
cm: cm,
}
}
func (r *Respawn) Run(ctx context.Context) {
g, gc := errgroup.WithContext(ctx)
r.ctx = gc
r.wait = g.Wait
r.timer.Reset(respawnInterval)
g.Go(r.handleEvents)
}
func (r *Respawn) Shutdown() {
if err := r.wait(); err != nil {
zap.S().Warnf("Failed to shutdown Respawn: %v", err)
}
zap.S().Info("Respawn shutdown successfully")
}
func (r *Respawn) handleEvents() error {
for {
select {
case <-r.ctx.Done():
return nil
case <-r.timer.C:
addresses, err := r.reg.TakeAvailableAddresses()
if len(addresses) > 0 {
zap.S().Infof("Trying to establish connections to %d available addresses", len(addresses))
} else {
zap.S().Debugf("[RSP] No available addresses to establish connections")
}
if err != nil {
zap.S().Warnf("Failed to take available addresses: %v", err)
continue
}
r.establishConnections(addresses)
r.timer.Reset(respawnInterval)
}
}
}
func (r *Respawn) establishConnections(addresses []netip.AddrPort) {
for _, a := range addresses {
go func(ap netip.AddrPort) {
addr := proto.NewTCPAddrFromString(ap.String())
if cErr := r.cm.Connect(r.ctx, addr); cErr != nil {
zap.S().Debugf("[RSP] Failed to establish outbound connection: %v", cErr)
if urErr := r.reg.UnregisterPeer(ap.Addr()); urErr != nil {
zap.S().Warnf("Failed to unregister peer on connection failure: %v", urErr)
return
}
}
}(a)
}
}