-
-
Notifications
You must be signed in to change notification settings - Fork 324
Closed
Labels
Description
Code to reproduce
use h2::client;
use http::{Method, Request};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
net::TcpStream,
time::sleep,
};
use tokio_native_tls::native_tls::TlsConnector;
type Result<T> = std::io::Result<T>;
struct ShutdownDelayedStream<T> {
shutdown_poll_count: u32,
shutdown_delay_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
inner: T,
}
impl<T> ShutdownDelayedStream<T> {
fn new(inner: T) -> Self {
Self {
shutdown_poll_count: 0,
shutdown_delay_fut: Some(Box::pin(async {
sleep(Duration::from_millis(100)).await;
})),
inner,
}
}
}
impl<T: AsyncRead + Unpin> AsyncRead for ShutdownDelayedStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for ShutdownDelayedStream<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.get_mut();
this.shutdown_poll_count = this.shutdown_poll_count.wrapping_add(1);
println!(
"poll_shutdown: shutdown_poll_count={}",
this.shutdown_poll_count
);
if let Some(delay_fut) = this.shutdown_delay_fut.as_mut() {
ready!(delay_fut.as_mut().poll(cx));
this.shutdown_delay_fut = None;
}
println!("poll_shutdown: calling inner poll_shutdown");
Pin::new(&mut this.inner).poll_shutdown(cx)
}
}
#[tokio::main]
pub async fn main() -> Result<()> {
// _main_tcp().await
main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
let stream = TcpStream::connect("1.1.1.1:443").await?;
println!("Connected");
let mut stream = ShutdownDelayedStream::new(stream);
stream.shutdown().await?;
println!("Shutdown done");
Ok(())
}
pub async fn main_h2() -> Result<()> {
let (mut h2, conn_task) = {
// Establish TCP connection to the server.
let stream = TcpStream::connect("1.1.1.1:443").await?;
let builder = tokio_native_tls::TlsConnector::from(
TlsConnector::builder()
.request_alpns(&["h2"])
.build()
.unwrap(),
);
let stream = builder.connect("1.1.1.1", stream).await.unwrap();
let stream = ShutdownDelayedStream::new(stream);
let (h2, connection) = client::handshake(stream).await.expect("handshake");
let conn_task = tokio::spawn(async move {
connection.await.unwrap();
});
let h2 = h2.ready().await.expect("h2 ready");
(h2, conn_task)
};
{
// Prepare the HTTP request to send to the server.
let request = Request::builder()
.method(Method::GET)
.uri("https://1.1.1.1/")
.body(())
.unwrap();
// Send the request. The second tuple item allows the caller
// to stream a request body.
let (response, _send) = h2.send_request(request, true).unwrap();
let (head, _) = response.await.expect("response").into_parts();
println!("Received response: {:?}", head);
}
drop(h2);
println!("Waiting for connection task to finish");
conn_task.await?;
println!("Connection task finished");
Ok(())
}Actual result
poll_shutdown is called a lot of times as if in a busy loop until the timer times up.
(...omitted 2k lines of logs...)
poll_shutdown: shutdown_poll_count=2269
poll_shutdown: shutdown_poll_count=2270
poll_shutdown: shutdown_poll_count=2271
poll_shutdown: shutdown_poll_count=2272
poll_shutdown: shutdown_poll_count=2273
poll_shutdown: calling inner poll_shutdown
Connection task finished
Expected result
When poll_shutdown returns Poll::Pending, it should not be woken up immediately. Calling _main_tcp produces the following output:
Connected
poll_shutdown: shutdown_poll_count=1
poll_shutdown: shutdown_poll_count=2
poll_shutdown: calling inner poll_shutdown
Shutdown done
Expecting to see something similar using main_h2.
Reference
This was first discovered when I saw CPU usage spikes in a server running cyper in compio runtime, which uses io_uring under the hood. Normally in compio IO requests submitted do not finish immediately, which is causing issues here.