Skip to content

Commit 82ff9da

Browse files
committed
std.io.poll initial windows implementation
1 parent d8c3738 commit 82ff9da

File tree

1 file changed

+144
-12
lines changed

1 file changed

+144
-12
lines changed

lib/std/io.zig

Lines changed: 144 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,33 +175,76 @@ pub fn poll(
175175
) Poller(StreamEnum) {
176176
const enum_fields = @typeInfo(StreamEnum).Enum.fields;
177177
var result: Poller(StreamEnum) = undefined;
178+
179+
if (builtin.os.tag == .windows) result.windows = .{
180+
.first_read_done = false,
181+
.overlapped = [1]os.windows.OVERLAPPED {
182+
mem.zeroes(os.windows.OVERLAPPED),
183+
} ** enum_fields.len,
184+
.active = .{
185+
.count = 0,
186+
.handles_buf = undefined,
187+
.stream_map = undefined,
188+
},
189+
};
190+
178191
inline for (0..enum_fields.len) |i| {
179192
result.fifos[i] = .{
180193
.allocator = allocator,
181194
.buf = &.{},
182195
.head = 0,
183196
.count = 0,
184197
};
185-
result.poll_fds[i] = .{
186-
.fd = @field(files, enum_fields[i].name).handle,
187-
.events = os.POLL.IN,
188-
.revents = undefined,
189-
};
198+
if (builtin.os.tag == .windows) {
199+
result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle;
200+
} else {
201+
result.poll_fds[i] = .{
202+
.fd = @field(files, enum_fields[i].name).handle,
203+
.events = os.POLL.IN,
204+
.revents = undefined,
205+
};
206+
}
190207
}
191208
return result;
192209
}
193210

211+
pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic);
212+
194213
pub fn Poller(comptime StreamEnum: type) type {
195214
return struct {
196215
const enum_fields = @typeInfo(StreamEnum).Enum.fields;
197-
const Fifo = std.fifo.LinearFifo(u8, .Dynamic);
198-
199-
fifos: [enum_fields.len]Fifo,
200-
poll_fds: [enum_fields.len]std.os.pollfd,
216+
const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd;
217+
218+
fifos: [enum_fields.len]PollFifo,
219+
poll_fds: [enum_fields.len]PollFd,
220+
windows: if (builtin.os.tag == .windows) struct {
221+
first_read_done: bool,
222+
overlapped: [enum_fields.len]os.windows.OVERLAPPED,
223+
active: struct {
224+
count: math.IntFittingRange(0, enum_fields.len),
225+
handles_buf: [enum_fields.len]os.windows.HANDLE,
226+
stream_map: [enum_fields.len]StreamEnum,
227+
228+
pub fn removeAt(self: *@This(), index: u32) void {
229+
std.debug.assert(index < self.count);
230+
for (index + 1 .. self.count) |i| {
231+
self.handles_buf[i - 1] = self.handles_buf[i];
232+
self.stream_map[i - 1] = self.stream_map[i];
233+
}
234+
self.count -= 1;
235+
}
236+
},
237+
} else void,
201238

202239
const Self = @This();
203240

204241
pub fn deinit(self: *Self) void {
242+
if (builtin.os.tag == .windows) {
243+
// cancel any pending IO to prevent clobbering OVERLAPPED value
244+
for (self.windows.active.handles_buf[0 .. self.windows.active.count]) |h| {
245+
_ = os.windows.kernel32.CancelIo(h);
246+
}
247+
}
205248
inline for (&self.fifos) |*q| q.deinit();
206249
self.* = undefined;
207250
}
@@ -214,19 +257,89 @@ pub fn Poller(comptime StreamEnum: type) type {
214257
}
215258
}
216259

217-
pub inline fn fifo(self: *Self, comptime which: StreamEnum) *Fifo {
260+
pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo {
218261
return &self.fifos[@enumToInt(which)];
219262
}
220263

221264
pub fn done(self: Self) bool {
265+
if (builtin.os.tag == .windows)
266+
return self.windows.first_read_done and self.windows.active.count == 0;
267+
222268
for (self.poll_fds) |poll_fd| {
223269
if (poll_fd.fd != -1) return false;
224270
} else return true;
225271
}
226272

227273
fn pollWindows(self: *Self) !void {
228-
_ = self;
229-
@compileError("TODO");
274+
const bump_amt = 512;
275+
276+
if (!self.windows.first_read_done) {
277+
// Windows Async IO requires an initial call to ReadFile before waiting on the handle
278+
for (0..enum_fields.len) |i| {
279+
const handle = self.windows.active.handles_buf[i];
280+
switch (try windowsAsyncRead(
281+
handle,
282+
&self.windows.overlapped[i],
283+
&self.fifos[i],
284+
bump_amt,
285+
)) {
286+
.pending => {
287+
self.windows.active.handles_buf[self.windows.active.count] = handle;
288+
self.windows.active.stream_map[self.windows.active.count] = @intToEnum(StreamEnum, i);
289+
self.windows.active.count += 1;
290+
},
291+
.closed => {}, // don't add to the wait_objects list
292+
}
293+
}
294+
self.windows.first_read_done = true;
295+
}
296+
297+
while (true) {
298+
if (self.windows.active.count == 0) return;
299+
300+
const status = os.windows.kernel32.WaitForMultipleObjects(
301+
self.windows.active.count,
302+
&self.windows.active.handles_buf,
303+
0,
304+
os.windows.INFINITE,
305+
);
306+
if (status == os.windows.WAIT_FAILED)
307+
return os.windows.unexpectedError(os.windows.kernel32.GetLastError());
308+
309+
if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1)
310+
unreachable;
311+
312+
const active_idx = status - os.windows.WAIT_OBJECT_0;
313+
314+
const handle = self.windows.active.handles_buf[active_idx];
315+
const stream_idx = @enumToInt(self.windows.active.stream_map[active_idx]);
316+
var read_bytes: u32 = undefined;
317+
if (0 == os.windows.kernel32.GetOverlappedResult(
318+
handle,
319+
&self.windows.overlapped[stream_idx],
320+
&read_bytes,
321+
0,
322+
)) switch (os.windows.kernel32.GetLastError()) {
323+
.BROKEN_PIPE => {
324+
self.windows.active.removeAt(active_idx);
325+
continue;
326+
},
327+
else => |err| return os.windows.unexpectedError(err),
328+
};
329+
330+
self.fifos[stream_idx].update(read_bytes);
331+
332+
switch (try windowsAsyncRead(
333+
handle,
334+
&self.windows.overlapped[stream_idx],
335+
&self.fifos[stream_idx],
336+
bump_amt,
337+
)) {
338+
.pending => {},
339+
.closed => self.windows.active.removeAt(active_idx),
340+
}
341+
return;
342+
}
230343
}
231344

232345
fn pollPosix(self: *Self) !void {
@@ -263,6 +376,25 @@ pub fn Poller(comptime StreamEnum: type) type {
263376
};
264377
}
265378

379+
fn windowsAsyncRead(
380+
handle: os.windows.HANDLE,
381+
overlapped: *os.windows.OVERLAPPED,
382+
fifo: *PollFifo,
383+
bump_amt: usize,
384+
) !enum{ pending, closed } {
385+
while (true) {
386+
const buf = try fifo.writableWithSize(bump_amt);
387+
var read_bytes: u32 = undefined;
388+
const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped);
389+
if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) {
390+
.IO_PENDING => .pending,
391+
.BROKEN_PIPE => .closed,
392+
else => |err| os.windows.unexpectedError(err),
393+
};
394+
fifo.update(read_bytes);
395+
}
396+
}
397+
266398
/// Given an enum, returns a struct with fields of that enum, each field
267399
/// representing an I/O stream for polling.
268400
pub fn PollFiles(comptime StreamEnum: type) type {

0 commit comments

Comments
 (0)