Skip to content

Commit

Permalink
Improve errors and speedup tx search (#87)
Browse files Browse the repository at this point in the history
* Improve errors

* Search all epochs in parallel
  • Loading branch information
gagliardetto authored Feb 20, 2024
1 parent dfc08d2 commit 47b4f23
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 58 deletions.
62 changes: 53 additions & 9 deletions bucketteer/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package bucketteer
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"math"
"os"

bin "github.com/gagliardetto/binary"
"github.com/rpcpool/yellowstone-faithful/indexmeta"
Expand Down Expand Up @@ -49,20 +51,62 @@ func uint16ToPrefix(num uint16) [2]byte {
// Open opens a Bucketteer file in read-only mode,
// using memory-mapped IO.
func Open(path string) (*Reader, error) {
empty, err := isEmptyFile(path)
if err != nil {
return nil, err
}
if empty {
return nil, fmt.Errorf("file is empty: %s", path)
}
file, err := mmap.Open(path)
if err != nil {
return nil, err
}
return NewReader(file)
}

func isEmptyFile(path string) (bool, error) {
file, err := os.Open(path)
if err != nil {
return false, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return false, err
}
return stat.Size() == 0, nil
}

func isReaderEmpty(reader io.ReaderAt) (bool, error) {
if reader == nil {
return false, errors.New("reader is nil")
}
buf := make([]byte, 1)
_, err := reader.ReadAt(buf, 0)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true, nil
}
return false, err
}
return len(buf) == 0, nil
}

func NewReader(reader io.ReaderAt) (*Reader, error) {
empty, err := isReaderEmpty(reader)
if err != nil {
return nil, fmt.Errorf("failed to check if reader is empty: %w", err)
}
if empty {
return nil, fmt.Errorf("reader is empty")
}
r := &Reader{
prefixToOffset: newUint16LayoutPointer(),
}
prefixToOffset, meta, headerTotalSize, err := readHeader(reader)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read header: %w", err)
}
r.meta = meta
r.prefixToOffset = prefixToOffset
Expand Down Expand Up @@ -95,12 +139,12 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er
// read header size:
headerSize, err := readHeaderSize(reader)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read header size: %w", err)
}
// read header bytes:
headerBuf := make([]byte, headerSize)
if _, err := reader.ReadAt(headerBuf, 4); err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read header bytes: %w", err)
}
// decode header:
decoder := bin.NewBorshDecoder(headerBuf)
Expand All @@ -110,7 +154,7 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er
magicBuf := make([]byte, len(_Magic[:]))
_, err := decoder.Read(magicBuf)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read magic: %w", err)
}
if !bytes.Equal(magicBuf, _Magic[:]) {
return nil, nil, 0, fmt.Errorf("invalid magic: %x", string(magicBuf))
Expand All @@ -120,7 +164,7 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er
{
got, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read version: %w", err)
}
if got != Version {
return nil, nil, 0, fmt.Errorf("expected version %d, got %d", Version, got)
Expand All @@ -135,19 +179,19 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er
// numPrefixes:
numPrefixes, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read numPrefixes: %w", err)
}
// prefix -> offset:
prefixToOffset := newUint16Layout()
for i := uint64(0); i < numPrefixes; i++ {
var prefix [2]byte
_, err := decoder.Read(prefix[:])
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read prefixes[%d]: %w", i, err)
}
offset, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read offsets[%d]: %w", i, err)
}
prefixToOffset[prefixToUint16(prefix)] = offset
}
Expand Down Expand Up @@ -176,7 +220,7 @@ func (r *Reader) Has(sig [64]byte) (bool, error) {
return readUint64Le(bucketReader, pos)
})
if err != nil {
if err == ErrNotFound {
if errors.Is(err, ErrNotFound) {
return false, nil
}
return false, err
Expand Down
48 changes: 39 additions & 9 deletions bucketteer/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type Writer struct {
path string
destination *os.File
writer *bufio.Writer
prefixToHashes *prefixToHashes // prefix -> hashes
Expand All @@ -22,7 +23,11 @@ type Writer struct {
type prefixToHashes [math.MaxUint16 + 1][]uint64 // prefix -> hashes

func newPrefixToHashes() *prefixToHashes {
return &prefixToHashes{}
var out prefixToHashes
for i := range out {
out[i] = make([]uint64, 0, 16_000)
}
return &out
}

const (
Expand All @@ -41,13 +46,9 @@ func NewWriter(path string) (*Writer, error) {
} else if !ok {
return nil, fmt.Errorf("file already exists and is not empty: %s", path)
}
file, err := os.Create(path)
if err != nil {
return nil, err
}

return &Writer{
writer: bufio.NewWriterSize(file, writeBufSize),
destination: file,
path: path,
prefixToHashes: newPrefixToHashes(),
}, nil
}
Expand All @@ -57,7 +58,8 @@ func NewWriter(path string) (*Writer, error) {
func (b *Writer) Put(sig [64]byte) {
var prefix [2]byte
copy(prefix[:], sig[:2])
b.prefixToHashes[prefixToUint16(prefix)] = append(b.prefixToHashes[prefixToUint16(prefix)], Hash(sig))
pU16 := prefixToUint16(prefix)
b.prefixToHashes[pU16] = append(b.prefixToHashes[pU16], Hash(sig))
}

// Has returns true if the Bucketteer has seen the given signature.
Expand All @@ -74,11 +76,29 @@ func (b *Writer) Has(sig [64]byte) bool {
}

func (b *Writer) Close() error {
if b.writer != nil {
if err := b.writer.Flush(); err != nil {
return fmt.Errorf("failed to flush writer: %w", err)
}
}
if b.destination == nil {
return nil
}
if err := b.destination.Sync(); err != nil {
return fmt.Errorf("failed to sync file: %w", err)
}
return b.destination.Close()
}

// Seal writes the Bucketteer's state to the given writer.
func (b *Writer) Seal(meta indexmeta.Meta) (int64, error) {
file, err := os.Create(b.path)
if err != nil {
return 0, fmt.Errorf("failed to create file: %w", err)
}
b.writer = bufio.NewWriterSize(file, writeBufSize)
b.destination = file

// truncate file and seek to beginning:
if err := b.destination.Truncate(0); err != nil {
return 0, err
Expand All @@ -90,6 +110,16 @@ func (b *Writer) Seal(meta indexmeta.Meta) (int64, error) {
if err != nil {
return 0, err
}
{
// flush the writer:
if err := b.writer.Flush(); err != nil {
return 0, fmt.Errorf("failed to flush writer: %w", err)
}
// sync the file:
if err := b.destination.Sync(); err != nil {
return 0, fmt.Errorf("failed to sync file: %w", err)
}
}
return size, overwriteFileContentAt(b.destination, 0, newHeader)
}

Expand Down Expand Up @@ -232,7 +262,7 @@ func seal(
updatedHeader, err := createHeader(
_Magic,
Version,
uint32(headerSize-4), // -4 because we don't count the header size itself
uint32(headerSize-4), // -4 because we don't count the header size itself (it's a uint32, so 4 bytes long)
meta,
prefixToOffset,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd-x-index-all.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newCmd_Index_all() *cli.Command {
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "verify",
Usage: "verify the index after creating it",
Usage: "verify the indexes after creating them",
Destination: &verify,
},
&cli.StringFlag{
Expand Down
64 changes: 54 additions & 10 deletions deprecated/bucketteer/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package bucketteer
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"

bin "github.com/gagliardetto/binary"
"golang.org/x/exp/mmap"
Expand All @@ -19,14 +21,56 @@ type Reader struct {
// Open opens a Bucketteer file in read-only mode,
// using memory-mapped IO.
func Open(path string) (*Reader, error) {
empty, err := isEmptyFile(path)
if err != nil {
return nil, err
}
if empty {
return nil, fmt.Errorf("file is empty: %s", path)
}
file, err := mmap.Open(path)
if err != nil {
return nil, err
}
return NewReader(file)
}

func isEmptyFile(path string) (bool, error) {
file, err := os.Open(path)
if err != nil {
return false, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return false, err
}
return stat.Size() == 0, nil
}

func isReaderEmpty(reader io.ReaderAt) (bool, error) {
if reader == nil {
return false, errors.New("reader is nil")
}
buf := make([]byte, 1)
_, err := reader.ReadAt(buf, 0)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return true, nil
}
return false, err
}
return len(buf) == 0, nil
}

func NewReader(reader io.ReaderAt) (*Reader, error) {
empty, err := isReaderEmpty(reader)
if err != nil {
return nil, fmt.Errorf("failed to check if reader is empty: %w", err)
}
if empty {
return nil, fmt.Errorf("reader is empty")
}
r := &Reader{
prefixToOffset: make(map[[2]byte]uint64),
}
Expand Down Expand Up @@ -71,12 +115,12 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6
// read header size:
headerSize, err := readHeaderSize(reader)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read header size: %w", err)
}
// read header bytes:
headerBuf := make([]byte, headerSize)
if _, err := reader.ReadAt(headerBuf, 4); err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read header bytes: %w", err)
}
// decode header:
decoder := bin.NewBorshDecoder(headerBuf)
Expand All @@ -86,7 +130,7 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6
magicBuf := make([]byte, len(_Magic[:]))
_, err := decoder.Read(magicBuf)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read magic: %w", err)
}
if !bytes.Equal(magicBuf, _Magic[:]) {
return nil, nil, 0, fmt.Errorf("invalid magic: %x", string(magicBuf))
Expand All @@ -96,7 +140,7 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6
{
got, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read version: %w", err)
}
if got != Version {
return nil, nil, 0, fmt.Errorf("expected version %d, got %d", Version, got)
Expand All @@ -106,37 +150,37 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6
// read meta:
numMeta, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read numMeta: %w", err)
}
meta := make(map[string]string, numMeta)
for i := uint64(0); i < numMeta; i++ {
key, err := decoder.ReadString()
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read meta[%d].key: %w", i, err)
}
value, err := decoder.ReadString()
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read meta[%d].value: %w", i, err)
}
meta[key] = value
}
}
// numPrefixes:
numPrefixes, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read numPrefixes: %w", err)
}
// prefix -> offset:
prefixToOffset := make(map[[2]byte]uint64, numPrefixes)
for i := uint64(0); i < numPrefixes; i++ {
var prefix [2]byte
_, err := decoder.Read(prefix[:])
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read prefixes[%d]: %w", i, err)
}
offset, err := decoder.ReadUint64(bin.LE)
if err != nil {
return nil, nil, 0, err
return nil, nil, 0, fmt.Errorf("failed to read offsets[%d]: %w", i, err)
}
prefixToOffset[prefix] = offset
}
Expand Down
Loading

0 comments on commit 47b4f23

Please sign in to comment.