-
Notifications
You must be signed in to change notification settings - Fork 21
pubsub: make message tags available to subscribers #143
Description
tl;dr replace out chan<-interface{} with out chan<-messageAndTags
Problem
Consider the following interfaces replicated from the pubsub Sever:
type Subscribable interface {
Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query pubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
type Publisher interface {
Publish(ctx context.Context, message interface{}, tags map[string]interface{}) error
}Subscribers receive messages (of type interface{}) passed on a channel to them based on their query. We can tell that the tags the original Publisher opted for match our query, but our query might be broad, and we have no way of telling which tags matched. There's nothing that guarantees we can reproduce the tags by looking at the message (we might if we know something about its type). You could argue that tags are not part of the message and shouldn't change its meaning (or something like that), but I think that's an odd position (particularly given the usage in Tendermint - where in contains the domain level type/ID).
This is especially bad for my use case, where I want to forward all messages published on one pubsub to another with:
func PublishAll(subscribable Subscribable, ctx context.Context, subscriber string, query pubsub.Query,
publisher Publisher, extraTags map[string]interface{}) error {
return SubscribeCallback(subscribable, ctx, subscriber, query, func(message interface{}) {
tags := make(map[string]interface{})
// this is where I'd merge in the tags originally used with the message but I have no access to them
for k,v := range extraTags {
tags[k] = v
}
// Help! I can't tell which tags the original publisher used - so I can't forward them
publisher.Publish(ctx, message, tags)
})
}To bulk forward I would use:
PublishAll(pubsubIn, ctx, "forwarder", query.empty{}, pubsubOut)Where the situation is worst; the interface for reading messages means the tags are erased.
Possible solutions
The obvious solutions would be to provide a message type like:
type Message struct {
Body interface{}
Tags map[string]interface{}
}
// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- Message) error {
...
} A solution I like better is to drop the message/tag distinction and make a message a key-value map, as if everything were a tag:
type Publisher interface {
Publish(ctx context.Context, keyvals map[string]interface{}) error
}You can still have a canonical key that is used for messages if required. This has the added bonus that when the message is a type supported by your query language you can also filter on message rather than only on distinguished tags. This starts to look quite a lot like your logging interface (probably because it is, and logs <-> events), which suggests:
type Publisher interface {
Publish(ctx context.Context, keyvals... interface{}) error
}
// and Subscribe becomes:
func Subscribe(ctx context.Context, subscriber string, query pubsub.Query, out chan<- []interface) error {
...
} This also means you can start to share output sinks and middlewares from you logging system with the event system (certainly works for me... I have plenty of things built for go-kit log's interface: https://github.com/hyperledger/burrow/blob/develop/logging/config/sinks.go). This is kind of a pleasing interface for callers too.
Another thing this interface gives you is an easier way to de-duplicate events because you can have a guaranteed order of tags/structured values if you wanted them - and hash them deterministically which is not immediately possible with a map.