Skip to content

Commit b64ac3f

Browse files
Merge b210ad7 into f6844c3
2 parents f6844c3 + b210ad7 commit b64ac3f

3 files changed

Lines changed: 593 additions & 450 deletions

File tree

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
# A part of NonVisual Desktop Access (NVDA)
2+
# Copyright (C) 2025 NV Access Limited, Zoomax
3+
# This file is covered by the GNU General Public License.
4+
# See the file COPYING for more details.
5+
# NLS eReader Zoomax driver for NVDA.
6+
7+
import bdDetect
8+
import braille
9+
import brailleInput
10+
import hwIo
11+
import inputCore
12+
import serial
13+
from dataclasses import dataclass
14+
from enum import Enum
15+
from logHandler import log
16+
17+
TIMEOUT_SEC = 0.2
18+
BAUD_RATE = 19200
19+
CONNECT_RETRIES = 5
20+
21+
COMMUNICATION_ESCAPE_BYTE = b"\x1b"
22+
""" The device communication protocol is escape ASCII character based.
23+
So each command starts with the escape character.
24+
When part of the command payload, the escape character is duplicated. """
25+
26+
27+
class DeviceCommand(bytes, Enum):
28+
DISPLAY_DATA = b"\x01"
29+
REQUEST_INFO = b"\x02"
30+
REQUEST_VERSION = b"\x05"
31+
REPEAT_ALL = b"\x08"
32+
PROTOCOL_ONOFF = b"\x15"
33+
ROUTING_KEYS = b"\x22"
34+
DISPLAY_KEYS = b"\x24"
35+
BRAILLE_KEYS = b"\x33"
36+
JOYSTICK_KEYS = b"\x34"
37+
DEVICE_ID = b"\x84"
38+
SERIAL_NUMBER = b"\x8a"
39+
40+
41+
@dataclass(frozen=True)
42+
class DeviceResponseInfo:
43+
length: int
44+
keys: tuple[str, ...] | None = None
45+
46+
47+
COMMAND_RESPONSE_INFO: dict[DeviceCommand, DeviceResponseInfo] = {
48+
DeviceCommand.DISPLAY_DATA: DeviceResponseInfo(1),
49+
DeviceCommand.DISPLAY_KEYS: DeviceResponseInfo(
50+
1,
51+
("d1", "d2", "d3", "d4", "d5", "d6"),
52+
),
53+
DeviceCommand.BRAILLE_KEYS: DeviceResponseInfo(
54+
2,
55+
(
56+
"bl",
57+
"br",
58+
"bs",
59+
None,
60+
"s1",
61+
"s2",
62+
"s3",
63+
"s4", # byte 1
64+
"b1",
65+
"b2",
66+
"b3",
67+
"b4",
68+
"b5",
69+
"b6",
70+
"b7",
71+
"b8", # byte 2
72+
),
73+
),
74+
DeviceCommand.JOYSTICK_KEYS: DeviceResponseInfo(
75+
1,
76+
("up", "left", "down", "right", "select"),
77+
),
78+
DeviceCommand.ROUTING_KEYS: DeviceResponseInfo(5),
79+
DeviceCommand.DEVICE_ID: DeviceResponseInfo(16),
80+
DeviceCommand.SERIAL_NUMBER: DeviceResponseInfo(8),
81+
}
82+
83+
84+
class BrailleDisplayDriver(braille.BrailleDisplayDriver):
85+
_dev: hwIo.IoBase
86+
name = "nlseReaderZoomax"
87+
# Translators: Names of braille displays.
88+
description = _("NLS eReader Zoomax")
89+
isThreadSafe = True
90+
supportsAutomaticDetection = True
91+
92+
@classmethod
93+
def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar):
94+
driverRegistrar.addUsbDevices(
95+
bdDetect.DeviceType.SERIAL,
96+
{
97+
"VID_1A86&PID_7523", # CH340 USB to serial chip
98+
},
99+
)
100+
101+
driverRegistrar.addBluetoothDevices(lambda m: m.id.startswith("NLS eReader Z"))
102+
103+
def _connect(self, port: str) -> bool:
104+
for portType, portId, port, portInfo in self._getTryPorts(port):
105+
try:
106+
self._dev = hwIo.Serial(
107+
port,
108+
baudrate=BAUD_RATE,
109+
bytesize=serial.EIGHTBITS,
110+
parity=serial.PARITY_NONE,
111+
stopbits=serial.STOPBITS_ONE,
112+
timeout=TIMEOUT_SEC,
113+
writeTimeout=TIMEOUT_SEC,
114+
onReceive=self._onReceive,
115+
)
116+
except EnvironmentError:
117+
log.debugWarning("Port not yet available.", exc_info=True)
118+
continue
119+
120+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF.value, False)
121+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF.value, True)
122+
self._sendRequest(DeviceCommand.REPEAT_ALL.value)
123+
124+
for i in range(CONNECT_RETRIES):
125+
self._dev.waitForRead(TIMEOUT_SEC)
126+
if self.numCells:
127+
break
128+
129+
if self.numCells:
130+
log.info(f"Device connected via {portType} ({port})")
131+
return True
132+
log.info("Device arrival timeout")
133+
self._dev.close()
134+
return False
135+
136+
def __init__(self, port: str = "auto"):
137+
log.info("Initializing nlseReaderZoomax driver")
138+
super().__init__()
139+
self.numCells = 0
140+
self._dev = None
141+
142+
if not self._connect(port):
143+
raise RuntimeError("Could not connect to device")
144+
145+
self._keysDown: dict[bytes, bytes] = {}
146+
self._ignoreKeyReleases = False
147+
148+
def terminate(self):
149+
try:
150+
super().terminate()
151+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF, False)
152+
except EnvironmentError:
153+
pass
154+
finally:
155+
self._dev.close()
156+
157+
def _sendRequest(self, command: bytes, arg: bytes | bool | int = b""):
158+
typeErrorString = "Expected param '{}' to be of type '{}', got '{}'"
159+
if not isinstance(arg, bytes):
160+
if isinstance(arg, bool):
161+
arg = hwIo.boolToByte(arg)
162+
elif isinstance(arg, int):
163+
arg = hwIo.intToByte(arg)
164+
else:
165+
raise TypeError(typeErrorString.format("arg", "bytes, bool, or int", type(arg).__name__))
166+
167+
if not isinstance(command, bytes):
168+
raise TypeError(typeErrorString.format("command", "bytes", type(command).__name__))
169+
170+
# doubling the escape characters in the data (arg) part
171+
# as required by the device communication protocol
172+
arg = arg.replace(COMMUNICATION_ESCAPE_BYTE, COMMUNICATION_ESCAPE_BYTE * 2)
173+
174+
data = b"".join(
175+
[
176+
COMMUNICATION_ESCAPE_BYTE,
177+
command,
178+
arg,
179+
],
180+
)
181+
self._dev.write(data)
182+
183+
def _onReceive(self, data: bytes):
184+
if data != COMMUNICATION_ESCAPE_BYTE:
185+
log.debugWarning(f"Ignoring byte before escape: {data!r}")
186+
return
187+
# data only contained the escape. Read the rest from the device.
188+
stream = self._dev
189+
command = stream.read(1)
190+
length = COMMAND_RESPONSE_INFO.get(command, DeviceResponseInfo(0)).length
191+
arg = stream.read(length)
192+
self._handleResponse(command, arg)
193+
194+
def _handleResponse(self, command: bytes, arg: bytes):
195+
if command == DeviceCommand.DISPLAY_DATA:
196+
self.numCells = ord(arg)
197+
elif command in COMMAND_RESPONSE_INFO:
198+
arg = int.from_bytes(reversed(arg))
199+
if arg < self._keysDown.get(command, 0):
200+
# Release.
201+
if not self._ignoreKeyReleases:
202+
# The first key released executes the key combination.
203+
try:
204+
inputCore.manager.executeGesture(InputGesture(self._keysDown))
205+
except inputCore.NoInputGestureAction:
206+
pass
207+
# Any further releases are just the rest of the keys in the combination being released,
208+
# so they should be ignored.
209+
self._ignoreKeyReleases = True
210+
else:
211+
# Press.
212+
# This begins a new key combination.
213+
self._ignoreKeyReleases = False
214+
if arg > 0:
215+
self._keysDown[command] = arg
216+
elif command in self._keysDown:
217+
# All keys in this group have been released.
218+
# Remove this group so it doesn't count as a group with keys down.
219+
del self._keysDown[command]
220+
else:
221+
log.debugWarning(f"Unknown command {command!r}, arg {arg!r}")
222+
223+
def display(self, cells: list[int]):
224+
# cells will already be padded up to numCells.
225+
arg = bytes(cells)
226+
self._sendRequest(DeviceCommand.DISPLAY_DATA, arg)
227+
228+
gestureMap = inputCore.GlobalGestureMap(
229+
{
230+
"globalCommands.GlobalCommands": {
231+
"braille_scrollBack": ("br(nlseReaderZoomax):d2",),
232+
"braille_scrollForward": ("br(nlseReaderZoomax):d5",),
233+
"braille_previousLine": ("br(nlseReaderZoomax):d1",),
234+
"braille_nextLine": ("br(nlseReaderZoomax):d3",),
235+
"braille_routeTo": ("br(nlseReaderZoomax):routing",),
236+
"kb:upArrow": ("br(nlseReaderZoomax):up",),
237+
"kb:downArrow": ("br(nlseReaderZoomax):down",),
238+
"kb:leftArrow": ("br(nlseReaderZoomax):left",),
239+
"kb:rightArrow": ("br(nlseReaderZoomax):right",),
240+
"kb:enter": ("br(nlseReaderZoomax):select",),
241+
},
242+
},
243+
)
244+
245+
246+
class InputGesture(braille.BrailleDisplayGesture, brailleInput.BrailleInputGesture):
247+
source = BrailleDisplayDriver.name
248+
249+
def __init__(self, keysDown: dict[bytes, bytes]):
250+
super().__init__()
251+
self.keysDown = keysDown
252+
253+
SYSTEM_KEYS_MASK = 0xF8
254+
SPACEBAR_KEYS_MASK = 0x07
255+
256+
self.keyNames = names = []
257+
for group, groupKeysDown in keysDown.items():
258+
if (
259+
group == DeviceCommand.BRAILLE_KEYS
260+
and len(keysDown) == 1
261+
and not groupKeysDown & SYSTEM_KEYS_MASK
262+
):
263+
self.dots = groupKeysDown >> 8
264+
self.space = groupKeysDown & SPACEBAR_KEYS_MASK
265+
if group == DeviceCommand.ROUTING_KEYS:
266+
for index in range(braille.handler.display.numCells):
267+
if groupKeysDown & (1 << index):
268+
self.routingIndex = index
269+
names.append("routing")
270+
break
271+
else:
272+
for index, name in enumerate(COMMAND_RESPONSE_INFO.get(group).keys):
273+
if groupKeysDown & (1 << index):
274+
names.append(name)
275+
276+
self.id = "+".join(names)

0 commit comments

Comments
 (0)