Add FD-based polling mode for fiber scheduler compatibility#780
Merged
Add FD-based polling mode for fiber scheduler compatibility#780
Conversation
Introduce an alternative polling mode that uses file descriptors and IO.select instead of librdkafka's native background threads. This enables proper integration with Ruby's fiber scheduler by yielding to the scheduler during IO waits. Key changes: - Add WaterDrop::Polling::Poller singleton for centralized FD polling - Add WaterDrop::Polling::State to track per-producer polling state - Add config.polling.mode (:thread or :fd) configuration option - Add fd_polling? helper method to Producer - Register/unregister producers with global poller in FD mode - Add integration tests for FD polling and OAuth token refresh - Add docker-compose.oauth.yml for OAuth integration testing
- Fix Style/RescueModifier in poller_spec.rb - Exclude oauth_token_refresh integration test from CI (requires separate Keycloak + OAuth Kafka infrastructure)
- Add separate CI job for OAuth integration testing with Keycloak + Kafka SASL/OAUTHBEARER infrastructure - Lower minimum coverage to 95% since thread mode and FD polling mode exercise different code paths - OAuth test runs in both thread and fiber polling modes
The Confluent Kafka image requires KAFKA_OPTS to be set when SASL is enabled. This adds the required JAAS configuration file and mounts it into the Kafka container.
To diagnose the OAuth token refresh failures in CI.
The listener-specific settings weren't being applied correctly. Using global SASL OAUTHBEARER settings instead.
1. Fix token lifetime_ms to be absolute epoch time, not duration 2. Allow HTTP URLs for JWKS endpoint (Kafka 3.0+ security feature)
The allowed URL must match the full JWKS endpoint URL exactly.
Kafka needs Keycloak to be fully ready before it can fetch JWKS. Using healthcheck with condition: service_healthy.
Keycloak takes longer to start on CI. Increased retries to 60 and start_period to 60s.
The Keycloak container may not have curl installed. Using bash's built-in /dev/tcp for HTTP checks.
Tests that statistics events write to the queue FD and wake up the poller, rather than relying on the 1s IO.select timeout. Uses 100ms statistics interval and verifies we get ~30 callbacks in 3 seconds (not just ~3 from timeout alone).
State already stores the monitor reference. Added attr_reader :monitor to State and access monitors through states instead of separate cache. Also adds integration test for statistics FD wakeup.
When the last producer is unregistered, set @shutdown = true to stop the polling thread immediately. This prevents resource leakage and aligns with how users are instructed to close producers. The thread will restart automatically when a new producer registers.
Tests that: 1. Thread starts when first producer registers 2. Thread continues with multiple producers 3. Thread continues when one producer closes (others remain) 4. Thread stops when last producer closes (no leakage) 5. New producer works after all were closed (restart scenario)
Contributor
There was a problem hiding this comment.
Pull request overview
This PR adds an FD-based polling mode to WaterDrop producers, replacing per-producer librdkafka polling threads with a single Ruby poller thread using IO.select, aimed at better fiber scheduler compatibility and throughput.
Changes:
- Introduce FD polling infrastructure (
Polling::Poller,State,QueuePipe,Latch) and wire it into the producer lifecycle (register/unregister). - Add config/contract support for
config.polling.mode = :fdand FD polling tuning (polling.fd.max_time,polling.fd.periodic_poll_interval). - Expand CI/spec/integration coverage to run in both thread and FD polling modes, plus OAuth and lifecycle integrations.
Reviewed changes
Copilot reviewed 39 out of 41 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| waterdrop.gemspec | Bumps karafka-rdkafka minimum version for FD polling support. |
| Gemfile.lock | Updates locked karafka-rdkafka RC version/platform entries. |
| CHANGELOG.md | Documents the new polling mode and related changes. |
| lib/waterdrop.rb | Requires singleton and eagerly initializes the poller singleton. |
| lib/waterdrop/config.rb | Adds polling config namespace with mode + fd settings. |
| lib/waterdrop/contracts/config.rb | Validates polling config (mode, fd.max_time, fd.periodic_poll_interval). |
| config/locales/errors.yml | Adds validation messages for polling config (partial). |
| lib/waterdrop/errors.rb | Adds PollerError base error type. |
| lib/waterdrop/clients/rdkafka.rb | Disables native polling thread in FD mode and registers producers in the global poller. |
| lib/waterdrop/producer.rb | Unregisters FD-mode producers from poller on disconnect/close/reload; adds fd_polling?. |
| lib/waterdrop/polling/poller.rb | New global poller implementation using IO.select. |
| lib/waterdrop/polling/state.rb | New per-producer poll state and queue/close signaling. |
| lib/waterdrop/polling/queue_pipe.rb | New IO pipe abstraction used for IO.select wakeups. |
| lib/waterdrop/polling/latch.rb | New latch used to synchronize poller-driven shutdown. |
| lib/waterdrop/instrumentation/notifications.rb | Adds poller-related events to supported notifications list. |
| lib/waterdrop/connection_pool.rb | Adds YARD attribute documentation for default_pool. |
| spec/spec_helper.rb | Lowers minimum coverage and adds poller singleton cleanup hook for FD mode. |
| spec/support/factories/producer.rb | Enables FD polling for specs via FD_POLLING=true. |
| spec/lib/waterdrop/contracts/config_spec.rb | Adds contract specs for polling config validity. |
| spec/lib/waterdrop/config_spec.rb | Adds config specs for polling mode and fd.max_time. |
| spec/lib/waterdrop/errors_spec.rb | Adds spec for new PollerError. |
| spec/lib/waterdrop/polling/state_spec.rb | New unit specs for Polling::State. |
| spec/lib/waterdrop/polling/queue_pipe_spec.rb | New unit specs for Polling::QueuePipe. |
| spec/lib/waterdrop/polling/poller_spec.rb | New unit specs for Polling::Poller. |
| spec/lib/waterdrop/polling/latch_spec.rb | New unit specs for Polling::Latch. |
| spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb | Makes error-callback assertion timing more robust. |
| spec/integrations/fd_polling_non_blocking/* | New integration to validate FD polling doesn’t block produce calls. |
| spec/integrations/fd_polling_statistics/* | New integration verifying stats callbacks fire in FD mode. |
| spec/integrations/fd_polling_statistics_wakeup/* | New integration verifying stats wake the FD poller promptly. |
| spec/integrations/fd_polling_thread_lifecycle/* | New integration verifying poller thread lifecycle and restart behavior. |
| spec/integrations/oauth_token_refresh/* | New OAuth/Keycloak integration fixture and test runner support. |
| docker-compose.oauth.yml | Adds Keycloak + OAuth-enabled Kafka compose setup for CI integration test. |
| .github/workflows/ci.yml | Runs CI in both polling modes and adds OAuth integration job. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…rror callbacks - fd_polling_transactional: Tests transactional producer with FD mode - fd_polling_idempotent: Tests idempotent producer with FD mode - fd_polling_error_callbacks: Tests error callback mechanism with FD mode
- Fix FD_POLLING env check to use == "true" instead of truthy check - Add missing polling.fd.periodic_poll_interval_format error message - Remove unused poller.started/stopped events from notifications - Add producer_id to broadcast_error for consistent error payloads - Fix potential deadlock when unregister called from poller thread - Update outdated comment about singleton instantiation in producer.rb - Fix CHANGELOG benchmark numbers (44-64% -> 39-54%)
mensfeld
added a commit
to karafka/wiki
that referenced
this pull request
Feb 17, 2026
Adds an admonition warning users not to close WaterDrop producers from within their own notification callbacks (e.g., message.acknowledged, statistics.emitted). When using FD polling mode, these callbacks execute on the poller thread, and closing the producer from within can cause synchronization issues or deadlocks. Related to: karafka/waterdrop#780
1 task
mensfeld
added a commit
to karafka/wiki
that referenced
this pull request
Feb 17, 2026
…#399) Adds an admonition warning users not to close WaterDrop producers from within their own notification callbacks (e.g., message.acknowledged, statistics.emitted). When using FD polling mode, these callbacks execute on the poller thread, and closing the producer from within can cause synchronization issues or deadlocks. Related to: karafka/waterdrop#780
a538d3f to
81a6e15
Compare
81a6e15 to
7943de0
Compare
…e instance_variable usage from tests
macOS on GitHub Actions has less precise timing than Linux. Increased sleep durations and timing thresholds to prevent flaky test failures: - connection_pool_spec: 0.01s -> 0.05s for thread sync - idle_disconnector_listener_spec: 0.1s -> 0.2s for async disconnect - state_spec: 20ms -> 100ms interval with proportionally adjusted sleeps - producer_spec: statistics ts_d upper bound 200000 -> 300000
The 0.05s sleep wasn't giving thread1 enough time to grab the connection on macOS before the main thread tried to acquire it, causing the timeout test to fail.
Replace flaky sleep-based synchronization with a Queue to ensure thread1 has actually grabbed the connection before the main thread tries to acquire it. This eliminates the race condition that was causing failures on macOS.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Performance Results
FD mode is 39-54% faster than thread mode across all tested scenarios:
Resource Savings
Why FD Mode is Faster
poll_nb(0)avoids GVL release/reacquire (~1.6x faster)Configuration
Test plan
Architecture
Notes
enable_queue_io_eventssupport