-
Notifications
You must be signed in to change notification settings - Fork 99
Description
Some resources (Subscriber.subStub and StreamingSubscriberConnection.messageDispatcher) seem not to be cleaned up when Subscriber fails trying to subscribe to non-existing subscription when using shared ScheduledThreadPoolExecutor. As a result executor work queue grows up in time using memory and cpu.
Attached code fragment below is an example of how to reproduce the issue, scenario:
- Share 2 instances of ScheduledThreadPoolExecutor (system executor provider and executor provider) between subscribers
- Create 200 Subscribers every 70 seconds - subscribing to non existing topic (you can decrease 70s to something smaller to see the effect faster. I used 70s because I noticed that some threads are being cleaned up 60s after Subscriber starts/fails)
- See how memory and cpu usage is growing over time.
I analysed the leak a bit further and I noticed that Watchdog and StreamingSubscriberConnection.messageDispatcher tasks are being queued in ScheduledThreadPoolExecutor. Here is why:
- when Subscriber fails this code is executed. It calls
stopAllStreamingConnections()andshutdownBackgroundResources() - shutdownBackgroundResources() does nothing as there are no background resources
- stopAllStreamingConnections() delegates to stopConnections() which also does nothing because stopAsync() delegates to AbstractService.stopAsync where status is already FAILED and if statement there is not being executed with results in StreamingSubscriberConnection.doStop() not being called and resulting in both StreamingSubscriberConnection.messageDisptacher and Subscriber.subStub not being stopped at all.
This results in
How to fix it ?
I have prepared sample commit with fix that does the cleanup but I guess it may break some internal stuff, so I am sharing this to start discussion and find correct solution :)
Environment details
- PubSub Subscriber
- OS type and version: Ubuntu 18.04
- Java version:
openjdk version "11.0.5" 2019-10-15
OpenJDK Runtime Environment 18.9 (build 11.0.5+10)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.5+10, mixed mode) - pubsub version(s): pubsub-java 1.108.2-SNAPSHOT (latest master, also present in 1.108.0)
Steps to reproduce
- Create lots of subscriber for non-existing topics
- Each subscriber fails with
io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=nonExistingSubscriptionName) - See how memory and CPU usage grows over time
Code example (Kotlin)
val executorProvider = FixedExecutorProvider.create(
ScheduledThreadPoolExecutor(
10 /*corePoolSize*/,
ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-Executor")
)
)
val systemExecutorProvider = FixedExecutorProvider.create(
ScheduledThreadPoolExecutor(
10 /*corePoolSize*/,
ThreadFactoryFactory.executorThreadFactory("PubSub-Sub-SystemExecutor")
)
)
fun main() {
//wait forever until killed
while(true) {
(1..200).map {
createSubscriber(it)
}
.onEach { it.startAsync() }
Thread.sleep(70_000)
}
}
private fun createSubscriber(subscriberNumber: Int): Subscriber {
val subscriber = Subscriber
//use non existing subscription name
.newBuilder(
ProjectSubscriptionName.of("yourProjectId", "nonExistingSubscriptionName"),
MessageReceiverExample()
)
.setCredentialsProvider(credentialsProvider())
.setSystemExecutorProvider(systemExecutorProvider)
.setExecutorProvider(executorProvider)
.setFlowControlSettings(
FlowControlSettings
.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(1000L)
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB
.build()
)
.build()!!
return subscriber
}