Skip to content

Commit

Permalink
Add slot to gsfa index
Browse files Browse the repository at this point in the history
Adds slot to the gsfa index.
  • Loading branch information
linuskendall authored Nov 14, 2024
1 parent 4808f83 commit 98b3d0d
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 46 deletions.
6 changes: 3 additions & 3 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (gsfa *GsfaReaderMultiepoch) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return nil, nil
Expand Down Expand Up @@ -102,7 +102,7 @@ func (multi *GsfaReaderMultiepoch) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand All @@ -118,7 +118,7 @@ func (multi *GsfaReaderMultiepoch) iterBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand Down
14 changes: 7 additions & 7 deletions gsfa/gsfa-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (index *GsfaReader) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil
}
lastOffset, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -106,7 +106,7 @@ func (index *GsfaReader) Get(
}
debugln("locs.OffsetToFirst:", lastOffset)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot
next := lastOffset // Start from the latest, and go back in time.

for {
Expand Down Expand Up @@ -138,10 +138,10 @@ func (index *GsfaReader) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktime) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktimeSlot) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil
}
locs, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -152,7 +152,7 @@ func (index *GsfaReader) GetBeforeUntil(
}
debugln("locs.OffsetToFirst:", locs)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot
next := locs // Start from the latest, and go back in time.

reachedBefore := false
Expand Down
27 changes: 14 additions & 13 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type GsfaWriter struct {
offsets *hashmap.Map[solana.PublicKey, [2]uint64]
ll *linkedlog.LinkedLog
man *manifest.Manifest
fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktime
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]
fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot]
offsetsWriter *indexes.PubkeyToOffsetAndSize_Writer
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -61,10 +61,10 @@ func NewGsfaWriter(
}
ctx, cancel := context.WithCancel(context.Background())
index := &GsfaWriter{
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot](int(1_000_000)),
ctx: ctx,
cancel: cancel,
fullBufferWriterDone: make(chan struct{}),
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewGsfaWriter(
func (a *GsfaWriter) fullBufferWriter() {
numReadFromChan := uint64(0)
howManyBuffersToFlushConcurrently := 256
tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently)
tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently)

for {
// fmt.Println("numReadFromChan", numReadFromChan, "len(a.fullBufferWriterChan)", len(a.fullBufferWriterChan), "a.exiting.Load()", a.exiting.Load())
Expand All @@ -131,7 +131,7 @@ func (a *GsfaWriter) fullBufferWriter() {
klog.Errorf("Error while flushing transactions for key %s: %v", buf.Key, err)
}
}
tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently)
tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently)
}
tmpBuf = append(tmpBuf, buffer)
}
Expand All @@ -151,10 +151,11 @@ func (a *GsfaWriter) Push(
a.mu.Lock()
defer a.mu.Unlock()

oas := &linkedlog.OffsetAndSizeAndBlocktime{
oas := &linkedlog.OffsetAndSizeAndBlocktimeSlot{
Offset: offset,
Size: length,
Blocktime: blocktime,
Slot: slot,
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
Expand All @@ -177,7 +178,7 @@ func (a *GsfaWriter) Push(
// if this key has less than 100 values and is not in the top list of keys by flush count, then
// it's very likely that this key isn't going to get a lot of values soon
if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: key,
Values: values,
}); err != nil {
Expand All @@ -190,14 +191,14 @@ func (a *GsfaWriter) Push(
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = make([]*linkedlog.OffsetAndSizeAndBlocktimeSlot, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
current = append(current, oas)
if len(current) >= itemsPerBatch {
a.popRank.Incr(publicKey, 1)
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: publicKey,
Values: clone(current),
}
Expand Down Expand Up @@ -259,13 +260,13 @@ func (a *GsfaWriter) Close() error {
)
}

func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]) error {
func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot]) error {
keys := solana.PublicKeySlice(m.Keys())
keys.Sort()
for ii := range keys {
key := keys[ii]
vals, _ := m.Get(key)
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: key,
Values: vals,
}); err != nil {
Expand All @@ -276,7 +277,7 @@ func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.Of
return nil
}

func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktime) error {
func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot) error {
if len(kvs) == 0 {
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions gsfa/linkedlog/linked-log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *LinkedLog) write(b []byte) (uint64, uint32, error) {
const mib = 1024 * 1024

// Read reads the block stored at the given offset.
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) {
lenBuf := make([]byte, binary.MaxVarintLen64)
_, err := s.file.ReadAt(lenBuf, int64(offset))
if err != nil {
Expand All @@ -130,7 +130,7 @@ func sizeOfUvarint(n uint64) int {
return binary.PutUvarint(make([]byte, binary.MaxVarintLen64), n)
}

func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) {
if size > 256*mib {
return nil, indexes.OffsetAndSize{}, fmt.Errorf("compacted indexes length too large: %d", size)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAnd
return sigIndexes, nextOffset, nil
}

func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktime, error) {
func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) {
decompressed, err := tooling.DecompressZstd(data)
if err != nil {
return nil, fmt.Errorf("error while decompressing data: %w", err)
Expand All @@ -178,15 +178,15 @@ func (s KeyToOffsetAndSizeAndBlocktimeSlice) Has(key solana.PublicKey) bool {
return false
}

type KeyToOffsetAndSizeAndBlocktime struct {
type KeyToOffsetAndSizeAndBlocktimeSlot struct {
Key solana.PublicKey
Values []*OffsetAndSizeAndBlocktime
}

func (s *LinkedLog) Put(
callbackBefore func(pk solana.PublicKey) (indexes.OffsetAndSize, error),
callbackAfter func(pk solana.PublicKey, offset uint64, ln uint32) error,
values ...KeyToOffsetAndSizeAndBlocktime,
values ...KeyToOffsetAndSizeAndBlocktimeSlot,
) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -205,7 +205,7 @@ func (s *LinkedLog) Put(
if len(val.Values) == 0 {
continue
}
slices.Reverse[[]*OffsetAndSizeAndBlocktime](val.Values) // reverse the slice so that the most recent indexes are first
slices.Reverse[[]*OffsetAndSizeAndBlocktimeSlot](val.Values) // reverse the slice so that the most recent indexes are first
err := func() error {
encodedIndexes, err := createIndexesPayload(val.Values)
if err != nil {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *LinkedLog) Put(
return uint64(previousSize), nil
}

func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktime) ([]byte, error) {
func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktimeSlot) ([]byte, error) {
buf := make([]byte, 0, 9*len(indexes))
for _, index := range indexes {
buf = append(buf, index.Bytes()...)
Expand Down
21 changes: 12 additions & 9 deletions gsfa/linkedlog/offset-size-blocktime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,35 @@ import (
"slices"
)

func NewOffsetAndSizeAndBlocktime(offset uint64, size uint64, blocktime uint64) *OffsetAndSizeAndBlocktime {
return &OffsetAndSizeAndBlocktime{
func NewOffsetAndSizeAndBlocktimeSlot(offset uint64, size uint64, slot uint64, blocktime uint64) *OffsetAndSizeAndBlocktimeSlot{
return &OffsetAndSizeAndBlocktimeSlot
Offset: offset,
Size: size,

Check failure on line 14 in gsfa/linkedlog/offset-size-blocktime.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

syntax error: unexpected :, expected := or = or comma
Blocktime: blocktime,
Slot: slot
}
}

Check failure on line 18 in gsfa/linkedlog/offset-size-blocktime.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

syntax error: non-declaration statement outside function body

type OffsetAndSizeAndBlocktime struct {
type OffsetAndSizeAndBlocktimeSlot struct {
Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes)
Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes)
Blocktime uint64 // uint40, 5 bytes, max 1099511627775 (seconds since epoch)
Slot uint64
}

// Bytes returns the offset and size as a byte slice.
func (oas OffsetAndSizeAndBlocktime) Bytes() []byte {
func (oas OffsetAndSizeAndBlocktimeSlot) Bytes() []byte {
buf := make([]byte, 0, binary.MaxVarintLen64*3)
buf = binary.AppendUvarint(buf, oas.Offset)
buf = binary.AppendUvarint(buf, oas.Size)
buf = binary.AppendUvarint(buf, oas.Blocktime)
buf = binary.AppendUvarint(buf, oas.Slot)
buf = slices.Clip(buf)
return buf
}

// FromBytes parses the offset and size from a byte slice.
func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error {
func (oas *OffsetAndSizeAndBlocktimeSlot) FromBytes(buf []byte) error {
if len(buf) > binary.MaxVarintLen64*3 {
return errors.New("invalid byte slice length")
}
Expand All @@ -55,7 +58,7 @@ func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error {
return nil
}

func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error {
func (oas *OffsetAndSizeAndBlocktimeSlot) FromReader(r UvarintReader) error {
var err error
oas.Offset, err = r.ReadUvarint()
if err != nil {
Expand Down Expand Up @@ -92,11 +95,11 @@ func (r *uvarintReader) ReadUvarint() (uint64, error) {
return v, nil
}

func OffsetAndSizeAndBlocktimeSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktime, error) {
func OffsetAndSizeAndBlocktimeSlotSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) {
r := &uvarintReader{buf: buf}
oass := make([]OffsetAndSizeAndBlocktime, 0)
oass := make([]OffsetAndSizeAndBlocktimeSlot, 0)
for {
oas := OffsetAndSizeAndBlocktime{}
oas := OffsetAndSizeAndBlocktimeSlot{}
err := oas.FromReader(r)
if err != nil {
if errors.Is(err, io.EOF) {
Expand Down
12 changes: 6 additions & 6 deletions gsfa/linkedlog/offset-size-blocktime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import (
"testing"
)

func TestOffsetAndSizeAndBlocktime(t *testing.T) {
func TestOffsetAndSizeAndBlocktimeSlott *testing.T) {

Check failure on line 9 in gsfa/linkedlog/offset-size-blocktime_test.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

expected '(', found '*'
{
ca := OffsetAndSizeAndBlocktime{
ca := OffsetAndSizeAndBlocktimeSlot
Offset: 1,
Size: 2,
Blocktime: 3,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndBlocktimeSlot}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -28,15 +28,15 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) {
}
{
// now with very high values
ca := OffsetAndSizeAndBlocktime{
ca := OffsetAndSizeAndBlocktimeSlot
Offset: 281474976710655,
Size: 16777215,
Blocktime: 1099511627775,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndBlocktimeSlot}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -47,7 +47,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) {
}
}
{
many := []OffsetAndSizeAndBlocktime{
many := []OffsetAndSizeAndBlocktimeSlot
{
Offset: 1,
Size: 2,
Expand Down
2 changes: 1 addition & 1 deletion multiepoch-getSignaturesForAddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn
limit,
params.Before,
params.Until,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
Expand Down

0 comments on commit 98b3d0d

Please sign in to comment.