Skip to content

BUG: RefCount with disconnect delay and minObservers disconnects prematurely if observer count drops to zero and then goes back up #2215

@idg10

Description

@idg10

If we use the RefCount overload that takes a disconnectDelay argument, and a minObservers of 2 or more, it does not correctly handle a situation in which the active observer count drops to 0, but then goes back up to 1 (but does not go as high as minObservers) before the disconnectDelay occurs. When the delay expires, RefCount calls Dispose on the connection even though the connection should remain active.

This is not entirely obvious, because the new subscriber remains subscribed to the underlying source, even though RefCount has disposed the connection. To observe this, you need to watch when the connection is disposed, for which we can use this helper:

public class LoggingConnectableObservable<T>(IConnectableObservable<T> source) : IConnectableObservable<T>
{
    public List<Connection> Connections { get; } = new();

    public IDisposable Connect()
    {
        var c = new Connection(source.Connect());
        Connections.Add(c);
        return c;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(observer);
    }

    public class Connection(IDisposable underlying) : IDisposable
    {
        public bool Disposed { get; private set; }

        public void Dispose()
        {
            Disposed = true;
            underlying.Dispose();
        }
    }
}

With that in hand, we can now write this:

var source = new Subject<int>();
var co = new LoggingConnectableObservable<int>(source.Publish());
var rco = co.RefCount(2, TimeSpan.FromSeconds(1));

var s1 = rco.Subscribe();
var s2 = rco.Subscribe();
s1.Dispose();
s2.Dispose();

Thread.Sleep(500); // Not long enough for the disconnect delay to elapse

var s3 = rco.Subscribe();

Thread.Sleep(1100);

if (co.Connections[0].Disposed)
{
    Console.WriteLine("Connection terminated while one subscriber still active");
}

The third subscription starts before the disconnect delay occurs, bringing the subscription count back up to 1, which should mean we don't disconnect.

(Reasonable people might expect RefCount to disconnect here because the number of subscriptions is below minObservers. However, that constitute a misunderstanding because minObservers only specifies the criterion for connection, not for disconnection. It states only the threshold that must be reached for the Connect to occur. The connection count is allowed to drop below minObservers without triggering a disconnect. Historically, RefCount only disconnects once the number of observers drops to zero. For example, if we make a small modification to the example here by moving s2.Dispose(); so that it occurs after var s3 = rco.Subscribe();, this example will not disconnect, because the subscription count never drops to zero. The end state is the same: we end up with a single observer, one less than minObservers, but when doing things in that order, we don't get a disconnect, and that is the correct behaviour.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions