-
Notifications
You must be signed in to change notification settings - Fork 56
/
lazyvoter.go
284 lines (232 loc) · 7.15 KB
/
lazyvoter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package app
import (
"context"
"sync"
"time"
atypes "github.com/omni-network/omni/halo/attest/types"
"github.com/omni-network/omni/halo/attest/voter"
"github.com/omni-network/omni/halo/comet"
vtypes "github.com/omni-network/omni/halo/valsync/types"
"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/ethclient"
"github.com/omni-network/omni/lib/expbackoff"
"github.com/omni-network/omni/lib/k1util"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/xchain"
xprovider "github.com/omni-network/omni/lib/xchain/provider"
"github.com/cometbft/cometbft/crypto"
"github.com/ethereum/go-ethereum/common"
)
var _ atypes.Voter = (*voterLoader)(nil)
var _ atypes.VoterDeps = voteDeps{}
type voteDeps struct {
comet.API
cchain.Provider
}
// voterLoader wraps a voter instances that is lazy loaded from the on-chain registry.
// It is basically a noop while not loaded.
type voterLoader struct {
mu sync.Mutex
voter *voter.Voter
proposed []*atypes.AttestHeader
committed []*atypes.AttestHeader
lastValSet *vtypes.ValidatorSetResponse
isVal bool
localAddr common.Address
}
func newVoterLoader(privKey crypto.PrivKey) (*voterLoader, error) {
localAddr, err := k1util.PubKeyToAddress(privKey.PubKey())
if err != nil {
return nil, err
}
return &voterLoader{
localAddr: localAddr,
}, nil
}
// LazyLoad blocks until the network config can be loaded from the on-chain registry, then it initializes and starts
// the voter instance and binds it to the lazy wrapper.
//
//nolint:nestif // 2 levels is not that bad
func (l *voterLoader) LazyLoad(
ctx context.Context,
netID netconf.ID,
omniEVMCl ethclient.Client,
endpoints xchain.RPCEndpoints,
cprov cchain.Provider,
privKey crypto.PrivKey,
voterStateFile string,
cmtAPI comet.API,
asyncAbort chan<- error,
) error {
if len(endpoints) == 0 {
log.Warn(ctx, "Flag --xchain-evm-rpc-endpoints empty. The app will crash if it becomes a validator since it cannot perform xchain voting duties", nil)
}
if !l.isValidator() {
log.Info(ctx, "Local halo node is not a validator")
}
// Wait until this node becomes a validator before initializing voter.
// This mitigates crashes due to invalid rpc endpoint config in non-validator nodes.
backoff := expbackoff.New(ctx, expbackoff.WithPeriodicConfig(time.Second))
for !l.isValidator() {
backoff()
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "lazy loading canceled")
}
}
log.Info(ctx, "🫡 Local halo node is a validator, starting voter")
if len(endpoints) == 0 {
// Note that this negatively affects chain liveness, but xchain liveness already negatively affected so rather
// highlight the issue to the operator by crashing. #allornothing
return errors.New("flag --xchain-evm-rpc-endpoints empty so cannot perform xchain voting duties")
}
// Use the RPCEndpoints config as the list of expected chains to load from the registry.
// This is required for fresh genesis chains since portals are registered one at a time.
// So netconf.AwaitOnConsensusChain can wait for all to be registered before returning.
//
// For existing chains however, clear expected, since we take what we get on-chain
// and avoid a dependency on possibly mismatching/incorrect RPCEndpoints config.
//
// TODO(corver): Dynamic reloading of voter when on-chain registry is updated.
expected := endpoints.Keys()
const day = 100_000 // At least a day old
if height, err := omniEVMCl.BlockNumber(ctx); err == nil && height > day {
expected = nil
}
network, err := netconf.AwaitOnConsensusChain(ctx, netID, cprov, expected)
if err != nil {
return err
}
var xprov xchain.Provider
if netID == netconf.Simnet {
omni, ok := network.OmniConsensusChain()
if !ok {
return errors.New("omni chain not found in network")
}
xprov, err = xprovider.NewMock(omni.BlockPeriod*8/10, omni.ID, cprov)
if err != nil {
return err
}
} else {
ethClients := make(map[uint64]ethclient.Client)
for _, chain := range network.EVMChains() {
// Use EngineAPI as omni_evm RPC client.
if netconf.IsOmniExecution(netID, chain.ID) {
ethClients[chain.ID] = omniEVMCl
continue
}
rpc, err := endpoints.ByNameOrID(chain.Name, chain.ID)
if err != nil {
return err
}
ethCl, err := ethclient.Dial(chain.Name, rpc)
if err != nil {
return err
}
ethClients[chain.ID] = ethCl
}
xprov = xprovider.New(network, ethClients, cprov)
}
deps := voteDeps{
API: cmtAPI,
Provider: cprov,
}
v, err := voter.LoadVoter(privKey, voterStateFile, xprov, deps, network, asyncAbort)
if err != nil {
return errors.Wrap(err, "create voter")
}
l.mu.Lock()
defer l.mu.Unlock()
// Process all cached values
if err := v.SetProposed(ctx, l.proposed); err != nil {
return errors.Wrap(err, "set cached proposed")
}
if err := v.SetCommitted(ctx, l.committed); err != nil {
return errors.Wrap(err, "set cached committed")
}
if l.lastValSet != nil {
if err := v.UpdateValidatorSet(l.lastValSet); err != nil {
return errors.Wrap(err, "update validator set")
}
}
// Clear all cached values
l.proposed = nil
l.committed = nil
l.lastValSet = nil
// Set voter and start it
l.voter = v
v.Start(ctx)
return nil
}
func (l *voterLoader) getVoter() (*voter.Voter, bool) {
l.mu.Lock()
defer l.mu.Unlock()
return l.voter, l.voter != nil
}
func (l *voterLoader) isValidator() bool {
l.mu.Lock()
defer l.mu.Unlock()
return l.isVal
}
func (l *voterLoader) GetAvailable() []*atypes.Vote {
if v, ok := l.getVoter(); ok {
return v.GetAvailable()
}
return nil // Return empty list if voter not available yet.
}
func (l *voterLoader) SetProposed(ctx context.Context, headers []*atypes.AttestHeader) error {
if v, ok := l.getVoter(); ok {
return v.SetProposed(ctx, headers)
}
// Cache these headers to provider to voter once available.
// This could be votes we sent right before a restart.
l.mu.Lock()
defer l.mu.Unlock()
l.proposed = append(l.proposed, headers...)
return nil
}
func (l *voterLoader) SetCommitted(ctx context.Context, headers []*atypes.AttestHeader) error {
if v, ok := l.getVoter(); ok {
return v.SetCommitted(ctx, headers)
}
// Cache these headers to provider to voter once available.
// This could be votes we sent right before a restart.
l.mu.Lock()
defer l.mu.Unlock()
l.committed = append(l.committed, headers...)
return nil
}
func (l *voterLoader) LocalAddress() common.Address {
if v, ok := l.getVoter(); ok {
return v.LocalAddress()
}
return l.localAddr
}
func (l *voterLoader) TrimBehind(minsByChain map[xchain.ChainVersion]uint64) int {
if v, ok := l.getVoter(); ok {
return v.TrimBehind(minsByChain)
}
return 0
}
func (l *voterLoader) UpdateValidatorSet(valset *vtypes.ValidatorSetResponse) error {
isVal, err := valset.IsValidator(l.localAddr)
if err != nil {
return err
}
setConstantGauge(cometValidator, isVal)
if v, ok := l.getVoter(); ok {
return v.UpdateValidatorSet(valset)
}
l.mu.Lock()
defer l.mu.Unlock()
l.isVal = isVal
l.lastValSet = valset
return nil
}
func (l *voterLoader) WaitDone() {
if v, ok := l.getVoter(); ok {
v.WaitDone()
return
}
}