@@ -707,7 +707,36 @@ private async Task<bool> WaitAllIgnoreErrorsAsync(Task[] tasks, int timeoutMilli
707707#endif
708708 return false ;
709709 }
710-
710+
711+ private static async Task < Task < T > > WaitFirstNonNullIgnoreErrorsAsync < T > ( Task < T > [ ] tasks )
712+ {
713+ if ( tasks == null ) throw new ArgumentNullException ( "tasks" ) ;
714+ if ( tasks . Length == 0 ) return null ;
715+ var typeNullable = ( Nullable . GetUnderlyingType ( typeof ( T ) ) != null ) ;
716+ var taskList = tasks . Cast < Task > ( ) . ToList ( ) ;
717+
718+ try
719+ {
720+ while ( taskList . Count ( ) > 0 )
721+ {
722+ #if NET40
723+ var allTasksAwaitingAny = TaskEx . WhenAny ( taskList ) . ObserveErrors ( ) ;
724+ #else
725+ var allTasksAwaitingAny = Task . WhenAny ( taskList ) . ObserveErrors ( ) ;
726+ #endif
727+ var result = await allTasksAwaitingAny . ForAwait ( ) ;
728+ taskList . Remove ( ( Task < T > ) result ) ;
729+ if ( ( ( Task < T > ) result ) . IsFaulted ) continue ;
730+ if ( ( ! typeNullable ) || ( ( Task < T > ) result ) . Result != null )
731+ return ( Task < T > ) result ;
732+ }
733+ }
734+ catch
735+ { }
736+
737+ return null ;
738+ }
739+
711740
712741 /// <summary>
713742 /// Raised when a hash-slot has been relocated
@@ -834,7 +863,7 @@ static ConnectionMultiplexer CreateMultiplexer(object configuration)
834863 {
835864 throw new ArgumentException ( "configuration" ) ;
836865 }
837- if ( config . EndPoints . Count == 0 ) throw new ArgumentException ( "No endpoints specified" , nameof ( configuration ) ) ;
866+ if ( config . EndPoints . Count == 0 ) throw new ArgumentException ( "No endpoints specified" , nameof ( configuration ) ) ;
838867 config . SetDefaultPorts ( ) ;
839868 return new ConnectionMultiplexer ( config ) ;
840869 }
@@ -874,6 +903,12 @@ private static ConnectionMultiplexer ConnectImpl(Func<ConnectionMultiplexer> mul
874903 }
875904 if ( ! task . Result ) throw ExceptionFactory . UnableToConnect ( muxer . failureMessage ) ;
876905 killMe = null ;
906+
907+ if ( muxer . ServerSelectionStrategy . ServerType == ServerType . Sentinel )
908+ {
909+ // Initialize the Sentinel handlers
910+ muxer . InitializeSentinel ( log ) ;
911+ }
877912 return muxer ;
878913 }
879914 finally
@@ -984,6 +1019,21 @@ internal long LastHeartbeatSecondsAgo {
9841019 internal static long LastGlobalHeartbeatSecondsAgo => unchecked ( Environment . TickCount - VolatileWrapper . Read ( ref lastGlobalHeartbeatTicks ) ) / 1000 ;
9851020
9861021 internal CompletionManager UnprocessableCompletionManager => unprocessableCompletionManager ;
1022+ internal EndPoint GetConfiguredMasterForService ( int timeoutmillis = - 1 )
1023+ {
1024+ Task < EndPoint > [ ] sentinelMasters = this . serverSnapshot
1025+ . Where ( s => s . ServerType == ServerType . Sentinel )
1026+ . Select ( s => this . GetServer ( s . EndPoint ) . SentinelGetMasterAddressByNameAsync ( RawConfig . ServiceName ) )
1027+ . ToArray ( ) ;
1028+
1029+ Task < Task < EndPoint > > firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync ( sentinelMasters ) ;
1030+ if ( ! firstCompleteRequest . Wait ( timeoutmillis ) )
1031+ throw new TimeoutException ( "Timeout resolving master for service" ) ;
1032+ if ( firstCompleteRequest . Result . Result == null )
1033+ throw new Exception ( "Unable to determine master" ) ;
1034+
1035+ return firstCompleteRequest . Result . Result ;
1036+ }
9871037
9881038 /// <summary>
9891039 /// Obtain a pub/sub subscriber connection to the specified server
@@ -1140,8 +1190,8 @@ internal int SyncConnectTimeout(bool forConnect)
11401190 if ( timeout >= int . MaxValue / retryCount ) return int . MaxValue ;
11411191
11421192 timeout *= retryCount ;
1143- if ( timeout >= int . MaxValue - 500 ) return int . MaxValue ;
1144- return timeout + Math . Min ( 500 , timeout ) ;
1193+ if ( timeout >= int . MaxValue - 500 ) return int . MaxValue ;
1194+ return timeout + Math . Min ( 500 , timeout ) ;
11451195 }
11461196 /// <summary>
11471197 /// Provides a text overview of the status of all connections
@@ -1192,8 +1242,8 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, Text
11921242 return false ;
11931243 }
11941244 Trace ( "Starting reconfiguration..." ) ;
1195- Trace ( blame != null , "Blaming: " + Format . ToString ( blame ) ) ;
1196-
1245+ Trace ( blame != null , "Blaming: " + Format . ToString ( blame ) ) ;
1246+
11971247 LogLocked ( log , Configuration ) ;
11981248 LogLocked ( log , "" ) ;
11991249
@@ -1820,6 +1870,218 @@ public bool IsConnected
18201870
18211871 internal ServerSelectionStrategy ServerSelectionStrategy => serverSelectionStrategy ;
18221872
1873+ internal ConnectionMultiplexer sentinelMasterConnection ;
1874+
1875+ /// <summary>
1876+ /// Contains a ConnectionMultiplexer connected to the current master as dictated by
1877+ /// Sentinel config
1878+ /// </summary>
1879+ public ConnectionMultiplexer SentinelMasterConnection { get { return sentinelMasterConnection ; } }
1880+
1881+ internal EndPoint currentSentinelMasterEndPoint = null ;
1882+
1883+ internal System . Timers . Timer sentinelMasterReconnectTimer = null ;
1884+
1885+ /// <summary>
1886+ /// Initializes the sentinel connection and creates the
1887+ /// ConnectionMultiplexer for the master
1888+ /// </summary>
1889+ internal void InitializeSentinel ( TextWriter log = null )
1890+ {
1891+ if ( log == null ) log = TextWriter . Null ;
1892+
1893+ // Are we a sentinel setup?
1894+ if ( ServerSelectionStrategy . ServerType == ServerType . Sentinel )
1895+ {
1896+ if ( String . IsNullOrEmpty ( configuration . ServiceName ) )
1897+ {
1898+ Trace ( "Sentinel configuration missing ServiceName" ) ;
1899+ throw new ArgumentException ( "No ServiceName configured for Sentinel connection." ) ;
1900+ }
1901+
1902+ Trace ( String . Format ( "Configuring Sentinel connection with ServiceName '{0}'" , configuration . ServiceName ) ) ;
1903+
1904+ if ( this . sentinelMasterConnection == null )
1905+ {
1906+ if ( configuration . SentinelMasterConfigurationOptions == null )
1907+ {
1908+ Trace ( "Missing config for Sentinel Master." ) ;
1909+ throw new ArgumentException ( "Missing configuration options for Sentinel Master" ) ;
1910+ }
1911+
1912+ // Clear out the configuration
1913+ ConfigurationOptions sconfig = configuration . SentinelMasterConfigurationOptions ;
1914+ sconfig . ServiceName = null ;
1915+ sconfig . EndPoints . Clear ( ) ;
1916+ sconfig . AbortOnConnectFail = false ;
1917+
1918+ // Get an initial endpoint
1919+ EndPoint initialMasterEndPoint = null ;
1920+
1921+ do
1922+ {
1923+ initialMasterEndPoint = GetConfiguredMasterForService ( ) ;
1924+ } while ( initialMasterEndPoint == null ) ;
1925+
1926+ sconfig . EndPoints . Add ( initialMasterEndPoint ) ;
1927+
1928+ this . sentinelMasterConnection = ConnectionMultiplexer . Connect ( sconfig , log ) ;
1929+
1930+ // Subscribe to sentinel change events
1931+ ISubscriber sub = GetSubscriber ( ) ;
1932+ if ( sub . SubscribedEndpoint ( "+switch-master" ) == null )
1933+ {
1934+ sub . Subscribe ( "+switch-master" , ( channel , message ) => {
1935+ string [ ] messageParts = ( ( string ) message ) . Split ( new [ ] { ' ' } , StringSplitOptions . RemoveEmptyEntries ) ;
1936+ EndPoint switchBlame = Format . TryParseEndPoint ( string . Format ( "{0}:{1}" , messageParts [ 1 ] , messageParts [ 2 ] ) ) ;
1937+ SwitchMaster ( switchBlame ) ;
1938+ } ) ;
1939+
1940+ }
1941+
1942+ // If we lose connection to a sentinel server,
1943+ // We need to reconfigure to make sure we still have
1944+ // a subscription to the +switch-master channel.
1945+ this . ConnectionFailed += ( sender , e ) => {
1946+ // Reconfigure to get subscriptions back online
1947+ ReconfigureAsync ( false , true , log , e . EndPoint , "Lost sentinel connection" , false ) . Wait ( ) ;
1948+ } ;
1949+
1950+ // Subscribe to new sentinels being added
1951+ if ( sub . SubscribedEndpoint ( "+sentinel" ) == null )
1952+ {
1953+ sub . Subscribe ( "+sentinel" , ( channel , message ) => {
1954+ UpdateSentinelAddressList ( ) ;
1955+ } ) ;
1956+ }
1957+
1958+ // Attach to reconnect event to ensure proper connection to the new master
1959+ sentinelMasterConnection . ConnectionRestored += ( sender , e ) => {
1960+ ConnectionMultiplexer connection = ( ConnectionMultiplexer ) sender ;
1961+
1962+ if ( sentinelMasterReconnectTimer != null )
1963+ {
1964+ sentinelMasterReconnectTimer . Stop ( ) ;
1965+ sentinelMasterReconnectTimer = null ;
1966+ }
1967+
1968+ // Run a swithc to make sure we have update-to-date
1969+ // information about which master we should connect to
1970+ SwitchMaster ( e . EndPoint ) ;
1971+
1972+ try
1973+ {
1974+ // Verify that the reconnected endpoint is a master,
1975+ // and the correct one otherwise we should reconnect
1976+ if ( connection . GetServer ( e . EndPoint ) . IsSlave || e . EndPoint != currentSentinelMasterEndPoint )
1977+ {
1978+ // Wait for things to smooth out
1979+ Thread . Sleep ( 200 ) ;
1980+
1981+ // This isn't a master, so try connecting again
1982+ SwitchMaster ( e . EndPoint ) ;
1983+ }
1984+ }
1985+ catch ( Exception )
1986+ {
1987+ // If we get here it means that we tried to reconnect to a server that is no longer
1988+ // considered a master by Sentinel and was removed from the list of endpoints.
1989+
1990+ // Wait for things to smooth out
1991+ Thread . Sleep ( 200 ) ;
1992+
1993+ // If we caught an exception, we may have gotten a stale endpoint
1994+ // we are not aware of, so retry
1995+ SwitchMaster ( e . EndPoint ) ;
1996+ }
1997+ } ;
1998+
1999+ // If we lost the connection, run a switch to a least try and get updated info about the master
2000+ sentinelMasterConnection . ConnectionFailed += ( sender , e ) => {
2001+ // Periodically check to see if we can reconnect to the proper master.
2002+ // This is here in case we lost our subscription to a good sentinel instance
2003+ // or if we miss the published master change
2004+ if ( sentinelMasterReconnectTimer == null )
2005+ {
2006+ sentinelMasterReconnectTimer = new System . Timers . Timer ( 1000 ) ;
2007+ sentinelMasterReconnectTimer . AutoReset = true ;
2008+
2009+ sentinelMasterReconnectTimer . Elapsed += ( o , t ) => {
2010+ SwitchMaster ( e . EndPoint ) ;
2011+ } ;
2012+
2013+ sentinelMasterReconnectTimer . Start ( ) ;
2014+ }
2015+ } ;
2016+
2017+ // Perform the initial switchover
2018+ SwitchMaster ( configuration . EndPoints [ 0 ] , log ) ;
2019+ }
2020+ }
2021+ }
2022+
2023+ /// <summary>
2024+ /// Switches the SentinelMasterConnection over to a new master.
2025+ /// </summary>
2026+ /// <param name="switchBlame"></param>
2027+ internal void SwitchMaster ( EndPoint switchBlame , TextWriter log = null )
2028+ {
2029+ if ( log == null ) log = TextWriter . Null ;
2030+
2031+ ConnectionMultiplexer sconnection = sentinelMasterConnection ;
2032+ // Get new master
2033+ EndPoint masterEndPoint = null ;
2034+
2035+ do
2036+ {
2037+ masterEndPoint = GetConfiguredMasterForService ( ) ;
2038+ } while ( masterEndPoint == null ) ;
2039+
2040+ currentSentinelMasterEndPoint = masterEndPoint ;
2041+
2042+ if ( ! sconnection . servers . Contains ( masterEndPoint ) )
2043+ {
2044+ sconnection . configuration . EndPoints . Clear ( ) ;
2045+ sconnection . servers . Clear ( ) ;
2046+ sconnection . serverSnapshot = new ServerEndPoint [ 0 ] ;
2047+ sconnection . configuration . EndPoints . Add ( masterEndPoint ) ;
2048+ Trace ( string . Format ( "Switching master to {0}" , masterEndPoint ) ) ;
2049+ // Trigger a reconfigure
2050+ sconnection . ReconfigureAsync ( false , false , log , switchBlame , "master switch" , false , CommandFlags . PreferMaster ) . Wait ( ) ;
2051+ }
2052+
2053+ UpdateSentinelAddressList ( ) ;
2054+ }
2055+
2056+ internal void UpdateSentinelAddressList ( int timeoutmillis = 500 )
2057+ {
2058+ Task < EndPoint [ ] > [ ] sentinels = this . serverSnapshot
2059+ . Where ( s => s . ServerType == ServerType . Sentinel )
2060+ . Select ( s => this . GetServer ( s . EndPoint ) . SentinelGetSentinelAddresses ( RawConfig . ServiceName ) )
2061+ . ToArray ( ) ;
2062+
2063+ Task < Task < EndPoint [ ] > > firstCompleteRequest = WaitFirstNonNullIgnoreErrorsAsync ( sentinels ) ;
2064+
2065+ // Ignore errors, as having an updated sentinel list is
2066+ // not essential
2067+ if ( ! firstCompleteRequest . Wait ( timeoutmillis ) )
2068+ return ;
2069+ if ( firstCompleteRequest . Result . Result == null )
2070+ return ;
2071+
2072+ bool hasNew = false ;
2073+ foreach ( EndPoint newSentinel in firstCompleteRequest . Result . Result . Where ( x => ! configuration . EndPoints . Contains ( x ) ) )
2074+ {
2075+ hasNew = true ;
2076+ configuration . EndPoints . Add ( newSentinel ) ;
2077+ }
2078+
2079+ if ( hasNew )
2080+ {
2081+ // Reconfigure the sentinel multiplexer if we added new endpoints
2082+ ReconfigureAsync ( false , true , null , configuration . EndPoints [ 0 ] , "Updating Sentinel List" , false ) . Wait ( ) ;
2083+ }
2084+ }
18232085
18242086 /// <summary>
18252087 /// Close all connections and release all resources associated with this object
0 commit comments