diff --git a/quaistats/quaistats.go b/quaistats/quaistats.go index 3a804cd961..0434aaeb34 100644 --- a/quaistats/quaistats.go +++ b/quaistats/quaistats.go @@ -56,10 +56,6 @@ import ( ) const ( - // historyUpdateRange is the number of blocks a node should report upon login or - // history request. - historyUpdateRange = 50 - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 chainSideChanSize = 10 @@ -67,10 +63,12 @@ const ( // reportInterval is the time interval between two reports. reportInterval = 15 - c_alpha = 8 - c_tpsLookupCacheLimit = 100 - c_gasLookupCacheLimit = 100 - c_statsErrorValue = int64(-1) + c_alpha = 8 + c_txBatchSize = 20 + c_blocksPerMinute = 5 + c_blocksPerHour = c_blocksPerMinute * 60 + c_txLookupCacheLimit = c_blocksPerHour / c_txBatchSize + c_statsErrorValue = int64(-1) ) // backend encompasses the bare-minimum functionality needed for quaistats reporting @@ -111,8 +109,7 @@ type Service struct { headSub event.Subscription sideSub event.Subscription - tpsLookupCache *lru.Cache - gasLookupCache *lru.Cache + txLookupCache *lru.Cache chainID *big.Int @@ -196,22 +193,20 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string, return err } - tpsLookupCache, _ := lru.New(c_tpsLookupCacheLimit) - gasLookupCache, _ := lru.New(c_gasLookupCacheLimit) + txLookupCache, _ := lru.New(c_txLookupCacheLimit) quaistats := &Service{ - backend: backend, - engine: engine, - server: node.Server(), - node: parts[0], - pass: parts[1], - host: parts[2], - pongCh: make(chan struct{}), - chainID: backend.ChainConfig().ChainID, - trusted: trustednode, - tpsLookupCache: tpsLookupCache, - gasLookupCache: gasLookupCache, - instanceDir: node.InstanceDir(), + backend: backend, + engine: engine, + server: node.Server(), + node: parts[0], + pass: parts[1], + host: parts[2], + pongCh: make(chan struct{}), + chainID: backend.ChainConfig().ChainID, + trusted: trustednode, + txLookupCache: txLookupCache, + instanceDir: node.InstanceDir(), } node.RegisterLifecycle(quaistats) @@ -397,7 +392,7 @@ func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, chainSideCh chan co } // Report internal blockstats every 20 block stats if trusted node - if head.NumberU64()%20 == 0 && s.trusted { + if head.NumberU64()%c_txBatchSize == 0 && s.trusted { if err = s.reportInternalBlockStats(conns["internalBlockStats"], head); err != nil { noErrs = false log.Warn("Block internal stats report failed", "err", err) @@ -507,54 +502,6 @@ type loginSecret struct { Password string `json:"password"` } -// login tries to authorize the client at the remote server. -func (s *Service) login(conn *connWrapper) error { - // Construct and send the login authentication - infos := s.server.NodeInfo() - - var protocols []string - for _, proto := range s.server.Protocols { - protocols = append(protocols, fmt.Sprintf("%s/%d", proto.Name, proto.Version)) - } - var network string - if info := infos.Protocols["eth"]; info != nil { - network = fmt.Sprintf("%d", info.(*ethproto.NodeInfo).Network) - } - auth := &authMsg{ - ID: s.node, - Info: nodeInfo{ - Name: s.node, - Node: infos.Name, - Port: infos.Ports.Listener, - Network: network, - Protocol: strings.Join(protocols, ", "), - API: "No", - Os: runtime.GOOS, - OsVer: runtime.GOARCH, - Client: "0.1.1", - History: true, - Chain: common.NodeLocation.Name(), - ChainID: s.chainID.Uint64(), - }, - Secret: loginSecret{ - Name: "admin", - Password: s.pass, - }, - } - login := map[string][]interface{}{ - "emit": {"hello", auth}, - } - if err := conn.WriteJSON(login); err != nil { - return err - } - // Retrieve the remote ack or connection termination - var ack map[string][]string - if err := conn.ReadJSON(&ack); err != nil || len(ack["emit"]) != 1 || ack["emit"][0] != "ready" { - return errors.New("unauthorized") - } - return nil -} - type Credentials struct { Name string `json:"name"` Password string `json:"password"` @@ -671,64 +618,6 @@ func (s *Service) report(dataType string, conn *connWrapper) error { return nil } -type latencyReport struct { - Latency int `json:"latency"` -} - -// reportLatency sends a ping request to the server, measures the RTT time and -// finally sends a latency update. -func (s *Service) reportLatency(conn *connWrapper) error { - // Send the current time to the quaistats server - start := time.Now() - - ping := map[string][]interface{}{ - "emit": {"node-ping", map[string]string{ - "id": s.node, - "clientTime": start.String(), - }}, - } - if err := conn.WriteJSON(ping); err != nil { - return err - } - // Wait for the pong request to arrive back - select { - case <-s.pongCh: - // Pong delivered, report the latency - case <-time.After(5 * time.Second): - // Ping timeout, abort - return errors.New("ping timed out") - } - - latency := int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000) - - // Send back the measured latency - log.Trace("Sending measured latency to ethstats", "latency", strconv.Itoa(latency)) - - latencyReport := map[string]interface{}{ - "id": s.node, - "latency": &latencyReport{ - Latency: latency, - }, - } - - report := map[string][]interface{}{ - "emit": {"latency", latencyReport}, - } - - return conn.WriteJSON(report) -} - -// uncleStats is a custom wrapper around an uncle array to force serializing -// empty arrays instead of returning null for them. -type uncleStats []*types.Header - -func (s uncleStats) MarshalJSON() ([]byte, error) { - if uncles := ([]*types.Header)(s); len(uncles) > 0 { - return json.Marshal(uncles) - } - return []byte("[]"), nil -} - // reportSideBlock retrieves the current chain side event and reports it to the stats server. func (s *Service) reportSideBlock(conn *connWrapper, block *types.Block) error { log.Trace("Sending new side block to quaistats", "number", block.Number(), "hash", block.Hash()) @@ -843,16 +732,18 @@ type blockAppendTime struct { } type nodeStats struct { - Name string `json:"name"` - Timestamp *big.Int `json:"timestamp"` - RAMUsagePercent int64 `json:"ramUsagePercent"` - RAMFreePercent int64 `json:"ramFreePercent"` - RAMAvailablePercent int64 `json:"ramAvailablePercent"` - CPUUsagePercent int64 `json:"cpuPercent"` - DiskUsagePercent int64 `json:"diskUsagePercent"` - DiskUsageValue int64 `json:"diskUsageValue"` - CurrentBlockNumber []*big.Int `json:"currentBlockNumber"` - Location common.Location `json:"location"` + Name string `json:"name"` + Timestamp *big.Int `json:"timestamp"` + RAMUsage int64 `json:"ramUsage"` + RAMUsagePercent int64 `json:"ramUsagePercent"` + RAMFreePercent int64 `json:"ramFreePercent"` + RAMAvailablePercent int64 `json:"ramAvailablePercent"` + CPUUsagePercent int64 `json:"cpuPercent"` + DiskUsagePercent int64 `json:"diskUsagePercent"` + DiskUsageValue int64 `json:"diskUsageValue"` + CurrentBlockNumber []*big.Int `json:"currentBlockNumber"` + RegionLocation int `json:"regionLocation"` + ZoneLocation int `json:"zoneLocation"` } type totalTransactions struct { @@ -860,59 +751,87 @@ type totalTransactions struct { TotalNoTransactions1m uint64 } -func (s *Service) calculateTotalNoTransactions20Blocks(block *types.Block) *totalTransactions { +func (s *Service) evictOutdatedEntries(currentMaxBlock uint64) { + minAcceptableBlock := currentMaxBlock - c_blocksPerHour + for key := minAcceptableBlock - 20; key >= minAcceptableBlock-c_blocksPerHour; key -= 20 { + // Check if the key exists before trying to delete + if _, found := s.txLookupCache.Get(key); found { + s.txLookupCache.Remove(key) + } else { + return + } + } +} + +func (s *Service) calculateTotalNoTransactions(block *types.Block) *totalTransactions { var totalTransactions1h uint64 var totalTransactions1m uint64 - blockCount := 0 currentBlock := block - for i := 0; i < 20; i++ { // iterate through the last 20 blocks - if currentBlock == nil { - log.Error("Encountered a nil block, stopping iteration") - break - } + batchesNeeded := c_blocksPerHour / c_txBatchSize // calculate how many batches of c_txBatchSize are needed - // Add the number of transactions in the current block to the total - totalTransactions1h += uint64(len(currentBlock.Transactions())) - if i < 5 { - totalTransactions1m += uint64(len(currentBlock.Transactions())) - } + for i := 0; i < batchesNeeded; i++ { + startBlockNum := currentBlock.NumberU64() - uint64(i*c_txBatchSize) - // Get the parent block for the next iteration - fullBackend, ok := s.backend.(fullNodeBackend) + // Try to get the data from the LRU cache + cachedTxCount, ok := s.txLookupCache.Get(startBlockNum) if !ok { - log.Error("Not running fullnode, cannot get parent block") - break + // Not in cache, so we need to calculate the transaction count for this batch + txCount := uint64(0) + + for j := 0; j < c_txBatchSize; j++ { + if currentBlock == nil { + log.Error("Encountered a nil block, stopping iteration") + break + } + + // Add the number of transactions in the current block to the total + txCount += uint64(len(currentBlock.Transactions())) + + // If within the last 5 blocks, add to the 1-minute total + if i == 0 && j < c_blocksPerMinute { + totalTransactions1m += uint64(len(currentBlock.Transactions())) + } + + // Get the parent block for the next iteration + fullBackend, ok := s.backend.(fullNodeBackend) + if !ok { + log.Error("Not running fullnode, cannot get parent block") + break + } + + var err error + currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1)) + if err != nil { + log.Error(fmt.Sprintf("Error getting block number %d: %s", int(currentBlock.NumberU64())-1, err.Error())) + break + } + } + + // Store the sum in the cache + s.txLookupCache.Add(startBlockNum, txCount) + + cachedTxCount = txCount } - var err error - currentBlock, err = fullBackend.BlockByNumber(context.Background(), rpc.BlockNumber(currentBlock.NumberU64()-1)) - if err != nil { - log.Error(fmt.Sprintf("Error getting block %d: %s", int(block.NumberU64())-i, err.Error())) + // Add the transactions from this batch + txCount, ok := cachedTxCount.(uint64) + if !ok { + log.Error("Error casting cachedTxCount to uint64") break } - blockCount += 1 + totalTransactions1h += txCount } - if blockCount < 1 { - log.Error("No blocks found in the last 20 blocks") - return &totalTransactions{ - TotalNoTransactions1h: 0, - TotalNoTransactions1m: 0, - } + if s.txLookupCache.Len() > c_txLookupCacheLimit { + s.evictOutdatedEntries(block.NumberU64()) } + // Now totalTransactions1h and totalTransactions1m have the transaction counts for the last c_blocksPerHour and c_txBatchSize blocks respectively return &totalTransactions{ - TotalNoTransactions1h: totalTransactions1h * (300 / uint64(blockCount)), - TotalNoTransactions1m: totalTransactions1m * (5 / min(blockCount, 5)), - } -} - -func min(a, b int) uint64 { - if a < b { - return uint64(a) + TotalNoTransactions1h: totalTransactions1h, + TotalNoTransactions1m: totalTransactions1m, } - return uint64(b) } func (s *Service) assembleBlockLocationStats(block *types.Block) *blockLocation { @@ -949,7 +868,7 @@ func (s *Service) assembleBlockAppendTimeStats(block *types.Block) *blockAppendT func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockStats { header := block.Header() - totalTransactions := s.calculateTotalNoTransactions20Blocks(block) + totalTransactions := s.calculateTotalNoTransactions(block) // Assemble and return the block stats return &internalBlockStats{ @@ -965,11 +884,12 @@ func (s *Service) assembleInternalBlockStats(block *types.Block) *internalBlockS // mining layer and reports it to the stats server. func (s *Service) reportNodeStats(conn *connWrapper) error { // Get RAM usage - var ramUsagePercent, ramFreePercent, ramAvailablePercent float64 + var ramUsagePercent, ramFreePercent, ramAvailablePercent, ramUsage float64 if vmStat, err := mem.VirtualMemory(); err == nil { ramUsagePercent = float64(vmStat.UsedPercent) ramFreePercent = float64(vmStat.Free) / float64(vmStat.Total) * 100 ramAvailablePercent = float64(vmStat.Available) / float64(vmStat.Total) * 100 + ramUsage = float64(vmStat.Used) } else { log.Warn("Error getting RAM usage:", err) // Handle error, possibly with default values or by returning an error @@ -1013,6 +933,7 @@ func (s *Service) reportNodeStats(conn *connWrapper) error { "stats": &nodeStats{ Name: s.node, Timestamp: big.NewInt(time.Now().Unix()), // Current timestamp + RAMUsage: int64(ramUsage), RAMUsagePercent: int64(ramUsagePercent), RAMFreePercent: int64(ramFreePercent), RAMAvailablePercent: int64(ramAvailablePercent), @@ -1020,7 +941,8 @@ func (s *Service) reportNodeStats(conn *connWrapper) error { DiskUsageValue: int64(diskUsage), DiskUsagePercent: int64(diskUsagePercent), CurrentBlockNumber: currentBlockHeight, - Location: location, + RegionLocation: location.Region(), + ZoneLocation: location.Zone(), }, }