Skip to content

Commit

Permalink
Calculate 1h transactions using a cache rather than extrapolating & r…
Browse files Browse the repository at this point in the history
…emove more old code
  • Loading branch information
robschleusner committed Oct 19, 2023
1 parent bafa6df commit ed46410
Showing 1 changed file with 102 additions and 180 deletions.
282 changes: 102 additions & 180 deletions quaistats/quaistats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,19 @@ import (
)

const (
// historyUpdateRange is the number of blocks a node should report upon login or
// history request.
historyUpdateRange = 50

// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10
chainSideChanSize = 10

// reportInterval is the time interval between two reports.
reportInterval = 15

c_alpha = 8
c_tpsLookupCacheLimit = 100
c_gasLookupCacheLimit = 100
c_statsErrorValue = int64(-1)
c_alpha = 8
c_txBatchSize = 20
c_blocksPerMinute = 5
c_blocksPerHour = c_blocksPerMinute * 60
c_txLookupCacheLimit = c_blocksPerHour / c_txBatchSize
c_statsErrorValue = int64(-1)
)

// backend encompasses the bare-minimum functionality needed for quaistats reporting
Expand Down Expand Up @@ -111,8 +109,7 @@ type Service struct {
headSub event.Subscription
sideSub event.Subscription

tpsLookupCache *lru.Cache
gasLookupCache *lru.Cache
txLookupCache *lru.Cache

chainID *big.Int

Expand Down Expand Up @@ -196,22 +193,20 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string,
return err
}

tpsLookupCache, _ := lru.New(c_tpsLookupCacheLimit)
gasLookupCache, _ := lru.New(c_gasLookupCacheLimit)
txLookupCache, _ := lru.New(c_txLookupCacheLimit)

quaistats := &Service{
backend: backend,
engine: engine,
server: node.Server(),
node: parts[0],
pass: parts[1],
host: parts[2],
pongCh: make(chan struct{}),
chainID: backend.ChainConfig().ChainID,
trusted: trustednode,
tpsLookupCache: tpsLookupCache,
gasLookupCache: gasLookupCache,
instanceDir: node.InstanceDir(),
backend: backend,
engine: engine,
server: node.Server(),
node: parts[0],
pass: parts[1],
host: parts[2],
pongCh: make(chan struct{}),
chainID: backend.ChainConfig().ChainID,
trusted: trustednode,
txLookupCache: txLookupCache,
instanceDir: node.InstanceDir(),
}

node.RegisterLifecycle(quaistats)
Expand Down Expand Up @@ -397,7 +392,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chainSideCh chan co
}

// Report internal blockstats every 20 block stats if trusted node
if head.NumberU64()%20 == 0 && s.trusted {
if head.NumberU64()%c_txBatchSize == 0 && s.trusted {
if err = s.reportInternalBlockStats(conns["internalBlockStats"], head); err != nil {
noErrs = false
log.Warn("Block internal stats report failed", "err", err)
Expand Down Expand Up @@ -507,54 +502,6 @@ type loginSecret struct {
Password string `json:"password"`
}

// login tries to authorize the client at the remote server.
func (s *Service) login(conn *connWrapper) error {
// Construct and send the login authentication
infos := s.server.NodeInfo()

var protocols []string
for _, proto := range s.server.Protocols {
protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version))
}
var network string
if info := infos.Protocols["eth"]; info != nil {
network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network)
}
auth := &authMsg{
ID: s.node,
Info: nodeInfo{
Name: s.node,
Node: infos.Name,
Port: infos.Ports.Listener,
Network: network,
Protocol: strings.Join(protocols, ", "),
API: "No",
Os: runtime.GOOS,
OsVer: runtime.GOARCH,
Client: "0.1.1",
History: true,
Chain: common.NodeLocation.Name(),
ChainID: s.chainID.Uint64(),
},
Secret: loginSecret{
Name: "admin",
Password: s.pass,
},
}
login := map[string][]interface{}{
"emit": {"hello", auth},
}
if err := conn.WriteJSON(login); err != nil {
return err
}
// Retrieve the remote ack or connection termination
var ack map[string][]string
if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" {
return errors.New("unauthorized")
}
return nil
}

type Credentials struct {
Name string `json:"name"`
Password string `json:"password"`
Expand Down Expand Up @@ -671,64 +618,6 @@ func (s *Service) report(dataType string, conn *connWrapper) error {
return nil
}

type latencyReport struct {
Latency int `json:"latency"`
}

// reportLatency sends a ping request to the server, measures the RTT time and
// finally sends a latency update.
func (s *Service) reportLatency(conn *connWrapper) error {
// Send the current time to the quaistats server
start := time.Now()

ping := map[string][]interface{}{
"emit": {"node-ping", map[string]string{
"id": s.node,
"clientTime": start.String(),
}},
}
if err := conn.WriteJSON(ping); err != nil {
return err
}
// Wait for the pong request to arrive back
select {
case <-s.pongCh:
// Pong delivered, report the latency
case <-time.After(5 * time.Second):
// Ping timeout, abort
return errors.New("ping timed out")
}

latency := int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000)

// Send back the measured latency
log.Trace("Sending measured latency to ethstats", "latency", strconv.Itoa(latency))

latencyReport := map[string]interface{}{
"id": s.node,
"latency": &latencyReport{
Latency: latency,
},
}

report := map[string][]interface{}{
"emit": {"latency", latencyReport},
}

return conn.WriteJSON(report)
}

// uncleStats is a custom wrapper around an uncle array to force serializing
// empty arrays instead of returning null for them.
type uncleStats []*types.Header

func (s uncleStats) MarshalJSON() ([]byte, error) {
if uncles := ([]*types.Header)(s); len(uncles) > 0 {
return json.Marshal(uncles)
}
return []byte("[]"), nil
}

// reportSideBlock retrieves the current chain side event and reports it to the stats server.
func (s *Service) reportSideBlock(conn *connWrapper, block *types.Block) error {
log.Trace("Sending new side block to quaistats", "number", block.Number(), "hash", block.Hash())
Expand Down Expand Up @@ -843,76 +732,106 @@ type blockAppendTime struct {
}

type nodeStats struct {
Name string `json:"name"`
Timestamp *big.Int `json:"timestamp"`
RAMUsagePercent int64 `json:"ramUsagePercent"`
RAMFreePercent int64 `json:"ramFreePercent"`
RAMAvailablePercent int64 `json:"ramAvailablePercent"`
CPUUsagePercent int64 `json:"cpuPercent"`
DiskUsagePercent int64 `json:"diskUsagePercent"`
DiskUsageValue int64 `json:"diskUsageValue"`
CurrentBlockNumber []*big.Int `json:"currentBlockNumber"`
Location common.Location `json:"location"`
Name string `json:"name"`
Timestamp *big.Int `json:"timestamp"`
RAMUsage int64 `json:"ramUsage"`
RAMUsagePercent int64 `json:"ramUsagePercent"`
RAMFreePercent int64 `json:"ramFreePercent"`
RAMAvailablePercent int64 `json:"ramAvailablePercent"`
CPUUsagePercent int64 `json:"cpuPercent"`
DiskUsagePercent int64 `json:"diskUsagePercent"`
DiskUsageValue int64 `json:"diskUsageValue"`
CurrentBlockNumber []*big.Int `json:"currentBlockNumber"`
RegionLocation int `json:"regionLocation"`
ZoneLocation int `json:"zoneLocation"`
}

type totalTransactions struct {
TotalNoTransactions1h uint64
TotalNoTransactions1m uint64
}

func (s *Service) calculateTotalNoTransactions20Blocks(block *types.Block) *totalTransactions {
func (s *Service) evictOutdatedEntries(currentMaxBlock uint64) {
minAcceptableBlock := currentMaxBlock - c_blocksPerHour
for key := minAcceptableBlock - 20; key >= minAcceptableBlock-c_blocksPerHour; key -= 20 {
// Check if the key exists before trying to delete
if _, found := s.txLookupCache.Get(key); found {
s.txLookupCache.Remove(key)
} else {
return
}
}
}

func (s *Service) calculateTotalNoTransactions(block *types.Block) *totalTransactions {
var totalTransactions1h uint64
var totalTransactions1m uint64

blockCount := 0
currentBlock := block
for i := 0; i < 20; i++ { // iterate through the last 20 blocks
if currentBlock == nil {
log.Error("Encountered a nil block, stopping iteration")
break
}
batchesNeeded := c_blocksPerHour / c_txBatchSize // calculate how many batches of c_txBatchSize are needed

// Add the number of transactions in the current block to the total
totalTransactions1h += uint64(len(currentBlock.Transactions()))
if i < 5 {
totalTransactions1m += uint64(len(currentBlock.Transactions()))
}
for i := 0; i < batchesNeeded; i++ {
startBlockNum := currentBlock.NumberU64() - uint64(i*c_txBatchSize)

// Get the parent block for the next iteration
fullBackend, ok := s.backend.(fullNodeBackend)
// Try to get the data from the LRU cache
cachedTxCount, ok := s.txLookupCache.Get(startBlockNum)
if !ok {
log.Error("Not running fullnode, cannot get parent block")
break
// Not in cache, so we need to calculate the transaction count for this batch
txCount := uint64(0)

for j := 0; j < c_txBatchSize; j++ {
if currentBlock == nil {
log.Error("Encountered a nil block, stopping iteration")
break
}

// Add the number of transactions in the current block to the total
txCount += uint64(len(currentBlock.Transactions()))

// If within the last 5 blocks, add to the 1-minute total
if i == 0 && j < c_blocksPerMinute {
totalTransactions1m += uint64(len(currentBlock.Transactions()))
}

// Get the parent block for the next iteration
fullBackend, ok := s.backend.(fullNodeBackend)
if !ok {
log.Error("Not running fullnode, cannot get parent block")
break
}

var err error
currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1))
if err != nil {
log.Error(fmt.Sprintf("Error getting block number %d: %s", int(currentBlock.NumberU64())-1, err.Error()))
break
}
}

// Store the sum in the cache
s.txLookupCache.Add(startBlockNum, txCount)

cachedTxCount = txCount
}

var err error
currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1))
if err != nil {
log.Error(fmt.Sprintf("Error getting block %d: %s", int(block.NumberU64())-i, err.Error()))
// Add the transactions from this batch
txCount, ok := cachedTxCount.(uint64)
if !ok {
log.Error("Error casting cachedTxCount to uint64")
break
}
blockCount += 1
totalTransactions1h += txCount
}

if blockCount < 1 {
log.Error("No blocks found in the last 20 blocks")
return &totalTransactions{
TotalNoTransactions1h: 0,
TotalNoTransactions1m: 0,
}
if s.txLookupCache.Len() > c_txLookupCacheLimit {
s.evictOutdatedEntries(block.NumberU64())
}

// Now totalTransactions1h and totalTransactions1m have the transaction counts for the last c_blocksPerHour and c_txBatchSize blocks respectively
return &totalTransactions{
TotalNoTransactions1h: totalTransactions1h * (300 / uint64(blockCount)),
TotalNoTransactions1m: totalTransactions1m * (5 / min(blockCount, 5)),
}
}

func min(a, b int) uint64 {
if a < b {
return uint64(a)
TotalNoTransactions1h: totalTransactions1h,
TotalNoTransactions1m: totalTransactions1m,
}
return uint64(b)
}

func (s *Service) assembleBlockLocationStats(block *types.Block) *blockLocation {
Expand Down Expand Up @@ -949,7 +868,7 @@ func (s *Service) assembleBlockAppendTimeStats(block *types.Block) *blockAppendT

func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockStats {
header := block.Header()
totalTransactions := s.calculateTotalNoTransactions20Blocks(block)
totalTransactions := s.calculateTotalNoTransactions(block)

// Assemble and return the block stats
return &internalBlockStats{
Expand All @@ -965,11 +884,12 @@ func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockS
// mining layer and reports it to the stats server.
func (s *Service) reportNodeStats(conn *connWrapper) error {
// Get RAM usage
var ramUsagePercent, ramFreePercent, ramAvailablePercent float64
var ramUsagePercent, ramFreePercent, ramAvailablePercent, ramUsage float64
if vmStat, err := mem.VirtualMemory(); err == nil {
ramUsagePercent = float64(vmStat.UsedPercent)
ramFreePercent = float64(vmStat.Free) / float64(vmStat.Total) * 100
ramAvailablePercent = float64(vmStat.Available) / float64(vmStat.Total) * 100
ramUsage = float64(vmStat.Used)
} else {
log.Warn("Error getting RAM usage:", err)
// Handle error, possibly with default values or by returning an error
Expand Down Expand Up @@ -1013,14 +933,16 @@ func (s *Service) reportNodeStats(conn *connWrapper) error {
"stats": &nodeStats{
Name: s.node,
Timestamp: big.NewInt(time.Now().Unix()), // Current timestamp
RAMUsage: int64(ramUsage),
RAMUsagePercent: int64(ramUsagePercent),
RAMFreePercent: int64(ramFreePercent),
RAMAvailablePercent: int64(ramAvailablePercent),
CPUUsagePercent: int64(cpuUsagePercent),
DiskUsageValue: int64(diskUsage),
DiskUsagePercent: int64(diskUsagePercent),
CurrentBlockNumber: currentBlockHeight,
Location: location,
RegionLocation: location.Region(),
ZoneLocation: location.Zone(),
},
}

Expand Down

0 comments on commit ed46410

Please sign in to comment.