Skip to content

Commit

Permalink
Merge remote-tracking branch 'gagliardetto/multiepoch' into multiepoch
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Aug 19, 2023
2 parents 658be27 + c6bca83 commit 140fe42
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
9 changes: 8 additions & 1 deletion cmd-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func newCmd_rpc() *cli.Command {
defer cancel()

err = onFileChanged(ctx, dirs, func(event fsnotify.Event) {
if !isJSONFile(event.Name) && !isYAMLFile(event.Name) {
klog.Infof("File %q is not a JSON or YAML file; do nothing", event.Name)
return
}
klog.Infof("File event: %s", spew.Sdump(event))

if event.Op != fsnotify.Remove && multi.HasEpochWithSameHashAsFile(event.Name) {
Expand Down Expand Up @@ -173,6 +177,7 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error replacing epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.Infof("Epoch %d replaced", epoch.Epoch())
}
case fsnotify.Create:
{
Expand All @@ -193,15 +198,17 @@ func newCmd_rpc() *cli.Command {
klog.Errorf("error adding epoch %d: %s", epoch.Epoch(), err.Error())
return
}
klog.Infof("Epoch %d added", epoch.Epoch())
}
case fsnotify.Remove:
{
klog.Infof("File %q was removed", event.Name)
// find the epoch that corresponds to this file, and remove it (if any)
err := multi.RemoveEpochByConfigFilepath(event.Name)
epNumber, err := multi.RemoveEpochByConfigFilepath(event.Name)
if err != nil {
klog.Errorf("error removing epoch for config file %q: %s", event.Name, err.Error())
}
klog.Infof("Epoch %d removed", epNumber)
}
case fsnotify.Rename:
klog.Infof("File %q was renamed; do nothing", event.Name)
Expand Down
12 changes: 9 additions & 3 deletions multiepoch-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
tim.time("GetBlock")
{
prefetcherFromCar := func() error {
parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)

var blockCid, parentCid cid.Cid
wg := new(errgroup.Group)
wg.Go(func() (err error) {
Expand All @@ -73,6 +75,9 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
return nil
})
wg.Go(func() (err error) {
if parentIsInPreviousEpoch {
return nil
}
parentCid, err = epochHandler.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
if err != nil {
return err
Expand All @@ -94,6 +99,9 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
return nil
})
wg.Go(func() (err error) {
if parentIsInPreviousEpoch {
return nil
}
parentOffset, err = epochHandler.FindOffsetFromCid(ctx, parentCid)
if err != nil {
// If the parent is not found, it (probably) means that it's outside of the car file.
Expand All @@ -106,8 +114,6 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
return err
}

parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)

length := blockOffset - parentOffset
// cap the length to 1GB
GiB := uint64(1024 * 1024 * 1024)
Expand Down Expand Up @@ -363,7 +369,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex
return &jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Internal error",
}, fmt.Errorf("failed to decode block: %v", err)
}, fmt.Errorf("failed to get/decode block: %v", err)
}

if len(parentBlock.Entries) > 0 {
Expand Down
11 changes: 8 additions & 3 deletions multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ func (m *MultiEpoch) RemoveEpoch(epoch uint64) error {
return nil
}

func (m *MultiEpoch) RemoveEpochByConfigFilepath(configFilepath string) error {
func (m *MultiEpoch) RemoveEpochByConfigFilepath(configFilepath string) (uint64, error) {
m.mu.Lock()
defer m.mu.Unlock()
for epoch, ep := range m.epochs {
if ep.config.ConfigFilepath() == configFilepath {
ep.Close()
delete(m.epochs, epoch)
return nil
return epoch, nil
}
}
return fmt.Errorf("epoch not found for config file %q", configFilepath)
return 0, fmt.Errorf("epoch not found for config file %q", configFilepath)
}

func (m *MultiEpoch) ReplaceEpoch(epoch uint64, ep *Epoch) error {
Expand All @@ -100,6 +101,10 @@ func (m *MultiEpoch) ReplaceEpoch(epoch uint64, ep *Epoch) error {
func (m *MultiEpoch) ReplaceOrAddEpoch(epoch uint64, ep *Epoch) error {
m.mu.Lock()
defer m.mu.Unlock()
// if the epoch already exists, close it
if oldEp, ok := m.epochs[epoch]; ok {
oldEp.Close()
}
m.epochs[epoch] = ep
return nil
}
Expand Down

0 comments on commit 140fe42

Please sign in to comment.