Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.

pss: Improve pressure backstop queue handling #1680

Closed
kortatu wants to merge 9 commits intoethersphere:masterfrom
epiclabs-io:issue-1654
Closed

pss: Improve pressure backstop queue handling #1680
kortatu wants to merge 9 commits intoethersphere:masterfrom
epiclabs-io:issue-1654

Conversation

@kortatu
Copy link
Copy Markdown
Contributor

@kortatu kortatu commented Aug 20, 2019

We wanted forward pending messages to always have a slot booked in the outbox queue. Although we kept the channel to implement that queue, we first check if there is space to add a new message taking into account pending messages.
When a message is enqueued for the first time, the forwardPending variable is incremented, and if that message is sent or is discarded, forwardPending is decreased.
There is a new flag pending in the enqueue function. When that flag is true, it means that this message already has a booked slot in the channel (skipping the new capacity check). However, if the flag is false (in most cases) it is a new message and before adding it to the queue we need to check if there is actual space left ( forwardPending < defaultOutboxCapacity ).
This way the numbers of messages in the channel will be always <= forwardPending
Both the update of the forwardPending variable and the enqueue of the message are protected with a Mutex to allow asynchronous processing.
References issue #1654

@kortatu kortatu changed the title pss: Improve pressure backstop queue handling #1654 pss: Improve pressure backstop queue handling Aug 20, 2019
@nolash nolash self-requested a review August 20, 2019 12:42
Copy link
Copy Markdown
Member

@zelig zelig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if the big picture motivation is to be able to requeue a msg for retry even if a new message could not be enqueued.

i believe the cleanest way to do this is

outbox := make([]msg, queueCapacity)
slots := make(chan int, queueCapacity)
process :=make(chan int)

func enqueue(m msg) {
      ...
      select {
      case i := <- slots;
          outbox[i] = msg
          select {
          case process <- i:
          case <- quit:
          }
      default:
       // queue contention error
     } 
}

...

// each  worker can just reinsert  async
for i := range process {
    // send and forward
    // if fails to send then 
       select {
          case process <- i:
          case <- quit:
          }
    // if success give back the slot                      
   slots <- i // never blocks
} 
     


 

@kortatu kortatu requested a review from nolash August 23, 2019 08:49
Copy link
Copy Markdown
Contributor

@nolash nolash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you chose the solution with the mutexes. Which is fine; I already said feel free to choose between the alternatives.

I did have some second thoughts over the weekend though. I think the solution we should pick is the one that comsumes the least resources, especially since this code can get called a lot. Would you mind benchmarking this solution with a channel-based akin to what @zelig proposed please?

pss/pss.go Outdated
p.outboxMutex.Lock()
defer p.outboxMutex.Unlock()
pendingSize := p.getPending()
// Only allow defaultOutboxCapacity messages at most processed (both enqueued or being forwarded)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultOutbuxCapacity -> capacity of outbox

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in commit f73157b

Copy link
Copy Markdown
Contributor Author

@kortatu kortatu Aug 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds good to me.
The only thing is that we only implemented the mechanism to ensure that a reenqued message always have a slot booked in the outbox, we didn't implement parallel processing of messages.
I think that in order to test both solutions we shoyuld compare them with parallel processing in both cases.
Anyway, I will implement @zelig proposal (with the augmented buffer to avoid deadlocks) in a different branch to compare them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Should we change the label to in progress again maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In deed. Until we can't compare perfromance with the other solution it should be in progress.


metrics.GetOrRegisterCounter("pss.enqueue.outbox.full", nil).Inc(1)
if pending {
log.Crit("unexpected outbox full for pending message!")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Crit actually panics by its own

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIxed in commit 4e2aff6

}

reEnqueue := func(iteration int) {
time.Sleep(1 * time.Millisecond)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let's not use Sleeps in tests if we can help it.

pss/pss_test.go Outdated
topic := [4]byte{}
data := []byte{0x66, 0x6f, 0x6f}

msg := testMessage(messageAddr, topic, data)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can just have testRandomMessage and conceal the addr, topic, data within?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Fixed in commit 4e2aff6

Changed testMessage to testRandomMessage and conceal addr, topic and data inside
Added benchmark test for outbox message processing
@kortatu
Copy link
Copy Markdown
Contributor Author

kortatu commented Aug 27, 2019

Added parallelization of message forwarding. Now the main routine spawn a new sub-routine for each message extracted from outbox channel.
Also added benchmark test for processing 200000 incoming messages.

@kortatu
Copy link
Copy Markdown
Contributor Author

kortatu commented Aug 28, 2019

Added b.N loop in benchmark tests. Now results are more stable:

$ go test -v -bench=BenchmarkMessageProcessing -run=^$
goos: linux
goarch: amd64
pkg: github.com/ethersphere/swarm/pss
BenchmarkMessageProcessing/0.00_-4                     1        1426398612 ns/op        171242400 B/op   1792064 allocs/op
BenchmarkMessageProcessing/0.01_-4                     1        1073405959 ns/op        80046600 B/op    1528512 allocs/op
BenchmarkMessageProcessing/0.05_-4                     2         878509165 ns/op        67466820 B/op    1391229 allocs/op
PASS
ok      github.com/ethersphere/swarm/pss        5.414s

@kortatu kortatu requested a review from nolash August 28, 2019 10:26
@nolash
Copy link
Copy Markdown
Contributor

nolash commented Aug 30, 2019

obsoleted by #1695

@nolash nolash closed this Aug 30, 2019
@kortatu kortatu deleted the issue-1654 branch September 2, 2019 10:21
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants