[SPARK-49821][SS][PYTHON] Implement MapState and TTL support for TransformWithStateInPandas#48290
[SPARK-49821][SS][PYTHON] Implement MapState and TTL support for TransformWithStateInPandas#48290bogao007 wants to merge 25 commits intoapache:masterfrom
Conversation
… interfere with each other
| iterator = self._stateful_processor_api_client._read_arrow_state() | ||
| batch = next(iterator) | ||
| pandas_df = batch.to_pandas() | ||
| data_batch = None |
There was a problem hiding this comment.
A bit confused here, this code snippet is trying to deal with multiple batches but only keep the data in the last batch?
There was a problem hiding this comment.
The previous code would stuck forever after we added the arrow resource cleanup logic (I think it might be related to previous logic did not exhaust the iterator, though that iterator did only contain a single batch), hence using the recommended way to consume the arrow batches which is
for batch in iterator:
......
The logic is the same as the previous one, we only need to consume a single batch here.
There was a problem hiding this comment.
Can we have short comment here to explain the iteration though we are not expecting multiple batches?
There was a problem hiding this comment.
So we do iterate (consume) all batches if there are more than one but only take the first batch? Please leave code comment as @jingz-db suggested as it confuses people to think it might be a bug from looking into the code.
Also, when the iterator has multiple batches and how it is safe to ignore remaining and take only the first one?
There was a problem hiding this comment.
I'd say I want to see this fix in separate PR, with relevant test which fails on master branch and passes with the fix. Let's scope the PR properly - the PR is aiming to add MapState with TTL.
There was a problem hiding this comment.
It's OK to have the fix in here, as the fix is applied to MapState impl as well.
sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto
Outdated
Show resolved
Hide resolved
sql/core/src/main/java/org/apache/spark/sql/execution/streaming/StateMessage.proto
Outdated
Show resolved
Hide resolved
jingz-db
left a comment
There was a problem hiding this comment.
One small nits and LGTM!
There was a problem hiding this comment.
First pass. Mostly minors and nits. Probably need some work to sort out/incorporate with #47878 once it is merged.
In overall, we'd probably need to spare a time in near future to revisit and refactor the code. I see non-trivial amount of redundant code in the files in TWS python impl. now.
| iterator = self._stateful_processor_api_client._read_arrow_state() | ||
| batch = next(iterator) | ||
| pandas_df = batch.to_pandas() | ||
| data_batch = None |
There was a problem hiding this comment.
So we do iterate (consume) all batches if there are more than one but only take the first batch? Please leave code comment as @jingz-db suggested as it confuses people to think it might be a bug from looking into the code.
Also, when the iterator has multiple batches and how it is safe to ignore remaining and take only the first one?
| iterator = self._stateful_processor_api_client._read_arrow_state() | ||
| batch = next(iterator) | ||
| pandas_df = batch.to_pandas() | ||
| data_batch = None |
There was a problem hiding this comment.
I'd say I want to see this fix in separate PR, with relevant test which fails on master branch and passes with the fix. Let's scope the PR properly - the PR is aiming to add MapState with TTL.
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| private def sendIteratorAsArrowBatches[T]( |
There was a problem hiding this comment.
yeah, if the timer PR is merged first, I can rebase and apply the update. Otherwise let's just keep the current implementation, we can do the rebase in the timer PR.
...main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala
Outdated
Show resolved
Hide resolved
| for s in iter: | ||
| ttl_list_state_count += s[0] | ||
| if self.ttl_map_state.exists(): | ||
| ttl_map_state_count = self.ttl_map_state.get_value(key)[0] |
There was a problem hiding this comment.
just to double confirm, here the key does not necessarily need to be the same with grouping key, right? It's just to simplify the test.
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1 pending CI
Let's rebase #47878 as we address the review comments here earlier.
|
https://github.com/bogao007/spark/actions/runs/11433439704/job/31805443260 |
|
Thanks! Merging to master. |
…sformWithStateInPandas ### What changes were proposed in this pull request? - Implement MapState and TTL support for TransformWithStateInPandas - Fixed an issue to properly closes/cleans up resources after arrow batch writes are completed in `TransformWithStateInPandasStateServer`. Since we use the same arrow batch write logic for both listState and mapState, this fix also applies to listState. ### Why are the changes needed? Bring parity to Scala on supported state variables ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added new unit test. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#48290 from bogao007/map-state. Authored-by: bogao007 <bo.gao@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
TransformWithStateInPandasStateServer. Since we use the same arrow batch write logic for both listState and mapState, this fix also applies to listState.Why are the changes needed?
Bring parity to Scala on supported state variables
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
Added new unit test.
Was this patch authored or co-authored using generative AI tooling?
No