Skip to content

Commit cfd67c5

Browse files
Merge ebda4ac into 8c9efe8
2 parents 8c9efe8 + ebda4ac commit cfd67c5

3 files changed

Lines changed: 612 additions & 512 deletions

File tree

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
# A part of NonVisual Desktop Access (NVDA)
2+
# Copyright (C) 2025 NV Access Limited, Zoomax
3+
# This file is covered by the GNU General Public License.
4+
# See the file COPYING for more details.
5+
# NLS eReader Zoomax driver for NVDA.
6+
7+
import bdDetect
8+
import braille
9+
import brailleInput
10+
import hwIo
11+
import inputCore
12+
import serial
13+
import time
14+
from dataclasses import dataclass
15+
from enum import Enum
16+
from logHandler import log
17+
from typing import Optional, Tuple, Dict
18+
19+
TIMEOUT_SEC = 0.2
20+
BAUD_RATE = 19200
21+
CONNECT_RETRIES = 5
22+
TIMEOUT_BETWEEN_RETRIES_SEC = 2
23+
24+
COMMUNICATION_ESCAPE_BYTE = b"\x1b"
25+
26+
27+
class DeviceCommand(bytes, Enum):
28+
DISPLAY_DATA = b"\x01"
29+
REQUEST_INFO = b"\x02"
30+
REQUEST_VERSION = b"\x05"
31+
REPEAT_ALL = b"\x08"
32+
PROTOCOL_ONOFF = b"\x15"
33+
ROUTING_KEYS = b"\x22"
34+
DISPLAY_KEYS = b"\x24"
35+
BRAILLE_KEYS = b"\x33"
36+
JOYSTICK_KEYS = b"\x34"
37+
DEVICE_ID = b"\x84"
38+
SERIAL_NUMBER = b"\x8a"
39+
40+
41+
@dataclass(frozen=True)
42+
class DeviceResponseInfo:
43+
length: int
44+
keys: Optional[Tuple[str, ...]] = None
45+
46+
47+
COMMAND_RESPONSE_INFO: Dict[DeviceCommand, DeviceResponseInfo] = {
48+
DeviceCommand.DISPLAY_DATA: DeviceResponseInfo(1),
49+
DeviceCommand.DISPLAY_KEYS: DeviceResponseInfo(
50+
1,
51+
("d1", "d2", "d3", "d4", "d5", "d6"),
52+
),
53+
DeviceCommand.BRAILLE_KEYS: DeviceResponseInfo(
54+
2,
55+
(
56+
"bl",
57+
"br",
58+
"bs",
59+
None,
60+
"s1",
61+
"s2",
62+
"s3",
63+
"s4", # byte 1
64+
"b1",
65+
"b2",
66+
"b3",
67+
"b4",
68+
"b5",
69+
"b6",
70+
"b7",
71+
"b8", # byte 2
72+
),
73+
),
74+
DeviceCommand.JOYSTICK_KEYS: DeviceResponseInfo(
75+
1,
76+
("up", "left", "down", "right", "select"),
77+
),
78+
DeviceCommand.ROUTING_KEYS: DeviceResponseInfo(5),
79+
DeviceCommand.DEVICE_ID: DeviceResponseInfo(16),
80+
DeviceCommand.SERIAL_NUMBER: DeviceResponseInfo(8),
81+
}
82+
83+
84+
class BrailleDisplayDriver(braille.BrailleDisplayDriver):
85+
_dev: hwIo.IoBase
86+
name = "nlseReaderZoomax"
87+
description = _("NLS eReader Zoomax")
88+
isThreadSafe = True
89+
supportsAutomaticDetection = True
90+
91+
@classmethod
92+
def registerAutomaticDetection(cls, driverRegistrar: bdDetect.DriverRegistrar):
93+
driverRegistrar.addUsbDevices(
94+
bdDetect.DeviceType.SERIAL,
95+
{
96+
"VID_1A86&PID_7523", # CH340 USB to serial chip
97+
},
98+
)
99+
100+
driverRegistrar.addBluetoothDevices(lambda m: m.id.startswith("NLS eReader Z"))
101+
102+
@classmethod
103+
def getManualPorts(cls):
104+
return braille.getSerialPorts()
105+
106+
def _connect(self, port: str) -> None:
107+
for portType, portId, port, portInfo in self._getTryPorts(port):
108+
try:
109+
self._dev = hwIo.Serial(
110+
port,
111+
baudrate=BAUD_RATE,
112+
bytesize=serial.EIGHTBITS,
113+
parity=serial.PARITY_NONE,
114+
stopbits=serial.STOPBITS_ONE,
115+
timeout=TIMEOUT_SEC,
116+
writeTimeout=TIMEOUT_SEC,
117+
onReceive=self._onReceive,
118+
)
119+
except EnvironmentError:
120+
log.info("Port not yet available.")
121+
log.debugWarning("", exc_info=True)
122+
if self._dev:
123+
self._dev.close()
124+
continue
125+
126+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF.value, False)
127+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF.value, True)
128+
self._sendRequest(DeviceCommand.REPEAT_ALL.value)
129+
130+
for i in range(CONNECT_RETRIES):
131+
self._dev.waitForRead(TIMEOUT_SEC)
132+
if self.numCells:
133+
break
134+
135+
if self.numCells:
136+
log.info(f"Device connected via {portType} ({port})")
137+
return True
138+
log.info("Device arrival timeout")
139+
self._dev.close()
140+
return False
141+
142+
def __init__(self, port="auto"):
143+
log.info("Initializing nlseReaderZoomax driver")
144+
super().__init__()
145+
self.numCells = 0
146+
self._deviceID: str | None = None
147+
self._dev = None
148+
149+
for i in range(CONNECT_RETRIES):
150+
if self._connect(port):
151+
break
152+
else:
153+
time.sleep(TIMEOUT_BETWEEN_RETRIES_SEC)
154+
155+
self._keysDown = {}
156+
self._ignoreKeyReleases = False
157+
158+
def terminate(self):
159+
try:
160+
super().terminate()
161+
self._sendRequest(DeviceCommand.PROTOCOL_ONOFF, False)
162+
except EnvironmentError:
163+
pass
164+
finally:
165+
self._dev.close()
166+
167+
def _sendRequest(self, command: bytes, arg: bytes | bool | int = b""):
168+
typeErrorString = "Expected param '{}' to be of type '{}', got '{}'"
169+
if not isinstance(arg, bytes):
170+
if isinstance(arg, bool):
171+
arg = hwIo.boolToByte(arg)
172+
elif isinstance(arg, int):
173+
arg = hwIo.intToByte(arg)
174+
else:
175+
raise TypeError(typeErrorString.format("arg", "bytes, bool, or int", type(arg).__name__))
176+
177+
if not isinstance(command, bytes):
178+
raise TypeError(typeErrorString.format("command", "bytes", type(command).__name__))
179+
180+
# doubling the escape characters in the data (arg) part
181+
# as requried by the device communication protocol
182+
arg = arg.replace(COMMUNICATION_ESCAPE_BYTE, COMMUNICATION_ESCAPE_BYTE * 2)
183+
184+
data = b"".join(
185+
[
186+
COMMUNICATION_ESCAPE_BYTE,
187+
command,
188+
arg,
189+
],
190+
)
191+
self._dev.write(data)
192+
193+
def _onReceive(self, data: bytes):
194+
if data != COMMUNICATION_ESCAPE_BYTE:
195+
log.debugWarning(f"Ignoring byte before escape: {data!r}")
196+
return
197+
# data only contained the escape. Read the rest from the device.
198+
stream = self._dev
199+
command = stream.read(1)
200+
length = COMMAND_RESPONSE_INFO.get(command, DeviceResponseInfo(0)).length
201+
arg = stream.read(length)
202+
self._handleResponse(command, arg)
203+
204+
def _handleResponse(self, command: bytes, arg: bytes):
205+
if command == DeviceCommand.DISPLAY_DATA:
206+
self.numCells = ord(arg)
207+
elif command == DeviceCommand.DEVICE_ID:
208+
# Short ids can be padded with either nulls or spaces.
209+
arg = arg.rstrip(b"\0 ")
210+
# Assumption: all device IDs can be decoded with latin-1.
211+
# If not, we wish to know about it, allow decode to raise.
212+
self._deviceID = arg.decode("latin-1", errors="strict")
213+
elif command in COMMAND_RESPONSE_INFO:
214+
arg = int.from_bytes(reversed(arg))
215+
if arg < self._keysDown.get(command, 0):
216+
# Release.
217+
if not self._ignoreKeyReleases:
218+
# The first key released executes the key combination.
219+
try:
220+
inputCore.manager.executeGesture(InputGesture(self._deviceID, self._keysDown))
221+
except inputCore.NoInputGestureAction:
222+
pass
223+
# Any further releases are just the rest of the keys in the combination being released,
224+
# so they should be ignored.
225+
self._ignoreKeyReleases = True
226+
else:
227+
# Press.
228+
# This begins a new key combination.
229+
self._ignoreKeyReleases = False
230+
if arg > 0:
231+
self._keysDown[command] = arg
232+
elif command in self._keysDown:
233+
# All keys in this group have been released.
234+
# #3541: Remove this group so it doesn't count as a group with keys down.
235+
del self._keysDown[command]
236+
else:
237+
log.debugWarning(f"Unknown command {command!r}, arg {arg!r}")
238+
239+
def display(self, cells: list[int]):
240+
# cells will already be padded up to numCells.
241+
arg = bytes(cells)
242+
self._sendRequest(DeviceCommand.DISPLAY_DATA, arg)
243+
244+
gestureMap = inputCore.GlobalGestureMap(
245+
{
246+
"globalCommands.GlobalCommands": {
247+
"braille_scrollBack": ("br(nlseReaderZoomax):d2",),
248+
"braille_scrollForward": ("br(nlseReaderZoomax):d5",),
249+
"braille_previousLine": ("br(nlseReaderZoomax):d1",),
250+
"braille_nextLine": ("br(nlseReaderZoomax):d3",),
251+
"braille_routeTo": ("br(nlseReaderZoomax):routing",),
252+
"kb:upArrow": ("br(nlseReaderZoomax):up",),
253+
"kb:downArrow": ("br(nlseReaderZoomax):down",),
254+
"kb:leftArrow": ("br(nlseReaderZoomax):left",),
255+
"kb:rightArrow": ("br(nlseReaderZoomax):right",),
256+
"kb:enter": ("br(nlseReaderZoomax):select",),
257+
},
258+
},
259+
)
260+
261+
262+
class InputGesture(braille.BrailleDisplayGesture, brailleInput.BrailleInputGesture):
263+
source = BrailleDisplayDriver.name
264+
265+
def __init__(self, model, keysDown):
266+
super(InputGesture, self).__init__()
267+
# Model identifiers should not contain spaces.
268+
if model:
269+
self.model = model.replace(" ", "")
270+
assert self.model.isalnum()
271+
self.keysDown = dict(keysDown)
272+
273+
SYSTEM_KEYS_MASK = 0xF8
274+
SPACEBAR_KEYS_MASK = 0x07
275+
276+
self.keyNames = names = []
277+
for group, groupKeysDown in keysDown.items():
278+
if (
279+
group == DeviceCommand.BRAILLE_KEYS
280+
and len(keysDown) == 1
281+
and not groupKeysDown & SYSTEM_KEYS_MASK
282+
):
283+
self.dots = groupKeysDown >> 8
284+
self.space = groupKeysDown & SPACEBAR_KEYS_MASK
285+
if group == DeviceCommand.ROUTING_KEYS:
286+
for index in range(braille.handler.display.numCells):
287+
if groupKeysDown & (1 << index):
288+
self.routingIndex = index
289+
names.append("routing")
290+
break
291+
else:
292+
for index, name in enumerate(COMMAND_RESPONSE_INFO.get(group).keys):
293+
if groupKeysDown & (1 << index):
294+
names.append(name)
295+
296+
self.id = "+".join(names)

0 commit comments

Comments
 (0)