Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream transactions protobuf #183

Merged
merged 15 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ import (
"sync"
"time"

bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/util"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/tooling"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -696,5 +701,195 @@ func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []st
}

return false
}

func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTransactionsRequest, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer) error {
ctx := ser.Context()

startSlot := params.StartSlot
endSlot := startSlot + maxSlotsToStream

if params.EndSlot != nil {
endSlot = *params.EndSlot
}
gsfaReader, _ := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot)

for slot := startSlot; slot <= endSlot; slot++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := multi.processSlotTransactions(ctx, ser, slot, params.Filter, gsfaReader); err != nil {
return err
}
}
return nil
}

func (multi *MultiEpoch) processSlotTransactions(
ctx context.Context,
ser old_faithful_grpc.OldFaithful_StreamTransactionsServer,
slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter,
gsfaReader *gsfa.GsfaReaderMultiepoch,
) error {

filterOutTxn := func(tx solana.Transaction, meta any) bool {
if filter == nil {
return true
}

if !(*filter.Vote) && IsSimpleVoteTransaction(&tx) { // If vote is false, we should filter out vote transactions
return false
}

if !(*filter.Failed) { // If failed is false, we should filter out failed transactions
err := getErr(meta)
if err != nil {
return false
}
}

// AccountInclude is handled in the main function

for _, acc := range filter.AccountExclude {
pkey := solana.MustPublicKeyFromBase58(acc)
ok, err := tx.HasAccount(pkey)
if err != nil {
klog.Errorf("Failed to check if transaction %v has account %s", tx, acc)
return false
}
if ok { // If any excluded account is present, filter out the transaction
return false
}
}

for _, acc := range filter.AccountRequired {
pkey := solana.MustPublicKeyFromBase58(acc)
ok, err := tx.HasAccount(pkey)
if err != nil {
klog.Errorf("Failed to check if transaction %v has account %s", tx, acc)
return false
}
if !ok { // If any required account is missing, filter out the transaction
return false
}
}

return true
}

if filter == nil || len(filter.AccountInclude) == 0 {

block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot})
if err != nil {
if status.Code(err) == codes.NotFound {
return nil
}
return err
}

for _, tx := range block.Transactions {
decoder := bin.NewBinDecoder(tx.Transaction)
txn, err := solana.TransactionFromDecoder(decoder)
if err != nil {
return status.Errorf(codes.Internal, "Failed to decode transaction: %v", err)
}

meta, err := solanatxmetaparsers.ParseAnyTransactionStatusMeta(tx.Meta)
if err != nil {
return status.Errorf(codes.Internal, "Failed to parse transaction meta: %v", err)
}

if !filterOutTxn(*txn, meta) {

txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)

{
txResp.Transaction.Transaction = tx.Transaction
txResp.Transaction.Meta = tx.Meta
txResp.Transaction.Index = tx.Index

// To do: add blocketime after index work is done
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
} else {
for _, account := range filter.AccountInclude {
pKey := solana.MustPublicKeyFromBase58(account)
epochToTxns, err := gsfaReader.Get(
ctx,
pKey,
1000,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
anjor marked this conversation as resolved.
Show resolved Hide resolved
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return nil, fmt.Errorf("failed to get signature: %w", err)
}
decoded, err := iplddecoders.DecodeTransaction(raw)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err)
}
return decoded, nil
},
)
if err != nil {
return err
}

for epochNumber, txns := range epochToTxns {
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber)
}
for _, txn := range txns {
if slot != uint64(txn.Slot) { // If the transaction is not in the requested slot, skip
continue
}
tx, meta, err := parseTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to parse transaction from node: %v", err)
}

if !filterOutTxn(tx, meta) {

txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)
{
pos, ok := txn.GetPositionIndex()
if ok {
txResp.Index = ptrToUint64(uint64(pos))
txResp.Transaction.Index = ptrToUint64(uint64(pos))
}
txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to get transaction: %v", err)
}
txResp.Slot = uint64(txn.Slot)

// To do: add blocketime after index work is done
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
}
}
}
return nil
}
Loading
Loading