Skip to content

Commit

Permalink
Multiepoch (#28)
Browse files Browse the repository at this point in the history
* Begin adding multiepoch RPC support

* Stable tx order inside block

* Cleanup

* multiepoch: getSignaturesForAddress

* Add script to compress gsfa indexes

* Add returnDataNone

* Update description

* More logs

* Improve mode config

* Check if URI is currently supported

* sig-to-epoch

* Allow mono-epoch

* Rename filecoin mode config

* Fix text

* Update logs

* Fix GSFA multi-epoch

* Fix checks

* Fix sig-to-epoch index

* Cleanup options

* compactindex: use eytzinger

* Update compactindexes version

* Prefetch 1 MiB of data from remote files

* Cleanup printing

* Debug

* http range cache

* Fix tests

* Cleanup prefetch

* Cleanup prefetch

* Add debug flag

* Fix matching

* Fix matching

* Add comments

* Fix file watcher

* Cleanup

* Cleanup

* Add rpc config file watcher that live (re)loads epochs

* Better handle file events

* Add transparent proxy; closes #29

* Cleanup logs

* Fix loaded addresses field names

* Add fallback for HEAD method to get size

* Fix log

* If len is zero, return error

* Fix getBlock when parent is in previous epoch

* Improve live reload

* Close before replacing or removing

* Improve closing

* Handle case where parent is in previous epoch

* Fix first block offset (consider header)

* Add getVersion enrichment

* Handle getVersion without proxy

* Make sure it's clear when the previousBlockHash

* Fix token balances and status response

* Cleanup printing

* Better ETA

* Better logs and responses

* Temporarily fix meta parsing

* Improve debug

* Improve debug

* Improve debug

* Improve debug

* Remove one roundtrip when using remote CAR files

* Fix url filepath base

* Fix tests

* If innerInstruction doesn't have index, then set to zero; closes #45

* Fix preload

* Fix rewards values

* Add lamports=0 field to rewards

* More response fixes

* More response fixes

* Move adapters

* Better names and comments

* If sig-to-epoch index is not specified, search all epochs

* Fix concurrent epoch searcher

* Bucketteer: Initial commit

* Refactor bucketteer

* Integrate bucketteer into the indexing pipelines

* Cleanup

* Cleanup naming

* Spawn returns a ok

* Remove sig-to-epoch and use sig-exists instead

* Add verification commands

* Rename

* Add flag to verify index after creating it.

* Cleanup names

* Use bucketteers in getTransaction

* Cleanup debug

* Fix log

* Improve debug

* Warmup sig-exists index

* Fix build

* Better example

* Improve timing

* Improve logs

* Fix config file watcher

* Fix error message

* Avoid processing the same file twice at the same time

* Cleanup log

* Add support for `json` encoding for transactions

* Fix `uiTokenAmount` fields

* Fix option parameters

* Improve dump-car filters

* Add concurrency to epoch loading; closes #53

* Add providers field to filecoin config; closes #52

* Add getBlockTime method; closes #50

* Proxy failed local requests to remote RPC server; closes #49

* Add readahead

* Fix error handling

* Better error handling

* Handle the case where the requested amount is larger than the chunk

* Fix split reader
  • Loading branch information
gagliardetto authored Nov 5, 2023
1 parent 55eb992 commit a7d008f
Show file tree
Hide file tree
Showing 106 changed files with 9,721 additions and 1,667 deletions.
248 changes: 248 additions & 0 deletions adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package main

import (
"encoding/base64"

"github.com/mr-tron/base58"
)

func ptrToUint64(v uint64) *uint64 {
return &v
}

// byteSliceAsIntegerSlice converts a byte slice to an integer slice.
func byteSliceAsIntegerSlice(b []byte) []uint64 {
var ret []uint64
for i := 0; i < len(b); i++ {
ret = append(ret, uint64(b[i]))
}
return ret
}

// adaptTransactionMetaToExpectedOutput adapts the transaction meta to the expected output
// as per what solana RPC server returns.
func adaptTransactionMetaToExpectedOutput(m map[string]any) map[string]any {
meta, ok := m["meta"].(map[string]any)
if !ok {
return m
}
{
if _, ok := meta["err"]; ok {
meta["err"], _ = parseTransactionError(meta["err"])
} else {
meta["err"] = nil
}
}
{
if _, ok := meta["loadedAddresses"]; !ok {
meta["loadedAddresses"] = map[string]any{
"readonly": []any{},
"writable": []any{},
}
}
{
// if has loadedReadonlyAddresses and is []string, then use that for loadedAddresses.readonly
if loadedReadonlyAddresses, ok := meta["loadedReadonlyAddresses"].([]any); ok {
// the address list is base64 encoded; decode and encode to base58
for i, addr := range loadedReadonlyAddresses {
addrStr, ok := addr.(string)
if ok {
decoded, err := base64.StdEncoding.DecodeString(addrStr)
if err == nil {
loadedReadonlyAddresses[i] = base58.Encode(decoded)
}
}
}
meta["loadedAddresses"].(map[string]any)["readonly"] = loadedReadonlyAddresses
delete(meta, "loadedReadonlyAddresses")
}
// if has loadedWritableAddresses and is []string, then use that for loadedAddresses.writable
if loadedWritableAddresses, ok := meta["loadedWritableAddresses"].([]any); ok {
// the address list is base64 encoded; decode and encode to base58
for i, addr := range loadedWritableAddresses {
addrStr, ok := addr.(string)
if ok {
decoded, err := base64.StdEncoding.DecodeString(addrStr)
if err == nil {
loadedWritableAddresses[i] = base58.Encode(decoded)
}
}
}
meta["loadedAddresses"].(map[string]any)["writable"] = loadedWritableAddresses
delete(meta, "loadedWritableAddresses")
}
// remove loadedReadonlyAddresses and loadedWritableAddresses
}
if preTokenBalances, ok := meta["preTokenBalances"]; !ok {
meta["preTokenBalances"] = []any{}
} else {
// in preTokenBalances.[].uiTokenAmount.decimals if not present, set to 0
preTokenBalances, ok := preTokenBalances.([]any)
if ok {
for _, preTokenBalanceAny := range preTokenBalances {
preTokenBalance, ok := preTokenBalanceAny.(map[string]any)
if ok {
uiTokenAmountAny, ok := preTokenBalance["uiTokenAmount"]
if ok {
uiTokenAmount, ok := uiTokenAmountAny.(map[string]any)
if ok {
_, ok := uiTokenAmount["decimals"]
if !ok {
uiTokenAmount["decimals"] = 0
}
_, ok = uiTokenAmount["uiAmount"]
if !ok {
uiTokenAmount["uiAmount"] = nil
}
}
}
}
}
}
}
if postTokenBalances, ok := meta["postTokenBalances"]; !ok {
meta["postTokenBalances"] = []any{}
} else {
// in postTokenBalances.[].uiTokenAmount.decimals if not present, set to 0
postTokenBalances, ok := postTokenBalances.([]any)
if ok {
for _, postTokenBalanceAny := range postTokenBalances {
postTokenBalance, ok := postTokenBalanceAny.(map[string]any)
if ok {
uiTokenAmountAny, ok := postTokenBalance["uiTokenAmount"]
if ok {
uiTokenAmount, ok := uiTokenAmountAny.(map[string]any)
if ok {
_, ok := uiTokenAmount["decimals"]
if !ok {
uiTokenAmount["decimals"] = 0
}
_, ok = uiTokenAmount["uiAmount"]
if !ok {
uiTokenAmount["uiAmount"] = nil
}
_, ok = uiTokenAmount["amount"]
if !ok {
uiTokenAmount["amount"] = "0"
}
_, ok = uiTokenAmount["uiAmountString"]
if !ok {
uiTokenAmount["uiAmountString"] = "0"
}
}
}
}
}
}
}

delete(meta, "returnDataNone")

if _, ok := meta["rewards"]; !ok {
meta["rewards"] = []any{}
}
if _, ok := meta["status"]; !ok {
eee, ok := meta["err"]
if ok {
if eee == nil {
meta["status"] = map[string]any{
"Ok": nil,
}
} else {
meta["status"] = map[string]any{
"Err": eee,
}
}
}
}
{
// TODO: is this correct?
// if doesn't have err, but has status and it is empty, then set status to Ok
if _, ok := meta["err"]; !ok || meta["err"] == nil {
if status, ok := meta["status"].(map[string]any); ok {
if len(status) == 0 {
meta["status"] = map[string]any{
"Ok": nil,
}
}
}
}
}
}
{
if returnData, ok := meta["returnData"].(map[string]any); ok {
if data, ok := returnData["data"].(string); ok {
returnData["data"] = []any{data, "base64"}
}

if programId, ok := returnData["programId"].(string); ok {
decoded, err := base64.StdEncoding.DecodeString(programId)
if err == nil {
returnData["programId"] = base58.Encode(decoded)
}
}
}
}
{
innerInstructionsAny, ok := meta["innerInstructions"]
if !ok {
meta["innerInstructions"] = []any{}
return m
}
innerInstructions, ok := innerInstructionsAny.([]any)
if !ok {
return m
}
for i, innerInstructionAny := range innerInstructions {
innerInstruction, ok := innerInstructionAny.(map[string]any)
if !ok {
continue
}
// If doesn't have `index`, then set it to 0
if _, ok := innerInstruction["index"]; !ok {
innerInstruction["index"] = 0
}
instructionsAny, ok := innerInstruction["instructions"]
if !ok {
continue
}
instructions, ok := instructionsAny.([]any)
if !ok {
continue
}
for _, instructionAny := range instructions {
instruction, ok := instructionAny.(map[string]any)
if !ok {
continue
}
{
if accounts, ok := instruction["accounts"]; ok {
// as string
accountsStr, ok := accounts.(string)
if ok {
decoded, err := base64.StdEncoding.DecodeString(accountsStr)
if err == nil {
instruction["accounts"] = byteSliceAsIntegerSlice(decoded)
}
}
} else {
instruction["accounts"] = []any{}
}
if data, ok := instruction["data"]; ok {
// as string
dataStr, ok := data.(string)
if ok {
decoded, err := base64.StdEncoding.DecodeString(dataStr)
if err == nil {
// TODO: the data in the `innerInstructions` is always base58 encoded (even if the transaction is base64 encoded)
instruction["data"] = base58.Encode(decoded)
}
}
}
}
}
meta["innerInstructions"].([]any)[i] = innerInstruction
}
}
return m
}
38 changes: 38 additions & 0 deletions bucketteer/bucketteer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package bucketteer

import (
"sort"

"github.com/cespare/xxhash/v2"
)

var _Magic = [8]byte{'b', 'u', 'c', 'k', 'e', 't', 't', 'e'}

func Magic() [8]byte {
return _Magic
}

const Version = uint64(1)

func sortWithCompare[T any](a []T, compare func(i, j int) int) {
sort.Slice(a, func(i, j int) bool {
return compare(i, j) < 0
})
sorted := make([]T, len(a))
eytzinger(a, sorted, 0, 1)
copy(a, sorted)
}

func eytzinger[T any](in, out []T, i, k int) int {
if k <= len(in) {
i = eytzinger(in, out, i, 2*k)
out[k-1] = in[i]
i++
i = eytzinger(in, out, i, 2*k+1)
}
return i
}

func Hash(sig [64]byte) uint64 {
return xxhash.Sum64(sig[:])
}
Loading

0 comments on commit a7d008f

Please sign in to comment.