Skip to content

Commit

Permalink
Cleanup logs; remove sigexists index warmup
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Feb 8, 2024
1 parent 9cf364e commit 24f7cef
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 144 deletions.
225 changes: 128 additions & 97 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/allegro/bigcache/v3"
"github.com/davecgh/go-spew/spew"
"github.com/fsnotify/fsnotify"
hugecache "github.com/rpcpool/yellowstone-faithful/huge-cache"
splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher"
Expand Down Expand Up @@ -109,7 +108,7 @@ func newCmd_rpc() *cli.Command {
}
klog.Infof("Found %d config files:", len(configFiles))
for _, configFile := range configFiles {
fmt.Printf(" - %s\n", configFile)
klog.V(3).Infof(" - %s", configFile)
}

conf := bigcache.DefaultConfig(5 * time.Minute)
Expand Down Expand Up @@ -205,109 +204,93 @@ func newCmd_rpc() *cli.Command {
return cli.Exit(err.Error(), 1)
}
klog.Infof("Found %d directories; will start watching them for changes ...", len(dirs))
spew.Dump(dirs)
for _, dir := range dirs {
klog.V(3).Infof(" - %s", dir)
}

ctx, cancel := context.WithCancel(c.Context)
defer cancel()

// create a map that tracks files that are already being processed because of an event:
// this is to avoid processing the same file multiple times
// (e.g. if a file is create and then modified, we don't want to process it twice)
fileProcessingTracker := make(map[string]struct{})
mu := &sync.Mutex{}

err = onFileChanged(ctx, dirs, func(event fsnotify.Event) {
if !isJSONFile(event.Name) && !isYAMLFile(event.Name) {
klog.V(1).Infof("File %q is not a JSON or YAML file; do nothing", event.Name)
return
}
klog.V(1).Infof("File event: name=%q, op=%q", event.Name, event.Op)
err = onFileChanged(
ctx,
epochLoadConcurrency,
dirs,
func(event fsnotify.Event) {
if !isJSONFile(event.Name) && !isYAMLFile(event.Name) {
klog.V(3).Infof("File %q is not a JSON or YAML file; do nothing", event.Name)
return
}
klog.V(3).Infof("File event: name=%q, op=%q", event.Name, event.Op)

if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) {
klog.V(1).Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name)
return
}
// register the file as being processed
mu.Lock()
_, ok := fileProcessingTracker[event.Name]
if ok {
klog.V(1).Infof("File %q is already being processed; do nothing", event.Name)
mu.Unlock()
return
}
fileProcessingTracker[event.Name] = struct{}{}
mu.Unlock()
// remove the file from the tracker when we're done processing it
defer func() {
mu.Lock()
delete(fileProcessingTracker, event.Name)
mu.Unlock()
}()

switch event.Op {
case fsnotify.Write:
{
startedAt := time.Now()
klog.V(1).Infof("File %q was modified; processing...", event.Name)
// find the config file, load it, and update the epoch (replace)
config, err := LoadConfig(event.Name)
if err != nil {
klog.Errorf("error loading config file %q: %s", event.Name, err.Error())
return
}
epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo)
if err != nil {
klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error())
return
}
err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch)
if err != nil {
klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.V(1).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt))
if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) {
klog.V(3).Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name)
return
}
case fsnotify.Create:
{
startedAt := time.Now()
klog.V(1).Infof("File %q was created; processing...", event.Name)
// find the config file, load it, and add it to the multi-epoch (if not already added)
config, err := LoadConfig(event.Name)
if err != nil {
klog.Errorf("error loading config file %q: %s", event.Name, err.Error())
return
}
epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo)
if err != nil {
klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error())
return

switch event.Op {
case fsnotify.Write:
{
startedAt := time.Now()
klog.V(3).Infof("File %q was modified; processing...", event.Name)
// find the config file, load it, and update the epoch (replace)
config, err := LoadConfig(event.Name)
if err != nil {
klog.Errorf("error loading config file %q: %s", event.Name, err.Error())
return
}
epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo)
if err != nil {
klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error())
return
}
err = multi.ReplaceOrAddEpoch(epoch.Epoch(), epoch)
if err != nil {
klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.V(2).Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt))
}
err = multi.AddEpoch(epoch.Epoch(), epoch)
if err != nil {
klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error())
return
case fsnotify.Create:
{
startedAt := time.Now()
klog.V(3).Infof("File %q was created; processing...", event.Name)
// find the config file, load it, and add it to the multi-epoch (if not already added)
config, err := LoadConfig(event.Name)
if err != nil {
klog.Errorf("error loading config file %q: %s", event.Name, err.Error())
return
}
epoch, err := NewEpochFromConfig(config, c, allCache, minerInfo)
if err != nil {
klog.Errorf("error creating epoch from config file %q: %s", event.Name, err.Error())
return
}
err = multi.AddEpoch(epoch.Epoch(), epoch)
if err != nil {
klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.V(2).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt))
}
klog.V(1).Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt))
}
case fsnotify.Remove:
{
startedAt := time.Now()
klog.V(1).Infof("File %q was removed; processing...", event.Name)
// find the epoch that corresponds to this file, and remove it (if any)
epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name)
if err != nil {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
case fsnotify.Remove:
{
startedAt := time.Now()
klog.V(3).Infof("File %q was removed; processing...", event.Name)
// find the epoch that corresponds to this file, and remove it (if any)
epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name)
if err != nil {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
}
klog.V(2).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt))
}
klog.V(1).Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt))
case fsnotify.Rename:
klog.V(3).Infof("File %q was renamed; do nothing", event.Name)
case fsnotify.Chmod:
klog.V(3).Infof("File %q had its permissions changed; do nothing", event.Name)
default:
klog.V(3).Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op)
}
case fsnotify.Rename:
klog.V(1).Infof("File %q was renamed; do nothing", event.Name)
case fsnotify.Chmod:
klog.V(1).Infof("File %q had its permissions changed; do nothing", event.Name)
default:
klog.V(1).Infof("File %q had an unknown event %q; do nothing", event.Name, event.Op)
}
})
})
if err != nil {
return cli.Exit(err.Error(), 1)
}
Expand All @@ -329,13 +312,49 @@ func newCmd_rpc() *cli.Command {
}
}

// create a map that tracks files that are already being processed because of an event:
// this is to avoid processing the same file multiple times
// (e.g. if a file is create and then modified, we don't want to process it twice)
type fileProcessingTracker struct {
mu sync.Mutex
m map[string]struct{}
}

func newFileProcessingTracker() *fileProcessingTracker {
return &fileProcessingTracker{
m: make(map[string]struct{}),
}
}

func (f *fileProcessingTracker) isBeingProcessedOrAdd(filename string) bool {
f.mu.Lock()
defer f.mu.Unlock()
_, ok := f.m[filename]
if !ok {
f.m[filename] = struct{}{}
}
// if ok is true, then the file is already being processed
return ok
}

func (f *fileProcessingTracker) removeFromList(filename string) {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.m, filename)
}

// - get the list of provided arguments, and distinguish between files and directories
// - load all the config files, etc.
// - start a goroutine that monitors the config files for changes
// - when a config file changes, reload it and update the epoch
// - start a goroutine that monitors the directories and subdirectories for changes (new files, deleted files, etc.)
// - is only watching directories sufficient? or do we need to watch files too?
func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Event)) error {
func onFileChanged(
ctx context.Context,
epochLoadConcurrency int,
dirs []string,
callback func(fsnotify.Event),
) error {
// monitor a directory for file changes
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -353,6 +372,10 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev
// start a goroutine to handle events
go func() {
defer watcher.Close()
tracker := newFileProcessingTracker()
wg := new(errgroup.Group)
wg.SetLimit(epochLoadConcurrency)
defer wg.Wait()
for {
select {
case <-ctx.Done():
Expand All @@ -361,7 +384,15 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev
if !ok {
return
}
callback(event)
wg.Go(func() error {
if tracker.isBeingProcessedOrAdd(event.Name) {
klog.V(3).Infof("File %q is already being processed; do nothing", event.Name)
return nil
}
defer tracker.removeFromList(event.Name)
callback(event)
return nil
})
case err, ok := <-watcher.Errors:
if !ok {
return
Expand Down
30 changes: 15 additions & 15 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,15 +301,15 @@ func NewEpochFromConfig(
if !ok {
return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP)
}
klog.V(2).Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
klog.V(3).Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
minerInfo, err := minerInfo.GetProviderInfo(c.Context, minerID)
if err != nil {
return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err)
}
if len(minerInfo.Multiaddrs) == 0 {
return nil, fmt.Errorf("miner %s has no multiaddrs", minerID)
}
klog.V(2).Infof("miner info: %s", spew.Sdump(minerInfo))
klog.V(3).Infof("miner info: %s", spew.Sdump(minerInfo))
// extract the IP address from the multiaddr:
split := multiaddr.Split(minerInfo.Multiaddrs[0])
if len(split) < 2 {
Expand All @@ -330,7 +330,7 @@ func NewEpochFromConfig(
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
}
minerIP := fmt.Sprintf("%s:%s", ip, port)
klog.V(2).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
klog.V(3).Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
return splitcarfetcher.NewRemoteFileSplitCarReader(
piece.CommP.String(),
Expand Down Expand Up @@ -425,12 +425,12 @@ func NewEpochFromConfig(
}
ep.onClose = append(ep.onClose, sigExists.Close)

{
// warm up the cache
for i := 0; i < 100_000; i++ {
sigExists.Has(newRandomSignature())
}
}
// {
// // warm up the cache
// for i := 0; i < 10; i++ {
// sigExists.Has(newRandomSignature())
// }
// }

ep.sigExists = sigExists
} else {
Expand All @@ -440,12 +440,12 @@ func NewEpochFromConfig(
}
ep.onClose = append(ep.onClose, sigExists.Close)

{
// warm up the cache
for i := 0; i < 100_000; i++ {
sigExists.Has(newRandomSignature())
}
}
// {
// // warm up the cache
// for i := 0; i < 10; i++ {
// sigExists.Has(newRandomSignature())
// }
// }

ep.sigExists = sigExists

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/gagliardetto/binary v0.7.8
github.com/gagliardetto/solana-go v1.8.4
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/hannahhoward/go-pubsub v1.0.0 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
github.com/ipfs/go-cid v0.4.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b/go.mod h1:czg5+yv1E0Z
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
Expand Down
Loading

0 comments on commit 24f7cef

Please sign in to comment.