Skip to content

Commit 88a706e

Browse files
authored
Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer size (#29334)
* Adjusting tcp min read chunk size growth to reduce wastage of incoming buffer space * syncing tcp-posix to head * fix sanity checks * fix typo * remove un-necessary log * minor fix to handle tcp read error case * renaming stuff * changing to std::atomic * fix sanity checks
1 parent d763974 commit 88a706e

2 files changed

Lines changed: 32 additions & 3 deletions

File tree

src/core/lib/iomgr/tcp_posix.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ struct grpc_tcp {
426426
on errors anymore */
427427
TcpZerocopySendCtx tcp_zerocopy_send_ctx;
428428
TcpZerocopySendRecord* current_zerocopy_send = nullptr;
429+
430+
bool curr_read_completed;
431+
int curr_min_read_chunk_size;
429432
};
430433

431434
struct backup_poller {
@@ -776,10 +779,12 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
776779
/* NB: After calling call_read_cb a parallel call of the read handler may
777780
* be running. */
778781
if (errno == EAGAIN) {
782+
tcp->curr_read_completed = true;
779783
finish_estimate(tcp);
780784
tcp->inq = 0;
781785
return false;
782786
} else {
787+
tcp->curr_read_completed = false;
783788
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
784789
*error = tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp);
785790
return true;
@@ -791,6 +796,7 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
791796
* We may have read something, i.e., total_read_bytes > 0, but
792797
* since the connection is closed we will drop the data here, because we
793798
* can't call the callback multiple times. */
799+
tcp->curr_read_completed = true;
794800
grpc_slice_buffer_reset_and_unref_internal(tcp->incoming_buffer);
795801
*error = tcp_annotate_error(
796802
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), tcp);
@@ -847,6 +853,8 @@ static bool tcp_do_read(grpc_tcp* tcp, grpc_error_handle* error)
847853
finish_estimate(tcp);
848854
}
849855

856+
// There may be more data to be read because recvmsg did not return EAGAIN.
857+
tcp->curr_read_completed = false;
850858
GPR_DEBUG_ASSERT(total_read_bytes > 0);
851859
if (total_read_bytes < tcp->incoming_buffer->length) {
852860
grpc_slice_buffer_trim_end(tcp->incoming_buffer,
@@ -871,11 +879,21 @@ static void maybe_make_read_slices(grpc_tcp* tcp)
871879
int target_length = static_cast<int>(tcp->target_length);
872880
int extra_wanted =
873881
target_length - static_cast<int>(tcp->incoming_buffer->length);
882+
if (tcp->curr_read_completed) {
883+
// Set it to false again to start the next block of reads
884+
tcp->curr_read_completed = false;
885+
// Reset curr_min_read_chunk_size for the next block of reads
886+
tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size;
887+
} else {
888+
// Last read is not completed yet. Double the last min read chunk size.
889+
tcp->curr_min_read_chunk_size =
890+
std::min(2 * tcp->curr_min_read_chunk_size, tcp->max_read_chunk_size);
891+
}
874892
grpc_slice_buffer_add_indexed(
875893
tcp->incoming_buffer,
876894
tcp->memory_owner.MakeSlice(grpc_core::MemoryRequest(
877-
tcp->min_read_chunk_size,
878-
grpc_core::Clamp(extra_wanted, tcp->min_read_chunk_size,
895+
tcp->curr_min_read_chunk_size,
896+
grpc_core::Clamp(extra_wanted, tcp->curr_min_read_chunk_size,
879897
tcp->max_read_chunk_size))));
880898
maybe_post_reclaimer(tcp);
881899
}
@@ -1790,6 +1808,8 @@ grpc_endpoint* grpc_tcp_create(grpc_fd* em_fd,
17901808
tcp->socket_ts_enabled = false;
17911809
tcp->ts_capable = true;
17921810
tcp->outgoing_buffer_arg = nullptr;
1811+
tcp->curr_read_completed = true;
1812+
tcp->curr_min_read_chunk_size = tcp->min_read_chunk_size;
17931813
if (tcp_tx_zerocopy_enabled && !tcp->tcp_zerocopy_send_ctx.memory_limited()) {
17941814
#ifdef GRPC_LINUX_ERRQUEUE
17951815
const int enable = 1;

src/core/lib/iomgr/tcp_server_posix.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include <errno.h>
3131
#include <fcntl.h>
32+
#include <inttypes.h>
3233
#include <netinet/in.h>
3334
#include <netinet/tcp.h>
3435
#include <string.h>
@@ -61,6 +62,8 @@
6162
#include "src/core/lib/iomgr/unix_sockets_posix.h"
6263
#include "src/core/lib/resource_quota/api.h"
6364

65+
static std::atomic<int64_t> num_dropped_connections{0};
66+
6467
static grpc_error_handle tcp_server_create(grpc_closure* shutdown_complete,
6568
const grpc_channel_args* args,
6669
grpc_tcp_server** server) {
@@ -221,7 +224,13 @@ static void on_read(void* arg, grpc_error_handle err) {
221224
}
222225

223226
if (sp->server->memory_quota->IsMemoryPressureHigh()) {
224-
gpr_log(GPR_INFO, "Drop incoming connection: high memory pressure");
227+
int64_t dropped_connections_count = ++num_dropped_connections;
228+
if (dropped_connections_count % 1000 == 0) {
229+
gpr_log(GPR_INFO,
230+
"Dropped >= %" PRId64
231+
" new connection attempts due to high memory pressure",
232+
dropped_connections_count);
233+
}
225234
close(fd);
226235
continue;
227236
}

0 commit comments

Comments
 (0)