Skip to content

Subscriber not cleaning resources when subscribing to non-existing subscription (Subscriber.subStub and StreamingSubscriberConnection.messageDispatcher) #315

@rgrebski

Description

@rgrebski

Some resources (Subscriber.subStub and StreamingSubscriberConnection.messageDispatcher) seem not to be cleaned up when Subscriber fails trying to subscribe to non-existing subscription when using shared ScheduledThreadPoolExecutor. As a result executor work queue grows up in time using memory and cpu.

Attached code fragment below is an example of how to reproduce the issue, scenario:

  1. Share 2 instances of ScheduledThreadPoolExecutor (system executor provider and executor provider) between subscribers
  2. Create 200 Subscribers every 70 seconds - subscribing to non existing topic (you can decrease 70s to something smaller to see the effect faster. I used 70s because I noticed that some threads are being cleaned up 60s after Subscriber starts/fails)
  3. See how memory and cpu usage is growing over time.

I analysed the leak a bit further and I noticed that Watchdog and StreamingSubscriberConnection.messageDispatcher tasks are being queued in ScheduledThreadPoolExecutor. Here is why:

How to fix it ?
I have prepared sample commit with fix that does the cleanup but I guess it may break some internal stuff, so I am sharing this to start discussion and find correct solution :)

Environment details

  1. PubSub Subscriber
  2. OS type and version: Ubuntu 18.04
  3. Java version:
    openjdk version "11.0.5" 2019-10-15
    OpenJDK Runtime Environment 18.9 (build 11.0.5+10)
    OpenJDK 64-Bit Server VM 18.9 (build 11.0.5+10, mixed mode)
  4. pubsub version(s): pubsub-java 1.108.2-SNAPSHOT (latest master, also present in 1.108.0)

Steps to reproduce

  1. Create lots of subscriber for non-existing topics
  2. Each subscriber fails with io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=nonExistingSubscriptionName)
  3. See how memory and CPU usage grows over time

Code example (Kotlin)

val executorProvider = FixedExecutorProvider.create(
    ScheduledThreadPoolExecutor(
        10 /*corePoolSize*/,
        ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-Executor")
    )
)
val systemExecutorProvider = FixedExecutorProvider.create(
    ScheduledThreadPoolExecutor(
        10 /*corePoolSize*/,
        ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-SystemExecutor")
    )
)

fun main() {
    //wait forever until killed
        while(true) {
            (1..200).map {
                createSubscriber(it)
            }
                .onEach { it.startAsync() }
               
            Thread.sleep(70_000)             
        }

}


private fun createSubscriber(subscriberNumber: Int): Subscriber {
    val subscriber = Subscriber
        //use non existing subscription name
        .newBuilder(
            ProjectSubscriptionName.of("yourProjectId", "nonExistingSubscriptionName"),
            MessageReceiverExample()
        )
        .setCredentialsProvider(credentialsProvider())
        .setSystemExecutorProvider(systemExecutorProvider)
        .setExecutorProvider(executorProvider)
        .setFlowControlSettings(
            FlowControlSettings
                .newBuilder()
                .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
                .setMaxOutstandingElementCount(1000L)
                .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB
                .build()
        )
        .build()!!


    return subscriber
}

Metadata

Metadata

Labels

🚨This issue needs some love.api: pubsubIssues related to the googleapis/java-pubsub API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions