@@ -426,6 +426,9 @@ struct grpc_tcp {
426426 on errors anymore */
427427 TcpZerocopySendCtx tcp_zerocopy_send_ctx;
428428 TcpZerocopySendRecord* current_zerocopy_send = nullptr ;
429+
430+ bool curr_read_completed;
431+ int curr_min_read_chunk_size;
429432};
430433
431434struct backup_poller {
@@ -776,10 +779,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
776779 /* NB: After calling call_read_cb a parallel call of the read handler may
777780 * be running. */
778781 if (errno == EAGAIN) {
782+ tcp->curr_read_completed = true ;
779783 finish_estimate (tcp);
780784 tcp->inq = 0 ;
781785 return false ;
782786 } else {
787+ tcp->curr_read_completed = false ;
783788 grpc_slice_buffer_reset_and_unref_internal (tcp->incoming_buffer );
784789 *error = tcp_annotate_error (GRPC_OS_ERROR (errno, " recvmsg" ), tcp);
785790 return true ;
@@ -791,6 +796,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
791796 * We may have read something, i.e., total_read_bytes > 0, but
792797 * since the connection is closed we will drop the data here, because we
793798 * can't call the callback multiple times. */
799+ tcp->curr_read_completed = true ;
794800 grpc_slice_buffer_reset_and_unref_internal (tcp->incoming_buffer );
795801 *error = tcp_annotate_error (
796802 GRPC_ERROR_CREATE_FROM_STATIC_STRING (" Socket closed" ), tcp);
@@ -847,6 +853,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
847853 finish_estimate (tcp);
848854 }
849855
856+ // There may be more data to be read because recvmsg did not return EAGAIN.
857+ tcp->curr_read_completed = false ;
850858 GPR_DEBUG_ASSERT (total_read_bytes > 0 );
851859 if (total_read_bytes < tcp->incoming_buffer ->length ) {
852860 grpc_slice_buffer_trim_end (tcp->incoming_buffer ,
@@ -871,11 +879,21 @@ static void maybe_make_read_slices(grpc_tcp* tcp)
871879 int target_length = static_cast <int >(tcp->target_length );
872880 int extra_wanted =
873881 target_length - static_cast <int >(tcp->incoming_buffer ->length );
882+ if (tcp->curr_read_completed ) {
883+ // Set it to false again to start the next block of reads
884+ tcp->curr_read_completed = false ;
885+ // Reset curr_min_read_chunk_size for the next block of reads
886+ tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size ;
887+ } else {
888+ // Last read is not completed yet. Double the last min read chunk size.
889+ tcp->curr_min_read_chunk_size =
890+ std::min (2 * tcp->curr_min_read_chunk_size , tcp->max_read_chunk_size );
891+ }
874892 grpc_slice_buffer_add_indexed (
875893 tcp->incoming_buffer ,
876894 tcp->memory_owner .MakeSlice (grpc_core::MemoryRequest (
877- tcp->min_read_chunk_size ,
878- grpc_core::Clamp (extra_wanted, tcp->min_read_chunk_size ,
895+ tcp->curr_min_read_chunk_size ,
896+ grpc_core::Clamp (extra_wanted, tcp->curr_min_read_chunk_size ,
879897 tcp->max_read_chunk_size ))));
880898 maybe_post_reclaimer (tcp);
881899 }
@@ -1790,6 +1808,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
17901808 tcp->socket_ts_enabled = false ;
17911809 tcp->ts_capable = true ;
17921810 tcp->outgoing_buffer_arg = nullptr ;
1811+ tcp->curr_read_completed = true ;
1812+ tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size ;
17931813 if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx .memory_limited ()) {
17941814#ifdef GRPC_LINUX_ERRQUEUE
17951815 const int enable = 1 ;
0 commit comments