diff --git a/gsfa/gsfa-read-multiepoch.go b/gsfa/gsfa-read-multiepoch.go index 6141bdd3..c90988cb 100644 --- a/gsfa/gsfa-read-multiepoch.go +++ b/gsfa/gsfa-read-multiepoch.go @@ -50,7 +50,7 @@ func (gsfa *GsfaReaderMultiepoch) Get( ctx context.Context, pk solana.PublicKey, limit int, - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return nil, nil @@ -102,7 +102,7 @@ func (multi *GsfaReaderMultiepoch) GetBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return make(EpochToTransactionObjects), nil @@ -118,7 +118,7 @@ func (multi *GsfaReaderMultiepoch) iterBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return make(EpochToTransactionObjects), nil diff --git a/gsfa/gsfa-read.go b/gsfa/gsfa-read.go index eaa01426..1848c185 100644 --- a/gsfa/gsfa-read.go +++ b/gsfa/gsfa-read.go @@ -93,9 +93,9 @@ func (index *GsfaReader) Get( ctx context.Context, pk solana.PublicKey, limit int, -) ([]linkedlog.OffsetAndSizeAndBlocktime, error) { +) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) { if limit <= 0 { - return []linkedlog.OffsetAndSizeAndBlocktime{}, nil + return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil } lastOffset, err := index.offsets.Get(pk) if err != nil { @@ -106,7 +106,7 @@ func (index *GsfaReader) Get( } debugln("locs.OffsetToFirst:", lastOffset) - var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime + var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot next := lastOffset // Start from the latest, and go back in time. for { @@ -138,10 +138,10 @@ func (index *GsfaReader) GetBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktime) (solana.Signature, error), -) ([]linkedlog.OffsetAndSizeAndBlocktime, error) { + fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktimeSlot) (solana.Signature, error), +) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) { if limit <= 0 { - return []linkedlog.OffsetAndSizeAndBlocktime{}, nil + return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil } locs, err := index.offsets.Get(pk) if err != nil { @@ -152,7 +152,7 @@ func (index *GsfaReader) GetBeforeUntil( } debugln("locs.OffsetToFirst:", locs) - var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime + var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot next := locs // Start from the latest, and go back in time. reachedBefore := false diff --git a/gsfa/gsfa-write.go b/gsfa/gsfa-write.go index 136e5563..6d216c0b 100644 --- a/gsfa/gsfa-write.go +++ b/gsfa/gsfa-write.go @@ -27,8 +27,8 @@ type GsfaWriter struct { offsets *hashmap.Map[solana.PublicKey, [2]uint64] ll *linkedlog.LinkedLog man *manifest.Manifest - fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktime - accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime] + fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot + accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot] offsetsWriter *indexes.PubkeyToOffsetAndSize_Writer ctx context.Context cancel context.CancelFunc @@ -61,10 +61,10 @@ func NewGsfaWriter( } ctx, cancel := context.WithCancel(context.Background()) index := &GsfaWriter{ - fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable + fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot, 50), // TODO: make this configurable popRank: newRollingRankOfTopPerformers(10_000), offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)), - accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)), + accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot](int(1_000_000)), ctx: ctx, cancel: cancel, fullBufferWriterDone: make(chan struct{}), @@ -104,7 +104,7 @@ func NewGsfaWriter( func (a *GsfaWriter) fullBufferWriter() { numReadFromChan := uint64(0) howManyBuffersToFlushConcurrently := 256 - tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently) + tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently) for { // fmt.Println("numReadFromChan", numReadFromChan, "len(a.fullBufferWriterChan)", len(a.fullBufferWriterChan), "a.exiting.Load()", a.exiting.Load()) @@ -131,7 +131,7 @@ func (a *GsfaWriter) fullBufferWriter() { klog.Errorf("Error while flushing transactions for key %s: %v", buf.Key, err) } } - tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently) + tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently) } tmpBuf = append(tmpBuf, buffer) } @@ -151,10 +151,11 @@ func (a *GsfaWriter) Push( a.mu.Lock() defer a.mu.Unlock() - oas := &linkedlog.OffsetAndSizeAndBlocktime{ + oas := &linkedlog.OffsetAndSizeAndBlocktimeSlot{ Offset: offset, Size: length, Blocktime: blocktime, + Slot: slot, } publicKeys = publicKeys.Dedupe() publicKeys.Sort() @@ -177,7 +178,7 @@ func (a *GsfaWriter) Push( // if this key has less than 100 values and is not in the top list of keys by flush count, then // it's very likely that this key isn't going to get a lot of values soon if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) { - if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{ + if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{ Key: key, Values: values, }); err != nil { @@ -190,14 +191,14 @@ func (a *GsfaWriter) Push( for _, publicKey := range publicKeys { current, ok := a.accum.Get(publicKey) if !ok { - current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch) + current = make([]*linkedlog.OffsetAndSizeAndBlocktimeSlot, 0, itemsPerBatch) current = append(current, oas) a.accum.Set(publicKey, current) } else { current = append(current, oas) if len(current) >= itemsPerBatch { a.popRank.Incr(publicKey, 1) - a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{ + a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{ Key: publicKey, Values: clone(current), } @@ -259,13 +260,13 @@ func (a *GsfaWriter) Close() error { ) } -func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]) error { +func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot]) error { keys := solana.PublicKeySlice(m.Keys()) keys.Sort() for ii := range keys { key := keys[ii] vals, _ := m.Get(key) - if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{ + if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{ Key: key, Values: vals, }); err != nil { @@ -276,7 +277,7 @@ func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.Of return nil } -func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktime) error { +func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot) error { if len(kvs) == 0 { return nil } diff --git a/gsfa/linkedlog/linked-log.go b/gsfa/linkedlog/linked-log.go index be7f2c6d..387f0b9e 100644 --- a/gsfa/linkedlog/linked-log.go +++ b/gsfa/linkedlog/linked-log.go @@ -111,7 +111,7 @@ func (s *LinkedLog) write(b []byte) (uint64, uint32, error) { const mib = 1024 * 1024 // Read reads the block stored at the given offset. -func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) { +func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) { lenBuf := make([]byte, binary.MaxVarintLen64) _, err := s.file.ReadAt(lenBuf, int64(offset)) if err != nil { @@ -130,7 +130,7 @@ func sizeOfUvarint(n uint64) int { return binary.PutUvarint(make([]byte, binary.MaxVarintLen64), n) } -func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) { +func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) { if size > 256*mib { return nil, indexes.OffsetAndSize{}, fmt.Errorf("compacted indexes length too large: %d", size) } @@ -158,7 +158,7 @@ func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAnd return sigIndexes, nextOffset, nil } -func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktime, error) { +func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) { decompressed, err := tooling.DecompressZstd(data) if err != nil { return nil, fmt.Errorf("error while decompressing data: %w", err) @@ -178,7 +178,7 @@ func (s KeyToOffsetAndSizeAndBlocktimeSlice) Has(key solana.PublicKey) bool { return false } -type KeyToOffsetAndSizeAndBlocktime struct { +type KeyToOffsetAndSizeAndBlocktimeSlot struct { Key solana.PublicKey Values []*OffsetAndSizeAndBlocktime } @@ -186,7 +186,7 @@ type KeyToOffsetAndSizeAndBlocktime struct { func (s *LinkedLog) Put( callbackBefore func(pk solana.PublicKey) (indexes.OffsetAndSize, error), callbackAfter func(pk solana.PublicKey, offset uint64, ln uint32) error, - values ...KeyToOffsetAndSizeAndBlocktime, + values ...KeyToOffsetAndSizeAndBlocktimeSlot, ) (uint64, error) { s.mu.Lock() defer s.mu.Unlock() @@ -205,7 +205,7 @@ func (s *LinkedLog) Put( if len(val.Values) == 0 { continue } - slices.Reverse[[]*OffsetAndSizeAndBlocktime](val.Values) // reverse the slice so that the most recent indexes are first + slices.Reverse[[]*OffsetAndSizeAndBlocktimeSlot](val.Values) // reverse the slice so that the most recent indexes are first err := func() error { encodedIndexes, err := createIndexesPayload(val.Values) if err != nil { @@ -245,7 +245,7 @@ func (s *LinkedLog) Put( return uint64(previousSize), nil } -func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktime) ([]byte, error) { +func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktimeSlot) ([]byte, error) { buf := make([]byte, 0, 9*len(indexes)) for _, index := range indexes { buf = append(buf, index.Bytes()...) diff --git a/gsfa/linkedlog/offset-size-blocktime.go b/gsfa/linkedlog/offset-size-blocktime.go index 5e5977cb..9e2fb760 100644 --- a/gsfa/linkedlog/offset-size-blocktime.go +++ b/gsfa/linkedlog/offset-size-blocktime.go @@ -8,32 +8,35 @@ import ( "slices" ) -func NewOffsetAndSizeAndBlocktime(offset uint64, size uint64, blocktime uint64) *OffsetAndSizeAndBlocktime { - return &OffsetAndSizeAndBlocktime{ +func NewOffsetAndSizeAndBlocktimeSlot(offset uint64, size uint64, slot uint64, blocktime uint64) *OffsetAndSizeAndBlocktimeSlot{ + return &OffsetAndSizeAndBlocktimeSlot Offset: offset, Size: size, Blocktime: blocktime, + Slot: slot } } -type OffsetAndSizeAndBlocktime struct { +type OffsetAndSizeAndBlocktimeSlot struct { Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes) Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes) Blocktime uint64 // uint40, 5 bytes, max 1099511627775 (seconds since epoch) + Slot uint64 } // Bytes returns the offset and size as a byte slice. -func (oas OffsetAndSizeAndBlocktime) Bytes() []byte { +func (oas OffsetAndSizeAndBlocktimeSlot) Bytes() []byte { buf := make([]byte, 0, binary.MaxVarintLen64*3) buf = binary.AppendUvarint(buf, oas.Offset) buf = binary.AppendUvarint(buf, oas.Size) buf = binary.AppendUvarint(buf, oas.Blocktime) + buf = binary.AppendUvarint(buf, oas.Slot) buf = slices.Clip(buf) return buf } // FromBytes parses the offset and size from a byte slice. -func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error { +func (oas *OffsetAndSizeAndBlocktimeSlot) FromBytes(buf []byte) error { if len(buf) > binary.MaxVarintLen64*3 { return errors.New("invalid byte slice length") } @@ -55,7 +58,7 @@ func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error { return nil } -func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error { +func (oas *OffsetAndSizeAndBlocktimeSlot) FromReader(r UvarintReader) error { var err error oas.Offset, err = r.ReadUvarint() if err != nil { @@ -92,11 +95,11 @@ func (r *uvarintReader) ReadUvarint() (uint64, error) { return v, nil } -func OffsetAndSizeAndBlocktimeSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktime, error) { +func OffsetAndSizeAndBlocktimeSlotSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) { r := &uvarintReader{buf: buf} - oass := make([]OffsetAndSizeAndBlocktime, 0) + oass := make([]OffsetAndSizeAndBlocktimeSlot, 0) for { - oas := OffsetAndSizeAndBlocktime{} + oas := OffsetAndSizeAndBlocktimeSlot{} err := oas.FromReader(r) if err != nil { if errors.Is(err, io.EOF) { diff --git a/gsfa/linkedlog/offset-size-blocktime_test.go b/gsfa/linkedlog/offset-size-blocktime_test.go index 99d1ac6a..1d8aeccb 100644 --- a/gsfa/linkedlog/offset-size-blocktime_test.go +++ b/gsfa/linkedlog/offset-size-blocktime_test.go @@ -6,9 +6,9 @@ import ( "testing" ) -func TestOffsetAndSizeAndBlocktime(t *testing.T) { +func TestOffsetAndSizeAndBlocktimeSlott *testing.T) { { - ca := OffsetAndSizeAndBlocktime{ + ca := OffsetAndSizeAndBlocktimeSlot Offset: 1, Size: 2, Blocktime: 3, @@ -16,7 +16,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { buf := ca.Bytes() { - ca2 := OffsetAndSizeAndBlocktime{} + ca2 := OffsetAndSizeAndBlocktimeSlot} err := ca2.FromBytes(buf) if err != nil { panic(err) @@ -28,7 +28,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { } { // now with very high values - ca := OffsetAndSizeAndBlocktime{ + ca := OffsetAndSizeAndBlocktimeSlot Offset: 281474976710655, Size: 16777215, Blocktime: 1099511627775, @@ -36,7 +36,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { buf := ca.Bytes() { - ca2 := OffsetAndSizeAndBlocktime{} + ca2 := OffsetAndSizeAndBlocktimeSlot} err := ca2.FromBytes(buf) if err != nil { panic(err) @@ -47,7 +47,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { } } { - many := []OffsetAndSizeAndBlocktime{ + many := []OffsetAndSizeAndBlocktimeSlot { Offset: 1, Size: 2, diff --git a/multiepoch-getSignaturesForAddress.go b/multiepoch-getSignaturesForAddress.go index d51dcdb3..c7996435 100644 --- a/multiepoch-getSignaturesForAddress.go +++ b/multiepoch-getSignaturesForAddress.go @@ -113,7 +113,7 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn limit, params.Before, params.Until, - func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error) { epoch, err := multi.GetEpoch(epochNum) if err != nil { return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)