55
66"""Unit tests for the orderedWinEventLimiter module.
77"""
8-
8+ import inspect
9+ import re
910import unittest
11+ from typing import List , Iterator , Callable
1012import winUser
13+ from IAccessibleHandler import orderedWinEventLimiter
1114from IAccessibleHandler .orderedWinEventLimiter import OrderedWinEventLimiter
1215
16+
17+ def softAssert (errorList : List [AssertionError ], method : Callable , * args , ** kwargs ):
18+ try :
19+ method (* args , ** kwargs )
20+ except AssertionError as e :
21+ errorList .append (e )
22+
23+
1324specialCaseEvents = [
1425 winUser .EVENT_SYSTEM_FOREGROUND ,
1526 winUser .EVENT_OBJECT_FOCUS ,
2233]
2334
2435
36+ def _getNonSpecialCaseEvents () -> Iterator [int ]:
37+ objectOrSystemEvent = re .compile ("^EVENT_(OBJECT|SYSTEM)_" )
38+ for name , value in inspect .getmembers (winUser ):
39+ if value not in specialCaseEvents and objectOrSystemEvent .match (name ):
40+ yield value
41+
42+
43+ nonSpecialCaseEvents : List [int ] = list (_getNonSpecialCaseEvents ())
44+
45+
2546class TestOrderedWinEventLimiter (unittest .TestCase ):
2647
48+ def test_nonSpecialCaseEvents (self ):
49+ """Test that the list of events without special cases matches expectations
50+ """
51+ self .assertEqual (39 , len (nonSpecialCaseEvents ))
52+
2753 def test_maxFocusEvents (self ):
2854 limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
2955 for n in range (0 , 5 ):
@@ -151,6 +177,162 @@ def test_hideNoOverridesShow_whenDetailsDontMatch(self):
151177 ]
152178 self .assertEqual (expectedEvents , actualEvents )
153179
180+ def test_alwaysAllowedObjects_specialCaseEvents (self ):
181+ # We have events from two unique objects:
182+ # Window, objectID, childID
183+ allowedSource = (1 , 1 , 1 )
184+
185+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
186+ for n in range (2000 ): # send many events, to saturate all limits.
187+ eventId = specialCaseEvents [n % len (specialCaseEvents )]
188+ limiter .addEvent (eventId , * allowedSource , threadID = 0 )
189+ events = limiter .flushEvents (alwaysAllowedObjects = [allowedSource , ])
190+
191+ expected = [
192+ # Two Foreground events, because they are added to multiple queues.
193+ # Added to both _focusEventCache and _genericEventCache
194+ # See also test_limitEventsPerThread
195+ (winUser .EVENT_SYSTEM_FOREGROUND , * allowedSource ),
196+ (winUser .EVENT_SYSTEM_FOREGROUND , * allowedSource ),
197+ (winUser .EVENT_OBJECT_FOCUS , * allowedSource ),
198+ (winUser .EVENT_OBJECT_HIDE , * allowedSource ), # latest out of show / hide is kept
199+ (winUser .EVENT_SYSTEM_MENUPOPUPEND , * allowedSource ), # Only one menu event is allowed
200+ ]
201+ self .assertEqual (expected , events )
202+
203+ def test_alwaysAllowedObjects_onlyLatestEventKept (self ):
204+ # We have events from two unique objects:
205+ # Window, objectID, childID
206+ allowedSource = (1 , 1 , 1 )
207+ otherSource = (2 , 2 , 2 ,)
208+
209+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
210+ for n in range (50 ): # send many value changed events
211+ limiter .addEvent (winUser .EVENT_OBJECT_VALUECHANGE , * allowedSource , threadID = 0 )
212+ limiter .addEvent (winUser .EVENT_OBJECT_VALUECHANGE , * otherSource , threadID = 0 )
213+ events = limiter .flushEvents (alwaysAllowedObjects = [allowedSource , ])
214+ # only the most recent event of each object is kept, all previous duplicates are discarded
215+ self .assertEqual (2 , len (events ))
216+
217+ def test_threadLimit_singleObject (self ):
218+ """Test that only the latest events are kept when the thread limit is exceeded
219+ """
220+ # We have events from two unique objects:
221+ # Window, objectID, childID
222+ source = (2 , 2 , 2 ,)
223+
224+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
225+
226+ for n in range (500 ): # exceed the limit for a single thread
227+ eventId = nonSpecialCaseEvents [n % len (nonSpecialCaseEvents )]
228+ # same thread, different object. Use a second object to aid tracking.
229+ limiter .addEvent (eventId , * source , threadID = 0 )
230+
231+ events = limiter .flushEvents ()
232+ errors = []
233+ expectedEventCount = orderedWinEventLimiter .MAX_WINEVENTS_PER_THREAD
234+ self .assertEqual (expectedEventCount , len (events ))
235+
236+ def test_threadLimit_noCanary (self ):
237+ """Test that only the latest events are kept when the thread limit is exceeded
238+ """
239+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
240+
241+ for n in range (500 ): # exceed the limit for a single thread
242+ eventId = nonSpecialCaseEvents [n % len (nonSpecialCaseEvents )]
243+ # same thread, different object. Ensure there are no duplicates
244+ # Window, objectID, childID
245+ source = (2 , 2 , n ,)
246+ limiter .addEvent (eventId , * source , threadID = 0 )
247+
248+ events = limiter .flushEvents ()
249+
250+ errors = []
251+ expectedEventCount = orderedWinEventLimiter .MAX_WINEVENTS_PER_THREAD
252+ softAssert (errors , self .assertEqual , expectedEventCount , len (events )) # Fails with 11 actual events
253+ self .assertListEqual ([], errors )
254+
255+ def test_threadLimit_withCanaryAtStart (self ):
256+ """Test that only the latest events are kept when the thread limit is exceeded
257+ """
258+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
259+
260+ # Window, objectID, childID
261+ canaryObject = (1 , 1 , 1 )
262+ eventStartCanary = (winUser .EVENT_OBJECT_VALUECHANGE , * canaryObject )
263+ limiter .addEvent (* eventStartCanary , threadID = 0 )
264+
265+ for n in range (500 ): # exceed the limit for a single thread
266+ eventId = nonSpecialCaseEvents [n % len (nonSpecialCaseEvents )]
267+ # same thread, different object. Ensure there are no duplicates
268+ # Window, objectID, childID
269+ source = (2 , 2 , n ,)
270+ limiter .addEvent (eventId , * source , threadID = 0 )
271+
272+ events = limiter .flushEvents ()
273+
274+ errors = []
275+ expectedEventCount = orderedWinEventLimiter .MAX_WINEVENTS_PER_THREAD
276+ softAssert (errors , self .assertEqual , expectedEventCount , len (events )) # Fails with 11 actual events
277+ softAssert (errors , self .assertNotIn , eventStartCanary , events )
278+ self .assertListEqual ([], errors )
279+
280+ def test_threadLimit_canaryStartAndEnd (self ):
281+ """Test that only the latest events are kept when the thread limit is exceeded
282+ """
283+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
284+
285+ # Window, objectID, childID
286+ canaryObject = (1 , 1 , 1 )
287+ eventStartCanary = (winUser .EVENT_OBJECT_VALUECHANGE , * canaryObject )
288+ limiter .addEvent (* eventStartCanary , threadID = 0 )
289+
290+ for n in range (500 ): # exceed the limit for a single thread
291+ eventId = nonSpecialCaseEvents [n % len (nonSpecialCaseEvents )]
292+ # same thread, different object. Ensure there are no duplicates
293+ # Window, objectID, childID
294+ source = (2 , 2 , n ,)
295+ limiter .addEvent (eventId , * source , threadID = 0 )
296+
297+ # Note event type must differ from start canary to ensure they are not duplicates
298+ eventEndCanary = (winUser .EVENT_OBJECT_NAMECHANGE , * canaryObject )
299+ limiter .addEvent (* eventEndCanary , threadID = 0 )
300+
301+ events = limiter .flushEvents ()
302+ errors = []
303+ expectedEventCount = orderedWinEventLimiter .MAX_WINEVENTS_PER_THREAD
304+ softAssert (errors , self .assertEqual , expectedEventCount , len (events )) # Fails with 11 actual events
305+ softAssert (errors , self .assertIn , eventEndCanary , events )
306+ softAssert (errors , self .assertNotIn , eventStartCanary , events )
307+ self .assertListEqual ([], errors )
308+
309+ def test_alwaysAllowedObjects (self ):
310+ """Matches test_threadLimit_canaryStartAndEnd, but allows events from the first object
311+ """
312+ limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
313+
314+ # Window, objectID, childID
315+ canaryObject = (1 , 1 , 1 )
316+ eventStartCanary = (winUser .EVENT_OBJECT_VALUECHANGE , * canaryObject )
317+ limiter .addEvent (* eventStartCanary , threadID = 0 )
318+
319+ for n in range (orderedWinEventLimiter .MAX_WINEVENTS_PER_THREAD ): # exceed the limit for a single thread
320+ eventId = nonSpecialCaseEvents [n % len (nonSpecialCaseEvents )]
321+ # same thread, different object. Ensure there are no duplicates
322+ # Window, objectID, childID
323+ source = (2 , 2 , n ,)
324+ limiter .addEvent (eventId , * source , threadID = 0 )
325+
326+ eventEndCanary = (winUser .EVENT_OBJECT_NAMECHANGE , * canaryObject )
327+ limiter .addEvent (* eventEndCanary , threadID = 0 )
328+
329+ events = limiter .flushEvents (alwaysAllowedObjects = [canaryObject , ])
330+ # only the most recent event of each object is kept, all previous duplicates are discarded
331+ self .assertEqual (11 , len (events ))
332+ self .assertIn (eventStartCanary , events )
333+ self .assertEqual (eventStartCanary , events [0 ])
334+ self .assertIn (eventEndCanary , events )
335+
154336 def test_limitEventsPerThread (self ):
155337 limiter = OrderedWinEventLimiter (maxFocusItems = 4 )
156338 for n in reversed (range (2000 )): # send many events, to saturate all limits.
@@ -165,10 +347,10 @@ def test_limitEventsPerThread(self):
165347 for e in events
166348 ]
167349 # TODO: Note: repeated Id's (0 and 8) are EVENT_SYSTEM_FOREGROUND see test_maxFocusEvents
168- expectedIds = [26 , 24 , 19 , 18 , 16 , 11 , 10 , 9 , 8 , 8 , 4 , 3 , 2 , 1 , 0 , 0 ]
350+ expectedIds = [24 , 19 , 18 , 16 , 11 , 10 , 9 , 8 , 8 , 4 , 3 , 2 , 1 , 0 , 0 ]
169351 self .assertEqual (expectedIds , windowIds )
170- # TODO:
171- # Why isn't this equal to MAX_WINEVENTS_PER_THREAD=10
172- # There are also 4 focus events .
173- # But the total is 16 not 10+4=14?
174- self .assertEqual (len (windowIds ), 16 )
352+ # equal to MAX_WINEVENTS_PER_THREAD=10
353+ # Plus 4 focus events,
354+ # Plus the last menu event .
355+ # All totalling 15.
356+ self .assertEqual (len (windowIds ), 15 )
0 commit comments