Skip to content

Commit 2b9f69a

Browse files
authored
Epoll: Avoid redundant EPOLL_CTL_MOD calls (#9397) (#9583)
Motivation Currently an epoll_ctl syscall is made every time there is a change to the event interest flags (EPOLLIN, EPOLLOUT, etc) of a channel. These are only done in the event loop so can be aggregated into 0 or 1 such calls per channel prior to the next call to epoll_wait. Modifications I think further streamlining/simplification is possible but for now I've tried to minimize structural changes and added the aggregation beneath the existing flag manipulation logic. A new AbstractChannel#activeFlags field records the flags last set on the epoll fd for that channel. Calls to setFlag/clearFlag update the flags field as before but instead of calling epoll_ctl immediately, just set or clear a bit for the channel in a new bitset in the associated EpollEventLoop to reflect whether there's any change to the last set value. Prior to calling epoll_wait the event loop makes the appropriate epoll_ctl(EPOLL_CTL_MOD) call once for each channel who's bit is set. Result Fewer syscalls, particularly in some auto-read=false cases. Simplified error handling from centralization of these calls.
1 parent 338e1a9 commit 2b9f69a

3 files changed

Lines changed: 76 additions & 61 deletions

File tree

transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
6969
private volatile SocketAddress local;
7070
private volatile SocketAddress remote;
7171

72-
protected int flags = Native.EPOLLET;
72+
protected int flags = Native.EPOLLET | Native.EPOLLIN;
73+
protected int activeFlags;
7374
boolean inputClosedSeenErrorOnRead;
7475
boolean epollInReadyRunnablePending;
7576

@@ -109,17 +110,23 @@ static boolean isSoErrorZero(Socket fd) {
109110
}
110111
}
111112

112-
void setFlag(int flag) throws IOException {
113+
void setFlag(int flag) {
113114
if (!isFlagSet(flag)) {
114115
flags |= flag;
115-
modifyEvents();
116+
updatePendingFlagsSet();
116117
}
117118
}
118119

119-
void clearFlag(int flag) throws IOException {
120+
void clearFlag(int flag) {
120121
if (isFlagSet(flag)) {
121122
flags &= ~flag;
122-
modifyEvents();
123+
updatePendingFlagsSet();
124+
}
125+
}
126+
127+
private void updatePendingFlagsSet() {
128+
if (isRegistered()) {
129+
((EpollEventLoop) eventLoop()).updatePendingFlagsSet(this);
123130
}
124131
}
125132

@@ -246,33 +253,33 @@ private static boolean isAllowHalfClosure(ChannelConfig config) {
246253
((SocketChannelConfig) config).isAllowHalfClosure();
247254
}
248255

256+
private Runnable clearEpollInTask;
257+
249258
final void clearEpollIn() {
250259
// Only clear if registered with an EventLoop as otherwise
251-
if (isRegistered()) {
252-
final EventLoop loop = eventLoop();
253-
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
254-
if (loop.inEventLoop()) {
255-
unsafe.clearEpollIn0();
256-
} else {
257-
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
258-
loop.execute(new Runnable() {
259-
@Override
260-
public void run() {
261-
if (!unsafe.readPending && !config().isAutoRead()) {
262-
// Still no read triggered so clear it now
263-
unsafe.clearEpollIn0();
264-
}
260+
final EventLoop loop = isRegistered() ? eventLoop() : null;
261+
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
262+
if (loop == null || loop.inEventLoop()) {
263+
unsafe.clearEpollIn0();
264+
return;
265+
}
266+
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
267+
Runnable clearFlagTask = clearEpollInTask;
268+
if (clearFlagTask == null) {
269+
clearEpollInTask = clearFlagTask = new Runnable() {
270+
@Override
271+
public void run() {
272+
if (!unsafe.readPending && !config().isAutoRead()) {
273+
// Still no read triggered so clear it now
274+
unsafe.clearEpollIn0();
265275
}
266-
});
267-
}
268-
} else {
269-
// The EventLoop is not registered atm so just update the flags so the correct value
270-
// will be used once the channel is registered
271-
flags &= ~Native.EPOLLIN;
276+
}
277+
};
272278
}
279+
loop.execute(clearFlagTask);
273280
}
274281

275-
private void modifyEvents() throws IOException {
282+
void modifyEvents() throws IOException {
276283
if (isOpen() && isRegistered()) {
277284
((EpollEventLoop) eventLoop()).modify(this);
278285
}
@@ -416,7 +423,7 @@ final void epollInFinally(ChannelConfig config) {
416423
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
417424
//
418425
// See https://github.com/netty/netty/issues/2254
419-
clearEpollIn();
426+
clearEpollIn0();
420427
}
421428
}
422429

@@ -446,19 +453,7 @@ final void epollRdHupReady() {
446453
}
447454

448455
// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
449-
clearEpollRdHup();
450-
}
451-
452-
/**
453-
* Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
454-
*/
455-
private void clearEpollRdHup() {
456-
try {
457-
clearFlag(Native.EPOLLRDHUP);
458-
} catch (IOException e) {
459-
pipeline().fireExceptionCaught(e);
460-
close(voidPromise());
461-
}
456+
clearFlag(Native.EPOLLRDHUP);
462457
}
463458

464459
/**
@@ -478,7 +473,7 @@ void shutdownInput(boolean rdHup) {
478473
// We attempted to shutdown and failed, which means the input has already effectively been
479474
// shutdown.
480475
}
481-
clearEpollIn();
476+
clearEpollIn0();
482477
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
483478
} else {
484479
close(voidPromise());
@@ -534,16 +529,9 @@ final void epollOutReady() {
534529
}
535530

536531
protected final void clearEpollIn0() {
537-
assert eventLoop().inEventLoop();
538-
try {
539-
readPending = false;
540-
clearFlag(Native.EPOLLIN);
541-
} catch (IOException e) {
542-
// When this happens there is something completely wrong with either the filedescriptor or epoll,
543-
// so fire the exception through the pipeline and close the Channel.
544-
pipeline().fireExceptionCaught(e);
545-
unsafe().close(unsafe().voidPromise());
546-
}
532+
assert !isRegistered() || eventLoop().inEventLoop();
533+
readPending = false;
534+
clearFlag(Native.EPOLLIN);
547535
}
548536

549537
@Override
@@ -668,7 +656,7 @@ private void finishConnect() {
668656
/**
669657
* Finish the connect
670658
*/
671-
private boolean doFinishConnect() throws Exception {
659+
private boolean doFinishConnect() throws IOException {
672660
if (socket.finishConnect()) {
673661
clearFlag(Native.EPOLLOUT);
674662
if (requestedRemoteAddress instanceof InetSocketAddress) {

transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ public EpollChannelConfig setEpollMode(EpollMode mode) {
150150
if (mode == null) {
151151
throw new NullPointerException("mode");
152152
}
153-
try {
154-
switch (mode) {
153+
switch (mode) {
155154
case EDGE_TRIGGERED:
156155
checkChannelNotRegistered();
157156
((AbstractEpollChannel) channel).setFlag(Native.EPOLLET);
@@ -162,9 +161,6 @@ public EpollChannelConfig setEpollMode(EpollMode mode) {
162161
break;
163162
default:
164163
throw new Error();
165-
}
166-
} catch (IOException e) {
167-
throw new ChannelException(e);
168164
}
169165
return this;
170166
}

transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.netty.util.internal.logging.InternalLoggerFactory;
3434

3535
import java.io.IOException;
36+
import java.util.BitSet;
3637
import java.util.Queue;
3738
import java.util.concurrent.Executor;
3839
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -59,6 +60,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
5960
private final FileDescriptor eventFd;
6061
private final FileDescriptor timerFd;
6162
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
63+
private final BitSet pendingFlagChannels = new BitSet();
64+
6265
private final boolean allowGrowing;
6366
private final EpollEventArray events;
6467

@@ -190,6 +193,7 @@ void add(AbstractEpollChannel ch) throws IOException {
190193
assert inEventLoop();
191194
int fd = ch.socket.intValue();
192195
Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
196+
ch.activeFlags = ch.flags;
193197
AbstractEpollChannel old = channels.put(fd, ch);
194198

195199
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
@@ -203,6 +207,28 @@ void add(AbstractEpollChannel ch) throws IOException {
203207
void modify(AbstractEpollChannel ch) throws IOException {
204208
assert inEventLoop();
205209
Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
210+
ch.activeFlags = ch.flags;
211+
}
212+
213+
void updatePendingFlagsSet(AbstractEpollChannel ch) {
214+
pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags);
215+
}
216+
217+
private void processPendingChannelFlags() {
218+
// Call epollCtlMod for any channels that require event interest changes before epollWaiting
219+
if (!pendingFlagChannels.isEmpty()) {
220+
for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) {
221+
AbstractEpollChannel ch = channels.get(fd);
222+
if (ch != null) {
223+
try {
224+
ch.modifyEvents();
225+
} catch (IOException e) {
226+
ch.pipeline().fireExceptionCaught(e);
227+
ch.close();
228+
}
229+
}
230+
}
231+
}
206232
}
207233

208234
/**
@@ -219,10 +245,14 @@ void remove(AbstractEpollChannel ch) throws IOException {
219245

220246
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
221247
assert !ch.isOpen();
222-
} else if (ch.isOpen()) {
223-
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
224-
// removed once the file-descriptor is closed.
225-
Native.epollCtlDel(epollFd.intValue(), fd);
248+
} else {
249+
ch.activeFlags = 0;
250+
pendingFlagChannels.clear(fd);
251+
if (ch.isOpen()) {
252+
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
253+
// removed once the file-descriptor is closed.
254+
Native.epollCtlDel(epollFd.intValue(), fd);
255+
}
226256
}
227257
}
228258

@@ -288,6 +318,7 @@ private int epollBusyWait() throws IOException {
288318
protected void run() {
289319
for (;;) {
290320
try {
321+
processPendingChannelFlags();
291322
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
292323
switch (strategy) {
293324
case SelectStrategy.CONTINUE:

0 commit comments

Comments
 (0)