88
99package org .opensearch .plugin .kinesis ;
1010
11+ import software .amazon .awssdk .services .kinesis .model .DescribeStreamRequest ;
12+ import software .amazon .awssdk .services .kinesis .model .DescribeStreamResponse ;
13+ import software .amazon .awssdk .services .kinesis .model .GetRecordsRequest ;
14+ import software .amazon .awssdk .services .kinesis .model .GetRecordsResponse ;
15+ import software .amazon .awssdk .services .kinesis .model .GetShardIteratorRequest ;
16+ import software .amazon .awssdk .services .kinesis .model .GetShardIteratorResponse ;
17+ import software .amazon .awssdk .services .kinesis .model .Record ;
18+ import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
19+
1120import org .opensearch .action .admin .cluster .node .info .NodeInfo ;
1221import org .opensearch .action .admin .cluster .node .info .NodesInfoRequest ;
1322import org .opensearch .action .admin .cluster .node .info .NodesInfoResponse ;
2837import java .util .stream .Stream ;
2938
3039import org .testcontainers .containers .localstack .LocalStackContainer ;
31- import software .amazon .awssdk .services .kinesis .model .DescribeStreamRequest ;
32- import software .amazon .awssdk .services .kinesis .model .DescribeStreamResponse ;
33- import software .amazon .awssdk .services .kinesis .model .GetRecordsRequest ;
34- import software .amazon .awssdk .services .kinesis .model .GetRecordsResponse ;
35- import software .amazon .awssdk .services .kinesis .model .GetShardIteratorRequest ;
36- import software .amazon .awssdk .services .kinesis .model .GetShardIteratorResponse ;
37- import software .amazon .awssdk .services .kinesis .model .Record ;
38- import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
3940
4041import static org .hamcrest .Matchers .is ;
4142import static org .awaitility .Awaitility .await ;
@@ -107,10 +108,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
107108 logger .info ("Produced message with sequence number: {}" , sequenceNumber );
108109 produceData ("4" , "name4" , "21" );
109110
110- await ()
111- .atMost (5 , TimeUnit .SECONDS )
112- .until (() -> isRewinded (sequenceNumber ));
113-
111+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> isRewinded (sequenceNumber ));
114112
115113 // create an index with ingestion source from kinesis
116114 createIndex (
@@ -142,8 +140,9 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
142140 }
143141
144142 private boolean isRewinded (String sequenceNumber ) {
145- DescribeStreamResponse describeStreamResponse =
146- kinesisClient .describeStream (DescribeStreamRequest .builder ().streamName (streamName ).build ());
143+ DescribeStreamResponse describeStreamResponse = kinesisClient .describeStream (
144+ DescribeStreamRequest .builder ().streamName (streamName ).build ()
145+ );
147146
148147 String shardId = describeStreamResponse .streamDescription ().shards ().get (0 ).shardId ();
149148
@@ -158,7 +157,9 @@ private boolean isRewinded(String sequenceNumber) {
158157 String shardIterator = iteratorResponse .shardIterator ();
159158
160159 // Use the iterator to read the record
161- GetRecordsRequest recordsRequest = GetRecordsRequest .builder ().shardIterator (shardIterator ).limit (1 ) // Adjust as needed
160+ GetRecordsRequest recordsRequest = GetRecordsRequest .builder ()
161+ .shardIterator (shardIterator )
162+ .limit (1 ) // Adjust as needed
162163 .build ();
163164
164165 GetRecordsResponse recordsResponse = kinesisClient .getRecords (recordsRequest );
0 commit comments