diff --git a/grpc-server.go b/grpc-server.go index 7eabe30c..3215c3a4 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -28,6 +28,8 @@ import ( "k8s.io/klog/v2" ) +const maxSlotsToStream uint64 = 100 + // ListeAndServe starts listening on the configured address and serves the RPC API. func (me *MultiEpoch) ListenAndServeGRPC(ctx context.Context, listenOn string) error { lis, err := net.Listen("tcp", listenOn) @@ -623,5 +625,69 @@ func (multi *MultiEpoch) GetBlockTime(ctx context.Context, params *old_faithful_ } func (multi *MultiEpoch) StreamBlocks(params *old_faithful_grpc.StreamBlocksRequest, ser old_faithful_grpc.OldFaithful_StreamBlocksServer) error { + ctx := ser.Context() + + startSlot := params.StartSlot + endSlot := startSlot + maxSlotsToStream + + if params.EndSlot != nil { + endSlot = *params.EndSlot + } + + filterFunc := func(block *old_faithful_grpc.BlockResponse) bool { + if params.Filter == nil || len(params.Filter.AccountInclude) == 0 { + retrun true + } + + return blockContainsAccounts(block, params.Filter.AccountInclude) + } + + for slot := startSlot; slot <= endSlot; slot++ { + select { + case <- ctx.Done(): + return ctx.Err() + default: + } + + epochNumber := CalcEpochForSlot(slot) + epochHandler, err := multi.GetEpoch(epochNumber) + if err != nil { + klog.Warningf("Epoch %d not available, skipping slot %d", epochNumber, slot) + continue + } + + block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot}) + if err != nil { + if status.Code(err) == codes.NotFound { + continue // is this the right thing to do? + } + return err + } + + if filterFunc(block) { + if err := ser.Send(block); err != nil { + return err + } + } + } + return nil } + +func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []string) bool { + accountSet := make(map[string]struct{}, len(accounts)) + for _, acc := range accounts { + accountSet[acc] = struct{}{} + } + + for _, tx := range block.Transactions { + for _, acc := range tx.Transation.Message.AccountKeys { + if _, exists := accountSet[string(acc)]; exists { + return true + } + } + + } + + return false +}