Skip to content

redis-stream-adapter#78

Merged
sanjomo merged 1 commit intomainfrom
redis-stream-adapter
Dec 14, 2025
Merged

redis-stream-adapter#78
sanjomo merged 1 commit intomainfrom
redis-stream-adapter

Conversation

@sanjomo
Copy link
Member

@sanjomo sanjomo commented Dec 14, 2025

Description

Brief description of the changes in this PR.

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring
  • Test improvements
  • Build/tooling changes

Related Issue

Closes #(issue number)

Changes Made

Testing

  • All existing tests pass
  • New tests added for new functionality
  • Tests pass locally with mvn test
  • Integration tests pass (if applicable)

Checklist

  • Code follows project coding standards
  • Self-review completed
  • Code is commented where necessary
  • Documentation updated (if needed)
  • Commit messages follow conventional format
  • No merge conflicts
  • All CI checks pass

Additional Notes

Any additional information, screenshots, or context that reviewers should know.

Summary by CodeRabbit

  • New Features
    • Redis Streams event store supporting single and multi-channel modes with configurable stream trimming, asynchronous polling, and built-in retry mechanisms for distributed Socket.IO deployments.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 14, 2025

Walkthrough

This PR introduces a new RedisStreamEventStore implementation for Redis Streams-based event publishing and subscribing, updates HazelcastEventStore's topic naming logic to consistently prefix topics, adds module exports, and includes multiple integration tests for distributed scenarios with both reliable and stream-based event stores in SINGLE_CHANNEL and MULTI_CHANNEL modes.

Changes

Cohort / File(s) Summary
Event Store Implementation
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java
New EventStore implementation using Redis Streams with separate pub/sub clients, stream management per EventType, async polling with offset tracking, message deduplication, listener dispatch, retry scheduling, and configurable trimming. Includes fluent builder API.
Event Store Logic Fix
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/hazelcast/HazelcastEventStore.java
Topic name generation now consistently prefixes both SINGLE_CHANNEL and other event types with topicPrefix, previously SINGLE_CHANNEL used bare ALL_SINGLE_CHANNEL name.
Module Manifest
netty-socketio-core/src/main/java/module-info.java
Added export for new com.socketio4j.socketio.store.redis_stream package.
Reliable Store Integration Tests
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliable*ChannelTest.java
New integration test classes for SINGLE_CHANNEL and MULTI_CHANNEL modes using RedissonReliableEventStore with distributed two-node setup, dynamic port allocation, and Redis container lifecycle management.
Stream Store Integration Tests
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStream*ChannelTest.java
Updated integration tests to use RedisStreamEventStore.Builder instead of RedissonReliableEventStore.Builder for both SINGLE_CHANNEL and MULTI_CHANNEL configurations.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • RedisStreamEventStore implementation: Complex with async polling loop, offset management, deduplication logic, listener dispatch, retry scheduling, and stream trimming—each subsystem may warrant detailed review
  • Topic naming change in HazelcastEventStore: Verify impact on existing topic lookups and subscription paths
  • Test configuration consistency: Ensure all four new/updated integration test classes properly initialize Redis clients, stream modes, and lifecycle management
  • Builder pattern correctness: Validate fluent builder API, null-safety, and defaults in RedisStreamEventStore.Builder

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • NeatGuyCoding

Poem

🐰 A stream flows where channels once lay,
With Redisson clients guiding the way—
Topics now branded with prefix so true,
Redis flows dancing, both old paths and new! 🌊

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 2 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.50% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'redis-stream-adapter' is related to the main change: implementing a Redis Stream-based event store adapter, but it is vague and does not clearly describe what was added or changed. Clarify the title to be more descriptive, such as 'Add Redis Stream event store adapter' or 'Implement RedisStreamEventStore for Redis Streams support'.
Description check ❓ Inconclusive The description follows the template structure but the 'Brief description' section and 'Changes Made' section lack substantive content, making it difficult for reviewers to understand the specific modifications. Add a brief description explaining the Redis Stream adapter implementation and populate 'Changes Made' with specific details about added features, modified files, and key components.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch redis-stream-adapter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

Qodana for JVM

45 new problems were found

Inspection name Severity Problems
Vulnerable declared dependency 🔶 Warning 8
Pointless arithmetic expression 🔶 Warning 7
Comparison of 'short' and 'char' values 🔶 Warning 2
Result of method call ignored 🔶 Warning 2
Busy wait 🔶 Warning 1
Injection point with ambiguous dependencies 🔶 Warning 1
Constant values 🔶 Warning 1
Number of placeholders does not match number of arguments in logging call 🔶 Warning 1
Unnecessary 'null' check before method call 🔶 Warning 1
Wrapper type may be primitive 🔶 Warning 1
Non-distinguishable logging calls ◽️ Notice 11
Vulnerable declared dependency ◽️ Notice 9

☁️ View the detailed Qodana report

Detected 123 dependencies

Third-party software list

This page lists the third-party software dependencies used in project

Dependency Version Licenses
aesh 2.8.2 Apache-2.0
annotations 26.0.2-1 Apache-2.0
arc-processor 3.30.1 Apache-2.0
arc 3.30.1 Apache-2.0
asm-analysis 9.9 BSD-3-Clause
asm-commons 9.9 BSD-3-Clause
asm-tree 9.9 BSD-3-Clause
asm-util 9.9 BSD-3-Clause
asm 9.9 BSD-3-Clause
byte-buddy 1.17.7 Apache-2.0
cache-api 1.1.1 Apache-2.0
commons-codec 1.20.0 Apache-2.0
commons-compress 1.28.0 Apache-2.0
commons-io 2.21.0 Apache-2.0
commons-logging-jboss-logging 1.0.0.final Apache-2.0
commons-logging 1.3.5 Apache-2.0
crac 1.5.0 BSD-2-Clause
gizmo 1.9.0 Apache-2.0
gizmo2 2.0.0.beta10 Apache-2.0
hazelcast 5.2.5 MIT
jackson-annotations 2.20 Apache-2.0
jackson-core 2.20.1 Apache-2.0
jackson-databind 2.20.1 Apache-2.0
jackson-dataformat-yaml 2.20.0 AML
jackson-datatype-jsr310 2.20.0 Apache-2.0
jakarta.annotation-api 2.1.1 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jakarta.annotation-api 3.0.0 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jakarta.el-api 6.0.1 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jakarta.enterprise.cdi-api 4.1.0 Apache-2.0
jakarta.enterprise.lang-model 4.1.0 Apache-2.0
jakarta.inject-api 2.0.1 Apache-2.0
jakarta.interceptor-api 2.2.0 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jakarta.json-api 2.1.3 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jakarta.transaction-api 2.0.1 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
jandex-gizmo2 3.5.2 Apache-2.0
jandex 3.5.2 Apache-2.0
jansi 2.4.0 Apache-2.0
jboss-logging 3.6.1.final Apache-2.0
jboss-logmanager 3.1.2.final Apache-2.0
jboss-threads 3.9.1 Apache-2.0
jctools-core 4.0.5 Apache-2.0
jdk-classfile-backport 25.1 GPL-2.0-only
jodd-util 6.3.0 BSD-2-Clause
jspecify 1.0.0 Apache-2.0
jul-to-slf4j 2.0.17 MIT
kryo 5.6.2 BSD-3-Clause
log4j-api 2.25.2 Apache-2.0
log4j-to-slf4j 2.25.2 Apache-2.0
micrometer-commons 1.16.0 Apache-2.0
micrometer-observation 1.16.0 Apache-2.0
micronaut-aop 4.10.2 Apache-2.0
micronaut-context-propagation 4.10.2 Apache-2.0
micronaut-context 4.10.2 Apache-2.0
micronaut-core-reactive 4.10.2 Apache-2.0
micronaut-core 4.10.2 Apache-2.0
micronaut-discovery-core 4.10.2 Apache-2.0
micronaut-http-server 4.10.2 Apache-2.0
micronaut-http 4.10.2 Apache-2.0
micronaut-inject 4.10.2 Apache-2.0
micronaut-retry 4.10.2 Apache-2.0
micronaut-router 4.10.2 Apache-2.0
micronaut-runtime 4.10.2 Apache-2.0
microprofile-config-api 3.1 Apache-2.0
microprofile-context-propagation-api 1.3 Apache-2.0
minlog 1.3.1 BSD-3-Clause
mutiny 3.0.1 Apache-2.0
nativeimage 23.1.2 UPL-1.0
netty-common 4.2.7.final Apache-2.0
parsson 1.1.7 Classpath-exception-2.0
EPL-2.0
GPL-2.0-only
quarkus-arc-deployment 3.30.1 Apache-2.0
quarkus-arc-dev 3.30.1 Apache-2.0
quarkus-arc 3.30.1 Apache-2.0
quarkus-bootstrap-app-model 3.30.1 Apache-2.0
quarkus-bootstrap-core 3.30.1 Apache-2.0
quarkus-bootstrap-runner 3.30.1 Apache-2.0
quarkus-builder 3.30.1 Apache-2.0
quarkus-class-change-agent 3.30.1 Apache-2.0
quarkus-classloader-commons 3.30.1 Apache-2.0
quarkus-core-deployment 3.30.1 Apache-2.0
quarkus-core 3.30.1 Apache-2.0
quarkus-development-mode-spi 3.30.1 Apache-2.0
quarkus-devui-deployment-spi 3.30.1 Apache-2.0
quarkus-fs-util 1.2.0 Apache-2.0
quarkus-hibernate-validator-spi 3.30.1 Apache-2.0
quarkus-ide-launcher 3.30.1 Apache-2.0
quarkus-smallrye-context-propagation-spi 3.30.1 Apache-2.0
reactive-streams 1.0.4 MIT-0
reactor-core 3.6.2 Apache-2.0
reactor-core 3.7.9 Apache-2.0
readline 2.6 Apache-2.0
redisson 3.52.0 Apache-2.0
reflectasm 1.11.9 BSD-3-Clause
rxjava 3.1.8 Apache-2.0
slf4j-api 2.0.17 MIT
slf4j-jboss-logmanager 2.0.2.final Apache-2.0
smallrye-common-annotation 2.14.0 Apache-2.0
smallrye-common-classloader 2.14.0 Apache-2.0
smallrye-common-constraint 2.14.0 Apache-2.0
smallrye-common-cpu 2.14.0 Apache-2.0
smallrye-common-expression 2.14.0 Apache-2.0
smallrye-common-function 2.14.0 Apache-2.0
smallrye-common-io 2.14.0 Apache-2.0
smallrye-common-net 2.14.0 Apache-2.0
smallrye-common-os 2.14.0 Apache-2.0
smallrye-common-process 2.14.0 Apache-2.0
smallrye-common-ref 2.14.0 Apache-2.0
smallrye-common-resource 2.14.0 Apache-2.0
smallrye-config-common 3.14.1 Apache-2.0
smallrye-config-core 3.14.1 Apache-2.0
smallrye-config 3.14.1 Apache-2.0
snakeyaml 2.4 Apache-2.0
snakeyaml 2.5 Apache-2.0
spring-aop 7.0.1 Apache-2.0
spring-beans 7.0.1 Apache-2.0
spring-boot-autoconfigure 4.0.0 Apache-2.0
spring-boot-starter-logging 4.0.0 Apache-2.0
spring-boot-starter 4.0.0 Apache-2.0
spring-boot 4.0.0 Apache-2.0
spring-context 7.0.1 Apache-2.0
spring-core 7.0.1 Apache-2.0
spring-expression 7.0.1 Apache-2.0
wildfly-common 2.0.1 Apache-2.0
word 23.1.2 UPL-1.0
Contact Qodana team

Contact us at qodana-support@jetbrains.com

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java (1)

32-35: Check if RedissonEventStore import is used.

Line 32 imports RedissonEventStore but it doesn't appear to be referenced in this test. Consider removing unused imports.

#!/bin/bash
# Verify if RedissonEventStore is used in this file
rg -n 'RedissonEventStore(?!\.Builder)' netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java (1)

32-35: Check if RedissonEventStore import is used.

Line 32 imports RedissonEventStore but it doesn't appear to be referenced in this test. Consider removing unused imports.

#!/bin/bash
# Verify if RedissonEventStore is used in this file
rg -n 'RedissonEventStore(?!\.Builder)' netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b9a5c7c and 57972ab.

📒 Files selected for processing (7)
  • netty-socketio-core/src/main/java/com/socketio4j/socketio/store/hazelcast/HazelcastEventStore.java (1 hunks)
  • netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java (1 hunks)
  • netty-socketio-core/src/main/java/module-info.java (1 hunks)
  • netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableMultiChannelTest.java (1 hunks)
  • netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableSingleChannelTest.java (1 hunks)
  • netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java (3 hunks)
  • netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableMultiChannelTest.java (2)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_pubsub/RedissonStoreFactory.java (1)
  • RedissonStoreFactory (33-75)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_reliable/RedissonReliableEventStore.java (1)
  • RedissonReliableEventStore (47-336)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java (2)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/event/EventMessage.java (1)
  • EventMessage (21-44)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/event/ListenerRegistration.java (1)
  • ListenerRegistration (25-43)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java (1)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java (1)
  • RedisStreamEventStore (51-506)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableSingleChannelTest.java (5)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableMultiChannelTest.java (1)
  • TestInstance (36-134)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java (1)
  • TestInstance (38-136)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java (1)
  • TestInstance (38-133)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_pubsub/RedissonStoreFactory.java (1)
  • RedissonStoreFactory (33-75)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_reliable/RedissonReliableEventStore.java (1)
  • RedissonReliableEventStore (47-336)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java (2)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_pubsub/RedissonStoreFactory.java (1)
  • RedissonStoreFactory (33-75)
netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java (1)
  • RedisStreamEventStore (51-506)
🪛 ast-grep (0.40.0)
netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableMultiChannelTest.java

[info] 45-45: "Detected use of a Java socket that is not encrypted. As a result, the
traffic could be read by an attacker intercepting the network traffic. Use
an SSLSocket created by 'SSLSocketFactory' or 'SSLServerSocketFactory'
instead."
Context: new ServerSocket(0)
Note: [CWE-319] Cleartext Transmission of Sensitive Information [REFERENCES]
- https://owasp.org/Top10/A02_2021-Cryptographic_Failures

(unencrypted-socket-java)

netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableSingleChannelTest.java

[info] 46-46: "Detected use of a Java socket that is not encrypted. As a result, the
traffic could be read by an attacker intercepting the network traffic. Use
an SSLSocket created by 'SSLSocketFactory' or 'SSLServerSocketFactory'
instead."
Context: new ServerSocket(0)
Note: [CWE-319] Cleartext Transmission of Sensitive Information [REFERENCES]
- https://owasp.org/Top10/A02_2021-Cryptographic_Failures

(unencrypted-socket-java)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: build (21) / build
  • GitHub Check: build (17) / build
  • GitHub Check: build (25) / build
  • GitHub Check: qodana
  • GitHub Check: Analyze (java-kotlin)
  • GitHub Check: Analyze (java-kotlin)
🔇 Additional comments (16)
netty-socketio-core/src/main/java/module-info.java (1)

20-20: LGTM!

The module export for the new redis_stream package follows the established pattern and correctly exposes the RedisStreamEventStore public API.

netty-socketio-core/src/main/java/com/socketio4j/socketio/store/redis_stream/RedisStreamEventStore.java (6)

106-182: LGTM!

The constructor and initialization logic properly validates required parameters, applies sensible defaults with warning logs, and sets up streams and trimming appropriately based on the EventStoreMode.


202-213: LGTM!

The publish logic correctly attaches the nodeId for deduplication and uses computeIfAbsent for lazy stream initialization.


264-306: LGTM!

The polling loop correctly handles async reads with timeout, processes records while skipping messages from the same node, and reschedules itself for continuous polling. The empty map check at line 285 protects the subsequent value access.


308-330: LGTM!

The dispatch logic correctly uses runtime type checking with isInstance before performing the cast, ensuring type safety when invoking listeners.


366-391: LGTM!

The shutdown logic comprehensively cleans up all resources, including pollers, executors, and internal maps, using shutdownNow() for immediate termination.


423-504: LGTM!

The Builder class follows best practices with proper validation in setters (empty prefix check, positive streamMaxLen validation) and a fluent API for configuration.

netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableMultiChannelTest.java (3)

45-49: LGTM!

The findAvailablePort() utility correctly uses ServerSocket(0) to allocate a free port for testing. The static analysis warning about unencrypted sockets is a false positive—this is only for dynamic port allocation, not for actual data transmission.


55-106: LGTM!

The test setup properly initializes Redis container, creates Redisson clients, configures two SocketIOServer nodes with RedissonReliableEventStore in MULTI_CHANNEL mode, and registers appropriate event handlers.


114-132: LGTM!

The teardown logic correctly cleans up all resources in the proper order: nodes, Redis clients, and the Redis container.

netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamMultiChannelTest.java (1)

70-70: LGTM!

The test now correctly uses RedisStreamEventStore.Builder for MULTI_CHANNEL mode, testing the new Redis Streams-based event store implementation.

Also applies to: 92-92

netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonStreamSingleChannelTest.java (1)

69-69: LGTM!

The test now correctly uses RedisStreamEventStore.Builder with SINGLE_CHANNEL mode, properly testing the new Redis Streams-based event store for single-channel scenarios.

Also applies to: 90-90

netty-socketio-core/src/test/java/com/socketio4j/socketio/integration/DistributedRedissonReliableSingleChannelTest.java (3)

46-50: LGTM!

The findAvailablePort() utility correctly uses ServerSocket(0) for dynamic port allocation in tests. The static analysis warning is a false positive.


55-105: LGTM!

The test setup properly configures two SocketIOServer nodes with RedissonReliableEventStore in SINGLE_CHANNEL mode, with appropriate event handlers for distributed room management testing.


113-131: LGTM!

The teardown logic ensures proper cleanup of all resources in the correct order.

netty-socketio-core/src/main/java/com/socketio4j/socketio/store/hazelcast/HazelcastEventStore.java (1)

96-101: Remove this comment - no breaking change detected.

The topic naming is not a new behavioral change. The codebase consistently applies topicPrefix across all event store implementations, with a default prefix "SOCKETIO4J:" provided. This is intentional design with builder configuration support (topicNamePrefix() method). The changelog contains no breaking change notice for topic naming, and all tests use this default behavior without issue.

Likely an incorrect or invalid review comment.

@sanjomo sanjomo merged commit b6c7b1b into main Dec 14, 2025
10 checks passed
@sanjomo sanjomo deleted the redis-stream-adapter branch December 14, 2025 09:06
@coderabbitai coderabbitai bot mentioned this pull request Jan 1, 2026
18 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant