Skip to content

Commit

Permalink
Merge branch 'main' into cli/restore-preamble
Browse files Browse the repository at this point in the history
  • Loading branch information
tsachiherman authored Aug 25, 2023
2 parents 9af4cd6 + 00fa12e commit e347082
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 209 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/soroban-rpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defaults:

on:
push:
branches: [master]
branches: [main, release/**]
pull_request:

jobs:
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
readme = "README.md"
version = "0.9.4"
edition = "2021"
rust-version = "1.71"
rust-version = "1.72"
autobins = false
default-run = "soroban"

Expand Down
50 changes: 36 additions & 14 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,15 @@ type WriteTx interface {
Rollback() error
}

type dbCache struct {
latestLedgerSeq uint32
ledgerEntries transactionalCache // Just like the DB: compress-encoded ledger key -> ledger entry XDR
sync.RWMutex
}

type DB struct {
db.SessionInterface
ledgerEntryCache transactionalCache // Just like the DB: compress-encoded ledger key -> ledger entry XDR
ledgerEntryCacheMutex sync.RWMutex
cache dbCache
}

func openSQLiteDB(dbFilePath string) (*db.Session, error) {
Expand All @@ -70,7 +75,9 @@ func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub
}
result := DB{
SessionInterface: db.RegisterMetrics(session, namespace, sub, registry),
ledgerEntryCache: newTransactionalCache(),
cache: dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return &result, nil
}
Expand All @@ -82,12 +89,14 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) {
}
result := DB{
SessionInterface: session,
ledgerEntryCache: newTransactionalCache(),
cache: dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return &result, nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface) (uint32, error) {
func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": latestLedgerSequenceMetaKey})
var results []string
if err := q.Select(ctx, &results, sql); err != nil {
Expand All @@ -106,7 +115,19 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface) (uint32
if err != nil {
return 0, err
}
return uint32(latestLedger), nil
result := uint32(latestLedger)

// Add missing ledger sequence to the top cache.
// Otherwise, the write-through cache won't get updated until the first ingestion commit
cache.Lock()
if cache.latestLedgerSeq == 0 {
// Only update the cache if value is missing (0), otherwise
// we may end up overwriting the entry with an older version
cache.latestLedgerSeq = result
}
cache.Unlock()

return result, nil
}

type readWriter struct {
Expand All @@ -128,7 +149,7 @@ func NewReadWriter(db *DB, maxBatchSize int, ledgerRetentionWindow uint32) ReadW
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db)
return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache)
}

func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
Expand All @@ -139,7 +160,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache := sq.NewStmtCache(txSession.GetTx())
db := rw.db
return writeTx{
db: db,
globalCache: &db.cache,
postCommit: func() error {
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
return err
Expand All @@ -151,15 +172,15 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
buffer: xdr.NewEncodingBuffer(),
keyToEntryBatch: make(map[string]*xdr.LedgerEntry, rw.maxBatchSize),
ledgerEntryCacheWriteTx: db.ledgerEntryCache.newWriteTx(rw.maxBatchSize),
ledgerEntryCacheWriteTx: db.cache.ledgerEntries.newWriteTx(rw.maxBatchSize),
maxBatchSize: rw.maxBatchSize,
},
ledgerRetentionWindow: rw.ledgerRetentionWindow,
}, nil
}

type writeTx struct {
db *DB
globalCache *dbCache
postCommit func() error
tx db.SessionInterface
stmtCache *sq.StmtCache
Expand Down Expand Up @@ -194,16 +215,17 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
// We need to make the cache update atomic with the transaction commit.
// Otherwise, the cache can be made inconsistent if a write transaction finishes
// in between, updating the cache in the wrong order.
commit := func() error {
w.db.ledgerEntryCacheMutex.Lock()
defer w.db.ledgerEntryCacheMutex.Unlock()
commitAndUpdateCache := func() error {
w.globalCache.Lock()
defer w.globalCache.Unlock()
if err = w.tx.Commit(); err != nil {
return err
}
w.globalCache.latestLedgerSeq = ledgerSeq
w.ledgerEntryWriter.ledgerEntryCacheWriteTx.commit()
return nil
}
if err := commit(); err != nil {
if err := commitAndUpdateCache(); err != nil {
return err
}

Expand Down
Loading

0 comments on commit e347082

Please sign in to comment.