-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Race Condition: Concurrent setOptions() During Async Connection Can Cause Inconsistent ClientOptions #3591
Description
Bug Report
Calling setOptions() on RedisClient while an async connection is being established can lead to inconsistent ClientOptions being used across different components of the same connection. This is because connectStandaloneAsync(), connectPubSubAsync(), and connectSentinelAsync() make multiple calls to getOptions() at different points during the async connection setup, and these calls may observe different ClientOptions values if setOptions() is called concurrently.
Current Behavior
The AbstractRedisClient.clientOptions field is declared as volatile:
private volatile ClientOptions clientOptions = ClientOptions.create();it does not provide atomicity for the entire connection setup process. During async connection establishment, multiple calls to getOptions() occur at different stages: If setOptions() is called by another thread during this sequence, different components may be initialized with different ClientOptions values.
Examples from current code base
From `RedisClient.connectStandaloneAsync()`:private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(
RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
// Call #1: Endpoint creation
DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources());
RedisChannelWriter writer = endpoint;
// Call #2: Check if CommandExpiryWriter is supported
if (CommandExpiryWriter.isSupported(getOptions())) {
// Call #3: Create CommandExpiryWriter
writer = CommandExpiryWriter.buildCommandExpiryWriter(writer, getOptions(), getResources());
}
// ...
// Call #4: Create CommandHandler (via lambda, executed later on Netty thread)
ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(
connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint), false);
// ...
}Reproducible Example
Examples from current code base
package io.lettuce.core;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.test.settings.TestSettings;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Demonstrates race condition when calling setOptions() during async connection establishment.
* <p>
* The test shows that different components of the same connection can be initialized with different ClientOptions if
* setOptions() is called while connectAsync() is in progress.
*/
class ClientOptionsRaceConditionTest extends TestSupport {
@Test
void demonstrateRaceCondition() throws Exception {
RedisURI redisURI = RedisURI.create(TestSettings.host(), TestSettings.port());
// Create a test client that captures getOptions() calls
TestRedisClient client = new TestRedisClient(redisURI);
try {
// Initial options with 10 second timeout
ClientOptions initialOptions = ClientOptions.builder()
.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(10)).build()).build();
client.setOptions(initialOptions);
// Start async connection
ConnectionFuture<StatefulRedisConnection<String, String>> connectionFuture = client.connectAsync(StringCodec.UTF8,
redisURI);
// Immediately change options (race condition!)
ClientOptions newOptions = ClientOptions.builder()
.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(60)).build()).build();
client.setOptions(newOptions);
// Wait for connection to complete
StatefulRedisConnection<String, String> connection = connectionFuture.get(10, TimeUnit.SECONDS);
connection.close();
// Verify the race condition: different components saw different options
List<ClientOptions> capturedOptions = client.getCapturedOptions();
// Multiple getOptions() calls should have been made during connection
assertThat(capturedOptions).hasSizeGreaterThan(1);
// Check if different options were observed
boolean hasInitialOptions = capturedOptions.stream()
.anyMatch(opts -> opts.getSocketOptions().getConnectTimeout().equals(Duration.ofSeconds(10)));
boolean hasNewOptions = capturedOptions.stream()
.anyMatch(opts -> opts.getSocketOptions().getConnectTimeout().equals(Duration.ofSeconds(60)));
// Race condition: some components saw initial options, others saw new options
assertThat(hasInitialOptions && hasNewOptions)
.as("Race condition detected: different components saw different ClientOptions").isTrue();
} finally {
client.shutdown();
}
}
/**
* Test RedisClient that captures all getOptions() calls during connection establishment.
*/
static class TestRedisClient extends RedisClient {
private final List<ClientOptions> capturedOptions = new CopyOnWriteArrayList<>();
private volatile boolean connectionStarted = false;
TestRedisClient(RedisURI redisURI) {
super(null, redisURI);
}
@Override
public ClientOptions getOptions() {
ClientOptions options = super.getOptions();
// Only intercept getOptions() calls that happen during connection
if (connectionStarted) {
capturedOptions.add(options);
}
return options;
}
@Override
public <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectAsync(io.lettuce.core.codec.RedisCodec<K, V> codec,
RedisURI redisURI) {
// Mark that connection has started
connectionStarted = true;
return super.connectAsync(codec, redisURI);
}
List<ClientOptions> getCapturedOptions() {
return capturedOptions;
}
}
}Expected behavior/code
Environment
- Lettuce version(s): [7.2.0.RELEASE,l]
- Redis version: [any]
Possible Solution
Solution 1: Capture Options at Entry Point
Capture ClientOptions once at the beginning of the async operation and pass it through:
public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
checkForRedisURI();
return getConnection(connectStandaloneAsync(codec, this.redisURI, this.redisURI.getTimeout(), getOptions()));
}special handling is needed for.
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout)The suggestion is to introduce new method accepting the ClientOptions, deprecated the existing one and use thread local to propagate the options till we completly remove the old one
@Deprecated
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) {
ClientOptions clientOptions = clientOptionsThreadLocal.get();
return new StatefulRedisConnectionImpl<>(channelWriter, pushHandler, codec, timeout, clientOptions.getJsonParser());
}
protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter,
PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout, ClientOptions clientOptions) {
try {
clientOptionsThreadLocal.set(clientOptions);
return newStatefulRedisConnection(channelWriter, pushHandler, codec, timeout);
} finally {
clientOptionsThreadLocal.remove();
}
}