From 93e7559ec0a1cf8900098f9c23bd1d48d68be1ca Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 19 Sep 2024 14:36:50 +0100 Subject: [PATCH] return cids for gsfa --- .gitignore | 2 + gsfa/gsfa-read-multiepoch.go | 112 +++++++++++++++++ multiepoch-getSignaturesForAddressCids.go | 146 ++++++++++++++++++++++ multiepoch.go | 2 + 4 files changed, 262 insertions(+) create mode 100644 multiepoch-getSignaturesForAddressCids.go diff --git a/.gitignore b/.gitignore index 9bac135a..f8f31572 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ _site /.cargo /target + +.zed diff --git a/gsfa/gsfa-read-multiepoch.go b/gsfa/gsfa-read-multiepoch.go index 6141bdd3..cec31a9c 100644 --- a/gsfa/gsfa-read-multiepoch.go +++ b/gsfa/gsfa-read-multiepoch.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gagliardetto/solana-go" + "github.com/ipfs/go-cid" "github.com/rpcpool/yellowstone-faithful/compactindexsized" "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" @@ -86,6 +87,7 @@ epochLoop: } type EpochToTransactionObjects map[uint64][]*ipldbindcode.Transaction +type EpochToTransactionCids map[uint64][]cid.Cid // Count returns the number of signatures in the EpochToSignatures. func (e EpochToTransactionObjects) Count() int { @@ -96,6 +98,15 @@ func (e EpochToTransactionObjects) Count() int { return count } +// Count returns the number of signatures in the EpochToSignatures. +func (e EpochToTransactionCids) Count() int { + var count int + for _, sigs := range e { + count += len(sigs) + } + return count +} + func (multi *GsfaReaderMultiepoch) GetBeforeUntil( ctx context.Context, pk solana.PublicKey, @@ -200,3 +211,104 @@ epochLoop: } return transactions, nil } + +func (multi *GsfaReaderMultiepoch) GetCids( + ctx context.Context, + pk solana.PublicKey, + limit int, + fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error), +) (EpochToTransactionCids, error) { + if limit <= 0 { + return make(EpochToTransactionCids), nil + } + return multi.iterBeforeUntilCids(ctx, pk, limit, fetcher) +} + +// GetBeforeUntil gets the signatures for the given public key, +// before the given slot. +func (multi *GsfaReaderMultiepoch) iterBeforeUntilCids( + ctx context.Context, + pk solana.PublicKey, + limit int, + fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error), +) (EpochToTransactionCids, error) { + if limit <= 0 { + return make(EpochToTransactionCids), nil + } + + transactions := make(EpochToTransactionCids) + // reachedBefore := false + // if before == nil { + // reachedBefore = true + // } + +epochLoop: + for readerIndex, index := range multi.epochs { + if ctx.Err() != nil { + return nil, ctx.Err() + } + epochNum, ok := index.GetEpoch() + if !ok { + return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex) + } + + locsStartedAt := time.Now() + locs, err := index.offsets.Get(pk) + if err != nil { + if compactindexsized.IsNotFound(err) { + continue epochLoop + } + return nil, fmt.Errorf("error while getting initial offset: %w", err) + } + klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt)) + debugln("locs.OffsetToFirst:", locs) + + next := locs // Start from the latest, and go back in time. + + for { + if next == nil || next.IsZero() { // no previous. + continue epochLoop + } + if limit > 0 && transactions.Count() >= limit { + break epochLoop + } + startedReadAt := time.Now() + locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size) + if err != nil { + return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err) + } + klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations)) + if len(locations) == 0 { + continue epochLoop + } + debugln("sigIndexes:", locations, "newNext:", newNext) + next = &newNext + for _, txLoc := range locations { + tx, err := fetcher(epochNum, txLoc) + if err != nil { + return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err) + } + // sig, err := tx.Signature() + // if err != nil { + // return nil, fmt.Errorf("error while getting signature: %w", err) + // } + // klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum) + // if !reachedBefore && sig == *before { + // reachedBefore = true + // continue + // } + // if !reachedBefore { + // continue + // } + if limit > 0 && transactions.Count() >= limit { + break epochLoop + } + transactions[epochNum] = append(transactions[epochNum], tx) + // if until != nil && sig == *until { + // break epochLoop + // } + } + } + } + return transactions, nil +} diff --git a/multiepoch-getSignaturesForAddressCids.go b/multiepoch-getSignaturesForAddressCids.go new file mode 100644 index 00000000..bd937146 --- /dev/null +++ b/multiepoch-getSignaturesForAddressCids.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "fmt" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" + "github.com/rpcpool/yellowstone-faithful/gsfa" + "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" + "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/sourcegraph/jsonrpc2" +) + +func countTransactionCids(v gsfa.EpochToTransactionCids) int { + var count int + for _, txs := range v { + count += len(txs) + } + return count +} + +func (multi *MultiEpoch) handleGetSignaturesForAddressCids(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) { + // - parse and validate request + // - get list of epochs (from most recent to oldest) + // - iterate until we find the requested number of signatures + // - expand the signatures with tx data + signaturesOnly := multi.options.GsfaOnlySignatures + + params, err := parseGetSignaturesForAddressParams(req.Params) + if err != nil { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInvalidParams, + Message: "Invalid params", + }, fmt.Errorf("failed to parse params: %v", err) + } + pk := params.Address + limit := params.Limit + + gsfaIndexes, _ := multi.getGsfaReadersInEpochDescendingOrder() + if len(gsfaIndexes) == 0 { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "getSignaturesForAddress method is not enabled", + }, fmt.Errorf("no gsfa indexes found") + } + + gsfaMulti, err := gsfa.NewGsfaReaderMultiepoch(gsfaIndexes) + if err != nil { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Internal error", + }, fmt.Errorf("failed to create gsfa multiepoch reader: %w", err) + } + + // Get the transactions: + foundTransactions, err := gsfaMulti.GetCids( + ctx, + pk, + limit, + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (cid.Cid, error) { + epoch, err := multi.GetEpoch(epochNum) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) + } + raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{ + Offset: oas.Offset, + Size: oas.Size, + }) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to get signature: %w", err) + } + cb := cid.V1Builder{MhLength: -1, MhType: uint64(multicodec.Sha2_256), Codec: uint64(multicodec.DagCbor)} + c, err := cb.Sum(raw) + if err != nil { + return cid.Cid{}, fmt.Errorf("failed to get cid: %w", err) + } + return c, nil + }, + ) + if err != nil { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Internal error", + }, fmt.Errorf("failed to get signatures: %w", err) + } + + if len(foundTransactions) == 0 { + err = conn.ReplyRaw( + ctx, + req.ID, + []map[string]any{}, + ) + if err != nil { + return nil, fmt.Errorf("failed to reply: %w", err) + } + return nil, nil + } + + // The response is an array of objects: [{sigCid: string}] + response := make([]map[string]any, countTransactionCids(foundTransactions)) + numBefore := 0 + for ei := range foundTransactions { + epoch := ei + if err != nil { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Internal error", + }, fmt.Errorf("failed to get epoch %d: %w", epoch, err) + } + + sigs := foundTransactions[ei] + for i := range sigs { + ii := numBefore + i + c := sigs[i] + err := func() error { + response[ii] = map[string]any{ + "sigCid": c.String(), + } + if signaturesOnly { + return nil + } + + return nil + }() + if err != nil { + return &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Internal error", + }, fmt.Errorf("failed to get tx data: %w", err) + } + } + numBefore += len(sigs) + } + // reply with the data + err = conn.ReplyRaw( + ctx, + req.ID, + response, + ) + if err != nil { + return nil, fmt.Errorf("failed to reply: %w", err) + } + + return nil, nil +} diff --git a/multiepoch.go b/multiepoch.go index 344a92da..0f38a399 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -538,6 +538,8 @@ func (ser *MultiEpoch) handleRequest(ctx context.Context, conn *requestContext, return ser.handleGetFirstAvailableBlock(ctx, conn, req) case "getSlot": return ser.handleGetSlot(ctx, conn, req) + case "getSignaturesForAddressCids": + return ser.handleGetSignaturesForAddressCids(ctx, conn, req) default: return &jsonrpc2.Error{ Code: jsonrpc2.CodeMethodNotFound,