Skip to content

Commit

Permalink
Merge branch 'main' into task_pit
Browse files Browse the repository at this point in the history
  • Loading branch information
altendky committed Dec 11, 2024
2 parents b3c016d + 3308c85 commit 9c350ab
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 98 deletions.
25 changes: 15 additions & 10 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class FullNode:
log: logging.Logger
db_path: Path
wallet_sync_queue: asyncio.Queue[WalletUpdate]
_segment_task: Optional[asyncio.Task[None]] = None
_segment_task_list: list[asyncio.Task[None]] = dataclasses.field(default_factory=list)
initialized: bool = False
_server: Optional[ChiaServer] = None
_shut_down: bool = False
Expand Down Expand Up @@ -370,7 +370,8 @@ async def manage(self) -> AsyncIterator[None]:
for one_sync_task in self._sync_task_list:
if not one_sync_task.done():
cancel_task_safe(task=one_sync_task, log=self.log)

for segment_task in self._segment_task_list:
cancel_task_safe(segment_task, self.log)
for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()):
cancel_task_safe(task, self.log)
if self._init_weight_proof is not None:
Expand All @@ -389,6 +390,7 @@ async def manage(self) -> AsyncIterator[None]:
with contextlib.suppress(asyncio.CancelledError):
self.log.info(f"Awaiting long sync task {one_sync_task.get_name()}")
await one_sync_task
await asyncio.gather(*self._segment_task_list, return_exceptions=True)

@property
def block_store(self) -> BlockStore:
Expand Down Expand Up @@ -599,12 +601,11 @@ async def short_sync_batch(self, peer: WSChiaConnection, start_height: uint32, t
return False

batch_size = self.constants.MAX_BLOCK_COUNT_PER_REQUESTS
if self._segment_task is not None and (not self._segment_task.done()):
try:
self._segment_task.cancel()
except Exception as e:
self.log.warning(f"failed to cancel segment task {e}")
self._segment_task = None
for task in self._segment_task_list[:]:
if task.done():
self._segment_task_list.remove(task)
else:
cancel_task_safe(task=task, log=self.log)

try:
peer_info = peer.get_peer_logging()
Expand Down Expand Up @@ -2242,8 +2243,12 @@ async def add_block(

record = self.blockchain.block_record(block.header_hash)
if self.weight_proof_handler is not None and record.sub_epoch_summary_included is not None:
if self._segment_task is None or self._segment_task.done():
self._segment_task = create_referenced_task(self.weight_proof_handler.create_prev_sub_epoch_segments())
self._segment_task_list.append(
create_referenced_task(self.weight_proof_handler.create_prev_sub_epoch_segments())
)
for task in self._segment_task_list[:]:
if task.done():
self._segment_task_list.remove(task)
return None

async def add_unfinished_block(
Expand Down
Loading

0 comments on commit 9c350ab

Please sign in to comment.