Skip to content
This repository was archived by the owner on Jul 15, 2018. It is now read-only.
This repository was archived by the owner on Jul 15, 2018. It is now read-only.

pubsub: use non-blocking send when dispatching messages #153

@silasdavis

Description

@silasdavis

tl;dr use non-blocking send and let subscribers miss messages if they are too slow to or forget to drain channel rather than punishing all subscribers by blocking

Currently Publish can avoid blocking on the input side by virtue of the the cmds buffer and by setting a timeout with the context. However if subscriber fails to read one of its out channels then it can block all other subscribers from receiving any messages because a blocking send is used in send:

func (state *state) send(msg interface{}, tags map[string]interface{}) {
	for q, clientToChannelMap := range state.queries {
		if q.Matches(tags) {
			for _, ch := range clientToChannelMap {
				ch <- msg
			}
		}
	}
}

Instead I think we should use a non-blocking send to a buffered channel:

	for _, ch := range clientToChannelMap {
		select {
		case ch <- msg:
		default:
			// implement retry logic, or probably just do nothing,
			// drop the message for this subscriber on this query
			// it seems fair if it couldn't keep up with its buffer
		}
	}

This only really works if ch is buffered by default.

I also think we should change the signature of Subscribe from:

Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error

To one of:

// Return a buffered channel that is Server's responsibility to close
Subscribe(ctx context.Context, clientID string, query Query) (chan<- interface{}, error)

There are two reasons for this:

  1. So that we can rely on the non-blocking send delivering when subscribers are draining their channels quickly enough and not allow one subscriber to starve another which may know nothing about.

  2. Since pubsub.Server takes responsibility for calling close on the output channels it would be more idiomatic if it also created them. Currently it is not clear who's responsibility it is to close the channel. Since I created it I might think I need to both call Unsubscribe and close the channel but this would cause a panic (calling closed on a closed channel). It makes sense for the server to close the channel but it should also be the one that creates it (I assume that it is the way it is to allow the buffer to be set by the caller, but we can handle this with an argument to subscribe as above).

We can implement Option just like BufferCapacity that sets the size of the buffer on the output channels. If we want the same flexibility in setting the output channel buffer sizes per subscribe we can add a SubscribeWithBuffer function or similar that takes an int for the buffer size, but I'm not sure it's necessary.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions