What version of gRPC and what language are you using?
- gRPC version: 1.76.0 (and likely all previous versions with
grpc.aio)
- Language: Python
- Platform: All platforms
What operating system (Linux, Windows,...) and version?
Reproduced on macOS, but affects all platforms.
What did you do?
Created a gRPC async server with maximum_concurrent_rpcs=1 and sent multiple concurrent requests in waves.
On our production servers we saw prometheus metrics where some of our grpc processes where having >18 concurrent active requests, while we have set the maximum_concurrent_rpcs to 2. This causes every request to this service to be incredibly slow and likely timeout. For now we have deployed a workaround with a manual check using the stdlib semaphore, and returning RESOURCE_EXHAUSTED from the python handler.
What did you expect to see?
Only 1 RPC should be processed concurrently at any time. Requests exceeding the limit should receive RESOURCE_EXHAUSTED status.
What did you see instead?
After the first wave of requests (where some are correctly rejected with RESOURCE_EXHAUSTED), subsequent waves of requests bypass the limit entirely, allowing unlimited concurrent RPCs.
Reproduction output:
WAVE 1: Sending 3 concurrent requests (limit=1)
[HANDLER] START request 'w1r0', concurrent=1, max_seen=1
[CLIENT] Wave1/req2 failed: RESOURCE_EXHAUSTED - Concurrent RPC limit exceeded!
[CLIENT] Wave1/req1 failed: RESOURCE_EXHAUSTED - Concurrent RPC limit exceeded!
[HANDLER] END request 'w1r0', remaining=0
[CLIENT] Wave1/req0 succeeded
WAVE 2: Sending 3 more concurrent requests
[CLIENT] Wave2/req0 failed: RESOURCE_EXHAUSTED
[HANDLER] START request 'w2r1', concurrent=1, max_seen=1
[HANDLER] START request 'w2r2', concurrent=2, max_seen=2 <-- BUG: 2 concurrent with limit=1!
[CLIENT] Wave2/req1 succeeded
[CLIENT] Wave2/req2 succeeded
RESULTS:
Wave 1: 1 success, 2 resource_exhausted
Wave 2: 2 success, 1 resource_exhausted <-- BUG: should be 1 success, 2 exhausted
Max concurrent seen in handler: 2 <-- BUG: should never exceed 1
Root Cause
The _ConcurrentRpcLimiter class in src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi has asymmetric increment/decrement logic:
-
check_before_request_call() (line 879-883): Only increments _active_rpcs when the limit is NOT exceeded:
def check_before_request_call(self):
if self._active_rpcs >= self._maximum_concurrent_rpcs:
self.limiter_concurrency_exceeded = True
else:
self._active_rpcs += 1 # Only incremented when NOT exceeded
-
_server_main_loop() (line 1008-1009): Always calls decrease_once_finished(), regardless of whether the request was accepted or rejected:
if self._limiter is not None:
self._limiter.decrease_once_finished(rpc_task) # Always called!
This means rejected requests still decrement the counter when they finish, causing it to go negative. With a negative counter, the check _active_rpcs >= _maximum_concurrent_rpcs always passes, effectively disabling the limit.
Example with maximum_concurrent_rpcs=1:
| Event |
_active_rpcs |
Notes |
| Request A accepted |
0 → 1 |
Counter incremented |
| Request B rejected |
1 (unchanged) |
Counter NOT incremented |
| Request C rejected |
1 (unchanged) |
Counter NOT incremented |
| Request B finishes |
1 → 0 |
Counter decremented (WRONG!) |
| Request C finishes |
0 → -1 |
Counter goes negative! |
| Request A finishes |
-1 → -2 |
Counter still negative |
| Request D arrives |
-2 → -1 |
Passes check (-2 < 1), accepted |
| Request E arrives |
-1 → 0 |
Passes check (-1 < 1), accepted |
| ... |
... |
Unlimited requests accepted! |
Reproduction Script
Click to expand reproduction script
#!/usr/bin/env python3
"""
Reproduction script for _ConcurrentRpcLimiter bug in gRPC Python aio server.
Usage:
pip install grpcio grpcio-tools
python repro.py
"""
import asyncio
import logging
import sys
import os
import tempfile
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger('grpc').setLevel(logging.WARNING)
PROTO_CONTENT = '''
syntax = "proto3";
package test;
service TestService {
rpc Test (TestRequest) returns (TestResponse) {}
}
message TestRequest { string data = 1; }
message TestResponse { string result = 1; }
'''
def setup_proto():
import subprocess
proto_dir = tempfile.mkdtemp()
proto_file = os.path.join(proto_dir, "test.proto")
with open(proto_file, "w") as f:
f.write(PROTO_CONTENT)
subprocess.run([
sys.executable, "-m", "grpc_tools.protoc",
f"-I{proto_dir}", f"--python_out={proto_dir}",
f"--grpc_python_out={proto_dir}", proto_file
], check=True)
sys.path.insert(0, proto_dir)
return proto_dir
async def main():
import grpc
from grpc import aio
proto_dir = setup_proto()
import test_pb2, test_pb2_grpc
concurrent_count = 0
max_concurrent_seen = 0
lock = asyncio.Lock()
class TestServicer(test_pb2_grpc.TestServiceServicer):
async def Test(self, request, context):
nonlocal concurrent_count, max_concurrent_seen
async with lock:
concurrent_count += 1
if concurrent_count > max_concurrent_seen:
max_concurrent_seen = concurrent_count
logger.info(f"[HANDLER] START {request.data}, concurrent={concurrent_count}")
await asyncio.sleep(1.0)
async with lock:
concurrent_count -= 1
logger.info(f"[HANDLER] END {request.data}")
return test_pb2.TestResponse(result=f"ok-{request.data}")
server = aio.server(maximum_concurrent_rpcs=1)
test_pb2_grpc.add_TestServiceServicer_to_server(TestServicer(), server)
port = server.add_insecure_port("127.0.0.1:0")
await server.start()
channel = aio.insecure_channel(f"127.0.0.1:{port}")
stub = test_pb2_grpc.TestServiceStub(channel)
await asyncio.sleep(0.5)
async def send(id, wave):
try:
await stub.Test(test_pb2.TestRequest(data=f"w{wave}r{id}"), timeout=30)
return "SUCCESS"
except grpc.aio.AioRpcError as e:
return e.code().name
# Wave 1
logger.info("WAVE 1: 3 concurrent requests")
w1 = await asyncio.gather(*[send(i, 1) for i in range(3)])
await asyncio.sleep(2)
# Wave 2
logger.info("WAVE 2: 3 more concurrent requests")
w2 = await asyncio.gather(*[send(i, 2) for i in range(3)])
await asyncio.sleep(2)
logger.info(f"Wave 1: {w1.count('SUCCESS')} success, {w1.count('RESOURCE_EXHAUSTED')} exhausted")
logger.info(f"Wave 2: {w2.count('SUCCESS')} success, {w2.count('RESOURCE_EXHAUSTED')} exhausted")
logger.info(f"Max concurrent seen: {max_concurrent_seen}")
if max_concurrent_seen > 1:
logger.error("BUG CONFIRMED: max concurrent exceeded limit!")
await server.stop(0)
import shutil
shutil.rmtree(proto_dir, ignore_errors=True)
if __name__ == "__main__":
asyncio.run(main())
What version of gRPC and what language are you using?
grpc.aio)What operating system (Linux, Windows,...) and version?
Reproduced on macOS, but affects all platforms.
What did you do?
Created a gRPC async server with
maximum_concurrent_rpcs=1and sent multiple concurrent requests in waves.On our production servers we saw prometheus metrics where some of our grpc processes where having >18 concurrent active requests, while we have set the
maximum_concurrent_rpcsto 2. This causes every request to this service to be incredibly slow and likely timeout. For now we have deployed a workaround with a manual check using the stdlib semaphore, and returning RESOURCE_EXHAUSTED from the python handler.What did you expect to see?
Only 1 RPC should be processed concurrently at any time. Requests exceeding the limit should receive
RESOURCE_EXHAUSTEDstatus.What did you see instead?
After the first wave of requests (where some are correctly rejected with
RESOURCE_EXHAUSTED), subsequent waves of requests bypass the limit entirely, allowing unlimited concurrent RPCs.Reproduction output:
Root Cause
The
_ConcurrentRpcLimiterclass insrc/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxihas asymmetric increment/decrement logic:check_before_request_call()(line 879-883): Only increments_active_rpcswhen the limit is NOT exceeded:_server_main_loop()(line 1008-1009): Always callsdecrease_once_finished(), regardless of whether the request was accepted or rejected:This means rejected requests still decrement the counter when they finish, causing it to go negative. With a negative counter, the check
_active_rpcs >= _maximum_concurrent_rpcsalways passes, effectively disabling the limit.Example with
maximum_concurrent_rpcs=1:_active_rpcsReproduction Script
Click to expand reproduction script