Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tx meta #27

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 9 additions & 21 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,31 @@ DEFAULT:full
ARGS := $(wordlist 2,$(words $(MAKECMDGOALS)),$(MAKECMDGOALS))

install-deps:
sudo apt install -y libsnappy-dev build-essential cmake zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
sudo apt install -y libsnappy-dev build-essential cmake zlib1g-dev libbz2-dev liblz4-dev libzstd-dev libgflags-dev
install_compatible_golang_version:
go install golang.org/dl/go1.20.5@latest
go1.20.5 download
build_rocksdb: install-deps
mkdir -p facebook ; cd facebook ; \
git clone https://github.com/facebook/rocksdb --branch v8.3.2 --depth 1 ; \
git clone https://github.com/facebook/rocksdb --branch v9.7.3 --depth 1 ; \
cd ./rocksdb ; \
mkdir -p build && cd build ; \
cmake .. \
-DCMAKE_BUILD_TYPE=Release \
-DROCKSDB_BUILD_SHARED=OFF \
-DWITH_GFLAGS=OFF \
-DWITH_BZ2=ON \
-DWITH_SNAPPY=OFF \
-DWITH_ZLIB=ON \
-DWITH_ZSTD=ON \
-DWITH_ALL_TESTS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF \
-DWITH_CORE_TOOLS=OFF \
-DWITH_RUNTIME_DEBUG=OFF \
-DWITH_TESTS=OFF \
-DWITH_TOOLS=OFF \
-DWITH_TRACE_TOOLS=OFF ; \
make -j
mkdir -p build ; \
make static_lib
full: install_compatible_golang_version build_rocksdb
# replace default go tmp build dir from /tpm to ./tmp
CGO_CFLAGS="-I$$(pwd)/facebook/rocksdb/include" \
CGO_LDFLAGS="-L$$(pwd)/facebook/rocksdb/build -lbz2" \
CGO_LDFLAGS="-L$$(pwd)/facebook/rocksdb/ -lrocksdb -lstdc++ -lm -lbz2" \
go1.20.5 build \
-ldflags="-X main.GitCommit=$$(git rev-parse HEAD) -X main.GitTag=$$(git symbolic-ref -q --short HEAD || git describe --tags --exact-match)" \
./cmd/radiance

radiance: install_compatible_golang_version build_rocksdb
CGO_CFLAGS="-I$$(pwd)/facebook/rocksdb/include" \
CGO_LDFLAGS="-L$$(pwd)/facebook/rocksdb/build -lbz2" \
CGO_LDFLAGS="-L$$(pwd)/facebook/rocksdb/librocksdb.a -lbz2" \
go1.20.5 run ./cmd/radiance $(ARGS)
test-full: install_compatible_golang_version build_rocksdb
CGO_CFLAGS="-I$$(pwd)/facebook/rocksdb/include" \
CGO_LDFLAGS="-L$$(pwd)/facebook/rocksdb/build -lbz2" \
go1.20.5 test ./... -cover -count=1
clear:
rm -rf facebook/rocksdb
6 changes: 4 additions & 2 deletions cmd/radiance/blockstore/verifydata/verifydata.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ func run(c *cobra.Command, args []string) {
}
defer db.Close()

isNewTxMetaKeyFormat := false

if false {
keysToBeFound := [][]byte{
blockstore.MakeTxMetadataKey(100971705, targetTxSignature),
blockstore.MakeTxMetadataKey(isNewTxMetaKeyFormat, 100971705, targetTxSignature),
}
got, err := db.DB.MultiGet(grocksdb.NewDefaultReadOptions(), keysToBeFound...)
if err != nil {
Expand Down Expand Up @@ -391,7 +393,7 @@ func run(c *cobra.Command, args []string) {
spew.Dump(slotMeta)
fmt.Println(firstSignature.String())
}
key := blockstore.MakeTxMetadataKey(slotMeta.Slot, firstSignature)
key := blockstore.MakeTxMetadataKey(isNewTxMetaKeyFormat, slotMeta.Slot, firstSignature)
keysToBeFound = append(keysToBeFound, key)
if printFirstThenStop {
os.Exit(0)
Expand Down
94 changes: 94 additions & 0 deletions cmd/radiance/car/createcar/block-cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package createcar

import (
"sync"
"sync/atomic"

"github.com/gagliardetto/solana-go"
)

type BlockCache struct {
rwm sync.RWMutex
cache map[uint64]ParsedBlock
}

func NewBlockCache() *BlockCache {
return &BlockCache{
cache: make(map[uint64]ParsedBlock),
}
}

type ParsedBlock map[solana.Signature][]byte

func (c *BlockCache) GetBlock(slot uint64) (ParsedBlock, bool) {
c.rwm.RLock()
defer c.rwm.RUnlock()
block, ok := c.cache[slot]
return block, ok
}

func (c *BlockCache) SetBlock(slot uint64, block ParsedBlock) {
c.rwm.Lock()
defer c.rwm.Unlock()
c.cache[slot] = block
}

func (c *BlockCache) DeleteBlock(slot uint64) {
c.rwm.Lock()
defer c.rwm.Unlock()
delete(c.cache, slot)
}

// SlotLocks makes sure that a given slot is only downloaded once.
// Thus this checks if a slot is being actively downloaded.
// If it is, it blocks until the download is finished.
// If it is not, it marks the slot as being downloaded.
// This is useful to avoid downloading the same slot multiple times.
type SlotLocks struct {
mu sync.Mutex
locks map[uint64]*Once
}

func NewSlotLocks() *SlotLocks {
return &SlotLocks{
locks: make(map[uint64]*Once),
}
}

// OncePerSlot returns true if the slot was not being downloaded, and it marks the slot as being downloaded.
// If was NOT being downloaded, it runs the given function and then returns true.
// If it was being downloaded, it returns false.
func (s *SlotLocks) OncePerSlot(slot uint64, fn func() error) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.locks[slot]; ok {
return false, nil
}
s.locks[slot] = &Once{}
return s.locks[slot].Do(fn)
}

type Once struct {
done atomic.Uint32
m sync.Mutex
}

// Returns `T, true, error` if the function was executed.
// Returns `T, false, nil` if the function was not executed.
func (o *Once) Do(f func() error) (bool, error) {
if o.done.Load() == 0 {
// Outlined slow-path to allow inlining of the fast-path.
return o.doSlow(f)
}
return false, nil
}

func (o *Once) doSlow(f func() error) (bool, error) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 {
defer o.done.Store(1)
return true, f()
}
return false, nil
}
76 changes: 75 additions & 1 deletion cmd/radiance/car/createcar/cmd-create-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/gagliardetto/solana-go"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/linxGnu/grocksdb"
"github.com/minio/sha256-simd"
Expand Down Expand Up @@ -52,6 +53,13 @@ var (
flagNextShredRevisionActivationSlot = flags.Uint64("next-shred-revision-activation-slot", 0, "Next shred revision activation slot; maybe depends on when the validator creating the snapshot upgraded to the latest version.")
flagCheckOnly = flags.Bool("check", false, "Only check if the data is available, without creating the CAR file")
flagStopAtSlot = flags.Uint64("stop-at-slot", 0, "Stop processing at this slot, excluding any slots after it")
//
flagAllowMissingTxMeta = flags.Bool("allow-missing-tx-meta", false, "Allow missing transaction metadata")
flagRpcEndpoint = flags.String("rpc-endpoint", "", "Solana RPC endpoint to use")
flagFillTxMetaFromRPC = flags.Bool("fill-tx-meta-from-rpc", false, "Fill missing transaction metadata from RPC")
flagFillConcurrency = flags.Uint("fill-concurrency", 1, "Number of concurrent requests to make to the RPC endpoint")
fillDBPath = flags.String("fill-storage-path", "", "Path to the dir where to save the blocks fetched from the RPC endpoint")
optRocksDBVerifyChecksums = flags.Bool("rocksdb-verify-checksums", true, "Verify checksums of data read from RocksDB")
)

func init() {
Expand Down Expand Up @@ -252,10 +260,61 @@ func run(c *cobra.Command, args []string) {
time.Sleep(1 * time.Second)
os.Exit(0)
}
// if epoch is 633, the tx meta key format changed at slot blockstore.SlotBoundaryTxMetadataKeyFormatChange
if epoch == 633 && slotedges.CalcEpochForSlot(blockstore.SlotBoundaryTxMetadataKeyFormatChange) == epoch {
klog.Infof("Epoch 633: the transaction metadata key format changed at slot %d", blockstore.SlotBoundaryTxMetadataKeyFormatChange)
}

var fillDB *BlockFillerStorage
var rpcFiller *RpcFiller

if !*flagAllowMissingTxMeta && !*flagFillTxMetaFromRPC {
klog.Infof("Will not fill missing transaction metadata; missing transaction metadata will be treated as an error")
}

alternativeTxMetaSources := []func(slot uint64, sig solana.Signature) ([]byte, error){}
if *flagFillTxMetaFromRPC {
if *flagRpcEndpoint == "" {
klog.Exitf("RPC endpoint is required")
}
if *fillDBPath == "" {
klog.Exitf("fill-db is required")
}
fillDB, err = NewBlockFillerStorage(*fillDBPath)
if err != nil {
klog.Exitf("Failed to create DB filler: %s", err)
}
rpcFiller, err = NewRpcFiller(*flagRpcEndpoint, fillDB)
if err != nil {
klog.Exitf("Failed to create RPC filler: %s", err)
}
alternativeTxMetaSources = append(alternativeTxMetaSources, func(slot uint64, sig solana.Signature) ([]byte, error) {
return rpcFiller.FillTxMetaFromRPC(c.Context(), slot, sig)
})
klog.Infof("Will fill missing transaction metadata from RPC")

{
klog.Info("---")
klog.Infof("Checking (fast) for unparseable transaction metadata...")
slotsWithMissingMeta := make([]uint64, 0)
{
// TODO: define a function to check for missing transaction metadata
}
if len(slotsWithMissingMeta) > 0 {
klog.Warningf("Found %d slots with missing transaction metadata", len(slotsWithMissingMeta))
} else {
klog.Infof("All slots have transaction metadata")
}
klog.Infof("Prefetching %d blocks with concurrency %d (will take a while) ...", len(slotsWithMissingMeta), *flagFillConcurrency)
rpcFiller.FetchBlocksToFillerStorage(c.Context(), int(*flagFillConcurrency), slotsWithMissingMeta)
}
}

multi, err := NewMultistage(
finalCARFilepath,
numWorkers,
*flagAllowMissingTxMeta,
alternativeTxMetaSources,
)
if err != nil {
panic(err)
Expand All @@ -265,14 +324,16 @@ func run(c *cobra.Command, args []string) {
latestSlot := uint64(0)
latestDB := int(0) // 0 is the first DB

iter := schedule.NewIterator(limitSlots)
schedule.EnableProgressBar()

iter := schedule.NewIterator(limitSlots)
err = iter.Iterate(
c.Context(),
func(dbIdex int, h *blockstore.WalkHandle, slot uint64, shredRevision int) error {
if *flagRequireFullEpoch && slotedges.CalcEpochForSlot(slot) != epoch {
return nil
}
defer rpcFiller.RemoveFromCache(slot)
slotMeta, err := h.DB.GetSlotMeta(slot)
if err != nil {
return fmt.Errorf("failed to get slot meta for slot %d: %w", slot, err)
Expand Down Expand Up @@ -369,6 +430,9 @@ func run(c *cobra.Command, args []string) {
NumBytesWrittenToDisk uint64 `yaml:"num_bytes_written_to_disk"`
VersionInfo map[string]interface{} `yaml:"version_info"`
Cmd string `yaml:"cmd"`
SizesByKind KVSlice `yaml:"sizes_by_kind"`
NumTotalMissingTxMeta uint64 `yaml:"num_total_missing_tx_meta"`
NumMissingTxMetaByDB map[string]uint64 `yaml:"num_missing_tx_meta_by_db"`
}

thisCarRecap := Recap{
Expand All @@ -380,6 +444,16 @@ func run(c *cobra.Command, args []string) {
FirstSlot: slotRecap.FirstSlot,
LastSlot: slotRecap.LastSlot,
TookCarCreation: tookCarCreation,
SizesByKind: slotRecap.StatsBySize,
}
{
totalMissingTxMeta := multi.statsNotFoundMeta.Total()
thisCarRecap.NumTotalMissingTxMeta = totalMissingTxMeta

thisCarRecap.NumMissingTxMetaByDB = make(map[string]uint64)
for dbPath, numMissing := range multi.statsNotFoundMeta.GetMap() {
thisCarRecap.NumMissingTxMetaByDB[dbPath] = numMissing
}
}
{
versionInfo, ok := versioninfo.GetBuildSettings()
Expand Down
Loading