Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,18 @@ public void ensurePersistedUpto(long seqNum) throws IOException{
* @throws IOException if an IO error occurs
*/
public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
final SerializedBatchHolder serializedBatchHolder;
lock.lock();
try {
Page p = nextReadPage();
return (isHeadPage(p) && p.isFullyRead()) ? null : readPageBatch(p, limit, 0L);
if (isHeadPage(p) && p.isFullyRead()) {
return null;
}
serializedBatchHolder = readPageBatch(p, limit, 0L);
} finally {
lock.unlock();
}
return serializedBatchHolder.deserialize();
}

/**
Expand All @@ -607,7 +612,11 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
* @throws QueueRuntimeException if queue is closed
* @throws IOException if an IO error occurs
*/
public synchronized Batch readBatch(int limit, long timeout) throws IOException {
public Batch readBatch(int limit, long timeout) throws IOException {
return readSerializedBatch(limit, timeout).deserialize();
}

private synchronized SerializedBatchHolder readSerializedBatch(int limit, long timeout) throws IOException {
lock.lock();

try {
Expand All @@ -618,15 +627,15 @@ public synchronized Batch readBatch(int limit, long timeout) throws IOException
}

/**
* read a {@link Batch} from the given {@link Page}. If the page is a head page, try to maximize the
* read a {@link SerializedBatchHolder} from the given {@link Page}. If the page is a head page, try to maximize the
* batch size by waiting for writes.
* @param p the {@link Page} to read from.
* @param limit size limit of the batch to read.
* @param timeout the maximum time to wait in milliseconds on write operations.
* @return {@link Batch} with read elements or null if nothing was read
* @throws IOException if an IO error occurs
*/
private Batch readPageBatch(Page p, int limit, long timeout) throws IOException {
private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException {
int left = limit;
final List<byte[]> elements = new ArrayList<>(limit);

Expand Down Expand Up @@ -678,7 +687,7 @@ private Batch readPageBatch(Page p, int limit, long timeout) throws IOException
removeUnreadPage(p);
}

return new Batch(elements, firstSeqNum, this);
return new SerializedBatchHolder(elements, firstSeqNum);
}

/**
Expand Down Expand Up @@ -894,4 +903,18 @@ private static boolean containsSeq(final Page page, final long seqNum) {
final long pMaxSeq = pMinSeq + (long) page.getElementCount();
return seqNum >= pMinSeq && seqNum < pMaxSeq;
}

class SerializedBatchHolder {
private final List<byte[]> elements;
private final long firstSeqNum;

private SerializedBatchHolder(List<byte[]> elements, long firstSeqNum) {
this.elements = elements;
this.firstSeqNum = firstSeqNum;
}

private Batch deserialize() {
return new Batch(elements, firstSeqNum, Queue.this);
}
}
}