From f76cb838fad95da37cf1f281aec30c6f71acedf9 Mon Sep 17 00:00:00 2001 From: anjor Date: Mon, 2 Dec 2024 10:44:41 +0000 Subject: [PATCH] filters --- grpc-server.go | 146 +++++++++++++++++++++++++++---------------------- 1 file changed, 81 insertions(+), 65 deletions(-) diff --git a/grpc-server.go b/grpc-server.go index ecd778c3..65ca1c73 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -18,6 +18,7 @@ import ( "github.com/ipld/go-car/util" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/rpcpool/yellowstone-faithful/gsfa" "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" @@ -718,75 +719,15 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran default: } - if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { - if err := multi.streamAllTxns(ctx, ser, slot); err != nil { - return err - } - } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { - for _, account := range params.Filter.AccountInclude { - pKey := solana.MustPublicKeyFromBase58(account) - epochToTxns, err := gsfaReader.Get( - ctx, - pKey, - 1000, - func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { - epoch, err := multi.GetEpoch(epochNum) - if err != nil { - return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) - } - raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ - Offset: oas.Offset, - Size: oas.Size, - }) - if err != nil { - return nil, fmt.Errorf("failed to get signature: %w", err) - } - decoded, err := iplddecoders.DecodeTransaction(raw) - if err != nil { - return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) - } - return decoded, nil - }, - ) - - if err != nil { - return err - } - - for epochNumber, txns := range epochToTxns { - epochHandler, err := multi.GetEpoch(epochNumber) - if err != nil { - return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) - } - for _, txn := range txns { - txResp := new(old_faithful_grpc.TransactionResponse) - txResp.Transaction = new(old_faithful_grpc.Transaction) - { - pos, ok := txn.GetPositionIndex() - if ok { - txResp.Index = ptrToUint64(uint64(pos)) - txResp.Transaction.Index = ptrToUint64(uint64(pos)) - } - txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) - if err != nil { - return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) - } - txResp.Slot = uint64(txn.Slot) - // What to do for blocktime? - } - - if err := ser.Send(txResp); err != nil { - return err - } - } - } - } - } + if err := multi.processSlotTransactions(ctx, ser, slot, params.filter, gsfareader); err != nil { + return err + } } - return nil } + + func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse { return &old_faithful_grpc.TransactionResponse{ Transaction: tx, @@ -809,3 +750,78 @@ func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grp } return nil } + + +func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, gsfaReader *gsfa.GsfaReaderMultiepoch) error { + if filter == nil { + return multi.streamAllTxns(ctx, ser, slot) + } + + if len(filter.AccountInclude) == 0 { + if err := multi.streamAllTxns(ctx, ser, slot); err != nil { + return err + } + } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { + for _, account := range params.Filter.AccountInclude { + pKey := solana.MustPublicKeyFromBase58(account) + epochToTxns, err := gsfaReader.Get( + ctx, + pKey, + 1000, + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + epoch, err := multi.GetEpoch(epochNum) + if err != nil { + return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) + } + raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ + Offset: oas.Offset, + Size: oas.Size, + }) + if err != nil { + return nil, fmt.Errorf("failed to get signature: %w", err) + } + decoded, err := iplddecoders.DecodeTransaction(raw) + if err != nil { + return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) + } + return decoded, nil + }, + ) + + if err != nil { + return err + } + + for epochNumber, txns := range epochToTxns { + epochHandler, err := multi.GetEpoch(epochNumber) + if err != nil { + return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) + } + for _, txn := range txns { + txResp := new(old_faithful_grpc.TransactionResponse) + txResp.Transaction = new(old_faithful_grpc.Transaction) + { + pos, ok := txn.GetPositionIndex() + if ok { + txResp.Index = ptrToUint64(uint64(pos)) + txResp.Transaction.Index = ptrToUint64(uint64(pos)) + } + txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid) + if err != nil { + return status.Errorf(codes.Internal, "Failed to get transaction: %v", err) + } + txResp.Slot = uint64(txn.Slot) + // What to do for blocktime? + } + + if err := ser.Send(txResp); err != nil { + return err + } + } + } + } + } +} + +return nil +}