forked from rustdesk/rustdesk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pynput_service.py
124 lines (108 loc) · 3.85 KB
/
pynput_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from pynput.keyboard import Key, Controller
from pynput.keyboard._xorg import KeyCode
from pynput._util.xorg import display_manager
import Xlib
import os
import sys
import socket
KeyCode._from_symbol("\0") # test
class MyController(Controller):
def _handle(self, key, is_press):
"""Resolves a key identifier and sends a keyboard event.
:param event: The *X* keyboard event.
:param int keysym: The keysym to handle.
"""
event = Xlib.display.event.KeyPress if is_press \
else Xlib.display.event.KeyRelease
keysym = self._keysym(key)
# Make sure to verify that the key was resolved
if keysym is None:
raise self.InvalidKeyException(key)
# If the key has a virtual key code, use that immediately with
# fake_input; fake input,being an X server extension, has access to
# more internal state that we do
if key.vk is not None:
with display_manager(self._display) as dm:
Xlib.ext.xtest.fake_input(
dm,
Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease,
dm.keysym_to_keycode(key.vk))
# Otherwise use XSendEvent; we need to use this in the general case to
# work around problems with keyboard layouts
else:
try:
keycode, shift_state = self.keyboard_mapping[keysym]
with self.modifiers as modifiers:
alt_gr = Key.alt_gr in modifiers
if alt_gr:
self._send_key(event, keycode, shift_state)
else:
with display_manager(self._display) as dm:
Xlib.ext.xtest.fake_input(
dm,
Xlib.X.KeyPress if is_press else Xlib.X.KeyRelease,
keycode)
except KeyError:
with self._borrow_lock:
keycode, index, count = self._borrows[keysym]
self._send_key(
event,
keycode,
index_to_shift(self._display, index))
count += 1 if is_press else -1
self._borrows[keysym] = (keycode, index, count)
# Notify any running listeners
self._emit('_on_fake_event', key, is_press)
keyboard = MyController()
server_address = sys.argv[1]
if not os.path.exists(os.path.dirname(server_address)):
os.makedirs(os.path.dirname(server_address))
try:
os.unlink(server_address)
except OSError:
if os.path.exists(server_address):
raise
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(server_address)
server.listen(1)
clientsocket, address = server.accept()
os.system('chmod a+rw %s'%server_address)
print("Got pynput connection")
def loop():
global keyboard
buf = []
while True:
data = clientsocket.recv(1024)
if not data:
print("Connection broken")
break
buf.extend(data)
while buf:
n = buf[0]
n = n + 1
if len(buf) < n:
break
msg = bytearray(buf[1:n]).decode("utf-8")
buf = buf[n:]
if len(msg) < 2:
continue
if msg[1] == "\0":
keyboard = MyController()
print("Keyboard reset")
continue
if len(msg) == 2:
name = msg[1]
else:
name = KeyCode._from_symbol(msg[1:])
if str(name) == "<0>":
continue
try:
if msg[0] == "p":
keyboard.press(name)
else:
keyboard.release(name)
except Exception as e:
print(e)
loop()
clientsocket.close()
server.close()