Skip to content

Commit

Permalink
Unconfirmed tx cache - CHIA-785 (#18422)
Browse files Browse the repository at this point in the history
* unconfirmed_tx_cache

* fix types in test

* fix already included check
  • Loading branch information
almogdepaz authored Dec 10, 2024
1 parent 9063e29 commit 35c8375
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 34 deletions.
12 changes: 7 additions & 5 deletions chia/_tests/environments/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from chia.simulator.full_node_simulator import FullNodeSimulator
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint32
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.transaction_record import LightTransactionRecord
from chia.wallet.util.transaction_type import CLAWBACK_INCOMING_TRANSACTION_TYPES
from chia.wallet.util.tx_config import DEFAULT_TX_CONFIG, TXConfig
from chia.wallet.wallet import Wallet
Expand Down Expand Up @@ -246,9 +246,9 @@ async def change_balances(self, update_dictionary: dict[Union[int, str], dict[st

async def wait_for_transactions_to_settle(
self, full_node_api: FullNodeSimulator, _exclude_from_mempool_check: list[bytes32] = []
) -> list[TransactionRecord]:
) -> list[LightTransactionRecord]:
# Gather all pending transactions
pending_txs: list[TransactionRecord] = await self.wallet_state_manager.tx_store.get_all_unconfirmed()
pending_txs: list[LightTransactionRecord] = await self.wallet_state_manager.tx_store.get_all_unconfirmed()
# Filter clawback txs
pending_txs = [
tx
Expand Down Expand Up @@ -318,7 +318,7 @@ async def process_pending_states(
ph_indexes[wallet_id] = await env.wallet_state_manager.puzzle_store.get_unused_count(wallet_id)
puzzle_hash_indexes.append(ph_indexes)

pending_txs: list[list[TransactionRecord]] = []
pending_txs: list[list[LightTransactionRecord]] = []
peak = self.full_node.full_node.blockchain.get_peak_height()
assert peak is not None
# Check balances prior to block
Expand Down Expand Up @@ -374,7 +374,9 @@ async def process_pending_states(
try:
await self.full_node.check_transactions_confirmed(env.wallet_state_manager, txs)
except TimeoutError: # pragma: no cover
unconfirmed: list[TransactionRecord] = await env.wallet_state_manager.tx_store.get_all_unconfirmed()
unconfirmed: list[
LightTransactionRecord
] = await env.wallet_state_manager.tx_store.get_all_unconfirmed()
raise TimeoutError(
f"ENV-{i} TXs not confirmed: {[tx.to_json_dict() for tx in unconfirmed if tx in txs]}"
)
Expand Down
7 changes: 6 additions & 1 deletion chia/_tests/wallet/dao_wallet/test_dao_wallets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2694,7 +2694,12 @@ async def test_dao_cat_exits(
dao_id_0 = dao_wallet_res_0.wallet_id
cat_wallet_0 = wallet_node_0.wallet_state_manager.wallets[dao_wallet_res_0.cat_wallet_id]
dao_cat_wallet_0 = wallet_node_0.wallet_state_manager.wallets[dao_wallet_res_0.dao_cat_wallet_id]
txs = await wallet_0.wallet_state_manager.tx_store.get_all_unconfirmed()
ltxs = await wallet_0.wallet_state_manager.tx_store.get_all_unconfirmed()
txs: list[TransactionRecord] = []
for ltx in ltxs:
tx = await wallet_0.wallet_state_manager.tx_store.get_transaction_record(ltx.name)
assert tx is not None
txs.append(tx)
await full_node_api.wait_transaction_records_entered_mempool(records=txs, timeout=60)
await full_node_api.process_transaction_records(records=txs, timeout=60)
await full_node_api.process_all_wallet_transactions(wallet_0, 60)
Expand Down
10 changes: 7 additions & 3 deletions chia/_tests/wallet/test_transaction_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
from chia.wallet.transaction_record import TransactionRecord, TransactionRecordOld, minimum_send_attempts
from chia.wallet.util.query_filter import TransactionTypeFilter
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.wallet_transaction_store import WalletTransactionStore, filter_ok_mempool_status
from chia.wallet.wallet_transaction_store import (
WalletTransactionStore,
filter_ok_mempool_status,
get_light_transaction_record,
)

module_seeded_random = random.Random()
module_seeded_random.seed(a=0, version=2)
Expand Down Expand Up @@ -252,8 +256,8 @@ async def test_get_all_unconfirmed(seeded_random: random.Random) -> None:
)
await store.add_transaction_record(tr1)
await store.add_transaction_record(tr2)

assert await store.get_all_unconfirmed() == [tr1]
all_unconfirmed = await store.get_all_unconfirmed()
assert all_unconfirmed == [get_light_transaction_record(tr1)]


@pytest.mark.anyio
Expand Down
6 changes: 3 additions & 3 deletions chia/simulator/full_node_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.timing import adjusted_timeout, backoff_times
from chia.wallet.payment import Payment
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.transaction_record import LightTransactionRecord, TransactionRecord
from chia.wallet.util.tx_config import DEFAULT_TX_CONFIG
from chia.wallet.wallet import Wallet
from chia.wallet.wallet_node import WalletNode
Expand Down Expand Up @@ -463,7 +463,7 @@ async def farm_rewards_to_wallet(

async def wait_transaction_records_entered_mempool(
self,
records: Collection[TransactionRecord],
records: Collection[Union[TransactionRecord, LightTransactionRecord]],
timeout: Union[float, None] = 5,
) -> None:
"""Wait until the transaction records have entered the mempool. Transaction
Expand Down Expand Up @@ -643,7 +643,7 @@ async def process_all_wallet_transactions(self, wallet: Wallet, timeout: Optiona
async def check_transactions_confirmed(
self,
wallet_state_manager: WalletStateManager,
transactions: list[TransactionRecord],
transactions: Union[list[TransactionRecord], list[LightTransactionRecord]],
timeout: Optional[float] = 5,
) -> None:
transactions_left: set[bytes32] = {tx.name for tx in transactions}
Expand Down
12 changes: 12 additions & 0 deletions chia/wallet/transaction_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from dataclasses import dataclass
from typing import Any, Generic, Optional, TypeVar

from chia_rs import SpendBundle

from chia.consensus.coinbase import farmer_parent_id, pool_parent_id
from chia.types.blockchain_format.coin import Coin
from chia.types.blockchain_format.sized_bytes import bytes32
Expand Down Expand Up @@ -144,3 +146,13 @@ def hint_dict(self) -> dict[bytes32, bytes32]:
@dataclass(frozen=True)
class TransactionRecord(TransactionRecordOld):
valid_times: ConditionValidTimes


@streamable
@dataclass(frozen=True)
class LightTransactionRecord(Streamable):
name: bytes32
type: uint32
additions: list[Coin]
removals: list[Coin]
spend_bundle: Optional[SpendBundle]
40 changes: 21 additions & 19 deletions chia/wallet/wallet_state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
from chia.wallet.trade_manager import TradeManager
from chia.wallet.trading.offer import Offer
from chia.wallet.trading.trade_status import TradeStatus
from chia.wallet.transaction_record import TransactionRecord
from chia.wallet.transaction_record import LightTransactionRecord, TransactionRecord
from chia.wallet.uncurried_puzzle import uncurry_puzzle
from chia.wallet.util.address_type import AddressType
from chia.wallet.util.compute_hints import compute_spend_hints_and_additions
Expand Down Expand Up @@ -1698,7 +1698,7 @@ async def _add_coin_states(
curr_h = last_change_height

trade_removals = await self.trade_manager.get_coins_of_interest()
all_unconfirmed: list[TransactionRecord] = await self.tx_store.get_all_unconfirmed()
all_unconfirmed: list[LightTransactionRecord] = await self.tx_store.get_all_unconfirmed()
used_up_to = -1
ph_to_index_cache: LRUCache[bytes32, uint32] = LRUCache(100)

Expand Down Expand Up @@ -1753,14 +1753,16 @@ async def _add_coin_states(
# Confirm tx records for txs which we submitted for coins which aren't in our wallet
if coin_state.created_height is not None and coin_state.spent_height is not None:
all_unconfirmed = await self.tx_store.get_all_unconfirmed()
tx_records_to_confirm: list[TransactionRecord] = []
tx_records_to_confirm: list[LightTransactionRecord] = []
for out_tx_record in all_unconfirmed:
if coin_state.coin in out_tx_record.removals:
tx_records_to_confirm.append(out_tx_record)

if len(tx_records_to_confirm) > 0:
for tx_record in tx_records_to_confirm:
await self.tx_store.set_confirmed(tx_record.name, uint32(coin_state.spent_height))
for light_tx_record in tx_records_to_confirm:
await self.tx_store.set_confirmed(
light_tx_record.name, uint32(coin_state.spent_height)
)
self.log.debug(f"No wallet for coin state: {coin_state}")
continue

Expand Down Expand Up @@ -1902,16 +1904,16 @@ async def _add_coin_states(

# Reorg rollback adds reorged transactions so it's possible there is tx_record already
# Even though we are just adding coin record to the db (after reorg)
tx_records: list[TransactionRecord] = []
tx_records: list[LightTransactionRecord] = []
for out_tx_record in all_unconfirmed:
for rem_coin in out_tx_record.removals:
if rem_coin == coin_state.coin:
tx_records.append(out_tx_record)

if len(tx_records) > 0:
for tx_record in tx_records:
for light_record in tx_records:
await self.tx_store.set_confirmed(
tx_record.name, uint32(coin_state.spent_height)
light_record.name, uint32(coin_state.spent_height)
)
else:
tx_name = bytes(coin_state.coin.name())
Expand Down Expand Up @@ -1945,20 +1947,20 @@ async def _add_coin_states(
await self.coin_store.set_spent(coin_name, uint32(coin_state.spent_height))
if record.coin_type == CoinType.CLAWBACK:
await self.interested_store.remove_interested_coin_id(coin_state.coin.name())
confirmed_tx_records: list[TransactionRecord] = []
confirmed_tx_records: list[LightTransactionRecord] = []

for tx_record in all_unconfirmed:
if tx_record.type in CLAWBACK_INCOMING_TRANSACTION_TYPES:
for add_coin in tx_record.additions:
for light_record in all_unconfirmed:
if light_record.type in CLAWBACK_INCOMING_TRANSACTION_TYPES:
for add_coin in light_record.additions:
if add_coin == coin_state.coin:
confirmed_tx_records.append(tx_record)
confirmed_tx_records.append(light_record)
else:
for rem_coin in tx_record.removals:
for rem_coin in light_record.removals:
if rem_coin == coin_state.coin:
confirmed_tx_records.append(tx_record)
confirmed_tx_records.append(light_record)

for tx_record in confirmed_tx_records:
await self.tx_store.set_confirmed(tx_record.name, uint32(coin_state.spent_height))
for light_record in confirmed_tx_records:
await self.tx_store.set_confirmed(light_record.name, uint32(coin_state.spent_height))
for unconfirmed_record in all_unconfirmed:
for rem_coin in unconfirmed_record.removals:
if rem_coin == coin_state.coin:
Expand Down Expand Up @@ -2201,7 +2203,7 @@ async def coin_added(
self,
coin: Coin,
height: uint32,
all_unconfirmed_transaction_records: list[TransactionRecord],
all_unconfirmed_transaction_records: list[LightTransactionRecord],
wallet_id: uint32,
wallet_type: WalletType,
peer: WSChiaConnection,
Expand Down Expand Up @@ -2231,7 +2233,7 @@ async def coin_added(
coin_confirmed_transaction = False
if not coinbase:
for record in all_unconfirmed_transaction_records:
if coin in record.additions and not record.confirmed:
if coin in record.additions:
await self.tx_store.set_confirmed(record.name, height)
coin_confirmed_transaction = True
break
Expand Down
30 changes: 27 additions & 3 deletions chia/wallet/wallet_transaction_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
from chia.util.errors import Err
from chia.util.ints import uint8, uint32
from chia.wallet.conditions import ConditionValidTimes
from chia.wallet.transaction_record import TransactionRecord, TransactionRecordOld, minimum_send_attempts
from chia.wallet.transaction_record import (
LightTransactionRecord,
TransactionRecord,
TransactionRecordOld,
minimum_send_attempts,
)
from chia.wallet.transaction_sorting import SortKey
from chia.wallet.util.query_filter import FilterMode, TransactionTypeFilter
from chia.wallet.util.transaction_type import TransactionType
Expand All @@ -37,6 +42,7 @@ class WalletTransactionStore:

db_wrapper: DBWrapper2
tx_submitted: dict[bytes32, tuple[int, int]] # tx_id: [time submitted: count]
unconfirmed_txs: list[LightTransactionRecord] # tx_id: [time submitted: count]
last_wallet_tx_resend_time: int # Epoch time in seconds

@classmethod
Expand Down Expand Up @@ -93,6 +99,7 @@ async def create(cls, db_wrapper: DBWrapper2):

self.tx_submitted = {}
self.last_wallet_tx_resend_time = int(time.time())
await self.load_unconfirmed()
return self

async def add_transaction_record(self, record: TransactionRecord) -> None:
Expand Down Expand Up @@ -138,6 +145,9 @@ async def add_transaction_record(self, record: TransactionRecord) -> None:
await conn.execute_insert(
"INSERT OR REPLACE INTO tx_times VALUES (?, ?)", (record.name, bytes(record.valid_times))
)
ltx = get_light_transaction_record(record)
if record.confirmed is False and ltx not in self.unconfirmed_txs:
self.unconfirmed_txs.append(ltx)

async def delete_transaction_record(self, tx_id: bytes32) -> None:
async with self.db_wrapper.writer_maybe_transaction() as conn:
Expand All @@ -154,6 +164,7 @@ async def set_confirmed(self, tx_id: bytes32, height: uint32):
return
tx: TransactionRecord = dataclasses.replace(current, confirmed_at_height=height, confirmed=True)
await self.add_transaction_record(tx)
self.unconfirmed_txs.remove(get_light_transaction_record(current))

async def increment_sent(
self,
Expand Down Expand Up @@ -269,13 +280,20 @@ async def get_farming_rewards(self) -> list[TransactionRecord]:
)
return await self._get_new_tx_records_from_old([TransactionRecordOld.from_bytes(row[0]) for row in rows])

async def get_all_unconfirmed(self) -> list[TransactionRecord]:
async def get_all_unconfirmed(self) -> list[LightTransactionRecord]:
"""
Returns the list of all transaction that have not yet been confirmed.
"""
return self.unconfirmed_txs

async def load_unconfirmed(self) -> None:
"""
loads the list of all transaction that have not yet been confirmed into the cache.
"""
async with self.db_wrapper.reader_no_transaction() as conn:
rows = await conn.execute_fetchall("SELECT transaction_record from transaction_record WHERE confirmed=0")
return await self._get_new_tx_records_from_old([TransactionRecordOld.from_bytes(row[0]) for row in rows])
records = [TransactionRecordOld.from_bytes(row[0]) for row in rows]
self.unconfirmed_txs = [get_light_transaction_record(rec) for rec in records]

async def get_unconfirmed_for_wallet(self, wallet_id: int) -> list[TransactionRecord]:
"""
Expand Down Expand Up @@ -470,3 +488,9 @@ async def _get_new_tx_records_from_old(self, old_records: list[TransactionRecord
)
for record in old_records
]


def get_light_transaction_record(rec: TransactionRecordOld) -> LightTransactionRecord:
return LightTransactionRecord(
name=rec.name, additions=rec.additions, removals=rec.removals, type=rec.type, spend_bundle=rec.spend_bundle
)

0 comments on commit 35c8375

Please sign in to comment.