Skip to content

Commit c76a342

Browse files
committed
use long as timestamp type
Signed-off-by: xuxiong1 <xiongxug@outlook.com>
1 parent a8e95f6 commit c76a342

5 files changed

Lines changed: 7 additions & 8 deletions

File tree

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,8 @@ public IngestionShardPointer latestPointer() {
150150
}
151151

152152
@Override
153-
public IngestionShardPointer pointerFromTimestampMillis(String timestampMillisStr) {
153+
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
154154
long offset = AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
155-
long timestampMillis = Long.parseLong(timestampMillisStr);
156155
Map<TopicPartition, OffsetAndTimestamp> position = consumer.offsetsForTimes(
157156
Collections.singletonMap(topicPartition, timestampMillis)
158157
);
@@ -166,7 +165,7 @@ public IngestionShardPointer pointerFromTimestampMillis(String timestampMillisSt
166165
return offsetAndTimestamp.offset();
167166
});
168167
if (offset < 0) {
169-
logger.warn("No message found for timestamp {}, fall back to auto.offset.reset policy", timestampMillisStr);
168+
logger.warn("No message found for timestamp {}, fall back to auto.offset.reset policy", timestampMillis);
170169
String autoOffsetResetConfig = config.getAutoOffsetResetConfig();
171170
if (OffsetResetStrategy.EARLIEST.toString().equals(autoOffsetResetConfig)) {
172171
logger.warn("The auto.offset.reset is set to earliest, seek to earliest pointer");
@@ -175,7 +174,7 @@ public IngestionShardPointer pointerFromTimestampMillis(String timestampMillisSt
175174
logger.warn("The auto.offset.reset is set to latest, seek to latest pointer");
176175
return latestPointer();
177176
} else {
178-
throw new IllegalArgumentException("No message found for timestamp " + timestampMillisStr);
177+
throw new IllegalArgumentException("No message found for timestamp " + timestampMillis);
179178
}
180179
}
181180
return new KafkaOffset(offset);

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testPointerFromTimestampMillis() {
9696
Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L))
9797
);
9898

99-
KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis("1000");
99+
KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000);
100100

101101
assertEquals(5L, offset.getOffset());
102102
}

server/src/main/java/org/opensearch/index/IngestionShardConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public M getMessage() {
8888
* @param timestampMillis the timestamp in milliseconds
8989
* @return the ingestion shard pointer corresponding to the given timestamp
9090
*/
91-
IngestionShardPointer pointerFromTimestampMillis(String timestampMillis);
91+
IngestionShardPointer pointerFromTimestampMillis(long timestampMillis);
9292

9393
/**
9494
* Returns an ingestion shard pointer based on the provided offset.

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ protected void startPoll() {
161161
logger.info("Resetting offset by seeking to offset {}", batchStartPointer.asString());
162162
break;
163163
case REWIND_BY_TIMESTAMP:
164-
batchStartPointer = consumer.pointerFromTimestampMillis(resetValue);
164+
batchStartPointer = consumer.pointerFromTimestampMillis(Long.parseLong(resetValue));
165165
logger.info(
166166
"Resetting offset by seeking to timestamp {}, corresponding offset {}",
167167
resetValue,

server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public FakeIngestionShardPointer latestPointer() {
9494
}
9595

9696
@Override
97-
public IngestionShardPointer pointerFromTimestampMillis(String timestampMillis) {
97+
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
9898
throw new UnsupportedOperationException("Not implemented yet.");
9999
}
100100

0 commit comments

Comments
 (0)