Skip to content

Commit

Permalink
websocket tweaks
Browse files Browse the repository at this point in the history
simplifies websocket implementation
  • Loading branch information
pablobuenaposada committed Nov 6, 2024
1 parent ab92d93 commit 1a72a93
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
class Backend:
def __init__(self):
self._load_user_preferences()
self.logger = Logger(self.setup_file.json.get("hddlg").get("autostart"))
self._init_resources()
self._init_websocket()
self.logger = Logger(self.setup_file.json.get("hddlg").get("autostart"))

def stop(self):
self.websocket.stop()
Expand Down
39 changes: 10 additions & 29 deletions src/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@


class Websocket:
clients_connected = set()
backend = None

def __init__(self, backend):
self.backend = backend
self.websocket = websockets.serve(
self._websocket_handler, WEBSOCKET_HOST, WEBSOCKET_PORT
self.connection_handler, WEBSOCKET_HOST, WEBSOCKET_PORT
)
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.websocket)
threading.Thread(target=self.loop.run_forever).start()

async def _websocket_handler(self, websocket, path):
await self._register(websocket) # register this client to keep tracking of it
consumer_task = asyncio.ensure_future(self._consumer_handler(websocket))
producer_task = asyncio.ensure_future(self._producer_handler(websocket))
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED
async def connection_handler(self, websocket):
await asyncio.gather(
self.broadcast_data(websocket), self._consumer_handler(websocket)
)
for task in pending:
task.cancel()

async def broadcast_data(self, websocket):
"""Keeps sending updated ecu data forever"""
while websocket.open:
await websocket.send(json.dumps({"data": self.backend.update()}))
await asyncio.sleep(0.01)

async def _consumer_handler(self, websocket):
"""Here is where messages from clients are processed"""
Expand Down Expand Up @@ -93,23 +91,6 @@ async def _consumer_handler(self, websocket):
)
)

async def _producer_handler(self, websocket):
"""Keeps sending updated ecu data forever"""
while True:
data = self.backend.update()
await websocket.send(json.dumps({"data": data}))
self.backend.logger.log(data)
await asyncio.sleep(0.1)

async def _send_all_clients(self, message):
"""Broadcast to all the clients"""
if self.clients_connected: # asyncio.wait doesn't accept an empty list
await asyncio.wait([user.send(message) for user in self.clients_connected])

async def _register(self, websocket):
"""Appends a new client to the connected clients list"""
self.clients_connected.add(websocket)

def stop(self):
self.websocket.ws_server.close()
self.loop.call_soon_threadsafe(self.loop.stop)

0 comments on commit 1a72a93

Please sign in to comment.