Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

July2024/improve gsfa perf #124

Merged
merged 12 commits into from
Oct 4, 2024
83 changes: 74 additions & 9 deletions cmd-x-index-gsfa.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/rpcpool/yellowstone-faithful/indexmeta"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/third_party/solana_proto/confirmed_block"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func newCmd_Index_gsfa() *cli.Command {
},
&cli.StringFlag{
Name: "tmp-dir",
Usage: "temporary directory to use for storing intermediate files",
Usage: "temporary directory to use for storing intermediate files; WILL BE DELETED",
Value: os.TempDir(),
},
},
Expand Down Expand Up @@ -137,6 +139,10 @@ func newCmd_Index_gsfa() *cli.Command {
return fmt.Errorf("failed to add network to sig_exists index metadata: %w", err)
}
tmpDir := c.String("tmp-dir")
tmpDir = filepath.Join(tmpDir, fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano()))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return fmt.Errorf("failed to create tmp dir: %w", err)
}
indexW, err := gsfa.NewGsfaWriter(
gsfaIndexDir,
meta,
Expand Down Expand Up @@ -218,12 +224,17 @@ func newCmd_Index_gsfa() *cli.Command {
for ii := range transactions {
txWithInfo := transactions[ii]
numProcessedTransactions.Add(1)
accountKeys := txWithInfo.Transaction.Message.AccountKeys
if txWithInfo.Metadata != nil {
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedReadonlyAddresses)...)
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedWritableAddresses)...)
}
err = indexW.Push(
txWithInfo.Offset,
txWithInfo.Length,
txWithInfo.Slot,
txWithInfo.Blocktime,
txWithInfo.Transaction.Message.AccountKeys,
accountKeys,
)
if err != nil {
klog.Exitf("Error while pushing to gsfa index: %s", err)
Expand Down Expand Up @@ -270,27 +281,80 @@ func objectsToTransactions(
objects []accum.ObjectWithMetadata,
) ([]*TransactionWithSlot, error) {
transactions := make([]*TransactionWithSlot, 0, len(objects))
dataBlocks := make([]accum.ObjectWithMetadata, 0)
for _, object := range objects {
// check if the object is a transaction:
kind := iplddecoders.Kind(object.ObjectData[1])
if kind == iplddecoders.KindDataFrame {
dataBlocks = append(dataBlocks, object)
continue
}
if kind != iplddecoders.KindTransaction {
continue
}
decoded, err := iplddecoders.DecodeTransaction(object.ObjectData)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex %s: %w", object.Cid, err)
}
tws := &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
}
if total, ok := decoded.Metadata.GetTotal(); !ok || total == 1 {
completeBuffer := decoded.Metadata.Bytes()
if ha, ok := decoded.Metadata.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
return nil, fmt.Errorf("failed to verify metadata hash: %w", err)
}
}
if len(completeBuffer) > 0 {
uncompressedMeta, err := decompressZstd(completeBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
} else {
metaBuffer, err := loadDataFromDataFrames(&decoded.Metadata, func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) {
for _, dataBlock := range dataBlocks {
if dataBlock.Cid == wantedCid {
df, err := iplddecoders.DecodeDataFrame(dataBlock.ObjectData)
if err != nil {
return nil, err
}
return df, nil
}
}
return nil, fmt.Errorf("dataframe not found")
})
if err != nil {
return nil, fmt.Errorf("failed to load metadata: %w", err)
}
// reset dataBlocks:
dataBlocks = dataBlocks[:0]
if len(metaBuffer) > 0 {
uncompressedMeta, err := decompressZstd(metaBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
}
tx, err := decoded.GetSolanaTransaction()
if err != nil {
return nil, fmt.Errorf("error while getting solana transaction from object %s: %w", object.Cid, err)
}
transactions = append(transactions, &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
Transaction: *tx,
})
tws.Transaction = *tx
transactions = append(transactions, tws)
}
return transactions, nil
}
Expand All @@ -311,4 +375,5 @@ type TransactionWithSlot struct {
Slot uint64
Blocktime uint64
Transaction solana.Transaction
Metadata *confirmed_block.TransactionStatusMeta
}
49 changes: 30 additions & 19 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type GsfaWriter struct {
mu sync.Mutex
indexRootDir string
popRank *rollingRankOfTopPerformers // top pubkeys by flush count
offsets *hashmap.Map[solana.PublicKey, [2]uint64]
ll *linkedlog.LinkedLog
man *manifest.Manifest
Expand Down Expand Up @@ -61,6 +62,7 @@ func NewGsfaWriter(
ctx, cancel := context.WithCancel(context.Background())
index := &GsfaWriter{
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
ctx: ctx,
Expand Down Expand Up @@ -120,6 +122,9 @@ func (a *GsfaWriter) fullBufferWriter() {
has := tmpBuf.Has(buffer.Key)
if len(tmpBuf) == howManyBuffersToFlushConcurrently || has {
for _, buf := range tmpBuf {
if len(buf.Values) == 0 {
continue
}
// Write the buffer to the linked log.
klog.V(5).Infof("Flushing %d transactions for key %s", len(buf.Values), buf.Key)
if err := a.flushKVs(buf); err != nil {
Expand All @@ -131,7 +136,7 @@ func (a *GsfaWriter) fullBufferWriter() {
tmpBuf = append(tmpBuf, buffer)
}
case <-time.After(1 * time.Second):
klog.Infof("Read %d buffers from channel", numReadFromChan)
klog.V(5).Infof("Read %d buffers from channel", numReadFromChan)
}
}
}
Expand All @@ -153,39 +158,45 @@ func (a *GsfaWriter) Push(
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
if slot%1000 == 0 {
if a.accum.Len() > 130_000 {
// flush all
klog.Infof("Flushing all %d keys", a.accum.Len())
if slot%500 == 0 && a.accum.Len() > 100_000 {
// flush all
klog.V(4).Infof("Flushing all %d keys", a.accum.Len())

var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()
var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()

for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
a.popRank.purge()

if len(values) < 100 && len(values) > 0 {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
// The objective is to have as big of a batch for each key as possible (max is 1000).
// So we optimize for delaying the flush for the most popular keys (popular=has been flushed a lot of times).
// And we flush the less popular keys, periodically if they haven't seen much activity.

// if this key has less than 100 values and is not in the top list of keys by flush count, then
// it's very likely that this key isn't going to get a lot of values soon
if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
}
}
}
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0)
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
current = append(current, oas)
if len(current) >= itemsPerBatch {
a.popRank.Incr(publicKey, 1)
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: publicKey,
Values: clone(current),
Expand Down
72 changes: 72 additions & 0 deletions gsfa/pop-rank.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package gsfa

import (
"slices"
"sort"

"github.com/gagliardetto/solana-go"
"github.com/tidwall/hashmap"
)

type rollingRankOfTopPerformers struct {
rankListSize int
maxValue int
minValue int
set hashmap.Map[solana.PublicKey, int]
}

func newRollingRankOfTopPerformers(rankListSize int) *rollingRankOfTopPerformers {
return &rollingRankOfTopPerformers{
rankListSize: rankListSize,
}
}

func (r *rollingRankOfTopPerformers) Incr(key solana.PublicKey, delta int) int {
value, ok := r.set.Get(key)
if !ok {
value = 0
}
value = value + delta
r.set.Set(key, value)
if value > r.maxValue {
r.maxValue = value
}
if value < r.minValue {
r.minValue = value
}
return value
}

func (r *rollingRankOfTopPerformers) Get(key solana.PublicKey) (int, bool) {
value, ok := r.set.Get(key)
return value, ok
}

// purge will remove all keys by the lowest values until the rankListSize is reached.
// keys with equivalent values are kept.
func (r *rollingRankOfTopPerformers) purge() {
values := r.set.Values()
sort.Ints(values)
values = slices.Compact(values)
if len(values) <= r.rankListSize {
return
}

// remove the lowest values
for _, value := range values[:len(values)-r.rankListSize] {
for _, key := range r.set.Keys() {
if v, _ := r.set.Get(key); v == value {
r.set.Delete(key)
}
}
}

// update the min and max values
r.minValue = values[len(values)-r.rankListSize]
r.maxValue = values[len(values)-1]
}

func (r *rollingRankOfTopPerformers) has(key solana.PublicKey) bool {
_, ok := r.set.Get(key)
return ok
}
55 changes: 55 additions & 0 deletions gsfa/pop-rank_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gsfa

import (
"testing"

"github.com/gagliardetto/solana-go"
"github.com/stretchr/testify/require"
)

func TestPopRank(t *testing.T) {
// Test the rollingRankOfTopPerformers type:
{
// Create a new rollingRankOfTopPerformers:
r := newRollingRankOfTopPerformers(5)
if r == nil {
t.Fatal("expected non-nil rollingRankOfTopPerformers")
}
// Test the Incr method:
{
key := solana.SysVarRentPubkey
delta := 1
value := r.Incr(key, delta)
require.Equal(t, 1, value)
}
// Test the purge method:
{
r.purge()
// the value should still be 1
value, ok := r.Get(solana.SysVarRentPubkey)
require.True(t, ok)
require.Equal(t, 1, value)
}
{
// now add a few more values:
r.Incr(solana.SysVarClockPubkey, 6)
r.Incr(solana.SysVarEpochSchedulePubkey, 5)
r.Incr(solana.SysVarFeesPubkey, 4)
r.Incr(solana.SysVarInstructionsPubkey, 3)
r.Incr(solana.SysVarRewardsPubkey, 2)

// there should be 6 values now
require.Equal(t, 6, r.set.Len())

// purge should remove the lowest values
r.purge()

// there should be 5 values now (equivalent values are kept)
require.Equal(t, 5, r.set.Len())

// the lowest value should be 2
require.Equal(t, 2, r.minValue)
require.Equal(t, 6, r.maxValue)
}
}
}
17 changes: 15 additions & 2 deletions http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,26 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
prefix := icon + "[READ-UNKNOWN]"
if isIndex {
prefix = icon + azureBG("[READ-INDEX]")
} else if isCar {

// get the index name, which is the part before the .index suffix, after the last .
indexName := strings.TrimSuffix(r.name, ".index")
// split the index name by . and get the last part
byDot := strings.Split(indexName, ".")
if len(byDot) > 0 {
indexName = byDot[len(byDot)-1]
}
// TODO: distinguish between remote and local index reads
metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds()))
}
// if has suffix .car, then it's a car file
if isCar {
if r.isSplitCar {
prefix = icon + azureBG("[READ-SPLIT-CAR]")
} else {
prefix = icon + purpleBG("[READ-CAR]")
}
carName := filepath.Base(r.name)
// TODO: distinguish between remote and local index reads
metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds()))
}

klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took)
Expand Down
Loading
Loading