diff --git a/bucketteer/read.go b/bucketteer/read.go index d9b678d1..b045832a 100644 --- a/bucketteer/read.go +++ b/bucketteer/read.go @@ -3,9 +3,11 @@ package bucketteer import ( "bytes" "encoding/binary" + "errors" "fmt" "io" "math" + "os" bin "github.com/gagliardetto/binary" "github.com/rpcpool/yellowstone-faithful/indexmeta" @@ -49,6 +51,13 @@ func uint16ToPrefix(num uint16) [2]byte { // Open opens a Bucketteer file in read-only mode, // using memory-mapped IO. func Open(path string) (*Reader, error) { + empty, err := isEmptyFile(path) + if err != nil { + return nil, err + } + if empty { + return nil, fmt.Errorf("file is empty: %s", path) + } file, err := mmap.Open(path) if err != nil { return nil, err @@ -56,13 +65,48 @@ func Open(path string) (*Reader, error) { return NewReader(file) } +func isEmptyFile(path string) (bool, error) { + file, err := os.Open(path) + if err != nil { + return false, err + } + defer file.Close() + stat, err := file.Stat() + if err != nil { + return false, err + } + return stat.Size() == 0, nil +} + +func isReaderEmpty(reader io.ReaderAt) (bool, error) { + if reader == nil { + return false, errors.New("reader is nil") + } + buf := make([]byte, 1) + _, err := reader.ReadAt(buf, 0) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true, nil + } + return false, err + } + return len(buf) == 0, nil +} + func NewReader(reader io.ReaderAt) (*Reader, error) { + empty, err := isReaderEmpty(reader) + if err != nil { + return nil, fmt.Errorf("failed to check if reader is empty: %w", err) + } + if empty { + return nil, fmt.Errorf("reader is empty") + } r := &Reader{ prefixToOffset: newUint16LayoutPointer(), } prefixToOffset, meta, headerTotalSize, err := readHeader(reader) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read header: %w", err) } r.meta = meta r.prefixToOffset = prefixToOffset @@ -95,12 +139,12 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er // read header size: headerSize, err := readHeaderSize(reader) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read header size: %w", err) } // read header bytes: headerBuf := make([]byte, headerSize) if _, err := reader.ReadAt(headerBuf, 4); err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read header bytes: %w", err) } // decode header: decoder := bin.NewBorshDecoder(headerBuf) @@ -110,7 +154,7 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er magicBuf := make([]byte, len(_Magic[:])) _, err := decoder.Read(magicBuf) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read magic: %w", err) } if !bytes.Equal(magicBuf, _Magic[:]) { return nil, nil, 0, fmt.Errorf("invalid magic: %x", string(magicBuf)) @@ -120,7 +164,7 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er { got, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read version: %w", err) } if got != Version { return nil, nil, 0, fmt.Errorf("expected version %d, got %d", Version, got) @@ -135,7 +179,7 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er // numPrefixes: numPrefixes, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read numPrefixes: %w", err) } // prefix -> offset: prefixToOffset := newUint16Layout() @@ -143,11 +187,11 @@ func readHeader(reader io.ReaderAt) (*bucketToOffset, *indexmeta.Meta, int64, er var prefix [2]byte _, err := decoder.Read(prefix[:]) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read prefixes[%d]: %w", i, err) } offset, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read offsets[%d]: %w", i, err) } prefixToOffset[prefixToUint16(prefix)] = offset } @@ -176,7 +220,7 @@ func (r *Reader) Has(sig [64]byte) (bool, error) { return readUint64Le(bucketReader, pos) }) if err != nil { - if err == ErrNotFound { + if errors.Is(err, ErrNotFound) { return false, nil } return false, err diff --git a/bucketteer/write.go b/bucketteer/write.go index e6691f6f..bc4f8024 100644 --- a/bucketteer/write.go +++ b/bucketteer/write.go @@ -14,6 +14,7 @@ import ( ) type Writer struct { + path string destination *os.File writer *bufio.Writer prefixToHashes *prefixToHashes // prefix -> hashes @@ -22,7 +23,11 @@ type Writer struct { type prefixToHashes [math.MaxUint16 + 1][]uint64 // prefix -> hashes func newPrefixToHashes() *prefixToHashes { - return &prefixToHashes{} + var out prefixToHashes + for i := range out { + out[i] = make([]uint64, 0, 16_000) + } + return &out } const ( @@ -41,13 +46,9 @@ func NewWriter(path string) (*Writer, error) { } else if !ok { return nil, fmt.Errorf("file already exists and is not empty: %s", path) } - file, err := os.Create(path) - if err != nil { - return nil, err - } + return &Writer{ - writer: bufio.NewWriterSize(file, writeBufSize), - destination: file, + path: path, prefixToHashes: newPrefixToHashes(), }, nil } @@ -57,7 +58,8 @@ func NewWriter(path string) (*Writer, error) { func (b *Writer) Put(sig [64]byte) { var prefix [2]byte copy(prefix[:], sig[:2]) - b.prefixToHashes[prefixToUint16(prefix)] = append(b.prefixToHashes[prefixToUint16(prefix)], Hash(sig)) + pU16 := prefixToUint16(prefix) + b.prefixToHashes[pU16] = append(b.prefixToHashes[pU16], Hash(sig)) } // Has returns true if the Bucketteer has seen the given signature. @@ -74,11 +76,29 @@ func (b *Writer) Has(sig [64]byte) bool { } func (b *Writer) Close() error { + if b.writer != nil { + if err := b.writer.Flush(); err != nil { + return fmt.Errorf("failed to flush writer: %w", err) + } + } + if b.destination == nil { + return nil + } + if err := b.destination.Sync(); err != nil { + return fmt.Errorf("failed to sync file: %w", err) + } return b.destination.Close() } // Seal writes the Bucketteer's state to the given writer. func (b *Writer) Seal(meta indexmeta.Meta) (int64, error) { + file, err := os.Create(b.path) + if err != nil { + return 0, fmt.Errorf("failed to create file: %w", err) + } + b.writer = bufio.NewWriterSize(file, writeBufSize) + b.destination = file + // truncate file and seek to beginning: if err := b.destination.Truncate(0); err != nil { return 0, err @@ -90,6 +110,16 @@ func (b *Writer) Seal(meta indexmeta.Meta) (int64, error) { if err != nil { return 0, err } + { + // flush the writer: + if err := b.writer.Flush(); err != nil { + return 0, fmt.Errorf("failed to flush writer: %w", err) + } + // sync the file: + if err := b.destination.Sync(); err != nil { + return 0, fmt.Errorf("failed to sync file: %w", err) + } + } return size, overwriteFileContentAt(b.destination, 0, newHeader) } @@ -232,7 +262,7 @@ func seal( updatedHeader, err := createHeader( _Magic, Version, - uint32(headerSize-4), // -4 because we don't count the header size itself + uint32(headerSize-4), // -4 because we don't count the header size itself (it's a uint32, so 4 bytes long) meta, prefixToOffset, ) diff --git a/cmd-x-index-all.go b/cmd-x-index-all.go index 5a21e9cf..2a2e3513 100644 --- a/cmd-x-index-all.go +++ b/cmd-x-index-all.go @@ -40,7 +40,7 @@ func newCmd_Index_all() *cli.Command { Flags: []cli.Flag{ &cli.BoolFlag{ Name: "verify", - Usage: "verify the index after creating it", + Usage: "verify the indexes after creating them", Destination: &verify, }, &cli.StringFlag{ diff --git a/deprecated/bucketteer/read.go b/deprecated/bucketteer/read.go index 7c7d2c95..d71b5eaa 100644 --- a/deprecated/bucketteer/read.go +++ b/deprecated/bucketteer/read.go @@ -3,8 +3,10 @@ package bucketteer import ( "bytes" "encoding/binary" + "errors" "fmt" "io" + "os" bin "github.com/gagliardetto/binary" "golang.org/x/exp/mmap" @@ -19,6 +21,13 @@ type Reader struct { // Open opens a Bucketteer file in read-only mode, // using memory-mapped IO. func Open(path string) (*Reader, error) { + empty, err := isEmptyFile(path) + if err != nil { + return nil, err + } + if empty { + return nil, fmt.Errorf("file is empty: %s", path) + } file, err := mmap.Open(path) if err != nil { return nil, err @@ -26,7 +35,42 @@ func Open(path string) (*Reader, error) { return NewReader(file) } +func isEmptyFile(path string) (bool, error) { + file, err := os.Open(path) + if err != nil { + return false, err + } + defer file.Close() + stat, err := file.Stat() + if err != nil { + return false, err + } + return stat.Size() == 0, nil +} + +func isReaderEmpty(reader io.ReaderAt) (bool, error) { + if reader == nil { + return false, errors.New("reader is nil") + } + buf := make([]byte, 1) + _, err := reader.ReadAt(buf, 0) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true, nil + } + return false, err + } + return len(buf) == 0, nil +} + func NewReader(reader io.ReaderAt) (*Reader, error) { + empty, err := isReaderEmpty(reader) + if err != nil { + return nil, fmt.Errorf("failed to check if reader is empty: %w", err) + } + if empty { + return nil, fmt.Errorf("reader is empty") + } r := &Reader{ prefixToOffset: make(map[[2]byte]uint64), } @@ -71,12 +115,12 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 // read header size: headerSize, err := readHeaderSize(reader) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read header size: %w", err) } // read header bytes: headerBuf := make([]byte, headerSize) if _, err := reader.ReadAt(headerBuf, 4); err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read header bytes: %w", err) } // decode header: decoder := bin.NewBorshDecoder(headerBuf) @@ -86,7 +130,7 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 magicBuf := make([]byte, len(_Magic[:])) _, err := decoder.Read(magicBuf) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read magic: %w", err) } if !bytes.Equal(magicBuf, _Magic[:]) { return nil, nil, 0, fmt.Errorf("invalid magic: %x", string(magicBuf)) @@ -96,7 +140,7 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 { got, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read version: %w", err) } if got != Version { return nil, nil, 0, fmt.Errorf("expected version %d, got %d", Version, got) @@ -106,17 +150,17 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 // read meta: numMeta, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read numMeta: %w", err) } meta := make(map[string]string, numMeta) for i := uint64(0); i < numMeta; i++ { key, err := decoder.ReadString() if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read meta[%d].key: %w", i, err) } value, err := decoder.ReadString() if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read meta[%d].value: %w", i, err) } meta[key] = value } @@ -124,7 +168,7 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 // numPrefixes: numPrefixes, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read numPrefixes: %w", err) } // prefix -> offset: prefixToOffset := make(map[[2]byte]uint64, numPrefixes) @@ -132,11 +176,11 @@ func readHeader(reader io.ReaderAt) (map[[2]byte]uint64, map[string]string, int6 var prefix [2]byte _, err := decoder.Read(prefix[:]) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read prefixes[%d]: %w", i, err) } offset, err := decoder.ReadUint64(bin.LE) if err != nil { - return nil, nil, 0, err + return nil, nil, 0, fmt.Errorf("failed to read offsets[%d]: %w", i, err) } prefixToOffset[prefix] = offset } diff --git a/epoch.go b/epoch.go index 861d20af..78eb1249 100644 --- a/epoch.go +++ b/epoch.go @@ -421,17 +421,9 @@ func NewEpochFromConfig( if config.IsDeprecatedIndexes() { sigExists, err := deprecatedbucketter.NewReader(sigExistsFile) if err != nil { - return nil, fmt.Errorf("failed to open sig-exists index: %w", err) + return nil, fmt.Errorf("failed to open (deprecated) sig-exists index: %w", err) } ep.onClose = append(ep.onClose, sigExists.Close) - - // { - // // warm up the cache - // for i := 0; i < 10; i++ { - // sigExists.Has(newRandomSignature()) - // } - // } - ep.sigExists = sigExists } else { sigExists, err := bucketteer.NewReader(sigExistsFile) @@ -439,14 +431,6 @@ func NewEpochFromConfig( return nil, fmt.Errorf("failed to open sig-exists index: %w", err) } ep.onClose = append(ep.onClose, sigExists.Close) - - // { - // // warm up the cache - // for i := 0; i < 10; i++ { - // sigExists.Has(newRandomSignature()) - // } - // } - ep.sigExists = sigExists gotEpoch, ok := sigExists.Meta().GetUint64(indexmeta.MetadataKey_Epoch) diff --git a/multiepoch-getTransaction.go b/multiepoch-getTransaction.go index f290cf6e..e5da594e 100644 --- a/multiepoch-getTransaction.go +++ b/multiepoch-getTransaction.go @@ -74,18 +74,7 @@ func (multi *MultiEpoch) findEpochNumberFromSignature(ctx context.Context, sig s return 0, ErrNotFound } - for _, epochNumber := range found { - epoch, err := multi.GetEpoch(epochNumber) - if err != nil { - return 0, fmt.Errorf("failed to get epoch %d: %v", epochNumber, err) - } - if _, err := epoch.FindCidFromSignature(ctx, sig); err == nil { - return epochNumber, nil - } - } - return 0, ErrNotFound - - // TODO: Search all epochs in parallel: + // Search all epochs in parallel: wg := NewFirstResponse(ctx, multi.options.EpochSearchConcurrency) for i := range numbers { epochNumber := numbers[i]