88
99package org .opensearch .rest ;
1010
11- import org .apache .hc .core5 .http .ConnectionClosedException ;
1211import org .opensearch .client .Request ;
1312import org .opensearch .client .Response ;
1413import org .opensearch .client .StreamingRequest ;
1514import org .opensearch .client .StreamingResponse ;
1615import org .opensearch .test .rest .OpenSearchRestTestCase ;
1716import org .junit .After ;
1817
18+ import java .io .IOException ;
1919import java .io .InterruptedIOException ;
20+ import java .io .UncheckedIOException ;
2021import java .nio .ByteBuffer ;
2122import java .nio .charset .StandardCharsets ;
2223import java .time .Duration ;
23- import java .util .concurrent .Executors ;
24- import java .util .concurrent .ScheduledExecutorService ;
25- import java .util .concurrent .TimeUnit ;
2624import java .util .concurrent .atomic .AtomicInteger ;
2725import java .util .stream .Stream ;
2826
2927import reactor .core .publisher .Flux ;
30- import reactor .test .subscriber .TestSubscriber ;
28+ import reactor .test .StepVerifier ;
29+ import reactor .test .scheduler .VirtualTimeScheduler ;
3130
32- import static org .hamcrest .CoreMatchers .anyOf ;
3331import static org .hamcrest .CoreMatchers .equalTo ;
34- import static org .hamcrest .CoreMatchers .instanceOf ;
35- import static org .hamcrest .CoreMatchers .not ;
36- import static org .hamcrest .collection .IsEmptyCollection .empty ;
3732
3833public class ReactorNetty4StreamingStressIT extends OpenSearchRestTestCase {
3934 @ After
@@ -49,6 +44,8 @@ public void tearDown() throws Exception {
4944 }
5045
5146 public void testCloseClientStreamingRequest () throws Exception {
47+ final VirtualTimeScheduler scheduler = VirtualTimeScheduler .create (true );
48+
5249 final AtomicInteger id = new AtomicInteger (0 );
5350 final Stream <String > stream = Stream .generate (
5451 () -> "{ \" index\" : { \" _index\" : \" test-stress-streaming\" , \" _id\" : \" "
@@ -57,39 +54,28 @@ public void testCloseClientStreamingRequest() throws Exception {
5754 + "{ \" name\" : \" josh\" }\n "
5855 );
5956
57+ final Duration delay = Duration .ofMillis (1 );
6058 final StreamingRequest <ByteBuffer > streamingRequest = new StreamingRequest <>(
6159 "POST" ,
6260 "/_bulk/stream" ,
63- Flux .fromStream (stream ).delayElements (Duration . ofMillis ( 500 ) ).map (s -> ByteBuffer .wrap (s .getBytes (StandardCharsets .UTF_8 )))
61+ Flux .fromStream (stream ).delayElements (delay , scheduler ).map (s -> ByteBuffer .wrap (s .getBytes (StandardCharsets .UTF_8 )))
6462 );
6563 streamingRequest .addParameter ("refresh" , "true" );
6664
6765 final StreamingResponse <ByteBuffer > streamingResponse = client ().streamRequest (streamingRequest );
68- TestSubscriber <ByteBuffer > subscriber = TestSubscriber .create ();
69- streamingResponse .getBody ().subscribe (subscriber );
70-
71- final ScheduledExecutorService executor = Executors .newSingleThreadScheduledExecutor ();
72- try {
73- // Await for subscriber to receive at least one chunk
74- assertBusy (() -> assertThat (subscriber .getReceivedOnNext (), not (empty ())));
75-
76- // Close client forceably
77- executor .schedule (() -> {
78- client ().close ();
79- return null ;
80- }, 2 , TimeUnit .SECONDS );
66+ scheduler .advanceTimeBy (delay ); /* emit first element */
8167
82- // Await for subscriber to terminate
83- subscriber . block ( Duration . ofSeconds ( 10 ));
84- assertThat (
85- subscriber . expectTerminalError (),
86- anyOf ( instanceOf ( InterruptedIOException . class ), instanceOf ( ConnectionClosedException . class ))
87- );
88- } finally {
89- executor . shutdown ();
90- if ( executor . awaitTermination ( 1 , TimeUnit . SECONDS ) == false ) {
91- executor . shutdownNow ();
92- }
93- }
68+ StepVerifier . create ( Flux . from ( streamingResponse . getBody ()). map ( b -> new String ( b . array (), StandardCharsets . UTF_8 )))
69+ . expectNextMatches ( s -> s . contains ( " \" result \" : \" created \" " ) && s . contains ( " \" _id \" : \" 1 \" " ))
70+ . then (() -> {
71+ try {
72+ client (). close ();
73+ } catch ( final IOException ex ) {
74+ throw new UncheckedIOException ( ex );
75+ }
76+ })
77+ . then (() -> scheduler . advanceTimeBy ( delay ))
78+ . expectErrorMatches ( t -> t instanceof InterruptedIOException )
79+ . verify ();
9480 }
9581}
0 commit comments