Skip to content

Commit b37a41a

Browse files
committed
Allow to get the number of bytes queued in PendingWriteQueue
Motivation: For some use-cases it would be useful to know the number of bytes queued in the PendingWriteQueue without the need to dequeue them. Modifications: Add PendingWriteQueue.bytes(). Result: Be able to get the number of bytes queued.
1 parent ce95c50 commit b37a41a

2 files changed

Lines changed: 17 additions & 2 deletions

File tree

transport/src/main/java/io/netty/channel/PendingWriteQueue.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public final class PendingWriteQueue {
3737
private PendingWrite head;
3838
private PendingWrite tail;
3939
private int size;
40+
private long bytes;
4041

4142
public PendingWriteQueue(ChannelHandlerContext ctx) {
4243
if (ctx == null) {
@@ -63,6 +64,15 @@ public int size() {
6364
return size;
6465
}
6566

67+
/**
68+
* Returns the total number of bytes that are pending because of pending messages. This is only an estimate so
69+
* it should only be treated as a hint.
70+
*/
71+
public long bytes() {
72+
assert ctx.executor().inEventLoop();
73+
return bytes;
74+
}
75+
6676
/**
6777
* Add the given {@code msg} and {@link ChannelPromise}.
6878
*/
@@ -90,6 +100,7 @@ public void add(Object msg, ChannelPromise promise) {
90100
tail = write;
91101
}
92102
size ++;
103+
bytes += messageSize;
93104
// We need to guard against null as channel.unsafe().outboundBuffer() may returned null
94105
// if the channel was already closed when constructing the PendingWriteQueue.
95106
// See https://github.com/netty/netty/issues/3967
@@ -112,7 +123,7 @@ public void removeAndFailAll(Throwable cause) {
112123
for (PendingWrite write = head; write != null; write = head) {
113124
head = tail = null;
114125
size = 0;
115-
126+
bytes = 0;
116127
while (write != null) {
117128
PendingWrite next = write.next;
118129
ReferenceCountUtil.safeRelease(write.msg);
@@ -168,6 +179,7 @@ public ChannelFuture removeAndWriteAll() {
168179
// Guard against re-entrance by directly reset
169180
head = tail = null;
170181
size = 0;
182+
bytes = 0;
171183

172184
ChannelPromise p = ctx.newPromise();
173185
PromiseCombiner combiner = new PromiseCombiner();
@@ -252,10 +264,12 @@ private void recycle(PendingWrite write, boolean update) {
252264
// Guard against re-entrance by directly reset
253265
head = tail = null;
254266
size = 0;
267+
bytes = 0;
255268
} else {
256269
head = next;
257270
size --;
258-
assert size > 0;
271+
bytes -= writeSize;
272+
assert size > 0 && bytes >= 0;
259273
}
260274
}
261275

transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ private static void assertBuffer(EmbeddedChannel channel, ByteBuf buffer) {
172172
private static void assertQueueEmpty(PendingWriteQueue queue) {
173173
assertTrue(queue.isEmpty());
174174
assertEquals(0, queue.size());
175+
assertEquals(0, queue.bytes());
175176
assertNull(queue.current());
176177
assertNull(queue.removeAndWrite());
177178
assertNull(queue.removeAndWriteAll());

0 commit comments

Comments
 (0)