Skip to content

Commit

Permalink
Tx meta fixes (#175)
Browse files Browse the repository at this point in the history
* Cleanup gsfa handling of tx metadata

* Add `find-missing-tx-metadata` command for finding transactions with missing metadata.

* Cleanup

* Cleanup

* Cleanup

* Undefined kind is -1

* Cleanup

* More checking

* Cleanup

* Cleanup

* More options

* More errors

* Move solana errors to dedicated dir

* Solanaerrors

* make gen-proto

* Stub multireader

* Handle InsufficientFundsForRent
  • Loading branch information
gagliardetto authored Oct 30, 2024
1 parent 453db82 commit b569841
Show file tree
Hide file tree
Showing 32 changed files with 1,759 additions and 1,140 deletions.
43 changes: 28 additions & 15 deletions accum/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

type ObjectAccumulator struct {
skipNodes uint64
flushOnKind iplddecoders.Kind
reader *carreader.CarReader
ignoreKinds iplddecoders.KindSlice
Expand Down Expand Up @@ -42,6 +43,11 @@ func NewObjectAccumulator(
}
}

// SetSkip(n)
func (oa *ObjectAccumulator) SetSkip(n uint64) {
oa.skipNodes = n
}

var flushBufferPool = sync.Pool{
New: func() interface{} {
return &flushBuffer{}
Expand All @@ -58,14 +64,14 @@ func putFlushBuffer(fb *flushBuffer) {
}

type flushBuffer struct {
head *ObjectWithMetadata
other []ObjectWithMetadata
parent *ObjectWithMetadata
children []ObjectWithMetadata
}

// Reset resets the flushBuffer.
func (fb *flushBuffer) Reset() {
fb.head = nil
clear(fb.other)
fb.parent = nil
fb.children = fb.children[:0]
}

type ObjectWithMetadata struct {
Expand All @@ -84,7 +90,7 @@ func (oa *ObjectAccumulator) startFlusher(ctx context.Context) {
if fb == nil {
return
}
if err := oa.flush(fb.head, fb.other); err != nil {
if err := oa.flush(fb.parent, fb.children); err != nil {
if isStop(err) {
return
}
Expand All @@ -99,8 +105,8 @@ func (oa *ObjectAccumulator) startFlusher(ctx context.Context) {
func (oa *ObjectAccumulator) sendToFlusher(head *ObjectWithMetadata, other []ObjectWithMetadata) {
oa.flushWg.Add(1)
fb := getFlushBuffer()
fb.head = head
fb.other = clone(other)
fb.parent = head
fb.children = other
oa.flushQueue <- fb
}

Expand All @@ -118,47 +124,54 @@ func (oa *ObjectAccumulator) Run(ctx context.Context) error {
totalOffset += size
}
}
numSkipped := uint64(0)
objectCap := 5000
buffersLoop:
for {
objects := make([]ObjectWithMetadata, 0, objectCap)
children := make([]ObjectWithMetadata, 0, objectCap)
currentBufferLoop:
for {
if ctx.Err() != nil {
return ctx.Err()
}
c, sectionLength, data, err := oa.reader.NextNodeBytes()
cid_, sectionLength, data, err := oa.reader.NextNodeBytes()
if err != nil {
if errors.Is(err, io.EOF) {
oa.sendToFlusher(nil, objects)
oa.sendToFlusher(nil, children)
break buffersLoop
}
return err
}
currentOffset := totalOffset
totalOffset += sectionLength

if numSkipped < oa.skipNodes {
numSkipped++
continue
}

if data == nil {
oa.sendToFlusher(nil, objects)
oa.sendToFlusher(nil, children)
break buffersLoop
}

objm := ObjectWithMetadata{
Cid: c,
element := ObjectWithMetadata{
Cid: cid_,
Offset: currentOffset,
SectionLength: sectionLength,
ObjectData: data,
}

kind := iplddecoders.Kind(data[1])
if kind == oa.flushOnKind {
oa.sendToFlusher(&objm, (objects))
// element is parent
oa.sendToFlusher(&element, children)
break currentBufferLoop
} else {
if len(oa.ignoreKinds) > 0 && oa.ignoreKinds.Has(kind) {
continue
}
objects = append(objects, objm)
children = append(children, element)
}
}
}
Expand Down
245 changes: 245 additions & 0 deletions accum/tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
package accum

import (
"context"
"fmt"
"sync"

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/tooling"
)

type TransactionWithSlot struct {
Offset uint64
Length uint64
Slot uint64
Blocktime uint64
Error error
Transaction solana.Transaction
Metadata *solanatxmetaparsers.TransactionStatusMetaContainer
}

// IsMetaNotFound returns true if the error is a not found error.
func (obj TransactionWithSlot) IsMetaNotFound() bool {
e, ok := obj.Error.(txMetaError)
if !ok {
return false
}
return e.IsMetaNotFound()
}

// IsMetaParseError returns true if the error is a parsing error.
func (obj TransactionWithSlot) IsMetaParseError() bool {
e, ok := obj.Error.(txMetaError)
if !ok {
return false
}
return e.IsMetaParseError()
}

// Ok returns true if the error is nil.
func (obj TransactionWithSlot) Ok() bool {
return obj.Error == nil
}

type txMetaError struct {
Sig solana.Signature
Err error
isNotFound bool
isParseError bool
}

func (obj txMetaError) Error() string {
switch {
case obj.isNotFound:
return fmt.Sprintf("not found: %s", obj.Err)
case obj.isParseError:
return fmt.Sprintf("parse error: %s", obj.Err)
default:
return fmt.Sprintf("error: %s", obj.Err)
}
}

func (obj txMetaError) Is(target error) bool {
if _, ok := target.(txMetaError); ok {
return true
}
return false
}

func (obj txMetaError) Unwrap() error {
return obj.Err
}

func (obj txMetaError) IsMetaNotFound() bool {
return obj.isNotFound
}

func (obj txMetaError) IsMetaParseError() bool {
return obj.isParseError
}

func (obj txMetaError) Ok() bool {
return obj.Err == nil
}

func newTxMetaErrorNotFound(sig solana.Signature, err error) txMetaError {
return txMetaError{
Sig: sig,
Err: err,
isNotFound: true,
}
}

func newTxMetaErrorParseError(sig solana.Signature, err error) txMetaError {
return txMetaError{
Sig: sig,
Err: err,
isParseError: true,
}
}

var poolOfTransactionWithSlotSlices = sync.Pool{
New: func() interface{} {
return make([]*TransactionWithSlot, 0, 1000)
},
}

func getTransactionWithSlotSlice() []*TransactionWithSlot {
return poolOfTransactionWithSlotSlices.Get().([]*TransactionWithSlot)
}

func PutTransactionWithSlotSlice(slice []*TransactionWithSlot) {
slice = slice[:0]
poolOfTransactionWithSlotSlices.Put(slice)
}

var poolDataBlocksMap = sync.Pool{
New: func() interface{} {
return make(map[string]ObjectWithMetadata, 0)
},
}

func clearDataBlocksMap(m map[string]ObjectWithMetadata) {
for k := range m {
delete(m, k)
}
}

func getDatablocksMap() map[string]ObjectWithMetadata {
return poolDataBlocksMap.Get().(map[string]ObjectWithMetadata)
}

func putDataBlocksMap(m map[string]ObjectWithMetadata) {
clearDataBlocksMap(m)
poolDataBlocksMap.Put(m)
}

func ObjectsToTransactionsAndMetadata(
block *ipldbindcode.Block,
objects []ObjectWithMetadata,
) ([]*TransactionWithSlot, error) {
transactions := getTransactionWithSlotSlice()
dataBlocksMap := getDatablocksMap()
defer putDataBlocksMap(dataBlocksMap)
for objI := range objects {
object := objects[objI]
// check if the object is a transaction:
kind := iplddecoders.Kind(object.ObjectData[1])
if kind == iplddecoders.KindDataFrame {
dataBlocksMap[object.Cid.String()] = object
continue
}
if kind != iplddecoders.KindTransaction {
continue
}
decodedTxObj, err := iplddecoders.DecodeTransaction(object.ObjectData)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex %s: %w", object.Cid, err)
}
tws := &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decodedTxObj.Slot),
Blocktime: uint64(block.Meta.Blocktime),
}
tx, err := decodedTxObj.GetSolanaTransaction()
if err != nil {
return nil, fmt.Errorf("error while getting solana transaction from object %s: %w", object.Cid, err)
}
tws.Transaction = *tx
sigs := tx.Signatures
if len(sigs) == 0 {
return nil, fmt.Errorf("transaction has no signatures: %s", object.Cid)
}
sig := sigs[0]

if total, ok := decodedTxObj.Metadata.GetTotal(); !ok || total == 1 {
// metadata fit into the transaction object:
completeBuffer := decodedTxObj.Metadata.Bytes()
if ha, ok := decodedTxObj.Metadata.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
return nil, fmt.Errorf("failed to verify metadata hash: %w", err)
}
}
if len(completeBuffer) > 0 {
uncompressedMeta, err := tooling.DecompressZstd(completeBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMetaContainer(uncompressedMeta)
if err == nil {
tws.Metadata = status
} else {
tws.Error = newTxMetaErrorParseError(sig, err)
}
} else {
tws.Error = newTxMetaErrorNotFound(sig, fmt.Errorf("metadata is empty"))
}
clearDataBlocksMap(dataBlocksMap)
} else {
// metadata didn't fit into the transaction object, and was split into multiple dataframes:
metaBuffer, err := tooling.LoadDataFromDataFrames(
&decodedTxObj.Metadata,
func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) {
if dataBlock, ok := dataBlocksMap[wantedCid.String()]; ok {
df, err := iplddecoders.DecodeDataFrame(dataBlock.ObjectData)
if err != nil {
return nil, err
}
return df, nil
}
return nil, fmt.Errorf("dataframe not found")
})
if err != nil {
return nil, fmt.Errorf("failed to load metadata: %w", err)
}
// clear dataBlocksMap so it can accumulate dataframes for the next transaction:
clearDataBlocksMap(dataBlocksMap)

// if we have a metadata buffer, try to decompress it:
if len(metaBuffer) > 0 {
uncompressedMeta, err := tooling.DecompressZstd(metaBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMetaContainer(uncompressedMeta)
if err == nil {
tws.Metadata = status
} else {
tws.Error = newTxMetaErrorParseError(sig, err)
}
} else {
tws.Error = newTxMetaErrorNotFound(sig, fmt.Errorf("metadata is empty"))
}
}

transactions = append(transactions, tws)
}
return transactions, nil
}
3 changes: 2 additions & 1 deletion adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"

"github.com/mr-tron/base58"
solanaerrors "github.com/rpcpool/yellowstone-faithful/solana-errors"
)

func ptrToUint64(v uint64) *uint64 {
Expand Down Expand Up @@ -34,7 +35,7 @@ func adaptTransactionMetaToExpectedOutput(m map[string]any) map[string]any {
}
{
if _, ok := meta["err"]; ok {
meta["err"], _ = parseTransactionError(meta["err"])
meta["err"], _ = solanaerrors.ParseTransactionError(meta["err"])
} else {
meta["err"] = nil
}
Expand Down
2 changes: 1 addition & 1 deletion carreader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

type CarReader struct {
headerSize *uint64
br *bufio.Reader
Header *carv1.CarHeader
br *bufio.Reader
}

func alignValueToPageSize(value int) int {
Expand Down
Loading

0 comments on commit b569841

Please sign in to comment.