Skip to content

Commit

Permalink
Merge branch 'fix/e2e-tests-big-block' into feat/proxy-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Mar 5, 2024
2 parents deaceac + 8415ac8 commit 55705ab
Show file tree
Hide file tree
Showing 18 changed files with 296 additions and 188 deletions.
2 changes: 1 addition & 1 deletion DOCKER/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ COPY --from=compile /src/tenderdash/build/tenderdash /src/tenderdash/build/abcid
# You can overwrite these before the first run to influence
# config.json and genesis.json. Additionally, you can override
# CMD to add parameters to `tenderdash node`.
ENV PROXY_APP=kvstore MONIKER=dockernode CHAIN_ID=dockerchain
ENV PROXY_APP=kvstore MONIKER=dockernode CHAIN_ID=dockerchain ABCI=""

COPY ./DOCKER/docker-entrypoint.sh /usr/local/bin/

Expand Down
8 changes: 7 additions & 1 deletion DOCKER/docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ if [ ! -d "$TMHOME/config" ]; then
-e 's/^prometheus\s*=.*/prometheus = true/' \
"$TMHOME/config/config.toml"

if [ -n "$ABCI" ]; then
sed -i \
-e "s/^abci\s*=.*/abci = \"$ABCI\"/" \
"$TMHOME/config/config.toml"
fi

jq ".chain_id = \"$CHAIN_ID\" | .consensus_params.block.time_iota_ms = \"500\"" \
"$TMHOME/config/genesis.json" > "$TMHOME/config/genesis.json.new"
"$TMHOME/config/genesis.json" >"$TMHOME/config/genesis.json.new"
mv "$TMHOME/config/genesis.json.new" "$TMHOME/config/genesis.json"
fi

Expand Down
25 changes: 16 additions & 9 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package kvstore
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"path"
"strconv"
"time"
Expand Down Expand Up @@ -228,7 +228,12 @@ func newApplication(stateStore StoreFactory, opts ...OptFunc) (*Application, err
defer in.Close()

if err := app.LastCommittedState.Load(in); err != nil {
return nil, fmt.Errorf("load state: %w", err)
// EOF means we most likely don't have any state yet
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("load state: %w", err)
} else {
app.logger.Debug("no state found, using initial state")
}
}

app.snapshots, err = NewSnapshotStore(path.Join(app.cfg.Dir, "snapshots"))
Expand Down Expand Up @@ -264,9 +269,9 @@ func (app *Application) InitChain(_ context.Context, req *abci.RequestInitChain)
}

// Overwrite state based on AppStateBytes
// Note this is not optimal from memory perspective; use chunked state sync instead
if len(req.AppStateBytes) > 0 {
err := json.Unmarshal(req.AppStateBytes, &app.LastCommittedState)
if err != nil {
if err := app.LastCommittedState.Load(bytes.NewBuffer(req.AppStateBytes)); err != nil {
return &abci.ResponseInitChain{}, err
}
}
Expand Down Expand Up @@ -398,7 +403,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *abci.RequestFinali
appHash := tmbytes.HexBytes(req.Block.Header.AppHash)
roundState, ok := app.roundStates[roundKey(appHash, req.Height, req.Round)]
if !ok {
return &abci.ResponseFinalizeBlock{}, fmt.Errorf("state with apphash %s not found", appHash)
return &abci.ResponseFinalizeBlock{}, fmt.Errorf("state with apphash %s at height %d round %d not found",
appHash, req.Height, req.Round)
}
if roundState.GetHeight() != req.Height {
return &abci.ResponseFinalizeBlock{},
Expand Down Expand Up @@ -530,14 +536,15 @@ func (app *Application) ApplySnapshotChunk(_ context.Context, req *abci.RequestA
}

if app.offerSnapshot.isFull() {
chunks := app.offerSnapshot.bytes()
err := json.Unmarshal(chunks, &app.LastCommittedState)
if err != nil {
chunks := app.offerSnapshot.reader()
defer chunks.Close()

if err := app.LastCommittedState.Load(chunks); err != nil {
return &abci.ResponseApplySnapshotChunk{}, fmt.Errorf("cannot unmarshal state: %w", err)
}

app.logger.Info("restored state snapshot",
"height", app.LastCommittedState.GetHeight(),
"json", string(chunks),
"apphash", app.LastCommittedState.GetAppHash(),
"snapshot_height", app.offerSnapshot.snapshot.Height,
"snapshot_apphash", app.offerSnapshot.appHash,
Expand Down
3 changes: 2 additions & 1 deletion abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func TestPersistentKVStoreKV(t *testing.T) {

data, err := os.ReadFile(path.Join(dir, "state.json"))
require.NoError(t, err)
assert.Contains(t, string(data), fmt.Sprintf(`"%s":"%s"`, key, value))

assert.Contains(t, string(data), fmt.Sprintf(`"key":"%s","value":"%s"`, key, value))
}

func TestPersistentKVStoreInfo(t *testing.T) {
Expand Down
66 changes: 58 additions & 8 deletions abci/example/kvstore/snapshots.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//nolint:gosec
package kvstore

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -97,20 +98,31 @@ func (s *SnapshotStore) Create(state State) (abci.Snapshot, error) {
s.Lock()
defer s.Unlock()

bz, err := json.Marshal(state)
height := state.GetHeight()

filename := filepath.Join(s.dir, fmt.Sprintf("%v.json", height))
f, err := os.Create(filename)
if err != nil {
return abci.Snapshot{}, err
}
height := state.GetHeight()
defer f.Close()

hasher := sha256.New()
writer := io.MultiWriter(f, hasher)

if err := state.Save(writer); err != nil {
f.Close()
// Cleanup incomplete file; ignore errors during cleanup
_ = os.Remove(filename)
return abci.Snapshot{}, err
}

snapshot := abci.Snapshot{
Height: uint64(height),
Version: 1,
Hash: crypto.Checksum(bz),
}
err = os.WriteFile(filepath.Join(s.dir, fmt.Sprintf("%v.json", height)), bz, 0644)
if err != nil {
return abci.Snapshot{}, err
Hash: hasher.Sum(nil),
}

s.metadata = append(s.metadata, snapshot)
err = s.saveMetadata()
if err != nil {
Expand Down Expand Up @@ -216,6 +228,44 @@ func (s *offerSnapshot) bytes() []byte {
return buf.Bytes()
}

// reader returns a reader for the snapshot data.
func (s *offerSnapshot) reader() io.ReadCloser {
chunks := s.chunks.Values()
reader := &chunkedReader{chunks: chunks}

return reader
}

type chunkedReader struct {
chunks [][]byte
index int
offset int
}

func (r *chunkedReader) Read(p []byte) (n int, err error) {
if r.chunks == nil {
return 0, io.EOF
}
for n < len(p) && r.index < len(r.chunks) {
copyCount := copy(p[n:], r.chunks[r.index][r.offset:])
n += copyCount
r.offset += copyCount
if r.offset >= len(r.chunks[r.index]) {
r.index++
r.offset = 0
}
}
if r.index >= len(r.chunks) {
err = io.EOF
}
return
}

func (r *chunkedReader) Close() error {
r.chunks = nil
return nil
}

// makeChunkItem returns the chunk at a given index from the full byte slice.
func makeChunkItem(chunks *ds.OrderedMap[string, []byte], chunkID []byte) chunkItem {
chunkIDStr := hex.EncodeToString(chunkID)
Expand Down
Loading

0 comments on commit 55705ab

Please sign in to comment.