Skip to content

Commit

Permalink
Merge remote-tracking branch 'gagliardetto/multiepoch' into multiepoch
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Sep 13, 2023
2 parents 348ee2c + d24d346 commit 297c408
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 38 deletions.
48 changes: 36 additions & 12 deletions cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"path/filepath"
"runtime"
"sort"
"sync"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/fsnotify/fsnotify"
Expand Down Expand Up @@ -148,22 +150,45 @@ func newCmd_rpc() *cli.Command {
ctx, cancel := context.WithCancel(c.Context)
defer cancel()

// create a map that tracks files that are already being processed because of an event:
// this is to avoid processing the same file multiple times
// (e.g. if a file is create and then modified, we don't want to process it twice)
fileProcessingTracker := make(map[string]struct{})
mu := &sync.Mutex{}

err = onFileChanged(ctx, dirs, func(event fsnotify.Event) {
if !isJSONFile(event.Name) && !isYAMLFile(event.Name) {
klog.Infof("File %q is not a JSON or YAML file; do nothing", event.Name)
return
}
klog.Infof("File event: %s", spew.Sdump(event))
klog.Infof("File event: name=%q, op=%q", event.Name, event.Op)

if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) {
klog.Infof("Epoch with same hash as file %q is already loaded; do nothing", event.Name)
return
}
// register the file as being processed
mu.Lock()
_, ok := fileProcessingTracker[event.Name]
if ok {
klog.Infof("File %q is already being processed; do nothing", event.Name)
mu.Unlock()
return
}
fileProcessingTracker[event.Name] = struct{}{}
mu.Unlock()
// remove the file from the tracker when we're done processing it
defer func() {
mu.Lock()
delete(fileProcessingTracker, event.Name)
mu.Unlock()
}()

switch event.Op {
case fsnotify.Write:
{
klog.Infof("File %q was modified", event.Name)
startedAt := time.Now()
klog.Infof("File %q was modified; processing...", event.Name)
// find the config file, load it, and update the epoch (replace)
config, err := LoadConfig(event.Name)
if err != nil {
Expand All @@ -180,11 +205,12 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.Infof("Epoch %d replaced", epoch.Epoch())
klog.Infof("Epoch %d added/replaced in %s", epoch.Epoch(), time.Since(startedAt))
}
case fsnotify.Create:
{
klog.Infof("File %q was created", event.Name)
startedAt := time.Now()
klog.Infof("File %q was created; processing...", event.Name)
// find the config file, load it, and add it to the multi-epoch (if not already added)
config, err := LoadConfig(event.Name)
if err != nil {
Expand All @@ -201,17 +227,18 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.Infof("Epoch %d added", epoch.Epoch())
klog.Infof("Epoch %d added in %s", epoch.Epoch(), time.Since(startedAt))
}
case fsnotify.Remove:
{
klog.Infof("File %q was removed", event.Name)
startedAt := time.Now()
klog.Infof("File %q was removed; processing...", event.Name)
// find the epoch that corresponds to this file, and remove it (if any)
epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name)
if err != nil {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
}
klog.Infof("Epoch %d removed", epNumber)
klog.Infof("Epoch %d removed in %s", epNumber, time.Since(startedAt))
}
case fsnotify.Rename:
klog.Infof("File %q was renamed; do nothing", event.Name)
Expand Down Expand Up @@ -275,15 +302,12 @@ func onFileChanged(ctx context.Context, dirs []string, callback func(fsnotify.Ev
if !ok {
return
}
klog.Infof("event: %s", event)
if event.Op&fsnotify.Write == fsnotify.Write {
callback(event)
}
callback(event)
case err, ok := <-watcher.Errors:
if !ok {
return
}
klog.Errorf("error: %s", err)
klog.Errorf("error watching files: %v", err)
}
}
}()
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7 // indirect
github.com/filecoin-project/go-state-types v0.10.0 // indirect
github.com/gagliardetto/binary v0.7.8
github.com/gagliardetto/solana-go v1.8.3-0.20230302093440-c6043ec381e3
github.com/gagliardetto/solana-go v1.8.4
github.com/gin-gonic/gin v1.9.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.3.0
Expand Down Expand Up @@ -61,6 +61,7 @@ require (
github.com/fsnotify/fsnotify v1.5.4
github.com/goware/urlx v0.3.2
github.com/ipld/go-car v0.5.0
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1
github.com/mr-tron/base58 v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/ronanh/intcomp v1.1.0
Expand All @@ -79,6 +80,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
Expand Down Expand Up @@ -161,7 +163,6 @@ require (
github.com/mitchellh/go-testing-interface v1.14.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/zstdpool-freelist v0.0.0-20201229113212-927304c0c3b1 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
Expand Down Expand Up @@ -201,12 +202,14 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/ratelimit v0.2.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ filippo.io/edwards25519 v1.0.0-rc.1/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7
filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/AlekSi/pointer v1.1.0 h1:SSDMPcXD9jSl8FPy9cRzoRaMJtm9g9ggGTxecRUbQoI=
github.com/AlekSi/pointer v1.1.0/go.mod h1:y7BvfRI3wXPWKXEBhU71nbnIEEZX0QTSB2Bj48UJIZE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
Expand All @@ -55,6 +56,7 @@ github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkK
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI=
github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
Expand Down Expand Up @@ -202,8 +204,8 @@ github.com/gagliardetto/binary v0.7.7/go.mod h1:mUuay5LL8wFVnIlecHakSZMvcdqfs+Cs
github.com/gagliardetto/binary v0.7.8 h1:hbIUIP8BWhPm/BIdODxY2Lnv4NlJwNdbtsi1xkhNOec=
github.com/gagliardetto/binary v0.7.8/go.mod h1:Cn70Gnvyk1OWkNJXwVh3oYqSYhKLHJN+C/Wguw3fc3U=
github.com/gagliardetto/gofuzz v1.2.2/go.mod h1:bkH/3hYLZrMLbfYWA0pWzXmi5TTRZnu4pMGZBkqMKvY=
github.com/gagliardetto/solana-go v1.8.3-0.20230302093440-c6043ec381e3 h1:PtvmSQDTpZ1mwN1t7UlCrUhTyEozJhF3ixuO1m0+9q0=
github.com/gagliardetto/solana-go v1.8.3-0.20230302093440-c6043ec381e3/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8=
github.com/gagliardetto/solana-go v1.8.4 h1:vmD/JmTlonyXGy39bAo0inMhmbdAwV7rXZtLDMZeodE=
github.com/gagliardetto/solana-go v1.8.4/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8=
github.com/gagliardetto/treeout v0.1.4 h1:ozeYerrLCmCubo1TcIjFiOWTTGteOOHND1twdFpgwaw=
github.com/gagliardetto/treeout v0.1.4/go.mod h1:loUefvXTrlRG5rYmJmExNryyBRh8f89VZhmMOyCyqok=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down Expand Up @@ -945,6 +947,7 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA=
go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
Expand Down Expand Up @@ -1146,6 +1149,7 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
3 changes: 2 additions & 1 deletion http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ type readCloserWrapper struct {
func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
startedAt := time.Now()
defer func() {
took := time.Since(startedAt)
if DebugMode {
var icon string
if r.isRemote {
Expand All @@ -203,7 +204,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
if strings.HasSuffix(r.name, ".car") {
prefix = icon + purpleBG("[READ-CAR]")
}
fmt.Fprintf(os.Stderr, prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), time.Since(startedAt))
fmt.Fprintf(os.Stderr, prefix+" %s:%d+%d (%s)\n", filepath.Base(r.name), off, len(p), took)
}
}()
return r.rac.ReadAt(p, off)
Expand Down
13 changes: 9 additions & 4 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
Message: "Invalid params",
}, fmt.Errorf("failed to parse params: %w", err)
}
if err := params.Validate(); err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidParams,
Message: err.Error(),
}, fmt.Errorf("failed to validate params: %w", err)
}
tim.time("parseGetBlockRequest")
slot := params.Slot

Expand Down Expand Up @@ -243,7 +249,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
var allTransactions []GetTransactionResponse
var rewards any
hasRewards := !block.Rewards.(cidlink.Link).Cid.Equals(DummyCID)
if hasRewards {
if *params.Options.Rewards && hasRewards {
rewardsNode, err := epochHandler.GetRewardsByCid(ctx, block.Rewards.(cidlink.Link).Cid)
if err != nil {
return &jsonrpc2.Error{
Expand Down Expand Up @@ -369,15 +375,14 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
}
txResp.Meta = meta

b64Tx, err := tx.ToBase64()
encodedTx, err := encodeTransactionResponseBasedOnWantedEncoding(*params.Options.Encoding, tx)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to encode transaction: %v", err)
}

txResp.Transaction = []any{b64Tx, "base64"}
txResp.Transaction = encodedTx
}

allTransactions = append(allTransactions, txResp)
Expand Down
11 changes: 8 additions & 3 deletions multiepoch-getTransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ func (multi *MultiEpoch) handleGetTransaction(ctx context.Context, conn *request
Message: "Invalid params",
}, fmt.Errorf("failed to parse params: %v", err)
}
if err := params.Validate(); err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInvalidParams,
Message: err.Error(),
}, fmt.Errorf("failed to validate params: %w", err)
}

sig := params.Signature

Expand Down Expand Up @@ -207,15 +213,14 @@ func (multi *MultiEpoch) handleGetTransaction(ctx context.Context, conn *request
}
response.Meta = meta

b64Tx, err := tx.ToBase64()
encodedTx, err := encodeTransactionResponseBasedOnWantedEncoding(*params.Options.Encoding, tx)
if err != nil {
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to encode transaction: %v", err)
}

response.Transaction = []any{b64Tx, "base64"}
response.Transaction = encodedTx
}

// reply with the data
Expand Down
Loading

0 comments on commit 297c408

Please sign in to comment.