Send file chunks asynchronously in peer recovery#39769
Send file chunks asynchronously in peer recovery#39769dnhatn wants to merge 7 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/es-distributed |
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
original-brownbear
left a comment
There was a problem hiding this comment.
LGTM just a few comments/questions :)
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
s1monw
left a comment
There was a problem hiding this comment.
I left some comments and questions
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
| if (error.get() == null) { | ||
| cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); | ||
|
|
||
| synchronized FileChunk readChunk(final byte[] buffer) throws Exception { |
There was a problem hiding this comment.
I don't understand why we parallelize the reading on top of a single file and then synchronize all of it. This doesn't make sense to me. I think we should build the model on top of the file and chuck head of time. ie. if we want to read with N threads in parallel then chunk the file up in N pieces and send them all in parallel. That means we must write them in the correct places on the other side as well but blocking on the read side here is not making much sense to me.
There was a problem hiding this comment.
Another option is to have a multiplexer if you want to make use of the parallelism between sending and reading. We need some kind of threadpool and a task queue for that. Once I am done reading a chunk I put it on a queue and read the next chunk. Another worker can then pick it up and send it. If the queue fills up we add more threads until we saturate. Or we do reading and sending in the same thread but notify others that another chunk can be read. But there is so much blocking going on here I feel like we didn't make the right design decisions?
|
@s1monw Thanks for reviewing. We introduced SeqId to FileChunk in #36981. The idea was to send up to N consecutive file chunks without waiting for replies from the recovery target. We used SeqId instead of Semaphore to make sure that the recovery target won't buffer more than N chunks in memory in any situation. This change reduces the recovery time significantly without using any extra thread. This PR makes #36981 non-blocking with these keys: (1) maintain the recovery time, (2) does not require extra threads, (3) never block any thread. Here we use a semaphore to ensure that only one thread can read file chunks. Other threads can quickly check this condition and exit without being blocked. I hope this clarifies the approach. As I said in the PR description, I am open to suggestions. |
|
I looked at the non-blocking version and it's more intuitive here. I would still like to have a comment what we are trying to do with the seqIds etc. What confuses me is the partial synchronization |
| r -> { | ||
| recycledBuffers.addFirst(buffer); | ||
| requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId); | ||
| sendFileChunks(listener); |
There was a problem hiding this comment.
can we make sure we always fork here somehow? I am a bit worried that we are ending up with a stack overflow? Like we can assert that we don't have sendFileChunks in the stacktrace for instance.
|
I am closing this PR and will open a new one. @original-brownbear @s1monw Thanks for looking. |
With this change, peer recovery will send file chunks asynchronously and concurrently. Recovery with compression enabled should be faster with this implementation. I will run a recovery benchmark after we agree on the approach.
Relates #36981