New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13553: add PAPI KV store tests for IQv2 #11624
Conversation
| // Randomizing the test cases in case some orderings interfere with each other. | ||
| // If you wish to reproduce a randomized order, copy the logged SEED and substitute | ||
| // it for the constant at the top of the file. This will cause exactly the same sequence | ||
| // of pseudorandom values to be generated. | ||
| Collections.shuffle(values, RANDOM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because it occurred to me that there's a small chance that the tests only pass in the order we generate them in. By shuffling the cases, we ensure that every possible ordering will eventually be run. The price we pay is that if there's an ordering that causes the test to fail, it will appear to be flaky. For that reason, we initialize the Random with a seed and log it. As the comment states, you can re-run the exact same ordering by setting SEED to the one that was logged.
| } else if (Objects.equals(kind, "DSL") && supplier instanceof WindowBytesStoreSupplier) { | ||
| setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, builder); | ||
| } else if (Objects.equals(kind, "PAPI") && supplier instanceof WindowBytesStoreSupplier) { | ||
| throw new AssumptionViolatedException("Case not implemented yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be filled in once there are window tests. cc @patrickstuedi
| } else if (Objects.equals(kind, "DSL") && supplier instanceof SessionBytesStoreSupplier) { | ||
| setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, builder); | ||
| } else if (Objects.equals(kind, "PAPI") && supplier instanceof SessionBytesStoreSupplier) { | ||
| throw new AssumptionViolatedException("Case not implemented yet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be filled in once there are session tests. cc @patrickstuedi
| final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized = | ||
| Materialized.as((KeyValueBytesStoreSupplier) supplier); | ||
| if (Objects.equals(kind, "DSL") && supplier instanceof KeyValueBytesStoreSupplier) { | ||
| setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was already too long to comfortably read. Doubling its length by adding PAPI was out of the question, so I extracted some methods.
|
Looks good to me, thanks @vvcephei ! |
| } else { | ||
| throw new AssertionError("Store supplier is an unrecognized type."); | ||
| materialized.withCachingDisabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withLoggingDisabled?
| if (log) { | ||
| materialized.withLoggingEnabled(Collections.emptyMap()); | ||
| } else { | ||
| materialized.withCachingDisabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withLoggingDisabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! Thanks for the catch!
| if (log) { | ||
| materialized.withLoggingEnabled(Collections.emptyMap()); | ||
| } else { | ||
| materialized.withCachingDisabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
| if (log) { | ||
| keyValueStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap()); | ||
| } else { | ||
| keyValueStoreStoreBuilder.withCachingDisabled(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
|
Hey @vvcephei , please feel free to merge after addressed the comments and resolved conflicts. |
|
Thanks, @guozhangwang ! |
|
Unrelated test failure: |
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds PAPI construction to the IQv2StoreIntegrationTest so that
we can ensure IQv2 works on every possible state store.
Committer Checklist (excluded from commit message)
The text was updated successfully, but these errors were encountered: