Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
# Conflicts:
#	go.mod
  • Loading branch information
billettc committed Aug 2, 2024
1 parent ce2c890 commit abdc44d
Show file tree
Hide file tree
Showing 5 changed files with 279 additions and 4 deletions.
113 changes: 113 additions & 0 deletions block/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package block

import (
"bytes"
"fmt"
"reflect"
"strings"

"github.com/streamingfast/substreams-gear/generated/convert1420"

Check failure on line 9 in block/decoder.go

View workflow job for this annotation

GitHub Actions / build (1.22.x)

github.com/streamingfast/[email protected]: replacement directory ../substreams-gear does not exist

"github.com/centrifuge/go-substrate-rpc-client/v4/registry"
"github.com/centrifuge/go-substrate-rpc-client/v4/scale"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/gobeam/stringy"
pbgear "github.com/streamingfast/firehose-gear/pb/sf/gear/type/v1"
v1 "github.com/streamingfast/substreams-gear/pb/sf/substreams/gear/type/v1"

Check failure on line 16 in block/decoder.go

View workflow job for this annotation

GitHub Actions / build (1.22.x)

github.com/streamingfast/[email protected]: replacement directory ../substreams-gear does not exist
"go.uber.org/zap"
)

type Decoder struct {
callRegistry registry.CallRegistry
logger *zap.Logger
}

func NewDecoder(logger *zap.Logger) *Decoder {
return &Decoder{
logger: logger,
}
}

func (d *Decoder) Decoded(block *pbgear.Block) (*v1.Block, error) {

decodedExtrinsics, err := decodeExtrinsics(d.callRegistry, block.Extrinsics, d.logger)
if err != nil {
return nil, fmt.Errorf("failed to decode extrinsics: %w", err)
}

return &v1.Block{
Number: block.Number,
Hash: block.Hash,
Header: block.Header,
DigestItems: block.DigestItems,
Justification: block.Justification,
Extrinsics: decodedExtrinsics,
Events: nil,
}, nil
}

var versionFuncMap map[string]map[string]reflect.Value

func init() {
versionFuncMap = map[string]map[string]reflect.Value{
"1420": convert1420.FuncMap,
}
}

func decodeExtrinsics(version string, callRegistry registry.CallRegistry, extrinsics []*pbgear.Extrinsic, logger *zap.Logger) ([]*v1.Extrinsic, error) {
var decodedExtrinsics []*v1.Extrinsic
for i, extrinsic := range extrinsics {
logger.Info("decoding extrinsic", zap.Int("index", i), zap.Uint32("section", extrinsic.Method.CallIndex.SectionIndex), zap.Uint32("method", extrinsic.Method.CallIndex.MethodIndex))
callName, decodedFields, err := decodeCallExtrinsics(callRegistry, extrinsic)
if err != nil {
return nil, fmt.Errorf("failed to decode extrinsic: %w", err)
}

parts := strings.Split(callName, ".")
pallet := parts[0]
call := parts[1]
call = stringy.New(call).PascalCase().Get()
structName := pallet + "_" + call + "Call"
funcName := "To_" + structName

funcMap := versionFuncMap[]
if fn, found := funcMap[funcName]; found {
o := fn.Call([]reflect.Value{reflect.ValueOf(decodedFields)})

e := o[0].Interface().(*v1.Extrinsic)
decodedExtrinsics = append(decodedExtrinsics, e)

} else {
panic(fmt.Sprintf("unknown extrinsic call: %s", callName))
}
}
return decodedExtrinsics, nil
}

func decodeCallExtrinsics(callRegistry registry.CallRegistry, extrinsic *pbgear.Extrinsic) (string, registry.DecodedFields, error) {
callIndex := extrinsic.Method.CallIndex
args := extrinsic.Method.Args

callDecoder, found := callRegistry[toCallIndex(callIndex)]
if !found {
return "", nil, fmt.Errorf("failed to get call decoder for call at index %d %d", callIndex.SectionIndex, callIndex.MethodIndex)
}

if args != nil {
decoder := scale.NewDecoder(bytes.NewReader(args))
callFields, err := callDecoder.Decode(decoder)
if err != nil {
return "", nil, fmt.Errorf("failed to decode call: %w", err)
}
return callDecoder.Name, callFields, nil
}

return callDecoder.Name, nil, nil
}

func toCallIndex(ci *pbgear.CallIndex) types.CallIndex {
return types.CallIndex{
SectionIndex: uint8(ci.SectionIndex),
MethodIndex: uint8(ci.MethodIndex),
}
}
138 changes: 138 additions & 0 deletions cmd/firevara/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package main

import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"

"github.com/centrifuge/go-substrate-rpc-client/v4/registry"

"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/centrifuge/go-substrate-rpc-client/v4/types/codec"

pbgear "github.com/streamingfast/firehose-gear/pb/sf/gear/type/v1"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/mostynb/go-grpc-compression/zstd"
"google.golang.org/grpc"

"github.com/streamingfast/cli/sflags"

"github.com/streamingfast/firehose-core/firehose/client"

"github.com/streamingfast/firehose-gear/block"

"github.com/spf13/cobra"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func NewDecoderCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "decode-block",
Short: "Consume blocks from firehose and decode them",
RunE: decodeRunE(logger, tracer),
}
cmd.Flags().Int64("first-streamable-block", 0, "first block to decode")
cmd.Flags().BoolP("plaintext", "p", false, "Use plaintext connection to Firehose")
cmd.Flags().BoolP("insecure", "k", false, "Use SSL connection to Firehose but skip SSL certificate validation")

return cmd
}

func decodeRunE(logger *zap.Logger, tracer logging.Tracer) func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

firehoseEndpoint := args[0]
plaintext := sflags.MustGetBool(cmd, "plaintext")
insecure := sflags.MustGetBool(cmd, "insecure")
firstStreamableBlock := sflags.MustGetInt64(cmd, "first-streamable-block")

decoder := block.NewDecoder(logger)
jwt := os.Getenv("FIREHOSE_API_TOKEN")

firehoseClient, connClose, grpcCallOpts, err := client.NewFirehoseClient(firehoseEndpoint, jwt, "", insecure, plaintext)
if err != nil {
return fmt.Errorf("creating firehose client: %w", err)
}
defer connClose()

grpcCallOpts = append(grpcCallOpts, grpc.UseCompressor(zstd.Name))

//todo: load cursor here

request := &pbfirehose.Request{
StartBlockNum: firstStreamableBlock,
StopBlockNum: 0,
FinalBlocksOnly: false,
}

stream, err := firehoseClient.Blocks(ctx, request, grpcCallOpts...)
if err != nil {
return fmt.Errorf("unable to start blocks stream: %w", err)
}
logger.Info("starting blocks stream")
lastBlockNum := uint64(firstStreamableBlock)
for {
response, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("stream error while receiving: %w", err)
}

logger.Info("got response")
block := &pbgear.Block{}
if err := anypb.UnmarshalTo(response.Block, block, proto.UnmarshalOptions{}); err != nil {
return fmt.Errorf("unmarshalling anypb: %w", err)
}
logger.Info("got block", zap.Uint64("block_num", block.Number))

if len(block.Header.UpdatedMetadata) > 0 {
logger.Info("Updating metadata", zap.Uint64("block_num", block.Number), zap.Uint32("version", block.Header.SpecVersion))
md := LoadMetadata(block.Header.UpdatedMetadata)
factory := registry.NewFactory()
callRegistry, err := factory.CreateCallRegistry(md)
if err != nil {
return fmt.Errorf("creating call registry: %w", err)
}
decoder.SetCallRegistry(callRegistry)
//todo: save has last metadata seen and reload it at startup
}

if lastBlockNum != 0 && (block.Number-lastBlockNum) != 1 {
panic(fmt.Sprintf("incorrect number of blocks sequence. Expected %d got %d", lastBlockNum+1, block.Number))
}
lastBlockNum = block.Number

decodedBlock, err := decoder.Decoded(block)
if err != nil {
return fmt.Errorf("decoding block %d %s: %w", block.Number, hex.EncodeToString(block.Hash), err)
}

j, err := json.Marshal(decodedBlock)
if err != nil {
return fmt.Errorf("marshalling block %d: %w", block.Number, err)
}
fmt.Println(string(j))
//todo: print DM log
//todo: save Cursor
}
return nil
}
}

func LoadMetadata(data []byte) *types.Metadata {
metadata := &types.Metadata{}
err := codec.Decode(data, metadata)
if err != nil {
panic(err)
}
return metadata
}
1 change: 1 addition & 0 deletions cmd/firevara/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func init() {
logging.InstantiateLoggers(logging.WithDefaultLevel(zap.InfoLevel))

rootCmd.AddCommand(NewFetchCmd(logger, tracer))
rootCmd.AddCommand(NewDecoderCmd(logger, tracer))
}

func main() {
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ go 1.22.0
toolchain go1.22.3

replace github.com/jhump/protoreflect => github.com/streamingfast/protoreflect v0.0.0-20231205191344-4b629d20ce8d

replace github.com/centrifuge/go-substrate-rpc-client/v4 => github.com/streamingfast/go-substrate-rpc-client/v4 v4.0.0-20240802141237-cb3f49e2ebd1

replace github.com/streamingfast/substreams-gear => ../substreams-gear
require (
github.com/centrifuge/go-substrate-rpc-client/v4 v4.2.1
github.com/gobeam/stringy v0.0.7
github.com/mostynb/go-grpc-compression v1.1.17
github.com/planetscale/vtprotobuf v0.6.0
github.com/planetscale/vtprotobuf v0.6.0
github.com/spf13/cobra v1.7.0
github.com/streamingfast/bstream v0.0.2-0.20240619142813-9d23840859bf
github.com/streamingfast/cli v0.0.4-0.20240412191021-5f81842cb71d
github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d
github.com/streamingfast/substreams-gear v0.1.1-0.20240802140944-5acc1cee4cf8
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.64.0
)

require (
Expand Down Expand Up @@ -135,7 +140,6 @@ require (
github.com/streamingfast/dstore v0.1.1-0.20240325191553-bcce8892a9bb // indirect
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 // indirect
github.com/streamingfast/opaque v0.0.0-20210811180740-0c01d37ea308 // indirect
github.com/streamingfast/pbgo v0.0.6-0.20240430190514-722fe9d82e5d // indirect
github.com/streamingfast/sf-tracing v0.0.0-20240430173521-888827872b90 // indirect
github.com/streamingfast/shutter v1.5.0 // indirect
github.com/streamingfast/substreams v1.9.1 // indirect
Expand Down Expand Up @@ -168,7 +172,6 @@ require (
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/grpc v1.64.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
Loading

0 comments on commit abdc44d

Please sign in to comment.