Skip to content

Commit

Permalink
filters
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Dec 3, 2024
1 parent 3e5da45 commit f76cb83
Showing 1 changed file with 81 additions and 65 deletions.
146 changes: 81 additions & 65 deletions grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ipld/go-car/util"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
Expand Down Expand Up @@ -718,75 +719,15 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran
default:
}

if params.Filter == nil || len(params.Filter.AccountInclude) == 0 {
if err := multi.streamAllTxns(ctx, ser, slot); err != nil {
return err
}
} else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 {
for _, account := range params.Filter.AccountInclude {
pKey := solana.MustPublicKeyFromBase58(account)
epochToTxns, err := gsfaReader.Get(
ctx,
pKey,
1000,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return nil, fmt.Errorf("failed to get signature: %w", err)
}
decoded, err := iplddecoders.DecodeTransaction(raw)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err)
}
return decoded, nil
},
)

if err != nil {
return err
}

for epochNumber, txns := range epochToTxns {
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber)
}
for _, txn := range txns {
txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)
{
pos, ok := txn.GetPositionIndex()
if ok {
txResp.Index = ptrToUint64(uint64(pos))
txResp.Transaction.Index = ptrToUint64(uint64(pos))
}
txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to get transaction: %v", err)
}
txResp.Slot = uint64(txn.Slot)
// What to do for blocktime?
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
}
}
if err := multi.processSlotTransactions(ctx, ser, slot, params.filter, gsfareader); err != nil {
return err
}
}

return nil
}



func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_faithful_grpc.BlockResponse) *old_faithful_grpc.TransactionResponse {
return &old_faithful_grpc.TransactionResponse{
Transaction: tx,
Expand All @@ -809,3 +750,78 @@ func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grp
}
return nil
}


func (multi *MultiEpoch) processSlotTransactions(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter, gsfaReader *gsfa.GsfaReaderMultiepoch) error {
if filter == nil {
return multi.streamAllTxns(ctx, ser, slot)
}

if len(filter.AccountInclude) == 0 {
if err := multi.streamAllTxns(ctx, ser, slot); err != nil {
return err
}
} else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 {
for _, account := range params.Filter.AccountInclude {
pKey := solana.MustPublicKeyFromBase58(account)
epochToTxns, err := gsfaReader.Get(
ctx,
pKey,
1000,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return nil, fmt.Errorf("failed to get signature: %w", err)
}
decoded, err := iplddecoders.DecodeTransaction(raw)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err)
}
return decoded, nil
},
)

if err != nil {
return err
}

for epochNumber, txns := range epochToTxns {
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber)
}
for _, txn := range txns {
txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)
{
pos, ok := txn.GetPositionIndex()
if ok {
txResp.Index = ptrToUint64(uint64(pos))
txResp.Transaction.Index = ptrToUint64(uint64(pos))
}
txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to get transaction: %v", err)
}
txResp.Slot = uint64(txn.Slot)
// What to do for blocktime?
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
}
}
}

return nil
}

0 comments on commit f76cb83

Please sign in to comment.