Add Windmill support for MultimapState#23492
Conversation
Codecov Report
@@ Coverage Diff @@
## master #23492 +/- ##
==========================================
+ Coverage 72.82% 73.32% +0.50%
==========================================
Files 775 719 -56
Lines 102928 95799 -7129
==========================================
- Hits 74958 70249 -4709
+ Misses 26515 24239 -2276
+ Partials 1455 1311 -144
Flags with carried forward coverage won't be shown. Click here to find out more. see 274 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
bd6791a to
81bdf90
Compare
81bdf90 to
f48eeab
Compare
f48eeab to
add1c3f
Compare
add1c3f to
14a2006
Compare
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
this isn't a paginated read, can we set valuesCached?
Or shoudl comment be updated that there may be additional entries for the key.
There was a problem hiding this comment.
I think this is back to loading all the pages into the memory before iterating
Could you instead make it lazy by returning windmill non-cached data first (filtering cached or added to cached) then return the rest?
Iterables.concat(Iterables.transform( logic in entries.forEach),
some iterable that lazily calls mergedCacheEntries and iterates);
Maybe you can make some test for this by having the test have some iterable that would return GBs of data total which should be streamed through and not cause test to OOM?
56a57e7 to
230a03f
Compare
230a03f to
8fa3d3f
Compare
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
...ava/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternals.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| for (Windmill.TagMultimapFetchResponse tagMultimap : response.getTagMultimapsList()) { | ||
| // First check if it's keys()/entries() |
There was a problem hiding this comment.
Can always optimize it later
| private enum KeyExistence { | ||
| // this key is known to exist, it has at least 1 value in either localAdditions or windmill | ||
| KNOWN_EXIST, | ||
| // this key is known to be nonexistent, it has 0 value in both localAdditions and windmill |
| if (allKeysKnown) { | ||
| keyState = keyStateMap.get(structuralKey); | ||
| if (keyState == null || keyState.existence == KeyExistence.UNKNOWN_EXISTENCE) { | ||
| if (keyState != null) keyStateMap.remove(structuralKey); |
There was a problem hiding this comment.
I think it would be simpler to not remove in this read if we're not inserting.
It seems like we shouldn't have UNKNOWN_EXISTENCE in the map except when we construct new entries. But if that's not the case for some reason it seems simpler to just leave it as is here.
There was a problem hiding this comment.
UNKNOWN_EXISTENCE is needed in situations like when a key doesn't exist in the map, and we need to check windmill to find out its existence. Before the windmill read returns result, we need to mark it as UNKNOWN_EXISTENCE.
| ByteString encodedKey = keyStream.toByteStringAndReset(); | ||
| Windmill.TagMultimapEntry.Builder entryBuilder = builder.addUpdatesBuilder(); | ||
| entryBuilder.setEntryName(encodedKey); | ||
| entryBuilder.setDeleteAll(keyState.removedLocally); |
There was a problem hiding this comment.
I meant leave proto to default of false if false
if (keyState.removedLocally) entryBuilder.setDeleteAll(true);
| keyState.valuesSize = 0; | ||
| keyState.existence = KeyExistence.KNOWN_NONEXISTENT; | ||
| } else { | ||
| // no data in windmill, deleting from local cache is sufficient. |
There was a problem hiding this comment.
Thanks, I think that inversion is easier to read, can you use it and swap the cases?
But shouldn't such a key be KNOWN_NONEXISTENT and handled above?
Maybe the special handling can just be removed and can assert that it is in the expected KNOWN state?
| keyState.existence = KeyExistence.KNOWN_EXIST; | ||
| keyState.values.extendWith(entry.getValue()); | ||
| keyState.valuesSize += Iterables.size(entry.getValue()); | ||
| // We can't set keyState.valuesCached to true here, because there may be more |
There was a problem hiding this comment.
That is the values within the bag page. But the iterable which you are examining here is either
- those values and thus weighted if a single page
- or a PagingIterable (which is not weighted)
| entry.getValue().forEach(expectedMap.get(key)::add); | ||
| } | ||
| for (Map.Entry<ByteString, List<Integer>> entry : actual) { | ||
| assertThat( |
There was a problem hiding this comment.
explicit check that expectedMap contains the key?
to avoid weird possiblities that returning null from remove possibly matches some weird actual state.
| Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest2.build()); | ||
| Mockito.verify(mockWindmill).getStateData(COMPUTATION, expectedRequest3.build()); | ||
| Mockito.verifyNoMoreInteractions(mockWindmill); | ||
| // NOTE: The future will still contain a reference to the underlying reader. |
| assertEquals(0, commitBuilder.getValueUpdatesCount()); | ||
| } | ||
|
|
||
| private static <T> ByteString encodeWithCoder(T key, Coder<T> coder) throws IOException { |
There was a problem hiding this comment.
just rethrow as RuntimeException to avoid having to add IOException to everything when not expected
| } | ||
|
|
||
| @Test | ||
| public void testMultimapCachedPartialEntryCannotCachePolled() throws IOException { |
There was a problem hiding this comment.
test name is a little unclear
how about a comment explaining the case
There was a problem hiding this comment.
Renamed the test.
| ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), Context.OUTER); | ||
| assertTrue(Arrays.equals(key1, decodedKey)); | ||
| assertTrue(entryUpdate.getDeleteAll()); | ||
| } |
There was a problem hiding this comment.
these look good but with such a complex class it is hard to cover everything. A fuzz test might help ensure that we're not missing something. You could do something like:
have two maps and in a loop
- randomly make a modification to both maps (add value to 100 random keys, remove 100 random keys, clear map)
- perform operations like persist, read that shouldn't affect viewed entries on one of the maps randomly
And then at the end or periodically verify that both maps have the same observed state by iterating over both. Would be a nice sanity check that things are always consistent and we covered everything.
There was a problem hiding this comment.
Added a fuzz test to perform different modifications and verify the state.
|
@zhengbuqian Any updates on this PR? |
| } | ||
| Windmill.WorkItemCommitRequest.Builder commitBuilder = | ||
| Windmill.WorkItemCommitRequest.newBuilder(); | ||
| underTest.persist(commitBuilder); |
There was a problem hiding this comment.
can you also recreate the multimap after some rounds so that it has to initialize from persisted state as necessary? As is it is fully cached all the time and thus is likely not merging in-memory or on-disk state.
There was a problem hiding this comment.
Done. Now it clears cache after 100 rounds, and ran another 100 rounds. In between it calls keys() and get() to rebuild cache.
scwhittle
left a comment
There was a problem hiding this comment.
Thanks Buqian for all your work on this!
I'm not a committer so adding
R: @reuvenlax
|
Run Java PreCommit |
|
precommit failure looked unrelated so rerunning |
|
Run Java PreCommit |
|
@reuvenlax can you merge? |
|
thanks @scwhittle! If more changes are needed pls let me know, I'll work on fixes! |
This PR adds windmill legacy worker support for MultimapState.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.