Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandora: Fix l2cap receive rpc #541

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions bumble/pandora/l2cap.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import grpc
import json
import logging
import threading

from . import utils
from .config import Config
Expand Down Expand Up @@ -61,6 +60,12 @@ def __init__(self, device: Device, config: Config) -> None:
self.config = config
self.sdu_queue: asyncio.Queue = asyncio.Queue()

def on_channel_sdu(self, sdu):
async def handle_sdu():
await self.sdu_queue.put_nowait(sdu)

asyncio.create_task(handle_sdu())

@utils.rpc
async def WaitConnection(
self, request: WaitConnectionRequest, context: grpc.ServicerContext
Expand Down Expand Up @@ -113,6 +118,7 @@ def on_l2cap_channel(
l2cap_channel: Union[ClassicChannel, LeCreditBasedChannel]
):
try:
l2cap_channel.sink = self.on_channel_sdu
channel_future.set_result(l2cap_channel)
self.log.debug(
f'Channel future set successfully with channel= {l2cap_channel}'
Expand Down Expand Up @@ -155,7 +161,7 @@ def on_close():
closed_event.set()

l2cap_channel.on('close', on_close)
await closed_event.wait()
_ = await closed_event.wait()
return WaitDisconnectionResponse(success=empty_pb2.Empty())
except Exception as e:
self.log.exception(f'WaitDisonnection failed: {e}')
Expand All @@ -173,17 +179,13 @@ async def Receive(
if not isinstance(channel, Channel):
raise NotImplementedError(f'TODO: {type(channel)} not currently supported.')

def on_channel_sdu(sdu):
async def handle_sdu():
await self.sdu_queue.put(sdu)

asyncio.create_task(handle_sdu())

l2cap_channel = self.get_l2cap_channel(channel)
if l2cap_channel is None:
raise ValueError('The channel in the request is not valid.')

l2cap_channel.sink = on_channel_sdu
if not l2cap_channel.sink:
l2cap_channel.sink = self.on_channel_sdu

while sdu := await self.sdu_queue.get():
# Retrieve the next SDU from the queue
self.log.debug(f'Receive: Received {len(sdu)} bytes -> {sdu.decode()}')
Expand Down Expand Up @@ -299,6 +301,7 @@ def get_l2cap_channel(
l2cap_channel = self.device.l2cap_channel_manager.find_le_coc_channel(
connection_handle, destination_cid
)
self.log.debug(f'get_l2cap_channel: l2cap_channel={l2cap_channel}')
return l2cap_channel

def channel_to_proto(
Expand Down