diff --git a/staker/challenge-cache/cache.go b/staker/challenge-cache/cache.go index 8cca4bb835..ed4fad6450 100644 --- a/staker/challenge-cache/cache.go +++ b/staker/challenge-cache/cache.go @@ -10,7 +10,7 @@ store them in a filesystem cache to avoid recomputing them and for hierarchical Each file contains a list of 32 byte hashes, concatenated together as bytes. Using this structure, we can namespace hashes by message number and by challenge level. -Once a validator receives a full list of computed machine hashes for the first time from a validatio node, +Once a validator receives a full list of computed machine hashes for the first time from a validation node, it will write the hashes to this filesystem hierarchy for fast access next time these hashes are needed. Example uses: @@ -18,7 +18,7 @@ Example uses: - Obtain all the hashes from step 100 to 101 at subchallenge level 1 for the execution of message num 70. wavm-module-root-0xab/ - rollup-block-hash-0x12...-message-num-70/ + message-num-70-rollup-block-hash-0x12.../ hashes.bin subchallenge-level-1-big-step-100/ hashes.bin @@ -31,18 +31,21 @@ package challengecache import ( "bufio" + "context" "errors" "fmt" "io" "os" "path/filepath" + "regexp" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" ) var ( - ErrNotFoundInCache = errors.New("no found in challenge cache") + ErrNotFoundInCache = errors.New("not found in challenge cache") ErrFileAlreadyExists = errors.New("file already exists") ErrNoHashes = errors.New("no hashes being written") hashesFileName = "hashes.bin" @@ -59,30 +62,46 @@ type HistoryCommitmentCacher interface { Put(lookup *Key, hashes []common.Hash) error } +// Key for cache lookups includes the wavm module root of a challenge, as well +// as the heights for messages and big steps as needed. +type Key struct { + RollupBlockHash common.Hash + WavmModuleRoot common.Hash + MessageHeight uint64 + StepHeights []uint64 +} + // Cache for history commitments on disk. type Cache struct { - baseDir string + baseDir string + tempWritesDir string } // New cache from a base directory path. func New(baseDir string) (*Cache, error) { - if _, err := os.Stat(baseDir); err != nil { - if err := os.MkdirAll(baseDir, os.ModePerm); err != nil { - return nil, fmt.Errorf("could not make base cache directory %s: %w", baseDir, err) - } - } return &Cache{ - baseDir: baseDir, + baseDir: baseDir, + tempWritesDir: "", }, nil } -// Key for cache lookups includes the wavm module root of a challenge, as well -// as the heights for messages and big steps as needed. -type Key struct { - RollupBlockHash common.Hash - WavmModuleRoot common.Hash - MessageHeight uint64 - StepHeights []uint64 +// Init a cache by verifying its base directory exists. +func (c *Cache) Init(_ context.Context) error { + if _, err := os.Stat(c.baseDir); err != nil { + if err := os.MkdirAll(c.baseDir, os.ModePerm); err != nil { + return fmt.Errorf("could not make initialize challenge cache directory %s: %w", c.baseDir, err) + } + } + // We create a temp directory to write our hashes to first when putting to the cache. + // Once writing succeeds, we rename in an atomic operation to the correct file name + // in the cache directory hierarchy in the `Put` function. All of these temporary writes + // will occur in a subdir of the base directory called temp. + tempWritesDir, err := os.MkdirTemp(c.baseDir, "temp") + if err != nil { + return err + } + c.tempWritesDir = tempWritesDir + return nil } // Get a list of hashes from the cache from index 0 up to a certain index. Hashes are saved as files in the directory @@ -122,24 +141,14 @@ func (c *Cache) Put(lookup *Key, hashes []common.Hash) error { if len(hashes) == 0 { return ErrNoHashes } - fName, err := determineFilePath(c.baseDir, lookup) - if err != nil { - return err + if c.tempWritesDir == "" { + return fmt.Errorf("cache not initialized by calling .Init(ctx)") } - // We create a tmp file to write our hashes to first. If writing fails, - // we don't want to leave a half-written file in our cache directory. - // Once writing succeeds, we rename in an atomic operation to the correct file name - // in the cache directory hierarchy. - tmp, err := os.MkdirTemp(c.baseDir, "tmpdir") + fName, err := determineFilePath(c.baseDir, lookup) if err != nil { return err } - tmpFName := filepath.Join(tmp, fName) - dir := filepath.Dir(tmpFName) - if err := os.MkdirAll(dir, os.ModePerm); err != nil { - return fmt.Errorf("could not make tmp directory %s: %w", dir, err) - } - f, err := os.Create(tmpFName) + f, err := os.CreateTemp(c.tempWritesDir, fmt.Sprintf("%s-*", hashesFileName)) if err != nil { return err } @@ -154,11 +163,57 @@ func (c *Cache) Put(lookup *Key, hashes []common.Hash) error { if err := os.MkdirAll(filepath.Dir(fName), os.ModePerm); err != nil { return fmt.Errorf("could not make file directory %s: %w", fName, err) } - // If the file writing was successful, we rename the file from the tmp directory + // If the file writing was successful, we rename the file from the temp directory // into our cache directory. This is an atomic operation. // For more information on this atomic write pattern, see: // https://stackoverflow.com/questions/2333872/how-to-make-file-creation-an-atomic-operation - return os.Rename(tmpFName /*old */, fName /* new */) + return os.Rename(f.Name() /*old */, fName /* new */) +} + +// Prune all entries in the cache with a message number <= a specified value. +func (c *Cache) Prune(ctx context.Context, messageNumber uint64) error { + // Define a regex pattern to extract the message number + numPruned := 0 + messageNumPattern := fmt.Sprintf(`%s-(\d+)-`, messageNumberPrefix) + pattern := regexp.MustCompile(messageNumPattern) + pathsToDelete := make([]string, 0) + if err := filepath.WalkDir(c.baseDir, func(path string, info os.DirEntry, err error) error { + if ctx.Err() != nil { + return ctx.Err() + } + if err != nil { + return err + } + if info.IsDir() { + matches := pattern.FindStringSubmatch(info.Name()) + if len(matches) > 1 { + dirNameMessageNum, err := strconv.Atoi(matches[1]) + if err != nil { + return err + } + // Collect the directory path if the message number is <= the specified value. + if dirNameMessageNum <= int(messageNumber) { + pathsToDelete = append(pathsToDelete, path) + } + } + } + return nil + }); err != nil { + return err + } + // We delete separately from collecting the paths, as deleting while walking + // a dir can cause issues with the filepath.Walk function. + for _, path := range pathsToDelete { + if ctx.Err() != nil { + return ctx.Err() + } + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("could not prune directory with path %s: %w", path, err) + } + numPruned += 1 + } + log.Info("Pruned challenge cache", "numDirsPruned", numPruned, "messageNumber", messageNumPattern) + return nil } // Reads 32 bytes at a time from a reader up to a specified height. If none, then read all. @@ -217,7 +272,7 @@ for the data requested within the cache directory hierarchy. The folder structur for a given filesystem challenge cache will look as follows: wavm-module-root-0xab/ - rollup-block-hash-0x12...-message-num-70/ + message-num-70-rollup-block-hash-0x12.../ hashes.bin subchallenge-level-1-big-step-100/ hashes.bin @@ -225,7 +280,7 @@ for a given filesystem challenge cache will look as follows: func determineFilePath(baseDir string, lookup *Key) (string, error) { key := make([]string, 0) key = append(key, fmt.Sprintf("%s-%s", wavmModuleRootPrefix, lookup.WavmModuleRoot.Hex())) - key = append(key, fmt.Sprintf("%s-%s-%s-%d", rollupBlockHashPrefix, lookup.RollupBlockHash.Hex(), messageNumberPrefix, lookup.MessageHeight)) + key = append(key, fmt.Sprintf("%s-%d-%s-%s", messageNumberPrefix, lookup.MessageHeight, rollupBlockHashPrefix, lookup.RollupBlockHash.Hex())) for challengeLevel, height := range lookup.StepHeights { key = append(key, fmt.Sprintf( "%s-%d-%s-%d", diff --git a/staker/challenge-cache/cache_test.go b/staker/challenge-cache/cache_test.go index 6b15d62af7..af0a058f78 100644 --- a/staker/challenge-cache/cache_test.go +++ b/staker/challenge-cache/cache_test.go @@ -4,6 +4,7 @@ package challengecache import ( "bytes" + "context" "errors" "fmt" "io" @@ -17,19 +18,19 @@ import ( var _ HistoryCommitmentCacher = (*Cache)(nil) func TestCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() basePath := t.TempDir() if err := os.MkdirAll(basePath, os.ModePerm); err != nil { t.Fatal(err) } - t.Cleanup(func() { - if err := os.RemoveAll(basePath); err != nil { - t.Fatal(err) - } - }) cache, err := New(basePath) if err != nil { t.Fatal(err) } + if err = cache.Init(ctx); err != nil { + t.Fatal(err) + } key := &Key{ WavmModuleRoot: common.BytesToHash([]byte("foo")), MessageHeight: 0, @@ -69,6 +70,144 @@ func TestCache(t *testing.T) { } } +func TestPrune(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + basePath := t.TempDir() + if err := os.MkdirAll(basePath, os.ModePerm); err != nil { + t.Fatal(err) + } + cache, err := New(basePath) + if err != nil { + t.Fatal(err) + } + if err = cache.Init(ctx); err != nil { + t.Fatal(err) + } + key := &Key{ + WavmModuleRoot: common.BytesToHash([]byte("foo")), + MessageHeight: 20, + StepHeights: []uint64{0}, + } + if _, err = cache.Get(key, 0); !errors.Is(err, ErrNotFoundInCache) { + t.Fatal(err) + } + t.Run("pruning non-existent dirs does nothing", func(t *testing.T) { + if err = cache.Prune(ctx, key.MessageHeight); err != nil { + t.Error(err) + } + }) + t.Run("pruning single item", func(t *testing.T) { + want := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + err = cache.Put(key, want) + if err != nil { + t.Fatal(err) + } + items, err := cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + if err = cache.Prune(ctx, key.MessageHeight); err != nil { + t.Error(err) + } + if _, err = cache.Get(key, 3); !errors.Is(err, ErrNotFoundInCache) { + t.Error(err) + } + }) + t.Run("does not prune items with message number > N", func(t *testing.T) { + want := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + key.MessageHeight = 30 + err = cache.Put(key, want) + if err != nil { + t.Fatal(err) + } + items, err := cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + if err = cache.Prune(ctx, 20); err != nil { + t.Error(err) + } + items, err = cache.Get(key, 3) + if err != nil { + t.Fatal(err) + } + if len(items) != len(want) { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", len(want), len(items)) + } + }) + t.Run("prunes many items with message number <= N", func(t *testing.T) { + moduleRoots := []common.Hash{ + common.BytesToHash([]byte("foo")), + common.BytesToHash([]byte("bar")), + common.BytesToHash([]byte("baz")), + } + totalMessages := 10 + for _, root := range moduleRoots { + for i := 0; i < totalMessages; i++ { + hashes := []common.Hash{ + common.BytesToHash([]byte("a")), + common.BytesToHash([]byte("b")), + common.BytesToHash([]byte("c")), + } + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + if err = cache.Put(key, hashes); err != nil { + t.Fatal(err) + } + } + } + if err = cache.Prune(ctx, 5); err != nil { + t.Error(err) + } + for _, root := range moduleRoots { + // Expect that we deleted all entries with message number <= 5 + for i := 0; i <= 5; i++ { + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + if _, err = cache.Get(key, 3); !errors.Is(err, ErrNotFoundInCache) { + t.Error(err) + } + } + // But also expect that we kept all entries with message number > 5 + for i := 6; i < totalMessages; i++ { + key = &Key{ + WavmModuleRoot: root, + MessageHeight: uint64(i), + StepHeights: []uint64{0}, + } + items, err := cache.Get(key, 3) + if err != nil { + t.Error(err) + } + if len(items) != 3 { + t.Fatalf("Wrong number of hashes. Expected %d, got %d", 3, len(items)) + } + } + } + }) +} + func TestReadWriteStatehashes(t *testing.T) { t.Run("read up to, but had empty reader", func(t *testing.T) { b := bytes.NewBuffer([]byte{}) @@ -255,7 +394,7 @@ func Test_determineFilePath(t *testing.T) { StepHeights: []uint64{50}, }, }, - want: "wavm-module-root-0x0000000000000000000000000000000000000000000000000000000000000000/rollup-block-hash-0x0000000000000000000000000000000000000000000000000000000000000000-message-num-100/subchallenge-level-1-big-step-50/hashes.bin", + want: "wavm-module-root-0x0000000000000000000000000000000000000000000000000000000000000000/message-num-100-rollup-block-hash-0x0000000000000000000000000000000000000000000000000000000000000000/subchallenge-level-1-big-step-50/hashes.bin", wantErr: false, }, } @@ -282,20 +421,20 @@ func Test_determineFilePath(t *testing.T) { } func BenchmarkCache_Read_32Mb(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() b.StopTimer() basePath := os.TempDir() if err := os.MkdirAll(basePath, os.ModePerm); err != nil { b.Fatal(err) } - b.Cleanup(func() { - if err := os.RemoveAll(basePath); err != nil { - b.Fatal(err) - } - }) cache, err := New(basePath) if err != nil { b.Fatal(err) } + if err = cache.Init(ctx); err != nil { + b.Fatal(err) + } key := &Key{ WavmModuleRoot: common.BytesToHash([]byte("foo")), MessageHeight: 0,