Skip to content

Commit

Permalink
return cids for gsfa
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Sep 19, 2024
1 parent 4910c9a commit 93e7559
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
_site
/.cargo
/target

.zed
112 changes: 112 additions & 0 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
Expand Down Expand Up @@ -86,6 +87,7 @@ epochLoop:
}

type EpochToTransactionObjects map[uint64][]*ipldbindcode.Transaction
type EpochToTransactionCids map[uint64][]cid.Cid

// Count returns the number of signatures in the EpochToSignatures.
func (e EpochToTransactionObjects) Count() int {
Expand All @@ -96,6 +98,15 @@ func (e EpochToTransactionObjects) Count() int {
return count
}

// Count returns the number of signatures in the EpochToSignatures.
func (e EpochToTransactionCids) Count() int {
var count int
for _, sigs := range e {
count += len(sigs)
}
return count
}

func (multi *GsfaReaderMultiepoch) GetBeforeUntil(
ctx context.Context,
pk solana.PublicKey,
Expand Down Expand Up @@ -200,3 +211,104 @@ epochLoop:
}
return transactions, nil
}

func (multi *GsfaReaderMultiepoch) GetCids(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error),
) (EpochToTransactionCids, error) {
if limit <= 0 {
return make(EpochToTransactionCids), nil
}
return multi.iterBeforeUntilCids(ctx, pk, limit, fetcher)
}

// GetBeforeUntil gets the signatures for the given public key,
// before the given slot.
func (multi *GsfaReaderMultiepoch) iterBeforeUntilCids(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error),
) (EpochToTransactionCids, error) {
if limit <= 0 {
return make(EpochToTransactionCids), nil
}

transactions := make(EpochToTransactionCids)
// reachedBefore := false
// if before == nil {
// reachedBefore = true
// }

epochLoop:
for readerIndex, index := range multi.epochs {
if ctx.Err() != nil {
return nil, ctx.Err()
}
epochNum, ok := index.GetEpoch()
if !ok {
return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex)
}

locsStartedAt := time.Now()
locs, err := index.offsets.Get(pk)
if err != nil {
if compactindexsized.IsNotFound(err) {
continue epochLoop
}
return nil, fmt.Errorf("error while getting initial offset: %w", err)
}
klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt))
debugln("locs.OffsetToFirst:", locs)

next := locs // Start from the latest, and go back in time.

for {
if next == nil || next.IsZero() { // no previous.
continue epochLoop
}
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
startedReadAt := time.Now()
locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size)
if err != nil {
return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err)
}
klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations))
if len(locations) == 0 {
continue epochLoop
}
debugln("sigIndexes:", locations, "newNext:", newNext)
next = &newNext
for _, txLoc := range locations {
tx, err := fetcher(epochNum, txLoc)
if err != nil {
return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err)
}
// sig, err := tx.Signature()
// if err != nil {
// return nil, fmt.Errorf("error while getting signature: %w", err)
// }
// klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum)
// if !reachedBefore && sig == *before {
// reachedBefore = true
// continue
// }
// if !reachedBefore {
// continue
// }
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
transactions[epochNum] = append(transactions[epochNum], tx)
// if until != nil && sig == *until {
// break epochLoop
// }
}
}
}
return transactions, nil
}
146 changes: 146 additions & 0 deletions multiepoch-getSignaturesForAddressCids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"context"
"fmt"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/sourcegraph/jsonrpc2"
)

func countTransactionCids(v gsfa.EpochToTransactionCids) int {
var count int
for _, txs := range v {
count += len(txs)
}
return count
}

func (multi *MultiEpoch) handleGetSignaturesForAddressCids(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) {
// - parse and validate request
// - get list of epochs (from most recent to oldest)
// - iterate until we find the requested number of signatures
// - expand the signatures with tx data
signaturesOnly := multi.options.GsfaOnlySignatures

params, err := parseGetSignaturesForAddressParams(req.Params)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidParams,
Message: "Invalid params",
}, fmt.Errorf("failed to parse params: %v", err)
}
pk := params.Address
limit := params.Limit

gsfaIndexes, _ := multi.getGsfaReadersInEpochDescendingOrder()
if len(gsfaIndexes) == 0 {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "getSignaturesForAddress method is not enabled",
}, fmt.Errorf("no gsfa indexes found")
}

gsfaMulti, err := gsfa.NewGsfaReaderMultiepoch(gsfaIndexes)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to create gsfa multiepoch reader: %w", err)
}

// Get the transactions:
foundTransactions, err := gsfaMulti.GetCids(
ctx,
pk,
limit,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get signature: %w", err)
}
cb := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)}
c, err := cb.Sum(raw)
if err != nil {
return cid.Cid{}, fmt.Errorf("failed to get cid: %w", err)
}
return c, nil
},
)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get signatures: %w", err)
}

if len(foundTransactions) == 0 {
err = conn.ReplyRaw(
ctx,
req.ID,
[]map[string]any{},
)
if err != nil {
return nil, fmt.Errorf("failed to reply: %w", err)
}
return nil, nil
}

// The response is an array of objects: [{sigCid: string}]
response := make([]map[string]any, countTransactionCids(foundTransactions))
numBefore := 0
for ei := range foundTransactions {
epoch := ei
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get epoch %d: %w", epoch, err)
}

sigs := foundTransactions[ei]
for i := range sigs {
ii := numBefore + i
c := sigs[i]
err := func() error {
response[ii] = map[string]any{
"sigCid": c.String(),
}
if signaturesOnly {
return nil
}

return nil
}()
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to get tx data: %w", err)
}
}
numBefore += len(sigs)
}
// reply with the data
err = conn.ReplyRaw(
ctx,
req.ID,
response,
)
if err != nil {
return nil, fmt.Errorf("failed to reply: %w", err)
}

return nil, nil
}
2 changes: 2 additions & 0 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ func (ser *MultiEpoch) handleRequest(ctx context.Context, conn *requestContext,
return ser.handleGetFirstAvailableBlock(ctx, conn, req)
case "getSlot":
return ser.handleGetSlot(ctx, conn, req)
case "getSignaturesForAddressCids":
return ser.handleGetSignaturesForAddressCids(ctx, conn, req)
default:
return &jsonrpc2.Error{
Code: jsonrpc2.CodeMethodNotFound,
Expand Down

0 comments on commit 93e7559

Please sign in to comment.