Skip to content

[Python aio] maximum_concurrent_rpcs limit bypassed due to underflow in _ConcurrentRpcLimiter #41531

@robinvd

Description

@robinvd

What version of gRPC and what language are you using?

  • gRPC version: 1.76.0 (and likely all previous versions with grpc.aio)
  • Language: Python
  • Platform: All platforms

What operating system (Linux, Windows,...) and version?

Reproduced on macOS, but affects all platforms.

What did you do?

Created a gRPC async server with maximum_concurrent_rpcs=1 and sent multiple concurrent requests in waves.

On our production servers we saw prometheus metrics where some of our grpc processes where having >18 concurrent active requests, while we have set the maximum_concurrent_rpcs to 2. This causes every request to this service to be incredibly slow and likely timeout. For now we have deployed a workaround with a manual check using the stdlib semaphore, and returning RESOURCE_EXHAUSTED from the python handler.

What did you expect to see?

Only 1 RPC should be processed concurrently at any time. Requests exceeding the limit should receive RESOURCE_EXHAUSTED status.

What did you see instead?

After the first wave of requests (where some are correctly rejected with RESOURCE_EXHAUSTED), subsequent waves of requests bypass the limit entirely, allowing unlimited concurrent RPCs.

Reproduction output:

WAVE 1: Sending 3 concurrent requests (limit=1)
[HANDLER] START request 'w1r0', concurrent=1, max_seen=1
[CLIENT] Wave1/req2 failed: RESOURCE_EXHAUSTED - Concurrent RPC limit exceeded!
[CLIENT] Wave1/req1 failed: RESOURCE_EXHAUSTED - Concurrent RPC limit exceeded!
[HANDLER] END request 'w1r0', remaining=0
[CLIENT] Wave1/req0 succeeded

WAVE 2: Sending 3 more concurrent requests
[CLIENT] Wave2/req0 failed: RESOURCE_EXHAUSTED
[HANDLER] START request 'w2r1', concurrent=1, max_seen=1
[HANDLER] START request 'w2r2', concurrent=2, max_seen=2  <-- BUG: 2 concurrent with limit=1!
[CLIENT] Wave2/req1 succeeded
[CLIENT] Wave2/req2 succeeded

RESULTS:
  Wave 1: 1 success, 2 resource_exhausted
  Wave 2: 2 success, 1 resource_exhausted  <-- BUG: should be 1 success, 2 exhausted
  Max concurrent seen in handler: 2  <-- BUG: should never exceed 1

Root Cause

The _ConcurrentRpcLimiter class in src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi has asymmetric increment/decrement logic:

  1. check_before_request_call() (line 879-883): Only increments _active_rpcs when the limit is NOT exceeded:

    def check_before_request_call(self):
        if self._active_rpcs >= self._maximum_concurrent_rpcs:
            self.limiter_concurrency_exceeded = True
        else:
            self._active_rpcs += 1  # Only incremented when NOT exceeded
  2. _server_main_loop() (line 1008-1009): Always calls decrease_once_finished(), regardless of whether the request was accepted or rejected:

    if self._limiter is not None:
        self._limiter.decrease_once_finished(rpc_task)  # Always called!

This means rejected requests still decrement the counter when they finish, causing it to go negative. With a negative counter, the check _active_rpcs >= _maximum_concurrent_rpcs always passes, effectively disabling the limit.

Example with maximum_concurrent_rpcs=1:

Event _active_rpcs Notes
Request A accepted 0 → 1 Counter incremented
Request B rejected 1 (unchanged) Counter NOT incremented
Request C rejected 1 (unchanged) Counter NOT incremented
Request B finishes 1 → 0 Counter decremented (WRONG!)
Request C finishes 0 → -1 Counter goes negative!
Request A finishes -1 → -2 Counter still negative
Request D arrives -2 → -1 Passes check (-2 < 1), accepted
Request E arrives -1 → 0 Passes check (-1 < 1), accepted
... ... Unlimited requests accepted!

Reproduction Script

Click to expand reproduction script
#!/usr/bin/env python3
"""
Reproduction script for _ConcurrentRpcLimiter bug in gRPC Python aio server.

Usage:
    pip install grpcio grpcio-tools
    python repro.py
"""

import asyncio
import logging
import sys
import os
import tempfile

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger('grpc').setLevel(logging.WARNING)

PROTO_CONTENT = '''
syntax = "proto3";
package test;
service TestService {
  rpc Test (TestRequest) returns (TestResponse) {}
}
message TestRequest { string data = 1; }
message TestResponse { string result = 1; }
'''

def setup_proto():
    import subprocess
    proto_dir = tempfile.mkdtemp()
    proto_file = os.path.join(proto_dir, "test.proto")
    with open(proto_file, "w") as f:
        f.write(PROTO_CONTENT)
    subprocess.run([
        sys.executable, "-m", "grpc_tools.protoc",
        f"-I{proto_dir}", f"--python_out={proto_dir}",
        f"--grpc_python_out={proto_dir}", proto_file
    ], check=True)
    sys.path.insert(0, proto_dir)
    return proto_dir

async def main():
    import grpc
    from grpc import aio

    proto_dir = setup_proto()
    import test_pb2, test_pb2_grpc

    concurrent_count = 0
    max_concurrent_seen = 0
    lock = asyncio.Lock()

    class TestServicer(test_pb2_grpc.TestServiceServicer):
        async def Test(self, request, context):
            nonlocal concurrent_count, max_concurrent_seen
            async with lock:
                concurrent_count += 1
                if concurrent_count > max_concurrent_seen:
                    max_concurrent_seen = concurrent_count
            logger.info(f"[HANDLER] START {request.data}, concurrent={concurrent_count}")
            await asyncio.sleep(1.0)
            async with lock:
                concurrent_count -= 1
            logger.info(f"[HANDLER] END {request.data}")
            return test_pb2.TestResponse(result=f"ok-{request.data}")

    server = aio.server(maximum_concurrent_rpcs=1)
    test_pb2_grpc.add_TestServiceServicer_to_server(TestServicer(), server)
    port = server.add_insecure_port("127.0.0.1:0")
    await server.start()

    channel = aio.insecure_channel(f"127.0.0.1:{port}")
    stub = test_pb2_grpc.TestServiceStub(channel)
    await asyncio.sleep(0.5)

    async def send(id, wave):
        try:
            await stub.Test(test_pb2.TestRequest(data=f"w{wave}r{id}"), timeout=30)
            return "SUCCESS"
        except grpc.aio.AioRpcError as e:
            return e.code().name

    # Wave 1
    logger.info("WAVE 1: 3 concurrent requests")
    w1 = await asyncio.gather(*[send(i, 1) for i in range(3)])
    await asyncio.sleep(2)

    # Wave 2
    logger.info("WAVE 2: 3 more concurrent requests")
    w2 = await asyncio.gather(*[send(i, 2) for i in range(3)])
    await asyncio.sleep(2)

    logger.info(f"Wave 1: {w1.count('SUCCESS')} success, {w1.count('RESOURCE_EXHAUSTED')} exhausted")
    logger.info(f"Wave 2: {w2.count('SUCCESS')} success, {w2.count('RESOURCE_EXHAUSTED')} exhausted")
    logger.info(f"Max concurrent seen: {max_concurrent_seen}")

    if max_concurrent_seen > 1:
        logger.error("BUG CONFIRMED: max concurrent exceeded limit!")

    await server.stop(0)
    import shutil
    shutil.rmtree(proto_dir, ignore_errors=True)

if __name__ == "__main__":
    asyncio.run(main())

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions