diff --git a/grpc-server.go b/grpc-server.go index e8a514b5..9a474875 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -707,6 +707,7 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran if params.EndSlot != nil { endSlot = *params.EndSlot } + gsfaReader, _ := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot) for slot := startSlot; slot <= endSlot; slot++ { select { @@ -716,19 +717,14 @@ func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTran } if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { - block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) - if err != nil { - if status.Code(err) == codes.NotFound { - continue // is this the right thing to do? - } + if err := multi.streamAllTxns(ctx, ser, slot); err != nil { return err } - - for _, tx := range block.Transactions { - if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { - return err - } + } else if params.Filter != nil && len(params.Filter.AccountInclude) > 0 { + for _, account := range params.Filter.AccountInclude { + gsfaReader.Get() } + } } @@ -743,3 +739,16 @@ func constructTransactionResponse(tx *old_faithful_grpc.Transaction, block *old_ // What to do for index? } } + +func (multi *MultiEpoch) streamAllTxns(ctx context.Context, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer, slot uint64) error { + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil && status.Code(err) != codes.NotFound { + return err + } + + for _, tx := range block.Transactions { + if err := ser.Send(constructTransactionResponse(tx, block)); err != nil { + return err + } + } +}