Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Aug 14, 2024
1 parent 82e546e commit dd092ae
Show file tree
Hide file tree
Showing 19 changed files with 474 additions and 249 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
- name: Build Injective
working-directory: ./injective
run: go build -v -ldflags "-X main.version=${{ github.event.ref }} -X main.commit=${{ github.sha }} -X main.date=$(date -u +%Y-%m-%dT%H:%MZ)" -o ../fireinjective ./cmd/fireinjective
run: go build -v -ldflags "-X main.version=${{ github.event.ref }} -X main.commit=${{ github.sha }} -X main.date=$(date -u +%Y-%m-%dT%H:%MZ)" -o ../firemantra ./cmd/firemantra

- name: Log in to the Container registry
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
Expand Down
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
*.lz4
/.idea
/injective/.idea/
/injective/devel/.env
/injective/devel/data
8 changes: 5 additions & 3 deletions poller/loader.go → cosmos/block/loader.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package poller
package block

import (
"fmt"

block2 "github.com/streamingfast/firehose-cosmos/poller/block"

"github.com/cometbft/cometbft/state"
txIndex "github.com/cometbft/cometbft/state/txindex/kv"
"github.com/cometbft/cometbft/store"
Expand Down Expand Up @@ -80,7 +82,7 @@ func (l *BlockLoader) loadBlock(height int64) (*pbcosmos.Block, error) {
// return nil, fmt.Errorf("converting tx results: %w", err)
//}

trxResults, err := convertDeliverTxs(abciResponses.DeliverTxs)
trxResults, err := block2.convertDeliverTxs(abciResponses.DeliverTxs)
if err != nil {
return nil, fmt.Errorf("converting tx results: %w", err)
}
Expand All @@ -94,7 +96,7 @@ func (l *BlockLoader) loadBlock(height int64) (*pbcosmos.Block, error) {
return nil, fmt.Errorf("converting validators: %w", err)
}

misbehaviors, err := MisbehaviorsFromEvidences(block.Evidence.Evidence)
misbehaviors, err := block2.MisbehaviorsFromEvidences(block.Evidence.Evidence)
if err != nil {
return nil, fmt.Errorf("converting misbehaviors from evidences: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions poller/merger.go → cosmos/block/merger.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package poller
package block

import (
"context"
Expand All @@ -15,10 +15,10 @@ import (

type SimpleMerge struct {
logger *zap.Logger
blockLoader *BlockLoader
blockLoader *block.BlockLoader
}

func NewSimpleMerger(blockLoader *BlockLoader, logger *zap.Logger) *SimpleMerge {
func NewSimpleMerger(blockLoader *block.BlockLoader, logger *zap.Logger) *SimpleMerge {
return &SimpleMerge{
blockLoader: blockLoader,
logger: logger,
Expand Down
10 changes: 6 additions & 4 deletions poller/cmd/firecosmos/fetcher.go → cosmos/cmd/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package main
package cmd

import (
"fmt"
"strconv"
"time"

"github.com/streamingfast/firehose-cosmos/poller/block"

cometBftHttp "github.com/cometbft/cometbft/rpc/client/http"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
"github.com/streamingfast/firehose-cosmos/poller"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer, cosmosChain string) *cobra.Command {
func NewFetchRPCCmd(logger *zap.Logger, tracer logging.Tracer, cosmosChain string) *cobra.Command {

cmd := &cobra.Command{
Use: "rpc <first-streamable-block>",
Short: "fetch blocks from rpc endpoint",
Expand Down Expand Up @@ -69,7 +71,7 @@ func fetchRunE(logger *zap.Logger, tracer logging.Tracer, cosmosChain string) fi
var rpcFetcher blockpoller.BlockFetcher

if cosmosChain == "injective" {
rpcFetcher = poller.NewRPCFetcher(rpcClients, fetchInterval, latestBlockRetryInterval, logger)
rpcFetcher = block.NewRPCFetcher(rpcClients, fetchInterval, latestBlockRetryInterval, logger)
}

poller := blockpoller.New(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package cmd

import (
"errors"
Expand All @@ -14,8 +14,8 @@ import (
"github.com/streamingfast/cli"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-cosmos/cosmos/block"
pbcosmos "github.com/streamingfast/firehose-cosmos/cosmos/pb/sf/cosmos/type/v2"
"github.com/streamingfast/firehose-cosmos/poller"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -105,7 +105,7 @@ func fixUnknownTypeBlocksRunE(zlog *zap.Logger, tracer logging.Tracer) firecore.
blockCount++

injectiveBlock := &pbcosmos.Block{}
err = poller.UnmarshallerDiscardUnknown.Unmarshal(currentBlock.Payload.Value, injectiveBlock)
err = block.UnmarshallerDiscardUnknown.Unmarshal(currentBlock.Payload.Value, injectiveBlock)
if err != nil {
return fmt.Errorf("unmarshaling block: %w", err)
}
Expand Down
File renamed without changes.
9 changes: 2 additions & 7 deletions cosmos/go.mod
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
module github.com/streamingfast/firehose-cosmos/cosmos

go 1.22
go 1.22.0

toolchain go1.22.0

require google.golang.org/protobuf v1.33.0
require github.com/google/go-cmp v0.6.0 // indirect

replace github.com/jhump/protoreflect => github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d
toolchain go1.22.3
2 changes: 2 additions & 0 deletions cosmos/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/firehose-core v1.5.8-0.20240814134036-ad3d137d66d4/go.mod h1:QR6NRnN9EEXVAtMJ05YEisd5UesdMoY9liILX1WzFgs=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
65 changes: 65 additions & 0 deletions cosmos/utils/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package utils

import (
"fmt"
"reflect"
"unicode/utf8"

"google.golang.org/protobuf/proto"
)

var UnmarshallerDiscardUnknown = &proto.UnmarshalOptions{
DiscardUnknown: true,
}

func ArrayToPointerArray[T any](ts []T) []*T {
res := make([]*T, len(ts))
for i, t := range ts {
res[i] = &t
}
return res
}

func ArrayProtoFlip[U cosmoProto.Message, V proto.Message](origins []U, targets []V) error {
if len(origins) != len(targets) {
return fmt.Errorf("origin and target arrays have different lengths: %d != %d", len(origins), len(targets))
}
if len(origins) == 0 {
return nil
}

for i := range origins {
err := ProtoFlip(origins[i], targets[i])
if err != nil {
return fmt.Errorf("converting element %d: %w", i, err)
}
}

return nil
}

func ProtoFlip(origin cosmoProto.Message, target proto.Message) error {
if origin == nil || reflect.ValueOf(origin).IsNil() {
return nil
}
//marshall origin the unmarshall to target
data, err := cosmoProto.Marshal(origin)
if err != nil {
return fmt.Errorf("mashalling origin object %T: %w", data, err)
}

err = UnmarshallerDiscardUnknown.Unmarshal(data, target)
if err != nil {
return fmt.Errorf("unmashalling target object %T: %w", data, err)
}

return nil
}

func FixUtf(r rune) rune {
if r == utf8.RuneError {
fmt.Println("found rune error")
return '�'
}
return r
}
73 changes: 66 additions & 7 deletions poller/fetcher.go → injective/block/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package poller
package block

import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

"github.com/streamingfast/firehose-cosmos/cosmos/utils"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/proto/tendermint/types"
cometBftHttp "github.com/cometbft/cometbft/rpc/client/http"
Expand Down Expand Up @@ -73,6 +76,7 @@ func (f *RPCBlockFetcher) Fetch(ctx context.Context, requestBlockNum uint64) (b
f.logger.Info("got latest block num", zap.Uint64("latest_block_num", f.latestBlockNum), zap.Uint64("requested_block_num", requestBlockNum))

if f.latestBlockNum >= requestBlockNum {

break
}
sleepDuration = f.latestBlockRetryInterval
Expand Down Expand Up @@ -185,8 +189,8 @@ func convertBlockFromResponse(rpcBlock *ctypes.ResultBlock, rpcBlockResults *cty
return nil, fmt.Errorf("converting consensus param updates: %w", err)
}

finalEvents := rpcBlockResults.BeginBlockEvents
finalEvents = append(finalEvents, rpcBlockResults.EndBlockEvents...)
finalEvents := rpcBlockResults.FinalizeBlockEvents
//finalEvents = append(finalEvents, rpcBlockResults.EndBlockEvents...)
events, err := convertEventsFromResponse(finalEvents)
if err != nil {
return nil, fmt.Errorf("converting events: %w", err)
Expand Down Expand Up @@ -229,7 +233,7 @@ func convertEventsFromResponse(responseEvents []abci.Event) ([]*pbcosmos.Event,
events[i] = &pbcosmos.Event{}
}

err := arrayProtoFlip(arrayToPointerArray(responseEvents), events)
err := utils.ArrayProtoFlip(utils.ArrayToPointerArray(responseEvents), events)
if err != nil {
return nil, fmt.Errorf("converting events: %w", err)
}
Expand All @@ -243,7 +247,7 @@ func convertTxsFromResponse(transactions cometType.Txs) (txs [][]byte) {
func convertHeaderFromResponse(responseHeader *cometType.Header) (*pbcosmos.Header, error) {
header := &pbcosmos.Header{}

err := protoFlip(responseHeader.ToProto(), header)
err := utils.ProtoFlip(responseHeader.ToProto(), header)
if err != nil {
return nil, fmt.Errorf("converting block meta header: %w", err)
}
Expand All @@ -257,7 +261,7 @@ func convertValidatorUpdatesFromResponse(validatorUpdates []abci.ValidatorUpdate
validators[i] = &pbcosmos.ValidatorUpdate{}
}

err := arrayProtoFlip(arrayToPointerArray(validatorUpdates), validators)
err := utils.ArrayProtoFlip(utils.ArrayToPointerArray(validatorUpdates), validators)
if err != nil {
return nil, fmt.Errorf("converting validators: %w", err)
}
Expand All @@ -266,9 +270,64 @@ func convertValidatorUpdatesFromResponse(validatorUpdates []abci.ValidatorUpdate

func convertConsensusParamUpdatesFromResponse(consensusParamUpdates *types.ConsensusParams) (*pbcosmos.ConsensusParams, error) {
out := &pbcosmos.ConsensusParams{}
err := protoFlip(consensusParamUpdates, out)
err := utils.ProtoFlip(consensusParamUpdates, out)
if err != nil {
return nil, fmt.Errorf("converting consensus param updates: %w", err)
}
return out, nil
}

func convertResponseDeliverTx(tx *abci.ResponseDeliverTx) (*pbcosmos.TxResults, error) {
events := make([]*pbcosmos.Event, len(tx.Events))
for i := range events {
events[i] = &pbcosmos.Event{}
}
err := utils.ArrayProtoFlip(arrayToPointerArray(tx.Events), events)
if err != nil {
return nil, fmt.Errorf("converting events: %w", err)
}

fixedLog := strings.Map(utils.FixUtf, tx.Log)

txResults := &pbcosmos.TxResults{
Code: tx.Code,
Data: tx.Data,
Log: fixedLog,
Info: tx.Info,
GasWanted: tx.GasWanted,
GasUsed: tx.GasUsed,
Events: events,
Codespace: tx.Codespace,
}

return txResults, nil
}

func convertDeliverTxs(txs []*abci.ResponseDeliverTx) ([]*pbcosmos.TxResults, error) {
txResults := make([]*pbcosmos.TxResults, len(txs))
for i, tx := range txs {
txResults[i], _ = convertResponseDeliverTx(tx)
}
return txResults, nil
}

func MisbehaviorsFromEvidences(evidences cometType.EvidenceList) ([]*pbcosmos.Misbehavior, error) {
var misbehaviors []*pbcosmos.Misbehavior
for _, e := range evidences {

abciMisbehavior := e.ABCI()

partials := make([]*pbcosmos.Misbehavior, len(abciMisbehavior))
for i := range partials {
partials[i] = &pbcosmos.Misbehavior{}
}

err := utils.ArrayProtoFlip(utils.ArrayToPointerArray(abciMisbehavior), partials)
if err != nil {
return nil, fmt.Errorf("converting abci misbehavior: %w", err)
}

misbehaviors = append(misbehaviors, partials...)
}
return misbehaviors, nil
}
39 changes: 39 additions & 0 deletions injective/cmd/fireinjective/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/streamingfast/firehose-cosmos/cosmos/cmd"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

var logger, tracer = logging.PackageLogger("firecosmos", "github.com/streamingfast/firehose-cosmos")
var rootCmd = &cobra.Command{
Use: "fireinjective",
Short: "injective block fetching and tooling",
Args: cobra.ExactArgs(1),
}

func init() {
logging.InstantiateLoggers(logging.WithDefaultLevel(zap.InfoLevel))

fetchCmd := &cobra.Command{
Use: "fetch",
Short: "fetch blocks from different sources",
Args: cobra.ExactArgs(2),
}
fetchCmd.AddCommand(cmd.NewFetchRPCCmd(logger, tracer, "mantra"))

rootCmd.AddCommand(fetchCmd)
rootCmd.AddCommand(cmd.NewToolsFixUnknownTypeBlocks(logger, tracer))
}

func main() {
if err := rootCmd.Execute(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Whoops. There was an error while executing your CLI '%s'", err)
os.Exit(1)
}
}
Loading

0 comments on commit dd092ae

Please sign in to comment.