Skip to content

Commit b60c01b

Browse files
committed
style
Signed-off-by: Yupeng Fu <yupeng@uber.com>
1 parent c855963 commit b60c01b

1 file changed

Lines changed: 16 additions & 15 deletions

File tree

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@
88

99
package org.opensearch.plugin.kinesis;
1010

11+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
12+
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
13+
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
14+
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
15+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
16+
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
17+
import software.amazon.awssdk.services.kinesis.model.Record;
18+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
19+
1120
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
1221
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1322
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
@@ -28,14 +37,6 @@
2837
import java.util.stream.Stream;
2938

3039
import org.testcontainers.containers.localstack.LocalStackContainer;
31-
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
32-
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
33-
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
34-
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
35-
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
36-
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
37-
import software.amazon.awssdk.services.kinesis.model.Record;
38-
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
3940

4041
import static org.hamcrest.Matchers.is;
4142
import static org.awaitility.Awaitility.await;
@@ -107,10 +108,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
107108
logger.info("Produced message with sequence number: {}", sequenceNumber);
108109
produceData("4", "name4", "21");
109110

110-
await()
111-
.atMost(5, TimeUnit.SECONDS)
112-
.until(() -> isRewinded(sequenceNumber));
113-
111+
await().atMost(5, TimeUnit.SECONDS).until(() -> isRewinded(sequenceNumber));
114112

115113
// create an index with ingestion source from kinesis
116114
createIndex(
@@ -142,8 +140,9 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
142140
}
143141

144142
private boolean isRewinded(String sequenceNumber) {
145-
DescribeStreamResponse describeStreamResponse =
146-
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(streamName).build());
143+
DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(
144+
DescribeStreamRequest.builder().streamName(streamName).build()
145+
);
147146

148147
String shardId = describeStreamResponse.streamDescription().shards().get(0).shardId();
149148

@@ -158,7 +157,9 @@ private boolean isRewinded(String sequenceNumber) {
158157
String shardIterator = iteratorResponse.shardIterator();
159158

160159
// Use the iterator to read the record
161-
GetRecordsRequest recordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).limit(1) // Adjust as needed
160+
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
161+
.shardIterator(shardIterator)
162+
.limit(1) // Adjust as needed
162163
.build();
163164

164165
GetRecordsResponse recordsResponse = kinesisClient.getRecords(recordsRequest);

0 commit comments

Comments
 (0)