pss: Improve pressure backstop queue handling #1680
pss: Improve pressure backstop queue handling #1680kortatu wants to merge 9 commits intoethersphere:masterfrom
Conversation
…to ensure channel capacity for pending messages
zelig
left a comment
There was a problem hiding this comment.
so if the big picture motivation is to be able to requeue a msg for retry even if a new message could not be enqueued.
i believe the cleanest way to do this is
outbox := make([]msg, queueCapacity)
slots := make(chan int, queueCapacity)
process :=make(chan int)
func enqueue(m msg) {
...
select {
case i := <- slots;
outbox[i] = msg
select {
case process <- i:
case <- quit:
}
default:
// queue contention error
}
}
...
// each worker can just reinsert async
for i := range process {
// send and forward
// if fails to send then
select {
case process <- i:
case <- quit:
}
// if success give back the slot
slots <- i // never blocks
}
…ss struct. New tests for enqueueing
…l for a reenqued message
There was a problem hiding this comment.
I see you chose the solution with the mutexes. Which is fine; I already said feel free to choose between the alternatives.
I did have some second thoughts over the weekend though. I think the solution we should pick is the one that comsumes the least resources, especially since this code can get called a lot. Would you mind benchmarking this solution with a channel-based akin to what @zelig proposed please?
pss/pss.go
Outdated
| p.outboxMutex.Lock() | ||
| defer p.outboxMutex.Unlock() | ||
| pendingSize := p.getPending() | ||
| // Only allow defaultOutboxCapacity messages at most processed (both enqueued or being forwarded) |
There was a problem hiding this comment.
defaultOutbuxCapacity -> capacity of outbox
There was a problem hiding this comment.
It sounds good to me.
The only thing is that we only implemented the mechanism to ensure that a reenqued message always have a slot booked in the outbox, we didn't implement parallel processing of messages.
I think that in order to test both solutions we shoyuld compare them with parallel processing in both cases.
Anyway, I will implement @zelig proposal (with the augmented buffer to avoid deadlocks) in a different branch to compare them.
There was a problem hiding this comment.
Great. Should we change the label to in progress again maybe?
There was a problem hiding this comment.
In deed. Until we can't compare perfromance with the other solution it should be in progress.
|
|
||
| metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1) | ||
| if pending { | ||
| log.Crit("unexpected outbox full for pending message!") |
There was a problem hiding this comment.
log.Crit actually panics by its own
| } | ||
|
|
||
| reEnqueue := func(iteration int) { | ||
| time.Sleep(1 * time.Millisecond) |
There was a problem hiding this comment.
Please let's not use Sleeps in tests if we can help it.
pss/pss_test.go
Outdated
| topic := [4]byte{} | ||
| data := []byte{0x66, 0x6f, 0x6f} | ||
|
|
||
| msg := testMessage(messageAddr, topic, data) |
There was a problem hiding this comment.
Maybe we can just have testRandomMessage and conceal the addr, topic, data within?
Changed testMessage to testRandomMessage and conceal addr, topic and data inside
Added benchmark test for outbox message processing
|
Added parallelization of message forwarding. Now the main routine spawn a new sub-routine for each message extracted from outbox channel. |
|
Added b.N loop in benchmark tests. Now results are more stable: $ go test -v -bench=BenchmarkMessageProcessing -run=^$
goos: linux
goarch: amd64
pkg: github.com/ethersphere/swarm/pss
BenchmarkMessageProcessing/0.00_-4 1 1426398612 ns/op 171242400 B/op 1792064 allocs/op
BenchmarkMessageProcessing/0.01_-4 1 1073405959 ns/op 80046600 B/op 1528512 allocs/op
BenchmarkMessageProcessing/0.05_-4 2 878509165 ns/op 67466820 B/op 1391229 allocs/op
PASS
ok github.com/ethersphere/swarm/pss 5.414s
|
|
obsoleted by #1695 |
We wanted forward pending messages to always have a slot booked in the outbox queue. Although we kept the channel to implement that queue, we first check if there is space to add a new message taking into account pending messages.
When a message is enqueued for the first time, the forwardPending variable is incremented, and if that message is sent or is discarded, forwardPending is decreased.
There is a new flag pending in the enqueue function. When that flag is true, it means that this message already has a booked slot in the channel (skipping the new capacity check). However, if the flag is false (in most cases) it is a new message and before adding it to the queue we need to check if there is actual space left ( forwardPending < defaultOutboxCapacity ).
This way the numbers of messages in the channel will be always <= forwardPending
Both the update of the forwardPending variable and the enqueue of the message are protected with a Mutex to allow asynchronous processing.
References issue #1654