Skip to content

Commit abe6af2

Browse files
committed
Sentinel Support
Initial pass at adding Sentinel support. Adds ConnectionMultiplexer that is managed by a set of Sentinel servers. Usage: ```C# ConfigurationOptions sentinelConfig = new ConfigurationOptions(); sentinelConfig.ServiceName = "ServiceNameValue"; sentinelConfig.EndPoints.Add("127.0.0.1", 26379); sentinelConfig.EndPoints.Add("127.0.0.2", 26379); sentinelConfig.EndPoints.Add("127.0.0.4", 26379); sentinelConfig.TieBreaker = ""; sentinelConfig.DefaultVersion = new Version(3, 0); // Add the configuration for the masters that we will connect to ConfigurationOptions masterConfig = new ConfigurationOptions(); sentinelConfig.SentinelMasterConfignfigurationOptions = mo; // Connect! ConnectionMultiplexer sentinelConnection = ConnectionMultiplexer.Connect(sentinelConfig); // When we need to perform a command against the masters, use the master connection sentinelConnection.SentinelMasterConnection.GetDatabase().MyCommand(); ```
1 parent e116def commit abe6af2

6 files changed

Lines changed: 370 additions & 14 deletions

File tree

StackExchange.Redis/StackExchange/Redis/ConfigurationOptions.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,17 @@ public static string TryNormalize(string value)
105105

106106
private bool? allowAdmin, abortOnConnectFail, highPrioritySocketThreads, resolveDns, ssl;
107107

108-
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
108+
private string clientName, serviceName, password, tieBreaker, sslHost, configChannel;
109109

110110
private CommandMap commandMap;
111111

112112
private Version defaultVersion;
113113

114114
private int? keepAlive, syncTimeout, connectTimeout, responseTimeout, writeBuffer, connectRetry, configCheckSeconds, defaultDatabase;
115115

116-
private Proxy? proxy;
116+
private Proxy? proxy;
117+
118+
private ConfigurationOptions sentinelMasterConfigurationOptions;
117119

118120
/// <summary>
119121
/// A LocalCertificateSelectionCallback delegate responsible for selecting the certificate used for authentication; note
@@ -242,7 +244,12 @@ public int ConnectTimeout {
242244
/// If enabled the ConnectionMultiplexer will not re-resolve DNS
243245
/// when attempting to re-connect after a connection failure.
244246
/// </summary>
245-
public bool ResolveDns { get { return resolveDns.GetValueOrDefault(); } set { resolveDns = value; } }
247+
public bool ResolveDns { get { return resolveDns.GetValueOrDefault(); } set { resolveDns = value; } }
248+
249+
/// <summary>
250+
/// Configuration options for the master that Sentinel connects to
251+
/// </summary>
252+
public ConfigurationOptions SentinelMasterConfigurationOptions { get { return sentinelMasterConfigurationOptions; } set { sentinelMasterConfigurationOptions = value; } }
246253

247254
/// <summary>
248255
/// The service name used to resolve a service via sentinel
@@ -347,7 +354,8 @@ public ConfigurationOptions Clone()
347354
ChannelPrefix = ChannelPrefix.Clone(),
348355
SocketManager = SocketManager,
349356
connectRetry = connectRetry,
350-
configCheckSeconds = configCheckSeconds,
357+
configCheckSeconds = configCheckSeconds,
358+
sentinelMasterConfigurationOptions = sentinelMasterConfigurationOptions,
351359
responseTimeout = responseTimeout,
352360
defaultDatabase = defaultDatabase,
353361
};

StackExchange.Redis/StackExchange/Redis/ConnectionMultiplexer.cs

Lines changed: 268 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,36 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
707707
#endif
708708
return false;
709709
}
710-
710+
711+
private static async Task<Task<T>> WaitFirstNonNullIgnoreErrorsAsync<T>(Task<T>[] tasks)
712+
{
713+
if (tasks == null) throw new ArgumentNullException("tasks");
714+
if (tasks.Length == 0) return null;
715+
var typeNullable = (Nullable.GetUnderlyingType(typeof(T)) != null);
716+
var taskList = tasks.Cast<Task>().ToList();
717+
718+
try
719+
{
720+
while (taskList.Count() > 0)
721+
{
722+
#if NET40
723+
var allTasksAwaitingAny = TaskEx.WhenAny(taskList).ObserveErrors();
724+
#else
725+
var allTasksAwaitingAny = Task.WhenAny(taskList).ObserveErrors();
726+
#endif
727+
var result = await allTasksAwaitingAny.ForAwait();
728+
taskList.Remove((Task<T>)result);
729+
if (((Task<T>)result).IsFaulted) continue;
730+
if ((!typeNullable) || ((Task<T>)result).Result != null)
731+
return (Task<T>)result;
732+
}
733+
}
734+
catch
735+
{ }
736+
737+
return null;
738+
}
739+
711740

712741
/// <summary>
713742
/// Raised when a hash-slot has been relocated
@@ -834,7 +863,7 @@ static ConnectionMultiplexer CreateMultiplexer(object configuration)
834863
{
835864
throw new ArgumentException("configuration");
836865
}
837-
if (config.EndPoints.Count == 0) throw new ArgumentException("No endpoints specified", nameof(configuration));
866+
if (config.EndPoints.Count == 0) throw new ArgumentException("No endpoints specified", nameof(configuration));
838867
config.SetDefaultPorts();
839868
return new ConnectionMultiplexer(config);
840869
}
@@ -874,6 +903,12 @@ private static ConnectionMultiplexer ConnectImpl(Func<ConnectionMultiplexer> mul
874903
}
875904
if (!task.Result) throw ExceptionFactory.UnableToConnect(muxer.failureMessage);
876905
killMe = null;
906+
907+
if(muxer.ServerSelectionStrategy.ServerType == ServerType.Sentinel)
908+
{
909+
// Initialize the Sentinel handlers
910+
muxer.InitializeSentinel(log);
911+
}
877912
return muxer;
878913
}
879914
finally
@@ -984,6 +1019,21 @@ internal long LastHeartbeatSecondsAgo {
9841019
internal static long LastGlobalHeartbeatSecondsAgo => unchecked(Environment.TickCount - VolatileWrapper.Read(ref lastGlobalHeartbeatTicks)) / 1000;
9851020

9861021
internal CompletionManager UnprocessableCompletionManager => unprocessableCompletionManager;
1022+
internal EndPoint GetConfiguredMasterForService(int timeoutmillis = -1)
1023+
{
1024+
Task<EndPoint>[] sentinelMasters = this.serverSnapshot
1025+
.Where(s => s.ServerType == ServerType.Sentinel)
1026+
.Select(s => this.GetServer(s.EndPoint).SentinelGetMasterAddressByNameAsync(RawConfig.ServiceName))
1027+
.ToArray();
1028+
1029+
Task<Task<EndPoint>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinelMasters);
1030+
if (!firstCompleteRequest.Wait(timeoutmillis))
1031+
throw new TimeoutException("Timeout resolving master for service");
1032+
if (firstCompleteRequest.Result.Result == null)
1033+
throw new Exception("Unable to determine master");
1034+
1035+
return firstCompleteRequest.Result.Result;
1036+
}
9871037

9881038
/// <summary>
9891039
/// Obtain a pub/sub subscriber connection to the specified server
@@ -1140,8 +1190,8 @@ internal int SyncConnectTimeout(bool forConnect)
11401190
if (timeout >= int.MaxValue / retryCount) return int.MaxValue;
11411191

11421192
timeout *= retryCount;
1143-
if (timeout >= int.MaxValue - 500) return int.MaxValue;
1144-
return timeout + Math.Min(500, timeout);
1193+
if (timeout >= int.MaxValue - 500) return int.MaxValue;
1194+
return timeout + Math.Min(500, timeout);
11451195
}
11461196
/// <summary>
11471197
/// Provides a text overview of the status of all connections
@@ -1192,8 +1242,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
11921242
return false;
11931243
}
11941244
Trace("Starting reconfiguration...");
1195-
Trace(blame != null, "Blaming: " + Format.ToString(blame));
1196-
1245+
Trace(blame != null, "Blaming: " + Format.ToString(blame));
1246+
11971247
LogLocked(log, Configuration);
11981248
LogLocked(log, "");
11991249

@@ -1820,6 +1870,218 @@ public bool IsConnected
18201870

18211871
internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy;
18221872

1873+
internal ConnectionMultiplexer sentinelMasterConnection;
1874+
1875+
/// <summary>
1876+
/// Contains a ConnectionMultiplexer connected to the current master as dictated by
1877+
/// Sentinel config
1878+
/// </summary>
1879+
public ConnectionMultiplexer SentinelMasterConnection { get { return sentinelMasterConnection; } }
1880+
1881+
internal EndPoint currentSentinelMasterEndPoint = null;
1882+
1883+
internal System.Timers.Timer sentinelMasterReconnectTimer = null;
1884+
1885+
/// <summary>
1886+
/// Initializes the sentinel connection and creates the
1887+
/// ConnectionMultiplexer for the master
1888+
/// </summary>
1889+
internal void InitializeSentinel(TextWriter log = null)
1890+
{
1891+
if(log == null) log = TextWriter.Null;
1892+
1893+
// Are we a sentinel setup?
1894+
if(ServerSelectionStrategy.ServerType == ServerType.Sentinel)
1895+
{
1896+
if(String.IsNullOrEmpty(configuration.ServiceName))
1897+
{
1898+
Trace("Sentinel configuration missing ServiceName");
1899+
throw new ArgumentException("No ServiceName configured for Sentinel connection.");
1900+
}
1901+
1902+
Trace(String.Format("Configuring Sentinel connection with ServiceName '{0}'", configuration.ServiceName));
1903+
1904+
if(this.sentinelMasterConnection == null)
1905+
{
1906+
if(configuration.SentinelMasterConfigurationOptions == null)
1907+
{
1908+
Trace("Missing config for Sentinel Master.");
1909+
throw new ArgumentException("Missing configuration options for Sentinel Master");
1910+
}
1911+
1912+
// Clear out the configuration
1913+
ConfigurationOptions sconfig = configuration.SentinelMasterConfigurationOptions;
1914+
sconfig.ServiceName = null;
1915+
sconfig.EndPoints.Clear();
1916+
sconfig.AbortOnConnectFail = false;
1917+
1918+
// Get an initial endpoint
1919+
EndPoint initialMasterEndPoint = null;
1920+
1921+
do
1922+
{
1923+
initialMasterEndPoint = GetConfiguredMasterForService();
1924+
} while(initialMasterEndPoint == null);
1925+
1926+
sconfig.EndPoints.Add(initialMasterEndPoint);
1927+
1928+
this.sentinelMasterConnection = ConnectionMultiplexer.Connect(sconfig, log);
1929+
1930+
// Subscribe to sentinel change events
1931+
ISubscriber sub = GetSubscriber();
1932+
if(sub.SubscribedEndpoint("+switch-master") == null)
1933+
{
1934+
sub.Subscribe("+switch-master", (channel, message) => {
1935+
string[] messageParts = ((string)message).Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
1936+
EndPoint switchBlame = Format.TryParseEndPoint(string.Format("{0}:{1}", messageParts[1], messageParts[2]));
1937+
SwitchMaster(switchBlame);
1938+
});
1939+
1940+
}
1941+
1942+
// If we lose connection to a sentinel server,
1943+
// We need to reconfigure to make sure we still have
1944+
// a subscription to the +switch-master channel.
1945+
this.ConnectionFailed += (sender, e) => {
1946+
// Reconfigure to get subscriptions back online
1947+
ReconfigureAsync(false, true, log, e.EndPoint, "Lost sentinel connection", false).Wait();
1948+
};
1949+
1950+
// Subscribe to new sentinels being added
1951+
if(sub.SubscribedEndpoint("+sentinel") == null)
1952+
{
1953+
sub.Subscribe("+sentinel", (channel, message) => {
1954+
UpdateSentinelAddressList();
1955+
});
1956+
}
1957+
1958+
// Attach to reconnect event to ensure proper connection to the new master
1959+
sentinelMasterConnection.ConnectionRestored += (sender, e) => {
1960+
ConnectionMultiplexer connection = (ConnectionMultiplexer)sender;
1961+
1962+
if(sentinelMasterReconnectTimer != null)
1963+
{
1964+
sentinelMasterReconnectTimer.Stop();
1965+
sentinelMasterReconnectTimer = null;
1966+
}
1967+
1968+
// Run a swithc to make sure we have update-to-date
1969+
// information about which master we should connect to
1970+
SwitchMaster(e.EndPoint);
1971+
1972+
try
1973+
{
1974+
// Verify that the reconnected endpoint is a master,
1975+
// and the correct one otherwise we should reconnect
1976+
if(connection.GetServer(e.EndPoint).IsSlave || e.EndPoint != currentSentinelMasterEndPoint)
1977+
{
1978+
// Wait for things to smooth out
1979+
Thread.Sleep(200);
1980+
1981+
// This isn't a master, so try connecting again
1982+
SwitchMaster(e.EndPoint);
1983+
}
1984+
}
1985+
catch(Exception)
1986+
{
1987+
// If we get here it means that we tried to reconnect to a server that is no longer
1988+
// considered a master by Sentinel and was removed from the list of endpoints.
1989+
1990+
// Wait for things to smooth out
1991+
Thread.Sleep(200);
1992+
1993+
// If we caught an exception, we may have gotten a stale endpoint
1994+
// we are not aware of, so retry
1995+
SwitchMaster(e.EndPoint);
1996+
}
1997+
};
1998+
1999+
// If we lost the connection, run a switch to a least try and get updated info about the master
2000+
sentinelMasterConnection.ConnectionFailed += (sender, e) => {
2001+
// Periodically check to see if we can reconnect to the proper master.
2002+
// This is here in case we lost our subscription to a good sentinel instance
2003+
// or if we miss the published master change
2004+
if(sentinelMasterReconnectTimer == null)
2005+
{
2006+
sentinelMasterReconnectTimer = new System.Timers.Timer(1000);
2007+
sentinelMasterReconnectTimer.AutoReset = true;
2008+
2009+
sentinelMasterReconnectTimer.Elapsed += (o, t) => {
2010+
SwitchMaster(e.EndPoint);
2011+
};
2012+
2013+
sentinelMasterReconnectTimer.Start();
2014+
}
2015+
};
2016+
2017+
// Perform the initial switchover
2018+
SwitchMaster(configuration.EndPoints[0], log);
2019+
}
2020+
}
2021+
}
2022+
2023+
/// <summary>
2024+
/// Switches the SentinelMasterConnection over to a new master.
2025+
/// </summary>
2026+
/// <param name="switchBlame"></param>
2027+
internal void SwitchMaster(EndPoint switchBlame, TextWriter log = null)
2028+
{
2029+
if(log == null) log = TextWriter.Null;
2030+
2031+
ConnectionMultiplexer sconnection = sentinelMasterConnection;
2032+
// Get new master
2033+
EndPoint masterEndPoint = null;
2034+
2035+
do
2036+
{
2037+
masterEndPoint = GetConfiguredMasterForService();
2038+
} while(masterEndPoint == null);
2039+
2040+
currentSentinelMasterEndPoint = masterEndPoint;
2041+
2042+
if (!sconnection.servers.Contains(masterEndPoint))
2043+
{
2044+
sconnection.configuration.EndPoints.Clear();
2045+
sconnection.servers.Clear();
2046+
sconnection.serverSnapshot = new ServerEndPoint[0];
2047+
sconnection.configuration.EndPoints.Add(masterEndPoint);
2048+
Trace(string.Format("Switching master to {0}", masterEndPoint));
2049+
// Trigger a reconfigure
2050+
sconnection.ReconfigureAsync(false, false, log, switchBlame, "master switch", false, CommandFlags.PreferMaster).Wait();
2051+
}
2052+
2053+
UpdateSentinelAddressList();
2054+
}
2055+
2056+
internal void UpdateSentinelAddressList(int timeoutmillis = 500)
2057+
{
2058+
Task<EndPoint[]>[] sentinels = this.serverSnapshot
2059+
.Where(s => s.ServerType == ServerType.Sentinel)
2060+
.Select(s => this.GetServer(s.EndPoint).SentinelGetSentinelAddresses(RawConfig.ServiceName))
2061+
.ToArray();
2062+
2063+
Task<Task<EndPoint[]>> firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync(sentinels);
2064+
2065+
// Ignore errors, as having an updated sentinel list is
2066+
// not essential
2067+
if (!firstCompleteRequest.Wait(timeoutmillis))
2068+
return;
2069+
if (firstCompleteRequest.Result.Result == null)
2070+
return;
2071+
2072+
bool hasNew = false;
2073+
foreach(EndPoint newSentinel in firstCompleteRequest.Result.Result.Where(x => !configuration.EndPoints.Contains(x)))
2074+
{
2075+
hasNew = true;
2076+
configuration.EndPoints.Add(newSentinel);
2077+
}
2078+
2079+
if(hasNew)
2080+
{
2081+
// Reconfigure the sentinel multiplexer if we added new endpoints
2082+
ReconfigureAsync(false, true, null, configuration.EndPoints[0], "Updating Sentinel List", false).Wait();
2083+
}
2084+
}
18232085

18242086
/// <summary>
18252087
/// Close all connections and release all resources associated with this object

StackExchange.Redis/StackExchange/Redis/IServer.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,16 @@ public partial interface IServer : IRedis
437437
/// <param name="flags"></param>
438438
/// <returns>the master ip and port</returns>
439439
/// <remarks>http://redis.io/topics/sentinel</remarks>
440-
Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None);
440+
Task<EndPoint> SentinelGetMasterAddressByNameAsync(string serviceName, CommandFlags flags = CommandFlags.None);
441+
442+
/// <summary>
443+
/// Returns the ip and port numbers of all known Sentinels
444+
/// for the given service name.
445+
/// </summary>
446+
/// <param name="serviveName">the sentinel service name</param>
447+
/// <param name="flags"></param>
448+
/// <returns>a list of the sentinel ips and ports</returns>
449+
Task<EndPoint[]> SentinelGetSentinelAddresses(string serviveName, CommandFlags flags = CommandFlags.None);
441450

442451
/// <summary>
443452
/// Show the state and info of the specified master.

StackExchange.Redis/StackExchange/Redis/RedisLiterals.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static readonly RedisValue
6868
GETMASTERADDRBYNAME = "GET-MASTER-ADDR-BY-NAME",
6969
// RESET = "RESET",
7070
FAILOVER = "FAILOVER",
71+
SENTINELS = "SENTINELS",
7172

7273
// Sentinel Literals as of 2.8.4
7374
MONITOR = "MONITOR",

0 commit comments

Comments
 (0)