Skip to content

poll_shutdown is called in a busy loop when returning Poll::Pending #831

@bdbai

Description

@bdbai

Code to reproduce

use h2::client;

use http::{Method, Request};
use std::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
    time::Duration,
};
use tokio::{
    io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
    net::TcpStream,
    time::sleep,
};
use tokio_native_tls::native_tls::TlsConnector;

type Result<T> = std::io::Result<T>;

struct ShutdownDelayedStream<T> {
    shutdown_poll_count: u32,
    shutdown_delay_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
    inner: T,
}

impl<T> ShutdownDelayedStream<T> {
    fn new(inner: T) -> Self {
        Self {
            shutdown_poll_count: 0,
            shutdown_delay_fut: Some(Box::pin(async {
                sleep(Duration::from_millis(100)).await;
            })),
            inner,
        }
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for ShutdownDelayedStream<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for ShutdownDelayedStream<T> {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
    }

    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        let this = self.get_mut();
        this.shutdown_poll_count = this.shutdown_poll_count.wrapping_add(1);
        println!(
            "poll_shutdown: shutdown_poll_count={}",
            this.shutdown_poll_count
        );
        if let Some(delay_fut) = this.shutdown_delay_fut.as_mut() {
            ready!(delay_fut.as_mut().poll(cx));
            this.shutdown_delay_fut = None;
        }
        println!("poll_shutdown: calling inner poll_shutdown");
        Pin::new(&mut this.inner).poll_shutdown(cx)
    }
}

#[tokio::main]
pub async fn main() -> Result<()> {
    // _main_tcp().await
    main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
    let stream = TcpStream::connect("1.1.1.1:443").await?;
    println!("Connected");
    let mut stream = ShutdownDelayedStream::new(stream);
    stream.shutdown().await?;
    println!("Shutdown done");
    Ok(())
}

pub async fn main_h2() -> Result<()> {
    let (mut h2, conn_task) = {
        // Establish TCP connection to the server.
        let stream = TcpStream::connect("1.1.1.1:443").await?;
        let builder = tokio_native_tls::TlsConnector::from(
            TlsConnector::builder()
                .request_alpns(&["h2"])
                .build()
                .unwrap(),
        );
        let stream = builder.connect("1.1.1.1", stream).await.unwrap();
        let stream = ShutdownDelayedStream::new(stream);
        let (h2, connection) = client::handshake(stream).await.expect("handshake");

        let conn_task = tokio::spawn(async move {
            connection.await.unwrap();
        });
        let h2 = h2.ready().await.expect("h2 ready");
        (h2, conn_task)
    };
    {
        // Prepare the HTTP request to send to the server.
        let request = Request::builder()
            .method(Method::GET)
            .uri("https://1.1.1.1/")
            .body(())
            .unwrap();

        // Send the request. The second tuple item allows the caller
        // to stream a request body.
        let (response, _send) = h2.send_request(request, true).unwrap();

        let (head, _) = response.await.expect("response").into_parts();

        println!("Received response: {:?}", head);
    }
    drop(h2);
    println!("Waiting for connection task to finish");
    conn_task.await?;
    println!("Connection task finished");
    Ok(())
}

Actual result

poll_shutdown is called a lot of times as if in a busy loop until the timer times up.

(...omitted 2k lines of logs...)
poll_shutdown: shutdown_poll_count=2269
poll_shutdown: shutdown_poll_count=2270
poll_shutdown: shutdown_poll_count=2271
poll_shutdown: shutdown_poll_count=2272
poll_shutdown: shutdown_poll_count=2273
poll_shutdown: calling inner poll_shutdown
Connection task finished

Expected result

When poll_shutdown returns Poll::Pending, it should not be woken up immediately. Calling _main_tcp produces the following output:

Connected
poll_shutdown: shutdown_poll_count=1
poll_shutdown: shutdown_poll_count=2
poll_shutdown: calling inner poll_shutdown
Shutdown done

Expecting to see something similar using main_h2.

Reference

compio-rs/cyper#25

This was first discovered when I saw CPU usage spikes in a server running cyper in compio runtime, which uses io_uring under the hood. Normally in compio IO requests submitted do not finish immediately, which is causing issues here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions