2121import org .elasticsearch .common .bytes .ReleasableBytesReference ;
2222import org .elasticsearch .http .HttpBody ;
2323import org .elasticsearch .test .ESTestCase ;
24- import org .junit .Before ;
2524
2625import java .util .ArrayList ;
2726import java .util .concurrent .atomic .AtomicBoolean ;
@@ -33,20 +32,26 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase {
3332 Netty4HttpRequestBodyStream stream ;
3433 static HttpBody .ChunkHandler discardHandler = (chunk , isLast ) -> chunk .close ();
3534
36- @ Before
37- public void createStream () {
35+ @ Override
36+ public void setUp () throws Exception {
37+ super .setUp ();
3838 channel = new EmbeddedChannel ();
3939 stream = new Netty4HttpRequestBodyStream (channel );
4040 stream .setHandler (discardHandler ); // set default handler, each test might override one
41- channel .pipeline ().addLast (new SimpleChannelInboundHandler <HttpContent >() {
41+ channel .pipeline ().addLast (new SimpleChannelInboundHandler <HttpContent >(false ) {
4242 @ Override
4343 protected void channelRead0 (ChannelHandlerContext ctx , HttpContent msg ) {
44- msg .retain ();
4544 stream .handleNettyContent (msg );
4645 }
4746 });
4847 }
4948
49+ @ Override
50+ public void tearDown () throws Exception {
51+ super .tearDown ();
52+ stream .close ();
53+ }
54+
5055 // ensures that no chunks are sent downstream without request
5156 public void testEnqueueChunksBeforeRequest () {
5257 var totalChunks = randomIntBetween (1 , 100 );
@@ -63,6 +68,7 @@ public void testFlushAllReceivedChunks() {
6368 stream .setHandler ((chunk , isLast ) -> {
6469 chunks .add (chunk );
6570 totalBytes .addAndGet (chunk .length ());
71+ chunk .close ();
6672 });
6773
6874 var chunkSize = 1024 ;
@@ -84,6 +90,7 @@ public void testReadFromChannel() {
8490 stream .setHandler ((chunk , isLast ) -> {
8591 gotChunks .add (chunk );
8692 gotLast .set (isLast );
93+ chunk .close ();
8794 });
8895 channel .pipeline ().addFirst (new FlowControlHandler ()); // block all incoming messages, need explicit channel.read()
8996 var chunkSize = 1024 ;
0 commit comments