Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registering to touch stream data #16

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions nanoleafapi/nanoleaf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, List, Dict, Tuple, Union, Callable, Optional
from sseclient import SSEClient
import requests
import socket

# Preset colours
RED = (255, 0, 0)
Expand Down Expand Up @@ -594,7 +595,8 @@ def get_layout(self) -> Dict[str, Any]:
#######################################################

def register_event(self, func : Callable[[Dict[str, Any]], Any],
event_types : List[int]) -> None:
event_types : List[int],
touch_port : int =None, touch_func : Callable[[Dict[str, Any]], Any] =None) -> None:
"""Starts a thread to register and listen for events

Creates an event listener. This method can only be called once per
Expand All @@ -609,6 +611,12 @@ def register_event(self, func : Callable[[Dict[str, Any]], Any],
2 = layout,
3 = effects,
4 = touch (Canvas only)
:param touch_port: Optional, can be set along with touch_func in order
to register to touch stream data. This is the port that the UDP
socket will be opened on.
:param touch_func: The function to run when a touch event is recieved
from the touch stream data. This function will recieve the event
as a dictionary.
"""

if self.already_registered:
Expand All @@ -620,22 +628,82 @@ def register_event(self, func : Callable[[Dict[str, Any]], Any],
for event in event_types:
if event < 1 or event > 4:
raise Exception("Valid event types must be between 1-4")
if touch_port != None and touch_func == None:
raise Exception("No callback function was given for the touch stream data")
if touch_port == None and touch_func != None:
raise Exception("No port was given for the touch stream data")
if touch_port != None and 4 not in event_types:
raise Exception("The touch event must be registered in order recieve touch stream data")
self.already_registered = True
thread = Thread(target=self.__event_listener, args=(func, set(event_types)))
if touch_port != None:
touch_thread = Thread(target=self.__touch_stream_listener, args=(touch_port, touch_func))
touch_thread.daemon = True
touch_thread.start()
args = (func, set(event_types))
if touch_port != None:
args += (touch_port,)
thread = Thread(target=self.__event_listener, args=args)
thread.daemon = True
thread.start()

def __event_listener(self, func : Callable[[Dict[str, Any]], Any],
event_types : List[int]) -> None:
event_types : List[int], touch_port : int =None) -> None:
"""Listens for events and passes event data to the user-defined
function."""
url = self.url + "/events?id="
for event in event_types:
url += str(event) + ","
client = SSEClient(url[:-1])
headers = {}
if touch_port != None:
headers["TouchEventsPort"] = str(touch_port)
client = SSEClient(url[:-1], headers=headers)
for event in client:
func(json.loads(str(event)))

def __touch_stream_listener(self, port : int, func : Callable[[Dict[str, Any]], Any]) -> None:
"""Listens for touch events and passes formatted data to the user-defined
function."""
#There seems to be a value 5 for touch types that isn't in the documentation
#From testing it appears to be sent whenever a Tap event is sent in the SSE client
touch_types_map = {0:"Hover",1:"Down",2:"Hold",3:"Up",4:"Swipe",5:"Tap"}
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(("", port))
data = b""
panels_coming = 0
while True:
current_data, addr = server_socket.recvfrom(1024)
data += current_data
if not data:
break

while True:
if panels_coming > 0:
if len(data) >= 5:
raw_panel_data = data[0:5]
data = data[5:]
panels_coming -= 1

panel_data = {}
panel_data["panelId"] = int.from_bytes(raw_panel_data[0:2], "big")
touch_data = raw_panel_data[2]
panel_data["touchType"] = (
touch_types_map[touch_data >> 4]
if touch_data >> 4 in touch_types_map else
"Unknown"
)
panel_data["touchStrength"] = touch_data & 15
if panel_data["touchType"] == "Swipe":
panel_data["panelIdFrom"] = int.from_bytes(raw_panel_data[3:5], "big")
func(panel_data)
else:
break
else:
if len(data) >= 2:
panels_coming = int.from_bytes(data[0:2], "big")
data = data[2:]
else:
break


#######################################################
#### ERRORS ####
Expand Down