From 2f6c2d4567c07f826dcf303e76539a60b828ab51 Mon Sep 17 00:00:00 2001 From: jfechete <60266095+jfechete@users.noreply.github.com> Date: Thu, 29 Dec 2022 23:57:44 -0800 Subject: [PATCH 1/2] Added touch stream data listener --- nanoleafapi/nanoleaf.py | 51 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/nanoleafapi/nanoleaf.py b/nanoleafapi/nanoleaf.py index 0c67c7d..21ed943 100644 --- a/nanoleafapi/nanoleaf.py +++ b/nanoleafapi/nanoleaf.py @@ -11,6 +11,7 @@ from typing import Any, List, Dict, Tuple, Union, Callable, Optional from sseclient import SSEClient import requests +import socket # Preset colours RED = (255, 0, 0) @@ -594,7 +595,8 @@ def get_layout(self) -> Dict[str, Any]: ####################################################### def register_event(self, func : Callable[[Dict[str, Any]], Any], - event_types : List[int]) -> None: + event_types : List[int], + touch_port : int =None, touch_func : Callable[[Dict[str, Any]], Any] =None) -> None: """Starts a thread to register and listen for events Creates an event listener. This method can only be called once per @@ -620,22 +622,63 @@ def register_event(self, func : Callable[[Dict[str, Any]], Any], for event in event_types: if event < 1 or event > 4: raise Exception("Valid event types must be between 1-4") + if touch_port != None and touch_func == None: + raise Exception("No callback function was given for the touch stream data") + if touch_port == None and touch_func != None: + raise Exception("No port was given for the touch stream data") self.already_registered = True - thread = Thread(target=self.__event_listener, args=(func, set(event_types))) + if touch_port != None: + touch_thread = Thread(target=self.__touch_stream_listener, args=(touch_port, touch_func)) + touch_thread.daemon = True + touch_thread.start() + args = (func, set(event_types)) + if touch_port != None: + args += (touch_port,) + thread = Thread(target=self.__event_listener, args=args) thread.daemon = True thread.start() def __event_listener(self, func : Callable[[Dict[str, Any]], Any], - event_types : List[int]) -> None: + event_types : List[int], touch_port : int =None) -> None: """Listens for events and passes event data to the user-defined function.""" url = self.url + "/events?id=" for event in event_types: url += str(event) + "," - client = SSEClient(url[:-1]) + headers = {} + if touch_port != None: + headers["TouchEventsPort"] = str(touch_port) + client = SSEClient(url[:-1], headers=headers) for event in client: func(json.loads(str(event))) + def __touch_stream_listener(self, port : int, func : Callable[[Dict[str, Any]], Any]) -> None: + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_socket.bind(("", port)) + data = b"" + panels_coming = 0 + while True: + current_data, addr = server_socket.recvfrom(1024) + data += current_data + if not data: + break + + while True: + if panels_coming > 0: + if len(data) >= 5: + panel_data = data[0:5] + data = data[5:] + panels_coming -= 1 + func(panel_data) + else: + break + else: + if len(data) >= 2: + panels_coming = int.from_bytes(data[0:2], "big") + data = data[2:] + else: + break + ####################################################### #### ERRORS #### From 2a9dafe75aa7ade3cda8d85e87ca682cdaa53f7a Mon Sep 17 00:00:00 2001 From: jfechete <60266095+jfechete@users.noreply.github.com> Date: Mon, 2 Jan 2023 15:14:06 -0800 Subject: [PATCH 2/2] Changed panel data to a dictionary --- nanoleafapi/nanoleaf.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/nanoleafapi/nanoleaf.py b/nanoleafapi/nanoleaf.py index 21ed943..fbeeeb8 100644 --- a/nanoleafapi/nanoleaf.py +++ b/nanoleafapi/nanoleaf.py @@ -611,6 +611,12 @@ def register_event(self, func : Callable[[Dict[str, Any]], Any], 2 = layout, 3 = effects, 4 = touch (Canvas only) + :param touch_port: Optional, can be set along with touch_func in order + to register to touch stream data. This is the port that the UDP + socket will be opened on. + :param touch_func: The function to run when a touch event is recieved + from the touch stream data. This function will recieve the event + as a dictionary. """ if self.already_registered: @@ -626,6 +632,8 @@ def register_event(self, func : Callable[[Dict[str, Any]], Any], raise Exception("No callback function was given for the touch stream data") if touch_port == None and touch_func != None: raise Exception("No port was given for the touch stream data") + if touch_port != None and 4 not in event_types: + raise Exception("The touch event must be registered in order recieve touch stream data") self.already_registered = True if touch_port != None: touch_thread = Thread(target=self.__touch_stream_listener, args=(touch_port, touch_func)) @@ -653,6 +661,11 @@ def __event_listener(self, func : Callable[[Dict[str, Any]], Any], func(json.loads(str(event))) def __touch_stream_listener(self, port : int, func : Callable[[Dict[str, Any]], Any]) -> None: + """Listens for touch events and passes formatted data to the user-defined + function.""" + #There seems to be a value 5 for touch types that isn't in the documentation + #From testing it appears to be sent whenever a Tap event is sent in the SSE client + touch_types_map = {0:"Hover",1:"Down",2:"Hold",3:"Up",4:"Swipe",5:"Tap"} server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) server_socket.bind(("", port)) data = b"" @@ -666,9 +679,21 @@ def __touch_stream_listener(self, port : int, func : Callable[[Dict[str, Any]], while True: if panels_coming > 0: if len(data) >= 5: - panel_data = data[0:5] + raw_panel_data = data[0:5] data = data[5:] panels_coming -= 1 + + panel_data = {} + panel_data["panelId"] = int.from_bytes(raw_panel_data[0:2], "big") + touch_data = raw_panel_data[2] + panel_data["touchType"] = ( + touch_types_map[touch_data >> 4] + if touch_data >> 4 in touch_types_map else + "Unknown" + ) + panel_data["touchStrength"] = touch_data & 15 + if panel_data["touchType"] == "Swipe": + panel_data["panelIdFrom"] = int.from_bytes(raw_panel_data[3:5], "big") func(panel_data) else: break