From 21aee6829c50d8d1c91c0ade11384aa07d2aeaca Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 4 Dec 2024 03:32:13 +0200 Subject: [PATCH 1/6] Optimize add node hashes. --- .../_tests/core/data_layer/test_data_store.py | 24 +++++++++++++++++++ chia/data_layer/data_store.py | 22 ++++++++++++++--- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/chia/_tests/core/data_layer/test_data_store.py b/chia/_tests/core/data_layer/test_data_store.py index 9c5abe5cda7b..8767dc30f624 100644 --- a/chia/_tests/core/data_layer/test_data_store.py +++ b/chia/_tests/core/data_layer/test_data_store.py @@ -1982,3 +1982,27 @@ async def test_migration( assert await data_store.get_keys_values(store_id=store_id) == [] await data_store.migrate_db(tmp_path) assert await data_store.get_keys_values(store_id=store_id) == kv_before + + +@pytest.mark.anyio +@pytest.mark.parametrize("num_keys", [10, 1000]) +async def test_get_existing_hashes( + data_store: DataStore, + store_id: bytes32, + num_keys: int, +) -> None: + changelist: list[dict[str, Any]] = [] + for i in range(num_keys): + key = i.to_bytes(4, byteorder="big") + value = (2 * i).to_bytes(4, byteorder="big") + changelist.append({"action": "insert", "key": key, "value": value}) + await data_store.insert_batch(store_id, changelist, status=Status.COMMITTED) + await data_store.add_node_hashes(store_id) + + root = await data_store.get_tree_root(store_id=store_id) + merkle_blob = await data_store.get_merkle_blob(root_hash=root.node_hash) + hash_to_index = merkle_blob.get_hashes_indexes() + existing_hashes = list(hash_to_index.keys()) + not_existing_hashes = [bytes32(i.to_bytes(32, byteorder="big")) for i in range(num_keys)] + result = await data_store.get_existing_hashes(existing_hashes + not_existing_hashes, store_id) + assert result == set(existing_hashes) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 4a81e7ae54ed..0f943bba6204 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -54,7 +54,7 @@ ) from chia.data_layer.util.merkle_blob import NodeType as NodeTypeMerkleBlob from chia.types.blockchain_format.sized_bytes import bytes32 -from chia.util.db_wrapper import DBWrapper2 +from chia.util.db_wrapper import SQLITE_MAX_VARIABLE_NUMBER, DBWrapper2 from chia.util.lru_cache import LRUCache log = logging.getLogger(__name__) @@ -484,6 +484,21 @@ async def get_first_generation(self, node_hash: bytes32, store_id: bytes32) -> O return int(row[0]) + async def get_existing_hashes(self, node_hashes: List[bytes32], store_id: bytes32) -> set[bytes32]: + result: set[bytes32] = set() + batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10) + + for i in range(0, len(node_hashes), batch_size): + chunk = node_hashes[i:i + batch_size] + placeholders = ",".join(["?"] * len(chunk)) + query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders})" + async with self.db_wrapper.reader() as reader: + cursor = await reader.execute(query, (store_id, *chunk)) + async for row in cursor: + result.add(row[0]) + + return result + async def add_node_hash( self, store_id: bytes32, hash: bytes32, root_hash: bytes32, generation: int, index: int ) -> None: @@ -503,9 +518,10 @@ async def add_node_hashes(self, store_id: bytes32) -> None: merkle_blob = await self.get_merkle_blob(root_hash=root.node_hash) hash_to_index = merkle_blob.get_hashes_indexes() + + existing_hashes = await self.get_existing_hashes(list(hash_to_index.keys()), store_id) for hash, index in hash_to_index.items(): - existing_generation = await self.get_first_generation(hash, store_id) - if existing_generation is None: + if hash not in existing_hashes: await self.add_node_hash(store_id, hash, root.node_hash, root.generation, index) async def build_blob_from_nodes( From 4a3ba52d2659e8a4b2cad9e0beb27ad757baef77 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 4 Dec 2024 03:39:12 +0200 Subject: [PATCH 2/6] Lint. --- chia/data_layer/data_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 0f943bba6204..53c1efa5afc7 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -484,7 +484,7 @@ async def get_first_generation(self, node_hash: bytes32, store_id: bytes32) -> O return int(row[0]) - async def get_existing_hashes(self, node_hashes: List[bytes32], store_id: bytes32) -> set[bytes32]: + async def get_existing_hashes(self, node_hashes: list[bytes32], store_id: bytes32) -> set[bytes32]: result: set[bytes32] = set() batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10) From 7572044a1d42b97d5219e24f4f512b9e17def79b Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 4 Dec 2024 03:41:02 +0200 Subject: [PATCH 3/6] Lint. --- chia/data_layer/data_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 53c1efa5afc7..969b2ac30a81 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -489,7 +489,7 @@ async def get_existing_hashes(self, node_hashes: list[bytes32], store_id: bytes3 batch_size = min(500, SQLITE_MAX_VARIABLE_NUMBER - 10) for i in range(0, len(node_hashes), batch_size): - chunk = node_hashes[i:i + batch_size] + chunk = node_hashes[i : i + batch_size] placeholders = ",".join(["?"] * len(chunk)) query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders})" async with self.db_wrapper.reader() as reader: From e52d27ece4b2fef3a76012965d897f2a1d2a1d13 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Thu, 5 Dec 2024 01:38:51 +0200 Subject: [PATCH 4/6] Apply suggestions from code review Co-authored-by: Kyle Altendorf --- chia/data_layer/data_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 969b2ac30a81..176770e7d326 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -491,11 +491,11 @@ async def get_existing_hashes(self, node_hashes: list[bytes32], store_id: bytes3 for i in range(0, len(node_hashes), batch_size): chunk = node_hashes[i : i + batch_size] placeholders = ",".join(["?"] * len(chunk)) - query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders})" + query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders}) LIMIT {len(chunk)}" async with self.db_wrapper.reader() as reader: - cursor = await reader.execute(query, (store_id, *chunk)) - async for row in cursor: - result.add(row[0]) + async with reader.execute(query, (store_id, *chunk)) as cursor: + rows = await cursor.fetchall() + result.update(row["hash"] for row in rows) return result From fa3b2f3adb400c27bfe07d244d35a1f8e6272d1c Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Thu, 5 Dec 2024 01:40:03 +0200 Subject: [PATCH 5/6] Update data_store.py --- chia/data_layer/data_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 176770e7d326..247842bedcbf 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -492,10 +492,11 @@ async def get_existing_hashes(self, node_hashes: list[bytes32], store_id: bytes3 chunk = node_hashes[i : i + batch_size] placeholders = ",".join(["?"] * len(chunk)) query = f"SELECT hash FROM nodes WHERE store_id = ? AND hash IN ({placeholders}) LIMIT {len(chunk)}" + async with self.db_wrapper.reader() as reader: async with reader.execute(query, (store_id, *chunk)) as cursor: - rows = await cursor.fetchall() - result.update(row["hash"] for row in rows) + rows = await cursor.fetchall() + result.update(row["hash"] for row in rows) return result From 6c3285f55fd3e0e5f0de02b43acb2d31e8f312ab Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Thu, 5 Dec 2024 03:09:44 +0200 Subject: [PATCH 6/6] Indent properly. --- chia/data_layer/data_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chia/data_layer/data_store.py b/chia/data_layer/data_store.py index 247842bedcbf..d1273e949ca5 100644 --- a/chia/data_layer/data_store.py +++ b/chia/data_layer/data_store.py @@ -495,8 +495,8 @@ async def get_existing_hashes(self, node_hashes: list[bytes32], store_id: bytes3 async with self.db_wrapper.reader() as reader: async with reader.execute(query, (store_id, *chunk)) as cursor: - rows = await cursor.fetchall() - result.update(row["hash"] for row in rows) + rows = await cursor.fetchall() + result.update(row["hash"] for row in rows) return result