Skip to content

Commit

Permalink
Optimize add node hashes.
Browse files Browse the repository at this point in the history
  • Loading branch information
fchirica committed Dec 4, 2024
1 parent 9e6f75d commit 21aee68
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
24 changes: 24 additions & 0 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1982,3 +1982,27 @@ async def test_migration(
assert await data_store.get_keys_values(store_id=store_id) == []
await data_store.migrate_db(tmp_path)
assert await data_store.get_keys_values(store_id=store_id) == kv_before


@pytest.mark.anyio
@pytest.mark.parametrize("num_keys", [10, 1000])
async def test_get_existing_hashes(
data_store: DataStore,
store_id: bytes32,
num_keys: int,
) -> None:
changelist: list[dict[str, Any]] = []
for i in range(num_keys):
key = i.to_bytes(4, byteorder="big")
value = (2 * i).to_bytes(4, byteorder="big")
changelist.append({"action": "insert", "key": key, "value": value})
await data_store.insert_batch(store_id, changelist, status=Status.COMMITTED)
await data_store.add_node_hashes(store_id)

root = await data_store.get_tree_root(store_id=store_id)
merkle_blob = await data_store.get_merkle_blob(root_hash=root.node_hash)
hash_to_index = merkle_blob.get_hashes_indexes()
existing_hashes = list(hash_to_index.keys())
not_existing_hashes = [bytes32(i.to_bytes(32, byteorder="big")) for i in range(num_keys)]
result = await data_store.get_existing_hashes(existing_hashes + not_existing_hashes, store_id)
assert result == set(existing_hashes)
22 changes: 19 additions & 3 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
)
from chia.data_layer.util.merkle_blob import NodeType as NodeTypeMerkleBlob
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.db_wrapper import DBWrapper2
from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2
from chia.util.lru_cache import LRUCache

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -484,6 +484,21 @@ async def get_first_generation(self, node_hash: bytes32, store_id: bytes32) -> O

return int(row[0])

async def get_existing_hashes(self, node_hashes: List[bytes32], store_id: bytes32) -> set[bytes32]:
result: set[bytes32] = set()
batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10)

for i in range(0, len(node_hashes), batch_size):
chunk = node_hashes[i:i + batch_size]
placeholders = ",".join(["?"] * len(chunk))
query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders})"
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(query, (store_id, *chunk))
async for row in cursor:
result.add(row[0])

return result

async def add_node_hash(
self, store_id: bytes32, hash: bytes32, root_hash: bytes32, generation: int, index: int
) -> None:
Expand All @@ -503,9 +518,10 @@ async def add_node_hashes(self, store_id: bytes32) -> None:

merkle_blob = await self.get_merkle_blob(root_hash=root.node_hash)
hash_to_index = merkle_blob.get_hashes_indexes()

existing_hashes = await self.get_existing_hashes(list(hash_to_index.keys()), store_id)
for hash, index in hash_to_index.items():
existing_generation = await self.get_first_generation(hash, store_id)
if existing_generation is None:
if hash not in existing_hashes:
await self.add_node_hash(store_id, hash, root.node_hash, root.generation, index)

async def build_blob_from_nodes(
Expand Down

0 comments on commit 21aee68

Please sign in to comment.