Skip to content

Commit a16cdee

Browse files
committed
http: fixing a resumption bug in pipelining
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
1 parent 7d8e9de commit a16cdee

3 files changed

Lines changed: 94 additions & 8 deletions

File tree

source/common/network/connection_impl.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -509,9 +509,9 @@ void ConnectionImpl::onReadReady() {
509509
}
510510

511511
read_end_stream_ |= result.end_stream_read_;
512-
if (result.bytes_processed_ != 0 || result.end_stream_read_) {
513-
// Skip onRead if no bytes were processed. For instance, if the connection was closed without
514-
// producing more data.
512+
if (result.bytes_processed_ != 0 || result.end_stream_read_ || read_buffer_.length() != 0) {
513+
// Skip onRead if no bytes were processed and no bytes are buffered. For instance, if the
514+
// connection was closed without producing more data.
515515
onRead(new_buffer_size);
516516
}
517517

test/common/network/connection_impl_test.cc

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,41 @@ TEST_P(ConnectionImplTest, ReadDisable) {
509509
disconnect(false);
510510
}
511511

512+
// The HTTP/1 codec handles pipelined connecitons by relying on readDisable(false) resulting in the
513+
// subsquent request being dispatched. Regression test this behavior.
514+
TEST_P(ConnectionImplTest, ReadEnableDispatches) {
515+
setUpBasicConnection();
516+
connect();
517+
518+
std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>());
519+
client_connection_->addReadFilter(client_read_filter);
520+
521+
{
522+
Buffer::OwnedImpl buffer("data");
523+
server_connection_->write(buffer, false);
524+
EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("data"), false))
525+
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
526+
dispatcher_->exit();
527+
return FilterStatus::StopIteration;
528+
}));
529+
dispatcher_->run(Event::Dispatcher::RunType::Block);
530+
}
531+
532+
{
533+
client_connection_->readDisable(true);
534+
EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("data"), false))
535+
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> FilterStatus {
536+
buffer.drain(buffer.length());
537+
dispatcher_->exit();
538+
return FilterStatus::StopIteration;
539+
}));
540+
client_connection_->readDisable(false);
541+
dispatcher_->run(Event::Dispatcher::RunType::Block);
542+
}
543+
544+
disconnect(true);
545+
}
546+
512547
// Regression test for (at least one failure mode of)
513548
// https://github.com/envoyproxy/envoy/issues/3639 where readDisable on a close
514549
// connection caused a crash.
@@ -944,8 +979,30 @@ TEST_P(ConnectionImplTest, EmptyReadOnCloseTest) {
944979
EXPECT_CALL(*read_filter_, onNewConnection());
945980
EXPECT_CALL(*read_filter_, onData(_, _))
946981
.Times(1)
947-
.WillOnce(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
948-
EXPECT_EQ(buffer_size, data.length());
982+
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> FilterStatus {
983+
EXPECT_EQ(buffer_size, buffer.length());
984+
buffer.drain(buffer.length());
985+
dispatcher_->exit();
986+
return FilterStatus::StopIteration;
987+
}));
988+
client_connection_->write(data, false);
989+
dispatcher_->run(Event::Dispatcher::RunType::Block);
990+
991+
disconnect(true);
992+
}
993+
994+
// Unlike EmptyReadOnCloseTest above, the read on close will result in an onData
995+
// call if there was buffered data.
996+
TEST_P(ConnectionImplTest, ReadEventIfBufferedDataOnClose) {
997+
setUpBasicConnection();
998+
connect();
999+
1000+
const int buffer_size = 32;
1001+
Buffer::OwnedImpl data(std::string(buffer_size, 'a'));
1002+
EXPECT_CALL(*read_filter_, onNewConnection());
1003+
EXPECT_CALL(*read_filter_, onData(_, _))
1004+
.Times(2)
1005+
.WillOnce(Invoke([&](Buffer::Instance&, bool) -> FilterStatus {
9491006
dispatcher_->exit();
9501007
return FilterStatus::StopIteration;
9511008
}));
@@ -1020,10 +1077,11 @@ TEST_P(ConnectionImplTest, FlushWriteAndDelayCloseTest) {
10201077

10211078
EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("Connection: Close"), false))
10221079
.Times(1)
1023-
.WillOnce(InvokeWithoutArgs([&]() -> FilterStatus {
1080+
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> FilterStatus {
10241081
// Advance time by 50ms; delayed close timer should _not_ trigger.
10251082
time_system_.setMonotonicTime(std::chrono::milliseconds(50));
10261083
client_connection_->close(ConnectionCloseType::NoFlush);
1084+
buffer.drain(buffer.length());
10271085
return FilterStatus::StopIteration;
10281086
}));
10291087

@@ -1063,7 +1121,8 @@ TEST_P(ConnectionImplTest, FlushWriteAndDelayCloseTimerTriggerTest) {
10631121
// on the server connection.
10641122
EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("Connection: Close"), false))
10651123
.Times(1)
1066-
.WillOnce(InvokeWithoutArgs([&]() -> FilterStatus {
1124+
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> FilterStatus {
1125+
buffer.drain(buffer.length());
10671126
time_system_.setMonotonicTime(std::chrono::milliseconds(100));
10681127
return FilterStatus::StopIteration;
10691128
}));
@@ -1142,7 +1201,8 @@ TEST_P(ConnectionImplTest, FlushWriteAfterFlushWriteAndDelayWithoutPendingWrite)
11421201
server_connection_->close(ConnectionCloseType::FlushWriteAndDelay);
11431202
EXPECT_CALL(*client_read_filter, onData(BufferStringEqual("Connection: Close"), false))
11441203
.Times(1)
1145-
.WillOnce(InvokeWithoutArgs([&]() -> FilterStatus {
1204+
.WillOnce(Invoke([&](Buffer::Instance& buffer, bool) -> FilterStatus {
1205+
buffer.drain(buffer.length());
11461206
dispatcher_->exit();
11471207
return FilterStatus::StopIteration;
11481208
}));

test/integration/integration_test.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,32 @@ TEST_P(IntegrationTest, Http10WithHostandKeepAlive) {
400400
EXPECT_EQ(upstream_headers->Host()->value(), "foo.com");
401401
}
402402

403+
TEST_P(IntegrationTest, Pipeline) {
404+
autonomous_upstream_ = true;
405+
initialize();
406+
std::string response;
407+
408+
Buffer::OwnedImpl buffer("GET / HTTP/1.1\r\nHost: host\r\n\r\nGET / HTTP/1.1\r\n\r\n");
409+
RawConnectionDriver connection(
410+
lookupPort("http"), buffer,
411+
[&](Network::ClientConnection&, const Buffer::Instance& data) -> void {
412+
response.append(data.toString());
413+
},
414+
version_);
415+
// First response should be success.
416+
while (response.find("200") == std::string::npos) {
417+
connection.run(Event::Dispatcher::RunType::NonBlock);
418+
}
419+
EXPECT_THAT(response, HasSubstr("HTTP/1.1 200 OK\r\n"));
420+
421+
// Second response should be 400 (no host)
422+
while (response.find("400") == std::string::npos) {
423+
connection.run(Event::Dispatcher::RunType::NonBlock);
424+
}
425+
EXPECT_THAT(response, HasSubstr("HTTP/1.1 400 Bad Request\r\n"));
426+
connection.close();
427+
}
428+
403429
TEST_P(IntegrationTest, NoHost) {
404430
initialize();
405431
codec_client_ = makeHttpConnection(lookupPort("http"));

0 commit comments

Comments
 (0)