@@ -175,33 +175,76 @@ pub fn poll(
175175) Poller (StreamEnum ) {
176176 const enum_fields = @typeInfo (StreamEnum ).Enum .fields ;
177177 var result : Poller (StreamEnum ) = undefined ;
178+
179+ if (builtin .os .tag == .windows ) result .windows = .{
180+ .first_read_done = false ,
181+ .overlapped = [1 ]os.windows.OVERLAPPED {
182+ mem .zeroes (os .windows .OVERLAPPED ),
183+ } ** enum_fields .len ,
184+ .active = .{
185+ .count = 0 ,
186+ .handles_buf = undefined ,
187+ .stream_map = undefined ,
188+ },
189+ };
190+
178191 inline for (0.. enum_fields .len ) | i | {
179192 result .fifos [i ] = .{
180193 .allocator = allocator ,
181194 .buf = &.{},
182195 .head = 0 ,
183196 .count = 0 ,
184197 };
185- result .poll_fds [i ] = .{
186- .fd = @field (files , enum_fields [i ].name ).handle ,
187- .events = os .POLL .IN ,
188- .revents = undefined ,
189- };
198+ if (builtin .os .tag == .windows ) {
199+ result .windows .active .handles_buf [i ] = @field (files , enum_fields [i ].name ).handle ;
200+ } else {
201+ result .poll_fds [i ] = .{
202+ .fd = @field (files , enum_fields [i ].name ).handle ,
203+ .events = os .POLL .IN ,
204+ .revents = undefined ,
205+ };
206+ }
190207 }
191208 return result ;
192209}
193210
211+ pub const PollFifo = std .fifo .LinearFifo (u8 , .Dynamic );
212+
194213pub fn Poller (comptime StreamEnum : type ) type {
195214 return struct {
196215 const enum_fields = @typeInfo (StreamEnum ).Enum .fields ;
197- const Fifo = std .fifo .LinearFifo (u8 , .Dynamic );
198-
199- fifos : [enum_fields .len ]Fifo ,
200- poll_fds : [enum_fields .len ]std.os.pollfd ,
216+ const PollFd = if (builtin .os .tag == .windows ) void else std .os .pollfd ;
217+
218+ fifos : [enum_fields .len ]PollFifo ,
219+ poll_fds : [enum_fields .len ]PollFd ,
220+ windows : if (builtin.os.tag == .windows ) struct {
221+ first_read_done : bool ,
222+ overlapped : [enum_fields .len ]os.windows.OVERLAPPED ,
223+ active : struct {
224+ count : math .IntFittingRange (0 , enum_fields .len ),
225+ handles_buf : [enum_fields .len ]os.windows.HANDLE ,
226+ stream_map : [enum_fields .len ]StreamEnum ,
227+
228+ pub fn removeAt (self : * @This (), index : u32 ) void {
229+ std .debug .assert (index < self .count );
230+ for (index + 1 .. self .count ) | i | {
231+ self .handles_buf [i - 1 ] = self .handles_buf [i ];
232+ self .stream_map [i - 1 ] = self .stream_map [i ];
233+ }
234+ self .count -= 1 ;
235+ }
236+ },
237+ } else void ,
201238
202239 const Self = @This ();
203240
204241 pub fn deinit (self : * Self ) void {
242+ if (builtin .os .tag == .windows ) {
243+ // cancel any pending IO to prevent clobbering OVERLAPPED value
244+ for (self .windows .active .handles_buf [0 .. self .windows .active .count ]) | h | {
245+ _ = os .windows .kernel32 .CancelIo (h );
246+ }
247+ }
205248 inline for (& self .fifos ) | * q | q .deinit ();
206249 self .* = undefined ;
207250 }
@@ -214,19 +257,89 @@ pub fn Poller(comptime StreamEnum: type) type {
214257 }
215258 }
216259
217- pub inline fn fifo (self : * Self , comptime which : StreamEnum ) * Fifo {
260+ pub inline fn fifo (self : * Self , comptime which : StreamEnum ) * PollFifo {
218261 return & self .fifos [@enumToInt (which )];
219262 }
220263
221264 pub fn done (self : Self ) bool {
265+ if (builtin .os .tag == .windows )
266+ return self .windows .first_read_done and self .windows .active .count == 0 ;
267+
222268 for (self .poll_fds ) | poll_fd | {
223269 if (poll_fd .fd != -1 ) return false ;
224270 } else return true ;
225271 }
226272
227273 fn pollWindows (self : * Self ) ! void {
228- _ = self ;
229- @compileError ("TODO" );
274+ const bump_amt = 512 ;
275+
276+ if (! self .windows .first_read_done ) {
277+ // Windows Async IO requires an initial call to ReadFile before waiting on the handle
278+ for (0.. enum_fields .len ) | i | {
279+ const handle = self .windows .active .handles_buf [i ];
280+ switch (try windowsAsyncRead (
281+ handle ,
282+ & self .windows .overlapped [i ],
283+ & self .fifos [i ],
284+ bump_amt ,
285+ )) {
286+ .pending = > {
287+ self .windows .active .handles_buf [self .windows .active .count ] = handle ;
288+ self .windows .active .stream_map [self .windows .active .count ] = @intToEnum (StreamEnum , i );
289+ self .windows .active .count += 1 ;
290+ },
291+ .closed = > {}, // don't add to the wait_objects list
292+ }
293+ }
294+ self .windows .first_read_done = true ;
295+ }
296+
297+ while (true ) {
298+ if (self .windows .active .count == 0 ) return ;
299+
300+ const status = os .windows .kernel32 .WaitForMultipleObjects (
301+ self .windows .active .count ,
302+ & self .windows .active .handles_buf ,
303+ 0 ,
304+ os .windows .INFINITE ,
305+ );
306+ if (status == os .windows .WAIT_FAILED )
307+ return os .windows .unexpectedError (os .windows .kernel32 .GetLastError ());
308+
309+ if (status < os .windows .WAIT_OBJECT_0 or status > os .windows .WAIT_OBJECT_0 + enum_fields .len - 1 )
310+ unreachable ;
311+
312+ const active_idx = status - os .windows .WAIT_OBJECT_0 ;
313+
314+ const handle = self .windows .active .handles_buf [active_idx ];
315+ const stream_idx = @enumToInt (self .windows .active .stream_map [active_idx ]);
316+ var read_bytes : u32 = undefined ;
317+ if (0 == os .windows .kernel32 .GetOverlappedResult (
318+ handle ,
319+ & self .windows .overlapped [stream_idx ],
320+ & read_bytes ,
321+ 0 ,
322+ )) switch (os .windows .kernel32 .GetLastError ()) {
323+ .BROKEN_PIPE = > {
324+ self .windows .active .removeAt (active_idx );
325+ continue ;
326+ },
327+ else = > | err | return os .windows .unexpectedError (err ),
328+ };
329+
330+ self .fifos [stream_idx ].update (read_bytes );
331+
332+ switch (try windowsAsyncRead (
333+ handle ,
334+ & self .windows .overlapped [stream_idx ],
335+ & self .fifos [stream_idx ],
336+ bump_amt ,
337+ )) {
338+ .pending = > {},
339+ .closed = > self .windows .active .removeAt (active_idx ),
340+ }
341+ return ;
342+ }
230343 }
231344
232345 fn pollPosix (self : * Self ) ! void {
@@ -263,6 +376,25 @@ pub fn Poller(comptime StreamEnum: type) type {
263376 };
264377}
265378
379+ fn windowsAsyncRead (
380+ handle : os.windows.HANDLE ,
381+ overlapped : * os.windows.OVERLAPPED ,
382+ fifo : * PollFifo ,
383+ bump_amt : usize ,
384+ ) ! enum { pending , closed } {
385+ while (true ) {
386+ const buf = try fifo .writableWithSize (bump_amt );
387+ var read_bytes : u32 = undefined ;
388+ const read_result = os .windows .kernel32 .ReadFile (handle , buf .ptr , math .cast (u32 , buf .len ) orelse math .maxInt (u32 ), & read_bytes , overlapped );
389+ if (read_result == 0 ) return switch (os .windows .kernel32 .GetLastError ()) {
390+ .IO_PENDING = > .pending ,
391+ .BROKEN_PIPE = > .closed ,
392+ else = > | err | os .windows .unexpectedError (err ),
393+ };
394+ fifo .update (read_bytes );
395+ }
396+ }
397+
266398/// Given an enum, returns a struct with fields of that enum, each field
267399/// representing an I/O stream for polling.
268400pub fn PollFiles (comptime StreamEnum : type ) type {
0 commit comments