Skip to content

Add FD-based polling mode for fiber scheduler compatibility#780

Merged
mensfeld merged 34 commits intomasterfrom
fiber-poller
Feb 17, 2026
Merged

Add FD-based polling mode for fiber scheduler compatibility#780
mensfeld merged 34 commits intomasterfrom
fiber-poller

Conversation

@mensfeld
Copy link
Member

@mensfeld mensfeld commented Feb 16, 2026

Summary

  • Introduce an alternative polling mode using file descriptors and IO.select instead of librdkafka's native background threads
  • Single polling thread handles all producers via efficient IO.select multiplexing

Performance Results

FD mode is 39-54% faster than thread mode across all tested scenarios:

Producers Thread Mode FD Mode Improvement
1 27,300 msg/s 41,900 msg/s +54%
2 29,260 msg/s 40,740 msg/s +39%
5 27,850 msg/s 40,080 msg/s +44%
10 26,170 msg/s 39,590 msg/s +51%
25 24,140 msg/s 36,110 msg/s +50%

Resource Savings

Producers Thread Mode FD Mode
10 10 threads 1 thread
25 25 threads, +6.2 MB 1 thread, +4.0 MB

Why FD Mode is Faster

  1. Immediate event notification: librdkafka signals via FD when delivery reports arrive
  2. No thread overhead: Single poller thread vs N librdkafka threads
  3. Efficient polling: poll_nb(0) avoids GVL release/reacquire (~1.6x faster)
  4. Better memory locality: Single thread handles all producers

Configuration

producer = WaterDrop::Producer.new do |config|
  config.kafka = { 'bootstrap.servers': 'localhost:9092' }
  config.polling.mode = :fd  # or :thread (default)
  config.polling.fd.max_time = 100  # max ms per poll cycle
  config.polling.fd.periodic_poll_interval = 1000  # ensures OAuth/stats fire
end

Test plan

  • Unit tests for Poller, State, QueuePipe, and Latch classes
  • Full spec suite passes in both thread and FD modes
  • Integration tests for FD polling non-blocking behavior
  • Integration tests for statistics callbacks in FD mode
  • OAuth token refresh integration test with Keycloak

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Poller (Singleton)                          │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │              Polling Thread (1 thread total)             │   │
│  │                                                          │   │
│  │  IO.select([producer1.io, producer2.io, ...], nil, 1s)  │   │
│  │       │                                                  │   │
│  │       ├── FD ready → drain pipe → poll_drain_nb          │   │
│  │       │              └── if more events: signal_continue │   │
│  │       │                                                  │   │
│  │       └── timeout → poll all (ensures OAuth/stats fire)  │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
         │              │              │
    ┌────┴────┐    ┌────┴────┐    ┌────┴────┐
    │Producer1│    │Producer2│    │Producer3│
    │ State   │    │ State   │    │ State   │
    │  - io   │    │  - io   │    │  - io   │
    │  - pipe │    │  - pipe │    │  - pipe │
    └─────────┘    └─────────┘    └─────────┘

Notes

  • Requires karafka-rdkafka >= 0.24.0 for enable_queue_io_events support
  • Thread mode remains the default for backwards compatibility

Introduce an alternative polling mode that uses file descriptors and
IO.select instead of librdkafka's native background threads. This enables
proper integration with Ruby's fiber scheduler by yielding to the scheduler
during IO waits.

Key changes:
- Add WaterDrop::Polling::Poller singleton for centralized FD polling
- Add WaterDrop::Polling::State to track per-producer polling state
- Add config.polling.mode (:thread or :fd) configuration option
- Add fd_polling? helper method to Producer
- Register/unregister producers with global poller in FD mode
- Add integration tests for FD polling and OAuth token refresh
- Add docker-compose.oauth.yml for OAuth integration testing
- Fix Style/RescueModifier in poller_spec.rb
- Exclude oauth_token_refresh integration test from CI (requires
  separate Keycloak + OAuth Kafka infrastructure)
- Add separate CI job for OAuth integration testing with Keycloak +
  Kafka SASL/OAUTHBEARER infrastructure
- Lower minimum coverage to 95% since thread mode and FD polling mode
  exercise different code paths
- OAuth test runs in both thread and fiber polling modes
The Confluent Kafka image requires KAFKA_OPTS to be set when SASL is
enabled. This adds the required JAAS configuration file and mounts it
into the Kafka container.
To diagnose the OAuth token refresh failures in CI.
The listener-specific settings weren't being applied correctly.
Using global SASL OAUTHBEARER settings instead.
1. Fix token lifetime_ms to be absolute epoch time, not duration
2. Allow HTTP URLs for JWKS endpoint (Kafka 3.0+ security feature)
The allowed URL must match the full JWKS endpoint URL exactly.
Kafka needs Keycloak to be fully ready before it can fetch JWKS.
Using healthcheck with condition: service_healthy.
Keycloak takes longer to start on CI. Increased retries to 60
and start_period to 60s.
The Keycloak container may not have curl installed.
Using bash's built-in /dev/tcp for HTTP checks.
Tests that statistics events write to the queue FD and wake up
the poller, rather than relying on the 1s IO.select timeout.

Uses 100ms statistics interval and verifies we get ~30 callbacks
in 3 seconds (not just ~3 from timeout alone).
State already stores the monitor reference. Added attr_reader :monitor
to State and access monitors through states instead of separate cache.

Also adds integration test for statistics FD wakeup.
When the last producer is unregistered, set @shutdown = true to
stop the polling thread immediately. This prevents resource leakage
and aligns with how users are instructed to close producers.

The thread will restart automatically when a new producer registers.
Tests that:
1. Thread starts when first producer registers
2. Thread continues with multiple producers
3. Thread continues when one producer closes (others remain)
4. Thread stops when last producer closes (no leakage)
5. New producer works after all were closed (restart scenario)
@mensfeld mensfeld self-assigned this Feb 17, 2026
@mensfeld mensfeld requested a review from Copilot February 17, 2026 11:42
@mensfeld mensfeld marked this pull request as ready for review February 17, 2026 11:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an FD-based polling mode to WaterDrop producers, replacing per-producer librdkafka polling threads with a single Ruby poller thread using IO.select, aimed at better fiber scheduler compatibility and throughput.

Changes:

  • Introduce FD polling infrastructure (Polling::Poller, State, QueuePipe, Latch) and wire it into the producer lifecycle (register/unregister).
  • Add config/contract support for config.polling.mode = :fd and FD polling tuning (polling.fd.max_time, polling.fd.periodic_poll_interval).
  • Expand CI/spec/integration coverage to run in both thread and FD polling modes, plus OAuth and lifecycle integrations.

Reviewed changes

Copilot reviewed 39 out of 41 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
waterdrop.gemspec Bumps karafka-rdkafka minimum version for FD polling support.
Gemfile.lock Updates locked karafka-rdkafka RC version/platform entries.
CHANGELOG.md Documents the new polling mode and related changes.
lib/waterdrop.rb Requires singleton and eagerly initializes the poller singleton.
lib/waterdrop/config.rb Adds polling config namespace with mode + fd settings.
lib/waterdrop/contracts/config.rb Validates polling config (mode, fd.max_time, fd.periodic_poll_interval).
config/locales/errors.yml Adds validation messages for polling config (partial).
lib/waterdrop/errors.rb Adds PollerError base error type.
lib/waterdrop/clients/rdkafka.rb Disables native polling thread in FD mode and registers producers in the global poller.
lib/waterdrop/producer.rb Unregisters FD-mode producers from poller on disconnect/close/reload; adds fd_polling?.
lib/waterdrop/polling/poller.rb New global poller implementation using IO.select.
lib/waterdrop/polling/state.rb New per-producer poll state and queue/close signaling.
lib/waterdrop/polling/queue_pipe.rb New IO pipe abstraction used for IO.select wakeups.
lib/waterdrop/polling/latch.rb New latch used to synchronize poller-driven shutdown.
lib/waterdrop/instrumentation/notifications.rb Adds poller-related events to supported notifications list.
lib/waterdrop/connection_pool.rb Adds YARD attribute documentation for default_pool.
spec/spec_helper.rb Lowers minimum coverage and adds poller singleton cleanup hook for FD mode.
spec/support/factories/producer.rb Enables FD polling for specs via FD_POLLING=true.
spec/lib/waterdrop/contracts/config_spec.rb Adds contract specs for polling config validity.
spec/lib/waterdrop/config_spec.rb Adds config specs for polling mode and fd.max_time.
spec/lib/waterdrop/errors_spec.rb Adds spec for new PollerError.
spec/lib/waterdrop/polling/state_spec.rb New unit specs for Polling::State.
spec/lib/waterdrop/polling/queue_pipe_spec.rb New unit specs for Polling::QueuePipe.
spec/lib/waterdrop/polling/poller_spec.rb New unit specs for Polling::Poller.
spec/lib/waterdrop/polling/latch_spec.rb New unit specs for Polling::Latch.
spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb Makes error-callback assertion timing more robust.
spec/integrations/fd_polling_non_blocking/* New integration to validate FD polling doesn’t block produce calls.
spec/integrations/fd_polling_statistics/* New integration verifying stats callbacks fire in FD mode.
spec/integrations/fd_polling_statistics_wakeup/* New integration verifying stats wake the FD poller promptly.
spec/integrations/fd_polling_thread_lifecycle/* New integration verifying poller thread lifecycle and restart behavior.
spec/integrations/oauth_token_refresh/* New OAuth/Keycloak integration fixture and test runner support.
docker-compose.oauth.yml Adds Keycloak + OAuth-enabled Kafka compose setup for CI integration test.
.github/workflows/ci.yml Runs CI in both polling modes and adds OAuth integration job.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

…rror callbacks

- fd_polling_transactional: Tests transactional producer with FD mode
- fd_polling_idempotent: Tests idempotent producer with FD mode
- fd_polling_error_callbacks: Tests error callback mechanism with FD mode
- Fix FD_POLLING env check to use == "true" instead of truthy check
- Add missing polling.fd.periodic_poll_interval_format error message
- Remove unused poller.started/stopped events from notifications
- Add producer_id to broadcast_error for consistent error payloads
- Fix potential deadlock when unregister called from poller thread
- Update outdated comment about singleton instantiation in producer.rb
- Fix CHANGELOG benchmark numbers (44-64% -> 39-54%)
mensfeld added a commit to karafka/wiki that referenced this pull request Feb 17, 2026
Adds an admonition warning users not to close WaterDrop producers from
within their own notification callbacks (e.g., message.acknowledged,
statistics.emitted). When using FD polling mode, these callbacks execute
on the poller thread, and closing the producer from within can cause
synchronization issues or deadlocks.

Related to: karafka/waterdrop#780
mensfeld added a commit to karafka/wiki that referenced this pull request Feb 17, 2026
…#399)

Adds an admonition warning users not to close WaterDrop producers from
within their own notification callbacks (e.g., message.acknowledged,
statistics.emitted). When using FD polling mode, these callbacks execute
on the poller thread, and closing the producer from within can cause
synchronization issues or deadlocks.

Related to: karafka/waterdrop#780
macOS on GitHub Actions has less precise timing than Linux.
Increased sleep durations and timing thresholds to prevent
flaky test failures:
- connection_pool_spec: 0.01s -> 0.05s for thread sync
- idle_disconnector_listener_spec: 0.1s -> 0.2s for async disconnect
- state_spec: 20ms -> 100ms interval with proportionally adjusted sleeps
- producer_spec: statistics ts_d upper bound 200000 -> 300000
The 0.05s sleep wasn't giving thread1 enough time to grab
the connection on macOS before the main thread tried to
acquire it, causing the timeout test to fail.
Replace flaky sleep-based synchronization with a Queue to ensure
thread1 has actually grabbed the connection before the main thread
tries to acquire it. This eliminates the race condition that was
causing failures on macOS.
@mensfeld mensfeld merged commit cb7b656 into master Feb 17, 2026
17 checks passed
@mensfeld mensfeld deleted the fiber-poller branch February 17, 2026 14:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

2 participants