Skip to content
This repository was archived by the owner on Jul 15, 2018. It is now read-only.
This repository was archived by the owner on Jul 15, 2018. It is now read-only.

pubsub: make message tags available to subscribers #143

@silasdavis

Description

@silasdavis

tl;dr replace out chan<-interface{} with out chan<-messageAndTags

Problem

Consider the following interfaces replicated from the pubsub Sever:

type Subscribable interface {
	Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- interface{}) error
	Unsubscribe(ctx context.Context, subscriber string, query pubsub.Query) error
	UnsubscribeAll(ctx context.Context, subscriber string) error
}

type Publisher interface {
	Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error
}

Subscribers receive messages (of type interface{}) passed on a channel to them based on their query. We can tell that the tags the original Publisher opted for match our query, but our query might be broad, and we have no way of telling which tags matched. There's nothing that guarantees we can reproduce the tags by looking at the message (we might if we know something about its type). You could argue that tags are not part of the message and shouldn't change its meaning (or something like that), but I think that's an odd position (particularly given the usage in Tendermint - where in contains the domain level type/ID).

This is especially bad for my use case, where I want to forward all messages published on one pubsub to another with:

func PublishAll(subscribable Subscribable, ctx context.Context, subscriber string, query pubsub.Query,
	publisher Publisher, extraTags map[string]interface{}) error {

	return SubscribeCallback(subscribable, ctx, subscriber, query, func(message interface{}) {
		tags := make(map[string]interface{})
		// this is where I'd merge in the tags originally used with the message but I have no access to them
		for k,v := range extraTags {
			tags[k] = v
		}
		// Help! I can't tell which tags the original publisher used - so I can't forward them
		publisher.Publish(ctx, message, tags)
	})
}

To bulk forward I would use:

PublishAll(pubsubIn, ctx, "forwarder", query.empty{}, pubsubOut)

Where the situation is worst; the interface for reading messages means the tags are erased.

Possible solutions

The obvious solutions would be to provide a message type like:

type Message struct {
	Body interface{}
	Tags map[string]interface{}
}

// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- Message) error {
	...
} 

A solution I like better is to drop the message/tag distinction and make a message a key-value map, as if everything were a tag:

type Publisher interface {
	Publish(ctx context.Context, keyvals map[string]interface{}) error
}

You can still have a canonical key that is used for messages if required. This has the added bonus that when the message is a type supported by your query language you can also filter on message rather than only on distinguished tags. This starts to look quite a lot like your logging interface (probably because it is, and logs <-> events), which suggests:

type Publisher interface {
	Publish(ctx context.Context, keyvals... interface{}) error
}

// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- []interface) error {
	...
} 

This also means you can start to share output sinks and middlewares from you logging system with the event system (certainly works for me... I have plenty of things built for go-kit log's interface: https://github.com/hyperledger/burrow/blob/develop/logging/config/sinks.go). This is kind of a pleasing interface for callers too.

Another thing this interface gives you is an easier way to de-duplicate events because you can have a guaranteed order of tags/structured values if you wanted them - and hash them deterministically which is not immediately possible with a map.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions