From a78c1c3acced8d78df55d1a96653b4a6054a5d27 Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Wed, 14 Feb 2024 14:35:14 +0100 Subject: [PATCH] Speed up epoch loading, and cleanup log verbosity (#85) * Speed up split epoch loading * Fix miner info printing * Use klog verbosity levels; max is 5 * Cleanup TODOs * Cleanup logs; remove sigexists index warmup * Add log levels to README * Fix log defaults --- README.md | 10 + car-dag-traverser.go | 3 +- cmd-check-deals.go | 18 +- cmd-rpc.go | 263 +++++++++++++++----------- epoch.go | 127 +++++-------- flags.go | 8 +- go.mod | 2 +- go.sum | 4 +- http-range.go | 6 +- index-cid-to-offset.go | 2 +- index-slot-to-cid.go | 2 +- klog.go | 167 ++++++++++++++++ main.go | 9 +- multiepoch-getBlock.go | 26 ++- multiepoch-getFirstAvailableBlock.go | 2 +- multiepoch-getSignaturesForAddress.go | 1 - multiepoch-getTransaction.go | 6 +- multiepoch.go | 25 ++- range-cache.go | 27 +-- request-response.go | 2 +- split-car-fetcher/fetcher.go | 31 ++- split-car-fetcher/miner-info.go | 10 +- storage.go | 5 +- tools.go | 6 +- 24 files changed, 483 insertions(+), 279 deletions(-) create mode 100644 klog.go diff --git a/README.md b/README.md index 31d01885..e5e4e67c 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,16 @@ The RPC server provides a proxy mode which allows it to forward traffic it can't The `proxyFailedRequests` flag will make the RPC server proxy not only RPC methods that it doesn't support, but also retry requests that failed to be served from the archives (e.g. a `getBlock` request that failed to be served from the archives because that epoch is not available). +### Log Levels + +You can set the desired log verbosity level by using the `-v` flag. The levels are from 0 to 5, where 0 is the least verbose and 5 is the most verbose. The default level is 2. + +Example: + +```bash +faithful-cli rpc -v=5 455.yml +``` + ### RPC server from old-faithful.net We are hosting data on old-faithful.net for testing and cloning purposes. This allows you to run a sample test server without downloading any data. You can run a fully remote server like this: diff --git a/car-dag-traverser.go b/car-dag-traverser.go index 951d0dcb..c6f98d1b 100644 --- a/car-dag-traverser.go +++ b/car-dag-traverser.go @@ -310,8 +310,7 @@ func FindSubsets( } decoded, err := iplddecoders.DecodeSubset(block.RawData()) if err != nil { - // TODO: log error, or return error? - continue + return fmt.Errorf("failed to decode Subset with CID %s: %w", block.Cid(), err) } err = callback(block.Cid(), decoded) if err != nil { diff --git a/cmd-check-deals.go b/cmd-check-deals.go index 3f9f06d6..276a0983 100644 --- a/cmd-check-deals.go +++ b/cmd-check-deals.go @@ -116,6 +116,14 @@ func newCmd_check_deals() *cli.Command { klog.Infof("Provider allowlist: ") } + lotusAPIAddress := "https://api.node.glif.io" + cl := jsonrpc.NewClient(lotusAPIAddress) + dm := splitcarfetcher.NewMinerInfo( + cl, + 24*time.Hour, + 5*time.Second, + ) + // Check deals: for _, config := range configs { epoch := *config.Epoch @@ -134,21 +142,13 @@ func newCmd_check_deals() *cli.Command { return fmt.Errorf("failed to read deals: %w", err) } - lotusAPIAddress := "https://api.node.glif.io" - cl := jsonrpc.NewClient(lotusAPIAddress) - dm := splitcarfetcher.NewMinerInfo( - cl, - 5*time.Minute, - 5*time.Second, - ) - err = checkAllPieces( c.Context, epoch, metadata, dealRegistry, providerAllowlist, - &dm, + dm, ) if err != nil { return fmt.Errorf( diff --git a/cmd-rpc.go b/cmd-rpc.go index 4add3088..4e269513 100644 --- a/cmd-rpc.go +++ b/cmd-rpc.go @@ -12,11 +12,12 @@ import ( "time" "github.com/allegro/bigcache/v3" - "github.com/davecgh/go-spew/spew" "github.com/fsnotify/fsnotify" hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache" + splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher" "github.com/ryanuber/go-glob" "github.com/urfave/cli/v2" + "github.com/ybbus/jsonrpc/v3" "golang.org/x/sync/errgroup" "k8s.io/klog/v2" ) @@ -52,12 +53,6 @@ func newCmd_rpc() *cli.Command { Value: false, Destination: &gsfaOnlySignatures, }, - &cli.BoolFlag{ - Name: "debug", - Usage: "Enable debug logging", - Value: false, - Destination: &DebugMode, - }, &cli.StringSliceFlag{ Name: "include", Usage: "Include files or dirs matching the given glob patterns", @@ -113,7 +108,7 @@ func newCmd_rpc() *cli.Command { } klog.Infof("Found %d config files:", len(configFiles)) for _, configFile := range configFiles { - fmt.Printf(" - %s\n", configFile) + klog.V(3).Infof(" - %s", configFile) } conf := bigcache.DefaultConfig(5 * time.Minute) @@ -140,6 +135,16 @@ func newCmd_rpc() *cli.Command { klog.Infof("Loaded %d epoch configs", len(configs)) klog.Info("Initializing epochs...") + startedInitiatingEpochsAt := time.Now() + + lotusAPIAddress := "https://api.node.glif.io" + cl := jsonrpc.NewClient(lotusAPIAddress) + minerInfo := splitcarfetcher.NewMinerInfo( + cl, + 24*time.Hour, + 5*time.Second, + ) + epochs := make([]*Epoch, 0) wg := new(errgroup.Group) wg.SetLimit(epochLoadConcurrency) @@ -147,7 +152,12 @@ func newCmd_rpc() *cli.Command { for confIndex := range configs { config := configs[confIndex] wg.Go(func() error { - epoch, err := NewEpochFromConfig(config, c, allCache) + epoch, err := NewEpochFromConfig( + config, + c, + allCache, + minerInfo, + ) if err != nil { return fmt.Errorf("failed to create epoch from config %q: %s", config.ConfigFilepath(), err.Error()) } @@ -181,6 +191,8 @@ func newCmd_rpc() *cli.Command { return cli.Exit(fmt.Sprintf("failed to add epoch %d: %s", epoch.Epoch(), err.Error()), 1) } } + tookInitializingEpochs := time.Since(startedInitiatingEpochsAt) + klog.Infof("Initialized %d epochs in %s", len(epochs), tookInitializingEpochs) if watch { dirs, err := GetListOfDirectories( @@ -192,109 +204,93 @@ func newCmd_rpc() *cli.Command { return cli.Exit(err.Error(), 1) } klog.Infof("Found %d directories; will start watching them for changes ...", len(dirs)) - spew.Dump(dirs) + for _, dir := range dirs { + klog.V(3).Infof(" - %s", dir) + } ctx, cancel := context.WithCancel(c.Context) defer cancel() - // create a map that tracks files that are already being processed because of an event: - // this is to avoid processing the same file multiple times - // (e.g. if a file is create and then modified, we don't want to process it twice) - fileProcessingTracker := make(map[string]struct{}) - mu := &sync.Mutex{} - - err = onFileChanged(ctx, dirs, func(event fsnotify.Event) { - if !isJSONFile(event.Name) && !isYAMLFile(event.Name) { - klog.Infof("File %q is not a JSON or YAML file; do nothing", event.Name) - return - } - klog.Infof("File event: name=%q, op=%q", event.Name, event.Op) + err = onFileChanged( + ctx, + epochLoadConcurrency, + dirs, + func(event fsnotify.Event) { + if !isJSONFile(event.Name) && !isYAMLFile(event.Name) { + klog.V(3).Infof("File %q is not a JSON or YAML file; do nothing", event.Name) + return + } + klog.V(3).Infof("File event: name=%q, op=%q", event.Name, event.Op) - if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) { - klog.Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name) - return - } - // register the file as being processed - mu.Lock() - _, ok := fileProcessingTracker[event.Name] - if ok { - klog.Infof("File %q is already being processed; do nothing", event.Name) - mu.Unlock() - return - } - fileProcessingTracker[event.Name] = struct{}{} - mu.Unlock() - // remove the file from the tracker when we're done processing it - defer func() { - mu.Lock() - delete(fileProcessingTracker, event.Name) - mu.Unlock() - }() - - switch event.Op { - case fsnotify.Write: - { - startedAt := time.Now() - klog.Infof("File %q was modified; processing...", event.Name) - // find the config file, load it, and update the epoch (replace) - config, err := LoadConfig(event.Name) - if err != nil { - klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) - return - } - epoch, err := NewEpochFromConfig(config, c, allCache) - if err != nil { - klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) - return - } - err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch) - if err != nil { - klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error()) - return - } - klog.Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) + if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) { + klog.V(3).Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name) + return } - case fsnotify.Create: - { - startedAt := time.Now() - klog.Infof("File %q was created; processing...", event.Name) - // find the config file, load it, and add it to the multi-epoch (if not already added) - config, err := LoadConfig(event.Name) - if err != nil { - klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) - return - } - epoch, err := NewEpochFromConfig(config, c, allCache) - if err != nil { - klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) - return + + switch event.Op { + case fsnotify.Write: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was modified; processing...", event.Name) + // find the config file, load it, and update the epoch (replace) + config, err := LoadConfig(event.Name) + if err != nil { + klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) + return + } + epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) + if err != nil { + klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) + return + } + err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch) + if err != nil { + klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error()) + return + } + klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt)) } - err = multi.AddEpoch(epoch.Epoch(), epoch) - if err != nil { - klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error()) - return + case fsnotify.Create: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was created; processing...", event.Name) + // find the config file, load it, and add it to the multi-epoch (if not already added) + config, err := LoadConfig(event.Name) + if err != nil { + klog.Errorf("error loading config file %q: %s", event.Name, err.Error()) + return + } + epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo) + if err != nil { + klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error()) + return + } + err = multi.AddEpoch(epoch.Epoch(), epoch) + if err != nil { + klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error()) + return + } + klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) } - klog.Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt)) - } - case fsnotify.Remove: - { - startedAt := time.Now() - klog.Infof("File %q was removed; processing...", event.Name) - // find the epoch that corresponds to this file, and remove it (if any) - epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name) - if err != nil { - klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) + case fsnotify.Remove: + { + startedAt := time.Now() + klog.V(3).Infof("File %q was removed; processing...", event.Name) + // find the epoch that corresponds to this file, and remove it (if any) + epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name) + if err != nil { + klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error()) + } + klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) } - klog.Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt)) + case fsnotify.Rename: + klog.V(3).Infof("File %q was renamed; do nothing", event.Name) + case fsnotify.Chmod: + klog.V(3).Infof("File %q had its permissions changed; do nothing", event.Name) + default: + klog.V(3).Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op) } - case fsnotify.Rename: - klog.Infof("File %q was renamed; do nothing", event.Name) - case fsnotify.Chmod: - klog.Infof("File %q had its permissions changed; do nothing", event.Name) - default: - klog.Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op) - } - }) + }) if err != nil { return cli.Exit(err.Error(), 1) } @@ -316,14 +312,49 @@ func newCmd_rpc() *cli.Command { } } -// TODO: -// - [ ] get the list of provided arguments, and distinguish between files and directories -// - [ ] load all the config files, etc. -// - [ ] start a goroutine that monitors the config files for changes -// - [ ] when a config file changes, reload it and update the epoch -// - [ ] start a goroutine that monitors the directories and subdirectories for changes (new files, deleted files, etc.) +// create a map that tracks files that are already being processed because of an event: +// this is to avoid processing the same file multiple times +// (e.g. if a file is create and then modified, we don't want to process it twice) +type fileProcessingTracker struct { + mu sync.Mutex + m map[string]struct{} +} + +func newFileProcessingTracker() *fileProcessingTracker { + return &fileProcessingTracker{ + m: make(map[string]struct{}), + } +} + +func (f *fileProcessingTracker) isBeingProcessedOrAdd(filename string) bool { + f.mu.Lock() + defer f.mu.Unlock() + _, ok := f.m[filename] + if !ok { + f.m[filename] = struct{}{} + } + // if ok is true, then the file is already being processed + return ok +} + +func (f *fileProcessingTracker) removeFromList(filename string) { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.m, filename) +} + +// - get the list of provided arguments, and distinguish between files and directories +// - load all the config files, etc. +// - start a goroutine that monitors the config files for changes +// - when a config file changes, reload it and update the epoch +// - start a goroutine that monitors the directories and subdirectories for changes (new files, deleted files, etc.) // - is only watching directories sufficient? or do we need to watch files too? -func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Event)) error { +func onFileChanged( + ctx context.Context, + epochLoadConcurrency int, + dirs []string, + callback func(fsnotify.Event), +) error { // monitor a directory for file changes watcher, err := fsnotify.NewWatcher() if err != nil { @@ -341,6 +372,10 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev // start a goroutine to handle events go func() { defer watcher.Close() + tracker := newFileProcessingTracker() + wg := new(errgroup.Group) + wg.SetLimit(epochLoadConcurrency) + defer wg.Wait() for { select { case <-ctx.Done(): @@ -349,7 +384,15 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev if !ok { return } - callback(event) + wg.Go(func() error { + if tracker.isBeingProcessedOrAdd(event.Name) { + klog.V(3).Infof("File %q is already being processed; do nothing", event.Name) + return nil + } + defer tracker.removeFromList(event.Name) + callback(event) + return nil + }) case err, ok := <-watcher.Errors: if !ok { return diff --git a/epoch.go b/epoch.go index 7f7c4c3f..861d20af 100644 --- a/epoch.go +++ b/epoch.go @@ -12,7 +12,6 @@ import ( "time" "github.com/multiformats/go-multiaddr" - "github.com/ybbus/jsonrpc/v3" "github.com/anjor/carlet" "github.com/davecgh/go-spew/spew" @@ -95,6 +94,7 @@ func NewEpochFromConfig( config *Config, c *cli.Context, allCache *hugecache.Cache, + minerInfo *splitcarfetcher.MinerInfoCache, ) (*Epoch, error) { if config == nil { return nil, fmt.Errorf("config must not be nil") @@ -129,7 +129,6 @@ func NewEpochFromConfig( cidToOffsetIndexFile, err := openIndexStorage( c.Context, string(config.Indexes.CidToOffset.URI), - DebugMode, ) if err != nil { return nil, fmt.Errorf("failed to open cid-to-offset index file: %w", err) @@ -149,7 +148,6 @@ func NewEpochFromConfig( cidToOffsetAndSizeIndexFile, err := openIndexStorage( c.Context, string(config.Indexes.CidToOffsetAndSize.URI), - DebugMode, ) if err != nil { return nil, fmt.Errorf("failed to open cid-to-offset index file: %w", err) @@ -176,7 +174,6 @@ func NewEpochFromConfig( slotToCidIndexFile, err := openIndexStorage( c.Context, string(config.Indexes.SlotToCid.URI), - DebugMode, ) if err != nil { return nil, fmt.Errorf("failed to open slot-to-cid index file: %w", err) @@ -207,7 +204,6 @@ func NewEpochFromConfig( sigToCidIndexFile, err := openIndexStorage( c.Context, string(config.Indexes.SigToCid.URI), - DebugMode, ) if err != nil { return nil, fmt.Errorf("failed to open sig-to-cid index file: %w", err) @@ -298,14 +294,6 @@ func NewEpochFromConfig( return nil, fmt.Errorf("failed to read deals: %w", err) } - lotusAPIAddress := "https://api.node.glif.io" - cl := jsonrpc.NewClient(lotusAPIAddress) - dm := splitcarfetcher.NewMinerInfo( - cl, - 5*time.Minute, - 5*time.Second, - ) - scr, err := splitcarfetcher.NewSplitCarReader( metadata.CarPieces, func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) { @@ -313,15 +301,15 @@ func NewEpochFromConfig( if !ok { return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP) } - klog.Infof("piece CID %s is stored on miner %s", piece.CommP, minerID) - minerInfo, err := dm.GetProviderInfo(c.Context, minerID) + klog.V(3).Infof("piece CID %s is stored on miner %s", piece.CommP, minerID) + minerInfo, err := minerInfo.GetProviderInfo(c.Context, minerID) if err != nil { return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err) } if len(minerInfo.Multiaddrs) == 0 { return nil, fmt.Errorf("miner %s has no multiaddrs", minerID) } - spew.Dump(minerInfo) + klog.V(3).Infof("miner info: %s", spew.Sdump(minerInfo)) // extract the IP address from the multiaddr: split := multiaddr.Split(minerInfo.Multiaddrs[0]) if len(split) < 2 { @@ -342,7 +330,7 @@ func NewEpochFromConfig( return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0]) } minerIP := fmt.Sprintf("%s:%s", ip, port) - klog.Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP) + klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP) formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String()) return splitcarfetcher.NewRemoteFileSplitCarReader( piece.CommP.String(), @@ -424,7 +412,6 @@ func NewEpochFromConfig( sigExistsFile, err := openIndexStorage( c.Context, string(config.Indexes.SigExists.URI), - DebugMode, ) if err != nil { return nil, fmt.Errorf("failed to open sig-exists index file: %w", err) @@ -438,12 +425,12 @@ func NewEpochFromConfig( } ep.onClose = append(ep.onClose, sigExists.Close) - { - // warm up the cache - for i := 0; i < 100_000; i++ { - sigExists.Has(newRandomSignature()) - } - } + // { + // // warm up the cache + // for i := 0; i < 10; i++ { + // sigExists.Has(newRandomSignature()) + // } + // } ep.sigExists = sigExists } else { @@ -453,12 +440,12 @@ func NewEpochFromConfig( } ep.onClose = append(ep.onClose, sigExists.Close) - { - // warm up the cache - for i := 0; i < 100_000; i++ { - sigExists.Has(newRandomSignature()) - } - } + // { + // // warm up the cache + // for i := 0; i < 10; i++ { + // sigExists.Has(newRandomSignature()) + // } + // } ep.sigExists = sigExists @@ -592,8 +579,7 @@ func (s *Epoch) prefetchSubgraph(ctx context.Context, wantedCid cid.Cid) error { return nil }) } - klog.Errorf("failed to get subgraph from lassie: %v", err) - return err + return fmt.Errorf("failed to get subgraph from lassie for CID %s: %w", wantedCid, err) } return nil } @@ -617,15 +603,13 @@ func (s *Epoch) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte, er s.GetCache().PutRawCarObject(wantedCid, data) return data, nil } - klog.Errorf("failed to get node from lassie: %v", err) - return nil, err + return nil, fmt.Errorf("failed to get node from lassie for CID %s: %w", wantedCid, err) } // Find CAR file oas for CID in index. oas, err := s.FindOffsetAndSizeFromCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find offset for CID %s: %v", wantedCid, err) // not found or error - return nil, err + return nil, fmt.Errorf("failed to find offset for CID %s: %w", wantedCid, err) } return s.GetNodeByOffsetAndSize(ctx, wantedCid, oas) } @@ -641,15 +625,13 @@ func (s *Epoch) ReadAtFromCar(ctx context.Context, offset uint64, length uint64) // Get reader and seek to offset, then read node. dr, err := s.localCarReader.DataReader() if err != nil { - klog.Errorf("failed to get data reader: %v", err) - return nil, err + return nil, fmt.Errorf("failed to get data reader: %w", err) } dr.Seek(int64(offset), io.SeekStart) data := make([]byte, length) _, err = io.ReadFull(dr, data) if err != nil { - klog.Errorf("failed to read node: %v", err) - return nil, err + return nil, fmt.Errorf("failed to read node from CAR: %w", err) } return data, nil } @@ -673,8 +655,7 @@ func (s *Epoch) GetNodeByOffsetAndSize(ctx context.Context, wantedCid cid.Cid, o // Get reader and seek to offset, then read node. dr, err := s.localCarReader.DataReader() if err != nil { - klog.Errorf("failed to get data reader: %v", err) - return nil, err + return nil, fmt.Errorf("failed to get local CAR data reader: %w", err) } dr.Seek(int64(offset), io.SeekStart) br := bufio.NewReader(dr) @@ -693,8 +674,7 @@ func (s *Epoch) getNodeSize(ctx context.Context, offset uint64) (uint64, error) // Get reader and seek to offset, then read node. dr, err := s.localCarReader.DataReader() if err != nil { - klog.Errorf("failed to get data reader: %v", err) - return 0, err + return 0, fmt.Errorf("failed to get local CAR data reader: %w", err) } return readNodeSizeFromReaderAtWithOffset(dr, offset) } @@ -719,8 +699,7 @@ func readNodeWithKnownSize(br *bufio.Reader, wantedCid cid.Cid, length uint64) ( section := make([]byte, length) _, err := io.ReadFull(br, section) if err != nil { - klog.Errorf("failed to read section: %v", err) - return nil, err + return nil, fmt.Errorf("failed to read section from CAR with length %d: %w", length, err) } return parseNodeFromSection(section, wantedCid) } @@ -741,7 +720,6 @@ func parseNodeFromSection(section []byte, wantedCid cid.Cid) ([]byte, error) { } // verify that the CID we read matches the one we expected. if !gotCid.Equals(wantedCid) { - klog.Errorf("CID mismatch: expected %s, got %s", wantedCid, gotCid) return nil, fmt.Errorf("CID mismatch: expected %s, got %s", wantedCid, gotCid) } return data[cidLen:], nil @@ -750,7 +728,7 @@ func parseNodeFromSection(section []byte, wantedCid cid.Cid) ([]byte, error) { func (ser *Epoch) FindCidFromSlot(ctx context.Context, slot uint64) (o cid.Cid, e error) { startedAt := time.Now() defer func() { - klog.Infof("Found CID for slot %d in %s: %s", slot, time.Since(startedAt), o) + klog.V(4).Infof("Found CID for slot %d in %s: %s", slot, time.Since(startedAt), o) }() // try from cache @@ -770,7 +748,7 @@ func (ser *Epoch) FindCidFromSlot(ctx context.Context, slot uint64) (o cid.Cid, func (ser *Epoch) FindCidFromSignature(ctx context.Context, sig solana.Signature) (o cid.Cid, e error) { startedAt := time.Now() defer func() { - klog.Infof("Found CID for signature %s in %s: %s", sig, time.Since(startedAt), o) + klog.V(4).Infof("Found CID for signature %s in %s: %s", sig, time.Since(startedAt), o) }() return ser.sigToCidIndex.Get(sig) } @@ -779,9 +757,9 @@ func (ser *Epoch) FindOffsetAndSizeFromCid(ctx context.Context, cid cid.Cid) (os startedAt := time.Now() defer func() { if os != nil { - klog.Infof("Found offset and size for CID %s in %s: o=%d s=%d", cid, time.Since(startedAt), os.Offset, os.Size) + klog.V(4).Infof("Found offset and size for CID %s in %s: o=%d s=%d", cid, time.Since(startedAt), os.Offset, os.Size) } else { - klog.Infof("Offset and size for CID %s in %s: not found", cid, time.Since(startedAt)) + klog.V(4).Infof("Offset and size for CID %s in %s: not found", cid, time.Since(startedAt)) } }() @@ -798,14 +776,14 @@ func (ser *Epoch) FindOffsetAndSizeFromCid(ctx context.Context, cid cid.Cid) (os return nil, err } - klog.Infof("Found offset for CID %s in %s: %d", cid, time.Since(startedAt), offset) + klog.V(4).Infof("Found offset for CID %s in %s: %d", cid, time.Since(startedAt), offset) size, err := ser.getNodeSize(ctx, offset) if err != nil { return nil, err } - klog.Infof("Found size for CID %s in %s: %d", cid, time.Since(startedAt), size) + klog.V(4).Infof("Found size for CID %s in %s: %d", cid, time.Since(startedAt), size) found := &indexes.OffsetAndSize{ Offset: offset, @@ -819,7 +797,6 @@ func (ser *Epoch) FindOffsetAndSizeFromCid(ctx context.Context, cid cid.Cid) (os if err != nil { return nil, err } - // TODO: use also the size. ser.GetCache().PutCidToOffsetAndSize(cid, found) return found, nil } @@ -828,8 +805,7 @@ func (ser *Epoch) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.Bloc // get the slot by slot number wantedCid, err := ser.FindCidFromSlot(ctx, slot) if err != nil { - klog.Errorf("failed to find CID for slot %d: %v", slot, err) - return nil, err + return nil, fmt.Errorf("failed to find CID for slot %d: %w", slot, err) } { doPrefetch := getValueFromContext(ctx, "prefetch") @@ -841,14 +817,12 @@ func (ser *Epoch) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.Bloc // get the block by CID data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to get node by cid %s: %w", wantedCid, err) } // try parsing the data as a Block node. decoded, err := iplddecoders.DecodeBlock(data) if err != nil { - klog.Errorf("failed to decode block: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode block with CID %s: %w", wantedCid, err) } return decoded, nil } @@ -856,14 +830,12 @@ func (ser *Epoch) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.Bloc func (ser *Epoch) GetEntryByCid(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.Entry, error) { data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to find node by cid %s: %w", wantedCid, err) } // try parsing the data as an Entry node. decoded, err := iplddecoders.DecodeEntry(data) if err != nil { - klog.Errorf("failed to decode entry: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode entry with CID %s: %w", wantedCid, err) } return decoded, nil } @@ -871,14 +843,12 @@ func (ser *Epoch) GetEntryByCid(ctx context.Context, wantedCid cid.Cid) (*ipldbi func (ser *Epoch) GetTransactionByCid(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.Transaction, error) { data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to find node by cid %s: %w", wantedCid, err) } // try parsing the data as a Transaction node. decoded, err := iplddecoders.DecodeTransaction(data) if err != nil { - klog.Errorf("failed to decode transaction: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode transaction with CID %s: %w", wantedCid, err) } return decoded, nil } @@ -886,14 +856,12 @@ func (ser *Epoch) GetTransactionByCid(ctx context.Context, wantedCid cid.Cid) (* func (ser *Epoch) GetDataFrameByCid(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) { data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to find node by cid %s: %w", wantedCid, err) } // try parsing the data as a DataFrame node. decoded, err := iplddecoders.DecodeDataFrame(data) if err != nil { - klog.Errorf("failed to decode data frame: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode data frame with CID %s: %w", wantedCid, err) } return decoded, nil } @@ -901,14 +869,12 @@ func (ser *Epoch) GetDataFrameByCid(ctx context.Context, wantedCid cid.Cid) (*ip func (ser *Epoch) GetRewardsByCid(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.Rewards, error) { data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to find node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to find node by cid %s: %w", wantedCid, err) } // try parsing the data as a Rewards node. decoded, err := iplddecoders.DecodeRewards(data) if err != nil { - klog.Errorf("failed to decode rewards: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode rewards with CID %s: %w", wantedCid, err) } return decoded, nil } @@ -917,8 +883,7 @@ func (ser *Epoch) GetTransaction(ctx context.Context, sig solana.Signature) (*ip // get the CID by signature wantedCid, err := ser.FindCidFromSignature(ctx, sig) if err != nil { - klog.Errorf("failed to find CID for signature %s: %v", sig, err) - return nil, err + return nil, fmt.Errorf("failed to find CID for signature %s: %w", sig, err) } { doPrefetch := getValueFromContext(ctx, "prefetch") @@ -930,14 +895,12 @@ func (ser *Epoch) GetTransaction(ctx context.Context, sig solana.Signature) (*ip // get the transaction by CID data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { - klog.Errorf("failed to get node by cid: %v", err) - return nil, err + return nil, fmt.Errorf("failed to get node by cid %s: %w", wantedCid, err) } // try parsing the data as a Transaction node. decoded, err := iplddecoders.DecodeTransaction(data) if err != nil { - klog.Errorf("failed to decode transaction: %v", err) - return nil, err + return nil, fmt.Errorf("failed to decode transaction with CID %s: %w", wantedCid, err) } return decoded, nil } diff --git a/flags.go b/flags.go index b1a50355..c0f13861 100644 --- a/flags.go +++ b/flags.go @@ -37,10 +37,10 @@ const ( // FlagVerbose enables verbose mode, which shows info information about // operations invoked in the CLI. var FlagVerbose = &cli.BoolFlag{ - Name: "verbose", - Aliases: []string{"v"}, - Usage: "enable verbose mode for logging", - Action: setLogLevel("INFO"), + Name: "verbose", + // Aliases: []string{"v"}, + Usage: "enable verbose mode for logging", + Action: setLogLevel("INFO"), } // FlagVeryVerbose enables very verbose mode, which shows debug information about diff --git a/go.mod b/go.mod index 1932a9ef..9deac589 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gagliardetto/binary v0.7.8 github.com/gagliardetto/solana-go v1.8.4 github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/hannahhoward/go-pubsub v1.0.0 // indirect github.com/ipfs/go-blockservice v0.5.0 // indirect github.com/ipfs/go-cid v0.4.1 diff --git a/go.sum b/go.sum index 9c1e67d4..5e5443c5 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,8 @@ github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0Z github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= diff --git a/http-range.go b/http-range.go index 099e3a36..b9220353 100644 --- a/http-range.go +++ b/http-range.go @@ -5,12 +5,12 @@ import ( "fmt" "io" "net/http" - "os" "path/filepath" "strings" "time" "github.com/goware/urlx" + "k8s.io/klog/v2" ) type ReaderAtCloser interface { @@ -186,7 +186,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { startedAt := time.Now() defer func() { took := time.Since(startedAt) - if DebugMode { + if klog.V(5).Enabled() { var icon string if r.isRemote { // add internet icon @@ -204,7 +204,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { if strings.HasSuffix(r.name, ".car") { prefix = icon + purpleBG("[READ-CAR]") } - fmt.Fprintf(os.Stderr, prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took) + klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took) } }() return r.rac.ReadAt(p, off) diff --git a/index-cid-to-offset.go b/index-cid-to-offset.go index 776eca95..6ce43cb8 100644 --- a/index-cid-to-offset.go +++ b/index-cid-to-offset.go @@ -80,7 +80,7 @@ func CreateIndex_cid2offset( rootCid, network, tmpDir, - numItems, // TODO: what if the number of real items is less than this? + numItems, ) if err != nil { return "", fmt.Errorf("failed to open index store: %w", err) diff --git a/index-slot-to-cid.go b/index-slot-to-cid.go index 51bb2fc3..1161e134 100644 --- a/index-slot-to-cid.go +++ b/index-slot-to-cid.go @@ -68,7 +68,7 @@ func CreateIndex_slot2cid( rootCid, network, tmpDir, - numItems, // TODO: what if the number of real items is less than this? + numItems, ) if err != nil { return "", fmt.Errorf("failed to open index store: %w", err) diff --git a/klog.go b/klog.go new file mode 100644 index 00000000..ef1cee8e --- /dev/null +++ b/klog.go @@ -0,0 +1,167 @@ +package main + +import ( + "flag" + "fmt" + + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func NewKlogFlagSet() []cli.Flag { + fs := flag.NewFlagSet("klog", flag.PanicOnError) + klog.InitFlags(fs) + + fs.Set("v", "2") + fs.Set("log_file_max_size", "1800") + fs.Set("logtostderr", "true") + + return []cli.Flag{ + // "log_dir", "", "If non-empty, write log files in this directory (no effect when -logtostderr=true)") + &cli.StringFlag{ + Name: "log_dir", + Usage: "If non-empty, write log files in this directory (no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_LOG_DIR"}, + Action: func(cctx *cli.Context, v string) error { + if v != "" { + fs.Set("log_dir", v) + } + return nil + }, + }, + // "log_file", "", "If non-empty, use this log file (no effect when -logtostderr=true)") + &cli.StringFlag{ + Name: "log_file", + Usage: "If non-empty, use this log file (no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_LOG_FILE"}, + Action: func(cctx *cli.Context, v string) error { + if v != "" { + fs.Set("log_file", v) + } + return nil + }, + }, + // "log_file_max_size", 1800, + &cli.Uint64Flag{ + Name: "log_file_max_size", + Usage: "Defines the maximum size a log file can grow to (no effect when -logtostderr=true). Unit is megabytes. If the value is 0, the maximum file size is unlimited.", + EnvVars: []string{"FAITHFUL_LOG_FILE_MAX_SIZE"}, + DefaultText: "1800", + Action: func(cctx *cli.Context, v uint64) error { + fs.Set("log_file_max_size", fmt.Sprint(v)) + return nil + }, + }, + + // "logtostderr", true, "log to standard error instead of files") + &cli.BoolFlag{ + Name: "logtostderr", + Usage: "log to standard error instead of files", + EnvVars: []string{"FAITHFUL_LOGTOSTDERR"}, + DefaultText: "true", + Action: func(cctx *cli.Context, v bool) error { + fs.Set("logtostderr", fmt.Sprint(v)) + return nil + }, + }, + // "alsologtostderr", false, "log to standard error as well as files (no effect when -logtostderr=true)") + &cli.BoolFlag{ + Name: "alsologtostderr", + Usage: "log to standard error as well as files (no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_ALSOLOGTOSTDERR"}, + DefaultText: "false", + Action: func(cctx *cli.Context, v bool) error { + fs.Set("alsologtostderr", fmt.Sprint(v)) + return nil + }, + }, + // "v", "number for the log level verbosity") + &cli.IntFlag{ + Name: "v", + Usage: "number for the log level verbosity", + EnvVars: []string{"FAITHFUL_V"}, + Value: 2, + Action: func(cctx *cli.Context, v int) error { + fs.Set("v", fmt.Sprint(v)) + return nil + }, + }, + // "add_dir_header", false, "If true, adds the file directory to the header of the log messages") + &cli.BoolFlag{ + Name: "add_dir_header", + Usage: "If true, adds the file directory to the header of the log messages", + EnvVars: []string{"FAITHFUL_ADD_DIR_HEADER"}, + Action: func(cctx *cli.Context, v bool) error { + fs.Set("add_dir_header", fmt.Sprint(v)) + return nil + }, + }, + + // "skip_headers", false, "If true, avoid header prefixes in the log messages") + &cli.BoolFlag{ + Name: "skip_headers", + Usage: "If true, avoid header prefixes in the log messages", + EnvVars: []string{"FAITHFUL_SKIP_HEADERS"}, + Action: func(cctx *cli.Context, v bool) error { + fs.Set("skip_headers", fmt.Sprint(v)) + return nil + }, + }, + // "one_output", false, "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)") + &cli.BoolFlag{ + Name: "one_output", + Usage: "If true, only write logs to their native severity level (vs also writing to each lower severity level; no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_ONE_OUTPUT"}, + Action: func(cctx *cli.Context, v bool) error { + fs.Set("one_output", fmt.Sprint(v)) + return nil + }, + }, + // "skip_log_headers", false, "If true, avoid headers when opening log files (no effect when -logtostderr=true)") + &cli.BoolFlag{ + Name: "skip_log_headers", + Usage: "If true, avoid headers when opening log files (no effect when -logtostderr=true)", + EnvVars: []string{"FAITHFUL_SKIP_LOG_HEADERS"}, + Action: func(cctx *cli.Context, v bool) error { + fs.Set("skip_log_headers", fmt.Sprint(v)) + return nil + }, + }, + // "stderrthreshold", "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)") + &cli.StringFlag{ + Name: "stderrthreshold", + Usage: "logs at or above this threshold go to stderr when writing to files and stderr (no effect when -logtostderr=true or -alsologtostderr=false)", + EnvVars: []string{"FAITHFUL_STDERRTHRESHOLD"}, + Action: func(cctx *cli.Context, v string) error { + if v != "" { + fs.Set("stderrthreshold", v) + } + return nil + }, + }, + // "vmodule", "comma-separated list of pattern=N settings for file-filtered logging") + &cli.StringFlag{ + Name: "vmodule", + Usage: "comma-separated list of pattern=N settings for file-filtered logging", + EnvVars: []string{"FAITHFUL_VMODULE"}, + Action: func(cctx *cli.Context, v string) error { + if v != "" { + fs.Set("vmodule", v) + } + return nil + }, + }, + // "log_backtrace_at", "when logging hits line file:N, emit a stack trace") + &cli.StringFlag{ + Name: "log_backtrace_at", + Usage: "when logging hits line file:N, emit a stack trace", + EnvVars: []string{"FAITHFUL_LOG_BACKTRACE_AT"}, + Action: func(cctx *cli.Context, v string) error { + if v != "" { + fs.Set("log_backtrace_at", v) + } + return nil + }, + }, + } +} diff --git a/main.go b/main.go index b8b08ca9..e75a044b 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,8 @@ import ( var gitCommitSHA = "" func main() { + defer klog.Flush() + // set up a context that is canceled when a command is interrupted ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -41,13 +43,10 @@ func main() { Name: "faithful CLI", Version: gitCommitSHA, Description: "CLI to get, manage and interact with the Solana blockchain data stored in a CAR file or on Filecoin/IPFS.", - Before: func(c *cli.Context) error { + Flags: NewKlogFlagSet(), + Before: func(cctx *cli.Context) error { return nil }, - Flags: []cli.Flag{ - FlagVerbose, - FlagVeryVerbose, - }, Action: nil, Commands: []*cli.Command{ newCmd_DumpCar(), diff --git a/multiepoch-getBlock.go b/multiepoch-getBlock.go index a959f0cb..4c91cc70 100644 --- a/multiepoch-getBlock.go +++ b/multiepoch-getBlock.go @@ -26,8 +26,24 @@ import ( var fasterJson = jsoniter.ConfigCompatibleWithStandardLibrary +type MyContextKey string + +const requestIDKey = MyContextKey("requestID") + +func setRequestIDToContext(ctx context.Context, id string) context.Context { + return context.WithValue(ctx, requestIDKey, id) +} + +func getRequestIDFromContext(ctx context.Context) string { + id, ok := ctx.Value(requestIDKey).(string) + if !ok { + return "" + } + return id +} + func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) { - tim := newTimer() + tim := newTimer(getRequestIDFromContext(ctx)) params, err := parseGetBlockRequest(req.Params) if err != nil { return &jsonrpc2.Error{ @@ -103,9 +119,9 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex return err } if slot == 0 { - klog.Infof("car start to slot(0)::%s", blockCid) + klog.V(4).Infof("car start to slot(0)::%s", blockCid) } else { - klog.Infof( + klog.V(4).Infof( "slot(%d)::%s to slot(%d)::%s", uint64(block.Meta.Parent_slot), parentBlockCid, @@ -151,7 +167,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex start := parentOffset - klog.Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset) + klog.V(4).Infof("prefetching CAR: start=%d length=%d (parent_offset=%d)", start, length, parentOffset) carSection, err := epochHandler.ReadAtFromCar(ctx, start, length) if err != nil { return err @@ -454,7 +470,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex } } else { if slot != 0 { - klog.Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)") + klog.V(4).Infof("parent slot is in a different epoch, not implemented yet (can't get previousBlockhash)") } } } diff --git a/multiepoch-getFirstAvailableBlock.go b/multiepoch-getFirstAvailableBlock.go index c6fcf017..1300e04f 100644 --- a/multiepoch-getFirstAvailableBlock.go +++ b/multiepoch-getFirstAvailableBlock.go @@ -12,7 +12,7 @@ func (multi *MultiEpoch) handleGetFirstAvailableBlock(ctx context.Context, conn if err != nil { return &jsonrpc2.Error{ Code: CodeNotFound, - Message: fmt.Sprintf("Internal error"), + Message: "Internal error", }, fmt.Errorf("failed to get first available block: %w", err) } diff --git a/multiepoch-getSignaturesForAddress.go b/multiepoch-getSignaturesForAddress.go index 655d7b38..3e5d2f54 100644 --- a/multiepoch-getSignaturesForAddress.go +++ b/multiepoch-getSignaturesForAddress.go @@ -53,7 +53,6 @@ func countSignatures(v map[uint64][]solana.Signature) int { } func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) (*jsonrpc2.Error, error) { - // TODO: // - parse and validate request // - get list of epochs (from most recent to oldest) // - iterate until we find the requested number of signatures diff --git a/multiepoch-getTransaction.go b/multiepoch-getTransaction.go index f061e9e9..f290cf6e 100644 --- a/multiepoch-getTransaction.go +++ b/multiepoch-getTransaction.go @@ -61,8 +61,8 @@ func (multi *MultiEpoch) findEpochNumberFromSignature(ctx context.Context, sig s found = append(found, epochNumber) } } - klog.Infof( - "Searched %d epochs in %s, and found %d candidate epochs for %s: %v", + klog.V(4).Infof( + "Searched %d epochs in %s, and found %d candidate epochs for signature %s: %v", len(numbers), time.Since(startedSearchingCandidatesAt), len(found), @@ -154,7 +154,7 @@ func (multi *MultiEpoch) handleGetTransaction(ctx context.Context, conn *request Message: "Internal error", }, fmt.Errorf("failed to get epoch for signature %s: %v", sig, err) } - klog.Infof("Found signature %s in epoch %d in %s", sig, epochNumber, time.Since(startedEpochLookupAt)) + klog.V(4).Infof("Found signature %s in epoch %d in %s", sig, epochNumber, time.Since(startedEpochLookupAt)) epochHandler, err := multi.GetEpoch(uint64(epochNumber)) if err != nil { diff --git a/multiepoch.go b/multiepoch.go index ec2e1711..f62b408b 100644 --- a/multiepoch.go +++ b/multiepoch.go @@ -2,7 +2,6 @@ package main import ( "context" - "crypto/rand" "fmt" "net/http" "sort" @@ -10,9 +9,9 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/goware/urlx" "github.com/libp2p/go-reuseport" - "github.com/mr-tron/base58" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/sourcegraph/jsonrpc2" "github.com/valyala/fasthttp" @@ -249,11 +248,8 @@ func (m *MultiEpoch) ListenAndServe(ctx context.Context, listenOn string, lsConf } func randomRequestID() string { - b := make([]byte, 4) - if _, err := rand.Read(b); err != nil { - panic(err) - } - return strings.ToUpper(base58.Encode(b)) + id := uuid.New().String() + return id } func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx *fasthttp.RequestCtx) { @@ -279,7 +275,7 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx startedAt := time.Now() reqID := randomRequestID() defer func() { - klog.Infof("[%s] request took %s", reqID, time.Since(startedAt)) + klog.V(2).Infof("[%s] request took %s", reqID, time.Since(startedAt)) }() { // make sure the method is POST @@ -307,6 +303,8 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx // read request body body := reqCtx.Request.Body() + reqCtx.Response.Header.Set("X-Request-ID", reqID) + // parse request var rpcRequest jsonrpc2.Request if err := fasterJson.Unmarshal(body, &rpcRequest); err != nil { @@ -319,11 +317,13 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx }) return } + method := rpcRequest.Method - klog.Infof("[%s] received request: %q", reqID, strings.TrimSpace(string(body))) + klog.V(2).Infof("[%s] method=%q", reqID, sanitizeMethod(method)) + klog.V(3).Infof("[%s] received request with body: %q", reqID, strings.TrimSpace(string(body))) if proxy != nil && !isValidLocalMethod(rpcRequest.Method) { - klog.Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) + klog.V(2).Infof("[%s] Unhandled method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) // proxy the request to the target proxyToAlternativeRPCServer( handler, @@ -338,7 +338,6 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } rqCtx := &requestContext{ctx: reqCtx} - method := rpcRequest.Method if method == "getVersion" { versionInfo := make(map[string]any) @@ -362,13 +361,13 @@ func newMultiEpochHandler(handler *MultiEpoch, lsConf *ListenerConfig) func(ctx } // errorResp is the error response to be sent to the client. - errorResp, err := handler.handleRequest(reqCtx, rqCtx, &rpcRequest) + errorResp, err := handler.handleRequest(setRequestIDToContext(reqCtx, reqID), rqCtx, &rpcRequest) if err != nil { klog.Errorf("[%s] failed to handle %s: %v", reqID, sanitizeMethod(method), err) } if errorResp != nil { if proxy != nil && lsConf.ProxyConfig.ProxyFailedRequests { - klog.Infof("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) + klog.Warningf("[%s] Failed local method %q, proxying to %q", reqID, rpcRequest.Method, proxy.Addr) // proxy the request to the target proxyToAlternativeRPCServer( handler, diff --git a/range-cache.go b/range-cache.go index 5e3bee2d..e8c84cb4 100644 --- a/range-cache.go +++ b/range-cache.go @@ -3,9 +3,10 @@ package main import ( "context" "fmt" - "os" "sync" "time" + + "k8s.io/klog/v2" ) type RangeCache struct { @@ -124,12 +125,12 @@ func (rc *RangeCache) setRange(ctx context.Context, start, ln int64, value []byt } // check if one of the ranges in the cache contains the requested range. if r.contains(Range{start, end}) { - debugLn("there's already a cache entry for this or a superset of this range") + klog.V(5).Infof("there's already a cache entry for this or a superset of this range: %v", r) return nil } // check if the requested range contains one of the ranges in the cache. if (Range{start, end}).contains(r) { - debugLn("deleting a subset of this range") + klog.V(5).Infof("deleting a subset of this range: %v", r) delete(rc.cache, r) rc.occupiedSpace -= uint64(len(rv.Value)) } @@ -148,7 +149,7 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er end := start + ln got, err := rc.getRange(ctx, start, end, func() ([]byte, error) { v := make([]byte, end-start) - debugf( + klog.V(5).Infof( orange("[cache-MISS] reading from source %s: start=%d end=%d len=%d\n"), rc.name, start, @@ -171,20 +172,6 @@ func (rc *RangeCache) GetRange(ctx context.Context, start, ln int64) ([]byte, er return got, nil } -func debugLn(a ...interface{}) { - if DebugMode { - fmt.Fprintln(os.Stderr, a...) - } -} - -func debugf(format string, a ...interface{}) { - if DebugMode { - fmt.Fprintf(os.Stderr, format, a...) - } -} - -var DebugMode = false - func orange(s string) string { return "\033[38;5;208m" + s + "\033[0m" } @@ -221,7 +208,7 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( return nil, false, nil } if v, ok := rc.cache[Range{start, end}]; ok { - debugf( + klog.V(5).Infof( lime("[exact-cache-HIT] for %s: start=%d end=%d len=%d\n"), rc.name, start, @@ -237,7 +224,7 @@ func (rc *RangeCache) getRangeFromCache(ctx context.Context, start, end int64) ( return nil, false, ctx.Err() } if r.contains(Range{start, end}) { - debugf( + klog.V(5).Infof( lime("[cache-HIT] range superset in %s: start=%d end=%d len=%d\n"), rc.name, start, diff --git a/request-response.go b/request-response.go index 3b5fec24..92e8873b 100644 --- a/request-response.go +++ b/request-response.go @@ -294,7 +294,7 @@ func (req *GetTransactionRequest) Validate() error { solana.EncodingBase64, solana.EncodingBase64Zstd, solana.EncodingJSON, - solana.EncodingJSONParsed, // TODO: add support for this + solana.EncodingJSONParsed, ) { return fmt.Errorf("unsupported encoding") } diff --git a/split-car-fetcher/fetcher.go b/split-car-fetcher/fetcher.go index 74ab1a23..c43e692a 100644 --- a/split-car-fetcher/fetcher.go +++ b/split-car-fetcher/fetcher.go @@ -9,8 +9,10 @@ import ( "math" "net/http" "os" + "sync" "github.com/anjor/carlet" + "golang.org/x/sync/errgroup" ) type SplitCarReader struct { @@ -172,11 +174,30 @@ func NewSplitCarReader( readers = append(readers, originalCarHeaderReaderAt) sizes = append(sizes, int64(originalCarHeaderSize)) } - for _, cf := range files.CarPieces { - fi, err := readerCreator(cf) - if err != nil { - return nil, fmt.Errorf("failed to open remote file %q: %s", cf.CommP, err) - } + fileHandlers := make([]ReaderAtCloserSize, len(files.CarPieces)) + // create all the handlers concurrently, max 10 at a time + wg := new(errgroup.Group) + wg.SetLimit(10) + mu := &sync.Mutex{} + for i, cf := range files.CarPieces { + i, cf := i, cf + wg.Go(func() error { + fi, err := readerCreator(cf) + if err != nil { + return fmt.Errorf("failed to open remote file %q: %s", cf.CommP, err) + } + mu.Lock() + defer mu.Unlock() + fileHandlers[i] = fi + return nil + }) + } + if err := wg.Wait(); err != nil { + return nil, err + } + + for cfi, cf := range files.CarPieces { + fi := fileHandlers[cfi] size := int(fi.Size()) diff --git a/split-car-fetcher/miner-info.go b/split-car-fetcher/miner-info.go index 54c22806..b9606f9c 100644 --- a/split-car-fetcher/miner-info.go +++ b/split-car-fetcher/miner-info.go @@ -29,25 +29,25 @@ func NewMinerInfo( lotusClient jsonrpc.RPCClient, cacheTTL time.Duration, requestTimeout time.Duration, -) MinerInfoCache { +) *MinerInfoCache { minerInfoCache := ttlcache.New[string, *MinerInfo]( ttlcache.WithTTL[string, *MinerInfo](cacheTTL), ttlcache.WithDisableTouchOnHit[string, *MinerInfo]()) - return MinerInfoCache{ + return &MinerInfoCache{ lotusClient: lotusClient, requestTimeout: requestTimeout, minerInfoCache: minerInfoCache, } } -func (d MinerInfoCache) GetProviderInfo(ctx context.Context, provider address.Address) (*MinerInfo, error) { +func (d *MinerInfoCache) GetProviderInfo(ctx context.Context, provider address.Address) (*MinerInfo, error) { file := d.minerInfoCache.Get(provider.String()) if file != nil && !file.IsExpired() { return file.Value(), nil } - minerInfo, err := MinerInfoFetcher{Client: d.lotusClient}.GetProviderInfo(ctx, provider.String()) + minerInfo, err := (&MinerInfoFetcher{Client: d.lotusClient}).GetProviderInfo(ctx, provider.String()) if err != nil { return nil, err } @@ -59,7 +59,7 @@ type MinerInfoFetcher struct { Client jsonrpc.RPCClient } -func (m MinerInfoFetcher) GetProviderInfo(ctx context.Context, provider string) (*MinerInfo, error) { +func (m *MinerInfoFetcher) GetProviderInfo(ctx context.Context, provider string) (*MinerInfo, error) { minerInfo := new(MinerInfo) err := m.Client.CallFor(ctx, minerInfo, "Filecoin.StateMinerInfo", provider, nil) if err != nil { diff --git a/storage.go b/storage.go index 28574292..fd6d37ba 100644 --- a/storage.go +++ b/storage.go @@ -24,7 +24,6 @@ import ( func openIndexStorage( ctx context.Context, where string, - debug bool, ) (ReaderAtCloser, error) { where = strings.TrimSpace(where) if strings.HasPrefix(where, "http://") || strings.HasPrefix(where, "https://") { @@ -33,7 +32,7 @@ func openIndexStorage( if err != nil { return nil, fmt.Errorf("failed to open remote index file: %w", err) } - if !debug { + if !klog.V(5).Enabled() { return rac, nil } return &readCloserWrapper{ @@ -48,7 +47,7 @@ func openIndexStorage( if err != nil { return nil, fmt.Errorf("failed to open local index file: %w", err) } - if !debug { + if !klog.V(5).Enabled() { return rac, nil } return &readCloserWrapper{ diff --git a/tools.go b/tools.go index 8d6f4f11..89beb021 100644 --- a/tools.go +++ b/tools.go @@ -72,20 +72,22 @@ func loadFromYAML(configFilepath string, dst any) error { } type timer struct { + reqID string start time.Time prev time.Time } -func newTimer() *timer { +func newTimer(reqID string) *timer { now := time.Now() return &timer{ + reqID: reqID, start: now, prev: now, } } func (t *timer) time(name string) { - klog.V(2).Infof("TIMED: %s: %s (overall %s)", name, time.Since(t.prev), time.Since(t.start)) + klog.V(4).Infof("[%s]: %q: %s (overall %s)", t.reqID, name, time.Since(t.prev), time.Since(t.start)) t.prev = time.Now() }