diff --git a/grpc-server.go b/grpc-server.go index d51832f1..d3c5e3d2 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -696,5 +696,60 @@ func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []st } return false +} + +func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTransactionsRequest, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer) error { + ctx := ser.Context() + + startSlot := params.StartSlot + endSlot := startSlot + maxSlotsToStream + + if params.EndSlot != nil { + endSlot = *params.EndSlot + } + + filterFunc := func(tx *old_faithful_grpc.Transaction) bool { + if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { + return true + } + return txContainsAccounts(tx, params.Filter.AccountInclude) + } + + for slot := startSlot; slot <= endSlot; slot++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil { + if status.Code(err) == codes.NotFound { + continue // is this the right thing to do? + } + return err + } + + for _, tx := range block.Transactions { + if filterFunc(tx) { + if err := ser.Send(constructTransactionResponse(tx)); err != nil { + return err + } + } + } + } + + return nil +} + +func txContainsAccounts(tx *old_faithful_grpc.Transaction, accounts []string) bool { + return true +} + +func constructTransactionResponse(tx *old_faithful_grpc.Transaction) *old_faithful_grpc.TransactionResponse { + // to do + return &old_faithful_grpc.TransactionResponse{ + Transaction: tx, + } }