Skip to content

Netty swallows listeners attached to writeAndFlush promises when event loop shutdown #7423

@Tim-Brooks

Description

@Tim-Brooks

Apologies if this is already tracked somewhere. I searched the issues / google and could not find an issue. Additionally, it is possible that this is a known shortcoming of netty - if that is the case I wanted to state that from my perspective this is unexpected. Additionally, I am opening this issue in relation to the write codepath. But I imagine this is on other codepaths too as I think this is just an artifact of how netty deals with listeners.

Expected behavior

Generally, when I call writeAndFlush with a write object and a promise I would expect that the promise always be complete (at some point in the future). Additionally, I would expect that any listeners attached to the promise be called. If not, I would expect to receive an exception letting me know that the operation will not complete. In Elasticsearch we use these listeners to release resources, so if they are not called, that can be problematic.

elastic/elasticsearch#27422

Actual behavior

It looks to me that if if the event loop is shutdown, netty will swallow listeners attached to the promise and not call them (although it will log this failure).

Specifically -

In SingleThreadEventExecutor we attempt to schedule the write task (when are not on the event loop thread). reject() will throw throw new RejectedExecutionException("event executor terminated");.

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

This exception will be caught in AbstractChannelHandlerContext. And the context will attempt to fail the promise. Great!

    private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

In DefaultPromise netty attempts to notify the listeners when the promise is failed.

    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this, cause);
    }

If we are not on the event loop thread, netty attempts to dispatch the listener execution in the DefaultPromise .

    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

But this obviously fails again due to the event loop shutdown and only logs an error. The listers are not dealt with.

    private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

Steps to reproduce

  1. Create a bootstrap and start.
  2. Stop the event loop and wait until complete
  3. Submit a write operation.
  4. Attach listener. The listener will not be called.

Netty version

The current version 4.1 master branch.

JVM version (e.g. java -version)

Should not matter.

OS version (e.g. uname -a)

Should not matter.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions