KAFKA-3522: Interactive Queries must return timestamped stores#6661
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei @ableegoldman @cadonna @abbccdda |
|
Minor: s/KAFAK-3522/KAFKA-3522 |
|
Java11 failed with env issue. Java8 passed. Waiting for a review before retriggering the tests. |
bbejeck
left a comment
There was a problem hiding this comment.
I've made a pass I just have a couple of minor comments otherwise this LGTM
| if (!store.isOpen()) { | ||
| throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); | ||
| } | ||
| if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { |
There was a problem hiding this comment.
nit: Since TimestampedKeyValueStore is always present in the if statement, I'm wondering if
if (store instanceof TimestampedKeyValueStore ) {
if( store instanceof QueryableStoreTypes.KeyValueStoreType) {
return ....
}
if( store instanceof QueryableStoreTypes.WindowStoreType) {
return ....
}
}Would be easier to read. But this is a subjective comment, so feel free to accept or not.
There was a problem hiding this comment.
The second check is actually using TimestampedWindowStore, not TimestampedKeyValueStore. Or do I miss-understand your comment?
| " because the store is not open. The state store may have migrated to another instances."); | ||
| } | ||
| stores.add((T) store); | ||
| if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { |
There was a problem hiding this comment.
TimestampedKeyValueStore vs TimestampedWindowStore -- or please elaborate.
Or do you refer to missing tests? Those seems to be there actually:
shouldFindTimestampedKeyValueStoresAsKeyValueStores()shouldFindTimestampedWindowStoresAsWindowStore()
There was a problem hiding this comment.
referring to my previous comments about the if/else but again I read to quickly
| } | ||
| if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { | ||
| return (List<T>) Collections.singletonList(new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object, Object>) store)); | ||
| } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { |
There was a problem hiding this comment.
super nit: while I don't believe in 100% test coverage, this line isn't covered by the tests
There was a problem hiding this comment.
Good catch! This slipped. Will add a test (I do believe ;) )
|
Updated this. |
|
Java11 failed. Test results not available any longer. Java8 passed. Retest this please. |
| if (!store.isOpen()) { | ||
| throw new InvalidStateStoreException("the state store, " + storeName + ", is not open."); | ||
| } | ||
| if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { |
| " because the store is not open. The state store may have migrated to another instances."); | ||
| } | ||
| stores.add((T) store); | ||
| if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { |
There was a problem hiding this comment.
referring to my previous comments about the if/else but again I read to quickly
|
Merged #6661 into trunk. |
…s-hashcode * apache-github/trunk: KAFKA-8158: Add EntityType for Kafka RPC fields (apache#6503) MINOR: correctly parse version OffsetCommitResponse version < 3 KAFKA-8284: enable static membership on KStream (apache#6673) KAFKA-8304: Fix registration of Connect REST extensions (apache#6651) KAFKA-8275; Take throttling into account when choosing least loaded node (apache#6619) KAFKA-3522: Interactive Queries must return timestamped stores (apache#6661) MINOR: MetricsIntegrationTest should set StreamsConfig.STATE_DIR_CONFIG (apache#6687) MINOR: Remove unused field in `ListenerConnectionQuota` KAFKA-8131; Move --version implementation into CommandLineUtils (apache#6481) KAFKA-8056; Use automatic RPC generation for FindCoordinator (apache#6408) MINOR: Remove workarounds for lz4-java bug affecting byte buffers (apache#6679) KAFKA-7455: Support JmxTool to connect to a secured RMI port. (apache#5968) MINOR: Document improvement (apache#6682) MINOR: Fix ThrottledReplicaListValidator doc error. (apache#6537) KAFKA-8306; Initialize log end offset accurately when start offset is non-zero (apache#6652)
…e#6661) Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)