diff --git a/bumble/pandora/l2cap.py b/bumble/pandora/l2cap.py index f88c4341..d7915297 100644 --- a/bumble/pandora/l2cap.py +++ b/bumble/pandora/l2cap.py @@ -16,7 +16,6 @@ import grpc import json import logging -import threading from . import utils from .config import Config @@ -61,6 +60,12 @@ def __init__(self, device: Device, config: Config) -> None: self.config = config self.sdu_queue: asyncio.Queue = asyncio.Queue() + def on_channel_sdu(self, sdu): + async def handle_sdu(): + await self.sdu_queue.put_nowait(sdu) + + asyncio.create_task(handle_sdu()) + @utils.rpc async def WaitConnection( self, request: WaitConnectionRequest, context: grpc.ServicerContext @@ -113,6 +118,7 @@ def on_l2cap_channel( l2cap_channel: Union[ClassicChannel, LeCreditBasedChannel] ): try: + l2cap_channel.sink = self.on_channel_sdu channel_future.set_result(l2cap_channel) self.log.debug( f'Channel future set successfully with channel= {l2cap_channel}' @@ -155,7 +161,7 @@ def on_close(): closed_event.set() l2cap_channel.on('close', on_close) - await closed_event.wait() + _ = await closed_event.wait() return WaitDisconnectionResponse(success=empty_pb2.Empty()) except Exception as e: self.log.exception(f'WaitDisonnection failed: {e}') @@ -173,17 +179,13 @@ async def Receive( if not isinstance(channel, Channel): raise NotImplementedError(f'TODO: {type(channel)} not currently supported.') - def on_channel_sdu(sdu): - async def handle_sdu(): - await self.sdu_queue.put(sdu) - - asyncio.create_task(handle_sdu()) - l2cap_channel = self.get_l2cap_channel(channel) if l2cap_channel is None: raise ValueError('The channel in the request is not valid.') - l2cap_channel.sink = on_channel_sdu + if not l2cap_channel.sink: + l2cap_channel.sink = self.on_channel_sdu + while sdu := await self.sdu_queue.get(): # Retrieve the next SDU from the queue self.log.debug(f'Receive: Received {len(sdu)} bytes -> {sdu.decode()}') @@ -299,6 +301,7 @@ def get_l2cap_channel( l2cap_channel = self.device.l2cap_channel_manager.find_le_coc_channel( connection_handle, destination_cid ) + self.log.debug(f'get_l2cap_channel: l2cap_channel={l2cap_channel}') return l2cap_channel def channel_to_proto(