Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track weight proof tasks #18896

Merged
merged 5 commits into from
Dec 11, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class FullNode:
log: logging.Logger
db_path: Path
wallet_sync_queue: asyncio.Queue[WalletUpdate]
_segment_task: Optional[asyncio.Task[None]] = None
_segment_task_list: list[asyncio.Task[None]] = dataclasses.field(default_factory=list)
initialized: bool = False
_server: Optional[ChiaServer] = None
_shut_down: bool = False
Expand Down Expand Up @@ -370,7 +370,8 @@ async def manage(self) -> AsyncIterator[None]:
for one_sync_task in self._sync_task_list:
if not one_sync_task.done():
cancel_task_safe(task=one_sync_task, log=self.log)

for segment_task in self._segment_task_list:
cancel_task_safe(segment_task, self.log)
for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()):
cancel_task_safe(task, self.log)
if self._init_weight_proof is not None:
Expand All @@ -389,6 +390,10 @@ async def manage(self) -> AsyncIterator[None]:
with contextlib.suppress(asyncio.CancelledError):
self.log.info(f"Awaiting long sync task {one_sync_task.get_name()}")
await one_sync_task
for segment_task in self._segment_task_list:
with contextlib.suppress(asyncio.CancelledError):
self.log.info(f"Awaitin segment task {segment_task.get_name()}")
await segment_task
almogdepaz marked this conversation as resolved.
Show resolved Hide resolved

@property
def block_store(self) -> BlockStore:
Expand Down Expand Up @@ -599,12 +604,11 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
return False

batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
if self._segment_task is not None and (not self._segment_task.done()):
try:
self._segment_task.cancel()
except Exception as e:
self.log.warning(f"failed to cancel segment task {e}")
self._segment_task = None
for task in self._segment_task_list[:]:
if task.done():
self._segment_task_list.remove(task)
else:
cancel_task_safe(task=task, log=self.log)
altendky marked this conversation as resolved.
Show resolved Hide resolved

try:
peer_info = peer.get_peer_logging()
Expand Down Expand Up @@ -2223,8 +2227,12 @@ async def add_block(

record = self.blockchain.block_record(block.header_hash)
if self.weight_proof_handler is not None and record.sub_epoch_summary_included is not None:
if self._segment_task is None or self._segment_task.done():
self._segment_task = asyncio.create_task(self.weight_proof_handler.create_prev_sub_epoch_segments())
self._segment_task_list.append(
asyncio.create_task(self.weight_proof_handler.create_prev_sub_epoch_segments())
)
for task in self._segment_task_list[:]:
if task.done():
self._segment_task_list.remove(task)
altendky marked this conversation as resolved.
Show resolved Hide resolved
return None

async def add_unfinished_block(
Expand Down
Loading