Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test race #7

Merged
merged 1 commit into from
Oct 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 102 additions & 180 deletions quaistats/quaistats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,19 @@ import (
)

const (
// historyUpdateRange is the number of blocks a node should report upon login or
// history request.
historyUpdateRange = 50

// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10
chainSideChanSize = 10

// reportInterval is the time interval between two reports.
reportInterval = 15

c_alpha = 8
c_tpsLookupCacheLimit = 100
c_gasLookupCacheLimit = 100
c_statsErrorValue = int64(-1)
c_alpha = 8
c_txBatchSize = 20
c_blocksPerMinute = 5
c_blocksPerHour = c_blocksPerMinute * 60
c_txLookupCacheLimit = c_blocksPerHour / c_txBatchSize
c_statsErrorValue = int64(-1)
)

// backend encompasses the bare-minimum functionality needed for quaistats reporting
Expand Down Expand Up @@ -111,8 +109,7 @@ type Service struct {
headSub event.Subscription
sideSub event.Subscription

tpsLookupCache *lru.Cache
gasLookupCache *lru.Cache
txLookupCache *lru.Cache

chainID *big.Int

Expand Down Expand Up @@ -196,22 +193,20 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string,
return err
}

tpsLookupCache, _ := lru.New(c_tpsLookupCacheLimit)
gasLookupCache, _ := lru.New(c_gasLookupCacheLimit)
txLookupCache, _ := lru.New(c_txLookupCacheLimit)

quaistats := &Service{
backend: backend,
engine: engine,
server: node.Server(),
node: parts[0],
pass: parts[1],
host: parts[2],
pongCh: make(chan struct{}),
chainID: backend.ChainConfig().ChainID,
trusted: trustednode,
tpsLookupCache: tpsLookupCache,
gasLookupCache: gasLookupCache,
instanceDir: node.InstanceDir(),
backend: backend,
engine: engine,
server: node.Server(),
node: parts[0],
pass: parts[1],
host: parts[2],
pongCh: make(chan struct{}),
chainID: backend.ChainConfig().ChainID,
trusted: trustednode,
txLookupCache: txLookupCache,
instanceDir: node.InstanceDir(),
}

node.RegisterLifecycle(quaistats)
Expand Down Expand Up @@ -397,7 +392,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chainSideCh chan co
}

// Report internal blockstats every 20 block stats if trusted node
if head.NumberU64()%20 == 0 && s.trusted {
if head.NumberU64()%c_txBatchSize == 0 && s.trusted {
if err = s.reportInternalBlockStats(conns["internalBlockStats"], head); err != nil {
noErrs = false
log.Warn("Block internal stats report failed", "err", err)
Expand Down Expand Up @@ -507,54 +502,6 @@ type loginSecret struct {
Password string `json:"password"`
}

// login tries to authorize the client at the remote server.
func (s *Service) login(conn *connWrapper) error {
// Construct and send the login authentication
infos := s.server.NodeInfo()

var protocols []string
for _, proto := range s.server.Protocols {
protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version))
}
var network string
if info := infos.Protocols["eth"]; info != nil {
network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network)
}
auth := &authMsg{
ID: s.node,
Info: nodeInfo{
Name: s.node,
Node: infos.Name,
Port: infos.Ports.Listener,
Network: network,
Protocol: strings.Join(protocols, ", "),
API: "No",
Os: runtime.GOOS,
OsVer: runtime.GOARCH,
Client: "0.1.1",
History: true,
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
},
Secret: loginSecret{
Name: "admin",
Password: s.pass,
},
}
login := map[string][]interface{}{
"emit": {"hello", auth},
}
if err := conn.WriteJSON(login); err != nil {
return err
}
// Retrieve the remote ack or connection termination
var ack map[string][]string
if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" {
return errors.New("unauthorized")
}
return nil
}

type Credentials struct {
Name string `json:"name"`
Password string `json:"password"`
Expand Down Expand Up @@ -671,64 +618,6 @@ func (s *Service) report(dataType string, conn *connWrapper) error {
return nil
}

type latencyReport struct {
Latency int `json:"latency"`
}

// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
func (s *Service) reportLatency(conn *connWrapper) error {
// Send the current time to the quaistats server
start := time.Now()

ping := map[string][]interface{}{
"emit": {"node-ping", map[string]string{
"id": s.node,
"clientTime": start.String(),
}},
}
if err := conn.WriteJSON(ping); err != nil {
return err
}
// Wait for the pong request to arrive back
select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
// Ping timeout, abort
return errors.New("ping timed out")
}

latency := int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000)

// Send back the measured latency
log.Trace("Sending measured latency to ethstats", "latency", strconv.Itoa(latency))

latencyReport := map[string]interface{}{
"id": s.node,
"latency": &latencyReport{
Latency: latency,
},
}

report := map[string][]interface{}{
"emit": {"latency", latencyReport},
}

return conn.WriteJSON(report)
}

// uncleStats is a custom wrapper around an uncle array to force serializing
// empty arrays instead of returning null for them.
type uncleStats []*types.Header

func (s uncleStats) MarshalJSON() ([]byte, error) {
if uncles := ([]*types.Header)(s); len(uncles) > 0 {
return json.Marshal(uncles)
}
return []byte("[]"), nil
}

// reportSideBlock retrieves the current chain side event and reports it to the stats server.
func (s *Service) reportSideBlock(conn *connWrapper, block *types.Block) error {
log.Trace("Sending new side block to quaistats", "number", block.Number(), "hash", block.Hash())
Expand Down Expand Up @@ -843,76 +732,106 @@ type blockAppendTime struct {
}

type nodeStats struct {
Name string `json:"name"`
Timestamp *big.Int `json:"timestamp"`
RAMUsagePercent int64 `json:"ramUsagePercent"`
RAMFreePercent int64 `json:"ramFreePercent"`
RAMAvailablePercent int64 `json:"ramAvailablePercent"`
CPUUsagePercent int64 `json:"cpuPercent"`
DiskUsagePercent int64 `json:"diskUsagePercent"`
DiskUsageValue int64 `json:"diskUsageValue"`
CurrentBlockNumber []*big.Int `json:"currentBlockNumber"`
Location common.Location `json:"location"`
Name string `json:"name"`
Timestamp *big.Int `json:"timestamp"`
RAMUsage int64 `json:"ramUsage"`
RAMUsagePercent int64 `json:"ramUsagePercent"`
RAMFreePercent int64 `json:"ramFreePercent"`
RAMAvailablePercent int64 `json:"ramAvailablePercent"`
CPUUsagePercent int64 `json:"cpuPercent"`
DiskUsagePercent int64 `json:"diskUsagePercent"`
DiskUsageValue int64 `json:"diskUsageValue"`
CurrentBlockNumber []*big.Int `json:"currentBlockNumber"`
RegionLocation int `json:"regionLocation"`
ZoneLocation int `json:"zoneLocation"`
}

type totalTransactions struct {
TotalNoTransactions1h uint64
TotalNoTransactions1m uint64
}

func (s *Service) calculateTotalNoTransactions20Blocks(block *types.Block) *totalTransactions {
func (s *Service) evictOutdatedEntries(currentMaxBlock uint64) {
minAcceptableBlock := currentMaxBlock - c_blocksPerHour
for key := minAcceptableBlock - 20; key >= minAcceptableBlock-c_blocksPerHour; key -= 20 {
// Check if the key exists before trying to delete
if _, found := s.txLookupCache.Get(key); found {
s.txLookupCache.Remove(key)
} else {
return
}
}
}

func (s *Service) calculateTotalNoTransactions(block *types.Block) *totalTransactions {
var totalTransactions1h uint64
var totalTransactions1m uint64

blockCount := 0
currentBlock := block
for i := 0; i < 20; i++ { // iterate through the last 20 blocks
if currentBlock == nil {
log.Error("Encountered a nil block, stopping iteration")
break
}
batchesNeeded := c_blocksPerHour / c_txBatchSize // calculate how many batches of c_txBatchSize are needed

// Add the number of transactions in the current block to the total
totalTransactions1h += uint64(len(currentBlock.Transactions()))
if i < 5 {
totalTransactions1m += uint64(len(currentBlock.Transactions()))
}
for i := 0; i < batchesNeeded; i++ {
startBlockNum := currentBlock.NumberU64() - uint64(i*c_txBatchSize)

// Get the parent block for the next iteration
fullBackend, ok := s.backend.(fullNodeBackend)
// Try to get the data from the LRU cache
cachedTxCount, ok := s.txLookupCache.Get(startBlockNum)
if !ok {
log.Error("Not running fullnode, cannot get parent block")
break
// Not in cache, so we need to calculate the transaction count for this batch
txCount := uint64(0)

for j := 0; j < c_txBatchSize; j++ {
if currentBlock == nil {
log.Error("Encountered a nil block, stopping iteration")
break
}

// Add the number of transactions in the current block to the total
txCount += uint64(len(currentBlock.Transactions()))

// If within the last 5 blocks, add to the 1-minute total
if i == 0 && j < c_blocksPerMinute {
totalTransactions1m += uint64(len(currentBlock.Transactions()))
}

// Get the parent block for the next iteration
fullBackend, ok := s.backend.(fullNodeBackend)
if !ok {
log.Error("Not running fullnode, cannot get parent block")
break
}

var err error
currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1))
if err != nil {
log.Error(fmt.Sprintf("Error getting block number %d: %s", int(currentBlock.NumberU64())-1, err.Error()))
break
}
}

// Store the sum in the cache
s.txLookupCache.Add(startBlockNum, txCount)

cachedTxCount = txCount
}

var err error
currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1))
if err != nil {
log.Error(fmt.Sprintf("Error getting block %d: %s", int(block.NumberU64())-i, err.Error()))
// Add the transactions from this batch
txCount, ok := cachedTxCount.(uint64)
if !ok {
log.Error("Error casting cachedTxCount to uint64")
break
}
blockCount += 1
totalTransactions1h += txCount
}

if blockCount < 1 {
log.Error("No blocks found in the last 20 blocks")
return &totalTransactions{
TotalNoTransactions1h: 0,
TotalNoTransactions1m: 0,
}
if s.txLookupCache.Len() > c_txLookupCacheLimit {
s.evictOutdatedEntries(block.NumberU64())
}

// Now totalTransactions1h and totalTransactions1m have the transaction counts for the last c_blocksPerHour and c_txBatchSize blocks respectively
return &totalTransactions{
TotalNoTransactions1h: totalTransactions1h * (300 / uint64(blockCount)),
TotalNoTransactions1m: totalTransactions1m * (5 / min(blockCount, 5)),
}
}

func min(a, b int) uint64 {
if a < b {
return uint64(a)
TotalNoTransactions1h: totalTransactions1h,
TotalNoTransactions1m: totalTransactions1m,
}
return uint64(b)
}

func (s *Service) assembleBlockLocationStats(block *types.Block) *blockLocation {
Expand Down Expand Up @@ -949,7 +868,7 @@ func (s *Service) assembleBlockAppendTimeStats(block *types.Block) *blockAppendT

func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockStats {
header := block.Header()
totalTransactions := s.calculateTotalNoTransactions20Blocks(block)
totalTransactions := s.calculateTotalNoTransactions(block)

// Assemble and return the block stats
return &internalBlockStats{
Expand All @@ -965,11 +884,12 @@ func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockS
// mining layer and reports it to the stats server.
func (s *Service) reportNodeStats(conn *connWrapper) error {
// Get RAM usage
var ramUsagePercent, ramFreePercent, ramAvailablePercent float64
var ramUsagePercent, ramFreePercent, ramAvailablePercent, ramUsage float64
if vmStat, err := mem.VirtualMemory(); err == nil {
ramUsagePercent = float64(vmStat.UsedPercent)
ramFreePercent = float64(vmStat.Free) / float64(vmStat.Total) * 100
ramAvailablePercent = float64(vmStat.Available) / float64(vmStat.Total) * 100
ramUsage = float64(vmStat.Used)
} else {
log.Warn("Error getting RAM usage:", err)
// Handle error, possibly with default values or by returning an error
Expand Down Expand Up @@ -1013,14 +933,16 @@ func (s *Service) reportNodeStats(conn *connWrapper) error {
"stats": &nodeStats{
Name: s.node,
Timestamp: big.NewInt(time.Now().Unix()), // Current timestamp
RAMUsage: int64(ramUsage),
RAMUsagePercent: int64(ramUsagePercent),
RAMFreePercent: int64(ramFreePercent),
RAMAvailablePercent: int64(ramAvailablePercent),
CPUUsagePercent: int64(cpuUsagePercent),
DiskUsageValue: int64(diskUsage),
DiskUsagePercent: int64(diskUsagePercent),
CurrentBlockNumber: currentBlockHeight,
Location: location,
RegionLocation: location.Region(),
ZoneLocation: location.Zone(),
},
}

Expand Down
Loading