Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Goroutine fix #122

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a8fbeda
Bugfix: InternalToExternal tx estimate gas returns valid gas limit an…
jdowning100 Mar 5, 2024
d8b254b
Added base fee enforcement and sorting by fee to Qi txs
jdowning100 Mar 6, 2024
160d01f
bugfix: modify msg payload to include length prefix
wizeguyy Mar 6, 2024
7944225
Implement custom propagation hooks to prevent bad data from spreading
Djadih Mar 4, 2024
b2ce90d
Refactor getRunningSlices and getRunningRegions for clarity
Djadih Mar 8, 2024
5d242bd
No need to initialize peers in the peerdb
Djadih Mar 8, 2024
a8d21d7
Add mutex lock to peerDb counter
Djadih Mar 8, 2024
8fbfec8
Create new peerDb for each location
Djadih Mar 8, 2024
9cb62a8
Create getDoms helper method to add peers to all dom locations
Djadih Mar 8, 2024
3506c49
Rearchitect the PeerManager and DBs to support peer bucketing by topic
Djadih Mar 8, 2024
c654e99
Establish timeout for stale requests
Djadih Mar 11, 2024
729313c
Refactored ETX Set to be an ordered list, added header commitment, ET…
jdowning100 Mar 11, 2024
a618b69
Optimization: Store all ETXs, only store ETX hashes for each block
jdowning100 Mar 13, 2024
a3ef7dc
Create APIs for Qi and Quai rate by number+hash
Djadih Mar 13, 2024
4557106
add genesis utxos
alanorwick Mar 5, 2024
600fe05
Miscellaneous error handling and print fixes
Djadih Mar 12, 2024
7ef2113
Send result to resultCh for processing even when nil
Djadih Mar 12, 2024
78f3a1d
Correctly clean up old streams from the streamCache
Djadih Mar 12, 2024
2eda1d9
Parallelize message handling from main read loop
Djadih Mar 12, 2024
8b541ee
Bugfix: Worker should commit to EtxSetHash
jdowning100 Mar 15, 2024
3437f20
Maps are reference types
Djadih Mar 18, 2024
53b5891
Abstract peer manager as interface
Djadih Mar 18, 2024
a2f4583
Separate stream manager and peer manager
Djadih Mar 18, 2024
f8f2375
Protect threads
Djadih Mar 18, 2024
bfc58c1
Enable Pprof
Djadih Mar 11, 2024
a446849
Custom bootnodes
Djadih Feb 29, 2024
1431c77
Change Prime settings
Djadih Feb 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
build/bin/
nodelogs/
traces/
/data/
*/config.toml
*/private.key
.vscode/
.idea*
.idea*
36 changes: 18 additions & 18 deletions cmd/utils/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@ func StartQuaiBackend(ctx context.Context, p2p quai.NetworkingAPI, logLevel stri
// Set the p2p backend inside the quaiBackend
quaiBackend.SetP2PApiBackend(p2p)

slicesRunning := getSlicesRunning()
regionsRunning := getRegionsRunning(slicesRunning)
runningSlices := GetRunningZones()
runningRegions := GetRunningRegions(runningSlices)

// Start nodes in separate goroutines
startNode("nodelogs/prime.log", nil, slicesRunning)
for _, region := range regionsRunning {
startNode("nodelogs/prime.log", nil, runningSlices)
for _, region := range runningRegions {
nodelogsFileName := "nodelogs/region-" + fmt.Sprintf("%d", region) + ".log"
startNode(nodelogsFileName, common.Location{region}, slicesRunning)
startNode(nodelogsFileName, common.Location{region}, runningSlices)
}
for _, slice := range slicesRunning {
for _, slice := range runningSlices {
nodelogsFileName := "nodelogs/zone-" + fmt.Sprintf("%d", slice[0]) + "-" + fmt.Sprintf("%d", slice[1]) + ".log"
startNode(nodelogsFileName, slice, slicesRunning)
startNode(nodelogsFileName, slice, runningSlices)
}

return quaiBackend, nil
}

// setSlicesRunning sets the slices running flag
func getSlicesRunning() []common.Location {
// GetRunningZones returns the slices that are processing state (which are only zones)
func GetRunningZones() []common.Location {
slices := strings.Split(viper.GetString(SlicesRunningFlag.Name), ",")

// Sanity checks
Expand All @@ -86,22 +86,22 @@ func getSlicesRunning() []common.Location {
if len(slices) > common.NumRegionsInPrime*common.NumZonesInRegion {
Fatalf("number of slices exceed the current ontology")
}
slicesRunning := []common.Location{}
runningSlices := []common.Location{}
for _, slice := range slices {
slicesRunning = append(slicesRunning, common.Location{slice[1] - 48, slice[3] - 48})
runningSlices = append(runningSlices, common.Location{slice[1] - 48, slice[3] - 48})
}
return slicesRunning
return runningSlices
}

// getRegionsRunning returns the regions running
func getRegionsRunning(slicesRunning []common.Location) []byte {
regionsRunning := []byte{}
for _, slice := range slicesRunning {
if !slices.Contains(regionsRunning, slice[0]) {
regionsRunning = append(regionsRunning, slice[0])
func GetRunningRegions(runningSlices []common.Location) []byte {
runningRegions := []byte{}
for _, slice := range runningSlices {
if !slices.Contains(runningRegions, slice[0]) {
runningRegions = append(runningRegions, slice[0])
}
}
return regionsRunning
return runningRegions
}

func StartNode(stack *node.Node) {
Expand Down
60 changes: 59 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package utils

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"regexp"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -86,6 +90,7 @@ var NodeFlags = []Flag{
UnlockedAccountFlag,
PasswordFileFlag,
VMEnableDebugFlag,
PprofFlag,
InsecureUnlockAllowedFlag,
GpoBlocksFlag,
GpoPercentileFlag,
Expand Down Expand Up @@ -465,6 +470,12 @@ var (
Usage: "Record information useful for VM and contract debugging" + generateEnvDoc(c_NodeFlagPrefix+"vmdebug"),
}

PprofFlag = Flag{
Name: "pprof",
Value: false,
Usage: "Enable the pprof HTTP server",
}

InsecureUnlockAllowedFlag = Flag{
Name: c_NodeFlagPrefix + "allow-insecure-unlock",
Value: false,
Expand Down Expand Up @@ -859,7 +870,7 @@ func setSubUrls(cfg *quaiconfig.Config, nodeLocation common.Location) {
switch nodeLocation.Context() {
case common.PRIME_CTX:
subUrls := []string{}
regionsRunning := getRegionsRunning(slicesRunning)
regionsRunning := GetRunningRegions(slicesRunning)
for _, region := range regionsRunning {
subUrls = append(subUrls, fmt.Sprintf("ws://127.0.0.1:%d", 8002+int(region)))
}
Expand Down Expand Up @@ -1212,10 +1223,51 @@ func CheckExclusive(args ...interface{}) {
}
}

func EnablePprof(nodeLocation common.Location) {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
var port string
myContext := nodeLocation
switch {
case bytes.Equal(myContext, []byte{}): // PRIME
port = "8081"
case bytes.Equal(myContext, []byte{0}): // Region 0
port = "8090"
// case bytes.Equal(myContext, []byte{1}): // Region 1
// port = "8100"
// case bytes.Equal(myContext, []byte{2}): // Region 2
// port = "8110"
case bytes.Equal(myContext, []byte{0, 0}): // Zone 0-0
port = "8091"
// case bytes.Equal(myContext, []byte{0, 1}): // Zone 0-1
// port = "8092"
// case bytes.Equal(myContext, []byte{0, 2}): // Zone 0-2
// port = "8093"
// case bytes.Equal(myContext, []byte{1, 0}): // Zone 1-0
// port = "8101"
// case bytes.Equal(myContext, []byte{1, 1}): // Zone 1-1
// port = "8102"
// case bytes.Equal(myContext, []byte{1, 2}): // Zone 1-2
// port = "8103"
// case bytes.Equal(myContext, []byte{2, 0}): // Zone 2-0
// port = "8111"
// case bytes.Equal(myContext, []byte{2, 1}): // Zone 2-1
// port = "8112"
// case bytes.Equal(myContext, []byte{2, 2}): // Zone 2-2
// port = "8113"
default:
port = "8085"
}
go func() {
log.Global.Print(http.ListenAndServe("localhost:"+port, nil))
}()
}

// SetQuaiConfig applies quai-related command line flags to the config.
func SetQuaiConfig(stack *node.Node, cfg *quaiconfig.Config, slicesRunning []common.Location, nodeLocation common.Location, logger *log.Logger) {
cfg.NodeLocation = nodeLocation
cfg.SlicesRunning = slicesRunning

// only set etherbase if its a zone chain
if len(nodeLocation) == 2 {
setEtherbase(cfg)
Expand All @@ -1242,6 +1294,12 @@ func SetQuaiConfig(stack *node.Node, cfg *quaiconfig.Config, slicesRunning []com
// set the gas limit ceil
setGasLimitCeil(cfg)

// if cfg.PprofFlag {
if viper.IsSet(PprofFlag.Name) {
log.Global.Info("Starting pprof server")
EnablePprof(nodeLocation)
}

// Cap the cache allowance and tune the garbage collector
mem, err := gopsutil.VirtualMemory()
if err == nil {
Expand Down
5 changes: 2 additions & 3 deletions common/bootnodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package common

var (
BootstrapPeers = []string{
"/ip4/34.31.180.127/tcp/4001/p2p/12D3KooWJ8d3AnGzscdVf2kzxzTbJgDtDXqrHS8vnedrFmZr8uvK",
"/ip4/34.68.124.139/tcp/4001/p2p/12D3KooWLxXHZx2QGjJra2oWjGYQ3Vdrkew5cNZcEgfaiKi7Dfrx",
"/ip4/34.41.131.24/tcp/4001/p2p/12D3KooWD8kycTRgZAovnciaoyNzcJxW16hCFNSfJYm4Lj6ouCBd",
"/ip4/34.31.180.127/tcp/4001/p2p/12D3KooWMukBUrnZ2LqBTCg16qVjU642F2QLYj2KjhTq3dCoJiMo",
"/ip4/34.68.124.139/tcp/4001/p2p/12D3KooWRJoK7ebWk33DQmF97sfQV5R1k6avNT4E8aB6QC6fJskG",
}
)
10 changes: 4 additions & 6 deletions common/stream_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,15 @@ func WriteMessageToStream(stream network.Stream, msg []byte) error {
return errors.Wrap(err, "failed to set write deadline")
}

// Get the length of the message and convert it into 4 bytes
// Get the length of the message and encode it
msgLen := uint32(len(msg))
lenBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lenBytes, msgLen)

// First write the length of the message
if _, err := stream.Write(lenBytes); err != nil {
return errors.Wrap(err, "failed to write message length to stream")
}
// Prefix the message with the encoded length
msg = append(lenBytes, msg...)

// Then write the message itself
// Then write the message
_, err := stream.Write(msg)
if err != nil {
return errors.Wrap(err, "failed to write message to stream")
Expand Down
19 changes: 19 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,25 @@ func (loc Location) SubInSlice(slice Location) Location {
return subLoc
}

// GetDoms returns the dom locations that must be running for a given location
// For example:
// - if a region-0 calls GetDoms() the result will be
// [prime, region-0]
// - if a zone-0-0 calls GetDoms() the result will be
// [prime, region-0, zone-0-0]
func (loc Location) GetDoms() []Location {
var dominantLocations []Location

// Always start with the prime location
dominantLocations = append(dominantLocations, Location{})

for i := range loc {
dominantLocations = append(dominantLocations, loc[:i+1])
}

return dominantLocations
}

func (loc Location) InSameSliceAs(cmp Location) bool {
// Figure out which location is shorter
shorter := loc
Expand Down
2 changes: 2 additions & 0 deletions consensus/blake3pow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,8 @@ func (blake3pow *Blake3pow) Finalize(chain consensus.ChainHeaderReader, header *
continue
}
}

core.AddGenesisUtxos(state, nodeLocation, blake3pow.logger)
}
header.SetUTXORoot(state.UTXORoot())
header.SetEVMRoot(state.IntermediateRoot(true))
Expand Down
21 changes: 2 additions & 19 deletions consensus/misc/rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,8 @@ func calculateQiReward(header *types.Header) *big.Int {
return big.NewInt(1000)
}

func CalculateRewardForQi(header *types.Header) map[uint8]uint8 {
rewardFromDifficulty := new(big.Int).Add(types.Denominations[types.MaxDenomination], types.Denominations[10])
return findMinDenominations(rewardFromDifficulty)
}

func CalculateRewardForQiWithFees(header *types.Header, fees *big.Int) map[uint8]uint8 {
rewardFromDifficulty := new(big.Int).Add(types.Denominations[types.MaxDenomination], types.Denominations[10])
reward := new(big.Int).Add(rewardFromDifficulty, fees)
return findMinDenominations(reward)
}

func CalculateRewardForQiWithFeesBigInt(header *types.Header, fees *big.Int) *big.Int {
rewardFromDifficulty := new(big.Int).Add(types.Denominations[types.MaxDenomination], types.Denominations[10])
reward := new(big.Int).Add(rewardFromDifficulty, fees)
return reward
}

// findMinDenominations finds the minimum number of denominations to make up the reward
func findMinDenominations(reward *big.Int) map[uint8]uint8 {
// FindMinDenominations finds the minimum number of denominations to make up the reward
func FindMinDenominations(reward *big.Int) map[uint8]uint8 {
// Store the count of each denomination used (map denomination to count)
denominationCount := make(map[uint8]uint8)
amount := new(big.Int).Set(reward)
Expand Down
1 change: 1 addition & 0 deletions consensus/progpow/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func (progpow *Progpow) Finalize(chain consensus.ChainHeaderReader, header *type
continue
}
}
core.AddGenesisUtxos(state, nodeLocation, progpow.logger)
}
header.SetUTXORoot(state.UTXORoot())
header.SetEVMRoot(state.IntermediateRoot(true))
Expand Down
16 changes: 15 additions & 1 deletion core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
// transition, such as amount of used gas, the receipt roots and the state root
// itself. ValidateState returns a database batch if the validation was a success
// otherwise nil and an error is returned.
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, utxoEtxs []*types.Transaction, usedGas uint64) error {
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, utxoEtxs []*types.Transaction, etxSet *types.EtxSet, usedGas uint64) error {
start := time.Now()
header := types.CopyHeader(block.Header())
time1 := common.PrettyDuration(time.Since(start))
Expand Down Expand Up @@ -139,6 +139,20 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
if etxHash := types.DeriveSha(emittedEtxs, trie.NewStackTrie(nil)); etxHash != header.EtxHash() {
return fmt.Errorf("invalid etx hash (remote: %x local: %x)", header.EtxHash(), etxHash)
}
// Confirm the ETX set used by the block matches the ETX set given in the block body
// This is the resulting ETX set after all ETXs in the block have been processed
// After validation, this ETX set should be stored in the database
if etxSet != nil {
etxSetHash := etxSet.Hash()
if etxSetHash != block.EtxSetHash() {
return fmt.Errorf("expected ETX Set hash %x does not match block ETXSetHash %x", etxSetHash, block.EtxSetHash())
}
} else {
if block.EtxSetHash() != types.EmptyEtxSetHash {
return fmt.Errorf("expected ETX Set hash %x does not match block ETXSetHash %x", types.EmptyRootHash, block.EtxSetHash())
}

}
v.hc.logger.WithFields(log.Fields{
"t1": time1,
"t2": time2,
Expand Down
4 changes: 4 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ var (
// would violate the block's ETX limits.
ErrEtxLimitReached = errors.New("etx limit reached")

// ErrEtxGasLimitReached is returned when the gas limit of an ETX is greater
// than the maximum allowed.
ErrEtxGasLimitReached = errors.New("etx gas limit greater than maximum allowed")

// ErrInsufficientFundsForTransfer is returned if the transaction sender doesn't
// have enough funds for transfer(topmost call only).
ErrInsufficientFundsForTransfer = errors.New("insufficient funds for transfer")
Expand Down
Loading