Skip to content

Commit

Permalink
gSFA: fix performance issues (#99)
Browse files Browse the repository at this point in the history
* Refactor gSFA index

* Make sure everything gets flushed

* Cleanup

* Add note

* Cleanup

* Save more data in the gsfa index to not require looking up the block.

* Cleanup

* Add carreader

* Accum

* Accum cleanup

* Remove store

* Use accum to create gSFA index

* Fix jsonParsed

* Fix jsonParsed

* Cleanup

* debug

* Refactor

* Fix all

* Cleanup

* Don't convert data to base58

* Fix gSFA

* Fix legacy error decoding

* Unknown error

* Ignore target

* Fix tests

* Fix test_decoder_u16

* Fix test
  • Loading branch information
gagliardetto authored Jun 27, 2024
1 parent e433d66 commit 7a528ee
Show file tree
Hide file tree
Showing 87 changed files with 2,424 additions and 10,945 deletions.
179 changes: 179 additions & 0 deletions accum/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package accum

import (
"context"
"errors"
"io"
"sync"

"github.com/ipfs/go-cid"
"github.com/rpcpool/yellowstone-faithful/carreader"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
)

type ObjectAccumulator struct {
flushOnKind iplddecoders.Kind
reader *carreader.CarReader
ignoreKinds iplddecoders.KindSlice
callback func(*ObjectWithMetadata, []ObjectWithMetadata) error
flushWg sync.WaitGroup
flushQueue chan *flushBuffer
}

var ErrStop = errors.New("stop")

func isStop(err error) bool {
return errors.Is(err, ErrStop)
}

func NewObjectAccumulator(
reader *carreader.CarReader,
flushOnKind iplddecoders.Kind,
callback func(*ObjectWithMetadata, []ObjectWithMetadata) error,
ignoreKinds ...iplddecoders.Kind,
) *ObjectAccumulator {
return &ObjectAccumulator{
reader: reader,
ignoreKinds: ignoreKinds,
flushOnKind: flushOnKind,
callback: callback,
flushQueue: make(chan *flushBuffer, 1000),
}
}

var flushBufferPool = sync.Pool{
New: func() interface{} {
return &flushBuffer{}
},
}

func getFlushBuffer() *flushBuffer {
return flushBufferPool.Get().(*flushBuffer)
}

func putFlushBuffer(fb *flushBuffer) {
fb.Reset()
flushBufferPool.Put(fb)
}

type flushBuffer struct {
head *ObjectWithMetadata
other []ObjectWithMetadata
}

// Reset resets the flushBuffer.
func (fb *flushBuffer) Reset() {
fb.head = nil
clear(fb.other)
}

type ObjectWithMetadata struct {
Cid cid.Cid
Offset uint64
SectionLength uint64
ObjectData []byte
}

func (oa *ObjectAccumulator) startFlusher(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case fb := <-oa.flushQueue:
if fb == nil {
return
}
if err := oa.flush(fb.head, fb.other); err != nil {
if isStop(err) {
return
}
panic(err)
}
oa.flushWg.Done()
putFlushBuffer(fb)
}
}
}

func (oa *ObjectAccumulator) sendToFlusher(head *ObjectWithMetadata, other []ObjectWithMetadata) {
oa.flushWg.Add(1)
fb := getFlushBuffer()
fb.head = head
fb.other = clone(other)
oa.flushQueue <- fb
}

func (oa *ObjectAccumulator) Run(ctx context.Context) error {
go oa.startFlusher(ctx)
defer func() {
oa.flushWg.Wait()
close(oa.flushQueue)
}()
totalOffset := uint64(0)
{
if size, err := oa.reader.HeaderSize(); err != nil {
return err
} else {
totalOffset += size
}
}
objectCap := 5000
buffersLoop:
for {
objects := make([]ObjectWithMetadata, 0, objectCap)
currentBufferLoop:
for {
if ctx.Err() != nil {
return ctx.Err()
}
c, sectionLength, data, err := oa.reader.NextNodeBytes()
if err != nil {
if errors.Is(err, io.EOF) {
oa.sendToFlusher(nil, objects)
break buffersLoop
}
return err
}
currentOffset := totalOffset
totalOffset += sectionLength

if data == nil {
oa.sendToFlusher(nil, objects)
break buffersLoop
}

objm := ObjectWithMetadata{
Cid: c,
Offset: currentOffset,
SectionLength: sectionLength,
ObjectData: data,
}

kind := iplddecoders.Kind(data[1])
if kind == oa.flushOnKind {
oa.sendToFlusher(&objm, (objects))
break currentBufferLoop
} else {
if len(oa.ignoreKinds) > 0 && oa.ignoreKinds.Has(kind) {
continue
}
objects = append(objects, objm)
}
}
}

return nil
}

func (oa *ObjectAccumulator) flush(head *ObjectWithMetadata, other []ObjectWithMetadata) error {
if head == nil && len(other) == 0 {
return nil
}
return oa.callback(head, other)
}

func clone[T any](s []T) []T {
v := make([]T, len(s))
copy(v, s)
return v
}
23 changes: 12 additions & 11 deletions adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,18 @@ func adaptTransactionMetaToExpectedOutput(m map[string]any) map[string]any {
} else {
instruction["accounts"] = []any{}
}
if data, ok := instruction["data"]; ok {
// as string
dataStr, ok := data.(string)
if ok {
decoded, err := base64.StdEncoding.DecodeString(dataStr)
if err == nil {
// TODO: the data in the `innerInstructions` is always base58 encoded (even if the transaction is base64 encoded)
instruction["data"] = base58.Encode(decoded)
}
}
}
// if data, ok := instruction["data"]; ok {
// // as string
// dataStr, ok := data.(string)
// if ok {
// decoded, err := base64.StdEncoding.DecodeString(dataStr)
// if err == nil {
// // TODO: the data in the `innerInstructions` is always base58 encoded (even if the transaction is base64 encoded)
// // instruction["data"] = base58.Encode(decoded)
// _ = decoded
// }
// }
// }
}
}
meta["innerInstructions"].([]any)[i] = innerInstruction
Expand Down
188 changes: 188 additions & 0 deletions carreader/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package carreader

import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/ipfs/go-libipfs/blocks"
carv1 "github.com/ipld/go-car"
"github.com/ipld/go-car/util"
"github.com/rpcpool/yellowstone-faithful/readahead"
)

type CarReader struct {
headerSize *uint64
br *bufio.Reader
Header *carv1.CarHeader
}

func alignValueToPageSize(value int) int {
pageSize := os.Getpagesize()
return (value + pageSize - 1) &^ (pageSize - 1)
}

func New(r io.ReadCloser) (*CarReader, error) {
br := bufio.NewReaderSize(r, alignValueToPageSize(readahead.DefaultChunkSize))
ch, err := ReadHeader(br)
if err != nil {
return nil, err
}

if ch.Version != 1 {
return nil, fmt.Errorf("invalid car version: %d", ch.Version)
}

if len(ch.Roots) == 0 {
return nil, fmt.Errorf("empty car, no roots")
}

return &CarReader{
br: br,
Header: ch,
}, nil
}

func ReadHeader(br io.Reader) (*carv1.CarHeader, error) {
hb, err := util.LdRead(bufio.NewReader(br))
if err != nil {
return nil, err
}

var ch carv1.CarHeader
if err := cbor.DecodeInto(hb, &ch); err != nil {
return nil, fmt.Errorf("invalid header: %v", err)
}

return &ch, nil
}

func (cr *CarReader) NextInfo() (cid.Cid, uint64, error) {
c, sectionLen, err := ReadNodeInfoWithoutData(cr.br)
if err != nil {
return c, 0, err
}
return c, sectionLen, nil
}

func (cr *CarReader) NextNode() (cid.Cid, uint64, *blocks.BasicBlock, error) {
c, sectionLen, data, err := ReadNodeInfoWithData(cr.br)
if err != nil {
return c, 0, nil, fmt.Errorf("failed to read node info: %w", err)
}
bl, err := blocks.NewBlockWithCid(data, c)
if err != nil {
return c, 0, nil, fmt.Errorf("failed to create block: %w", err)
}
return c, sectionLen, bl, nil
}

func (cr *CarReader) NextNodeBytes() (cid.Cid, uint64, []byte, error) {
c, sectionLen, data, err := ReadNodeInfoWithData(cr.br)
if err != nil {
return c, 0, nil, fmt.Errorf("failed to read node info: %w", err)
}
return c, sectionLen, data, nil
}

func (cr *CarReader) HeaderSize() (uint64, error) {
if cr.headerSize == nil {
var buf bytes.Buffer
if err := carv1.WriteHeader(cr.Header, &buf); err != nil {
return 0, err
}
size := uint64(buf.Len())
cr.headerSize = &size
}
return *cr.headerSize, nil
}

func ReadNodeInfoWithoutData(br *bufio.Reader) (cid.Cid, uint64, error) {
sectionLen, ll, err := ReadSectionLength(br)
if err != nil {
return cid.Cid{}, 0, err
}

cidLen, c, err := cid.CidFromReader(br)
if err != nil {
return cid.Cid{}, 0, err
}

// Seek to the next section by skipping the block.
// The section length includes the CID, so subtract it.
remainingSectionLen := int64(sectionLen) - int64(cidLen)

_, err = io.CopyN(io.Discard, br, remainingSectionLen)
if err != nil {
return cid.Cid{}, 0, err
}

return c, sectionLen + ll, nil
}

func ReadNodeInfoWithData(br *bufio.Reader) (cid.Cid, uint64, []byte, error) {
sectionLen, ll, err := ReadSectionLength(br)
if err != nil {
return cid.Cid{}, 0, nil, fmt.Errorf("failed to read section length: %w", err)
}

cidLen, c, err := cid.CidFromReader(br)
if err != nil {
return cid.Cid{}, 0, nil, fmt.Errorf("failed to read cid: %w", err)
}

// Seek to the next section by skipping the block.
// The section length includes the CID, so subtract it.
remainingSectionLen := int64(sectionLen) - int64(cidLen)

buf := make([]byte, remainingSectionLen)
_, err = io.ReadFull(br, buf)
if err != nil {
return cid.Cid{}, 0, nil, fmt.Errorf("failed to read block: %w", err)
}

return c, sectionLen + ll, buf, nil
}

func ReadSectionLength(r *bufio.Reader) (uint64, uint64, error) {
if _, err := r.Peek(1); err != nil { // no more blocks, likely clean io.EOF
if errors.Is(err, io.ErrNoProgress) {
return 0, 0, io.EOF
}
return 0, 0, fmt.Errorf("failed to peek: %w", err)
}

br := byteReaderWithCounter{r, 0}
l, err := binary.ReadUvarint(&br)
if err != nil {
if errors.Is(err, io.EOF) {
return 0, 0, io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF
}
return 0, 0, err
}

if l > uint64(util.MaxAllowedSectionSize) { // Don't OOM
return 0, 0, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize")
}

return l, br.Offset, nil
}

type byteReaderWithCounter struct {
io.ByteReader
Offset uint64
}

func (b *byteReaderWithCounter) ReadByte() (byte, error) {
c, err := b.ByteReader.ReadByte()
if err == nil {
b.Offset++
}
return c, err
}
Loading

0 comments on commit 7a528ee

Please sign in to comment.