Skip to content

[SPARK-49821][SS][PYTHON] Implement MapState and TTL support for TransformWithStateInPandas#48290

Closed
bogao007 wants to merge 25 commits intoapache:masterfrom
bogao007:map-state
Closed

[SPARK-49821][SS][PYTHON] Implement MapState and TTL support for TransformWithStateInPandas#48290
bogao007 wants to merge 25 commits intoapache:masterfrom
bogao007:map-state

Conversation

@bogao007
Copy link
Copy Markdown
Contributor

@bogao007 bogao007 commented Sep 28, 2024

What changes were proposed in this pull request?

  • Implement MapState and TTL support for TransformWithStateInPandas
  • Fixed an issue to properly closes/cleans up resources after arrow batch writes are completed in TransformWithStateInPandasStateServer. Since we use the same arrow batch write logic for both listState and mapState, this fix also applies to listState.

Why are the changes needed?

Bring parity to Scala on supported state variables

Does this PR introduce any user-facing change?

Yes

How was this patch tested?

Added new unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@bogao007 bogao007 changed the title [SPARK-49821] Implement MapState for TransformWithStateInPandas [SPARK-49821][SS][PYTHON] Implement MapState and TTL support for TransformWithStateInPandas Oct 2, 2024
iterator = self._stateful_processor_api_client._read_arrow_state()
batch = next(iterator)
pandas_df = batch.to_pandas()
data_batch = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confused here, this code snippet is trying to deal with multiple batches but only keep the data in the last batch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code would stuck forever after we added the arrow resource cleanup logic (I think it might be related to previous logic did not exhaust the iterator, though that iterator did only contain a single batch), hence using the recommended way to consume the arrow batches which is

for batch in iterator:
    ......

The logic is the same as the previous one, we only need to consume a single batch here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have short comment here to explain the iteration though we are not expecting multiple batches?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do iterate (consume) all batches if there are more than one but only take the first batch? Please leave code comment as @jingz-db suggested as it confuses people to think it might be a bug from looking into the code.

Also, when the iterator has multiple batches and how it is safe to ignore remaining and take only the first one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say I want to see this fix in separate PR, with relevant test which fails on master branch and passes with the fix. Let's scope the PR properly - the PR is aiming to add MapState with TTL.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK to have the fix in here, as the fix is applied to MapState impl as well.

Copy link
Copy Markdown
Contributor

@jingz-db jingz-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small nits and LGTM!

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Mostly minors and nits. Probably need some work to sort out/incorporate with #47878 once it is merged.

In overall, we'd probably need to spare a time in near future to revisit and refactor the code. I see non-trivial amount of redundant code in the files in TWS python impl. now.

iterator = self._stateful_processor_api_client._read_arrow_state()
batch = next(iterator)
pandas_df = batch.to_pandas()
data_batch = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do iterate (consume) all batches if there are more than one but only take the first batch? Please leave code comment as @jingz-db suggested as it confuses people to think it might be a bug from looking into the code.

Also, when the iterator has multiple batches and how it is safe to ignore remaining and take only the first one?

iterator = self._stateful_processor_api_client._read_arrow_state()
batch = next(iterator)
pandas_df = batch.to_pandas()
data_batch = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say I want to see this fix in separate PR, with relevant test which fails on master branch and passes with the fix. Let's scope the PR properly - the PR is aiming to add MapState with TTL.

}
}

private def sendIteratorAsArrowBatches[T](
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is also added from #47878 but different implementation. Do you plan to rebase this PR to apply the new implementation to timer as well once #47878 is merged?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if the timer PR is merged first, I can rebase and apply the update. Otherwise let's just keep the current implementation, we can do the rebase in the timer PR.

for s in iter:
ttl_list_state_count += s[0]
if self.ttl_map_state.exists():
ttl_map_state_count = self.ttl_map_state.get_value(key)[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double confirm, here the key does not necessarily need to be the same with grouping key, right? It's just to simplify the test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, correct.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

Let's rebase #47878 as we address the review comments here earlier.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

https://github.com/bogao007/spark/actions/runs/11433439704/job/31805443260
It only failed with org.apache.spark.sql.SparkSessionE2ESuite.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Thanks! Merging to master.

ericm-db pushed a commit to ericm-db/spark that referenced this pull request Oct 22, 2024
…sformWithStateInPandas

### What changes were proposed in this pull request?

- Implement MapState and TTL support for TransformWithStateInPandas
- Fixed an issue to properly closes/cleans up resources after arrow batch writes are completed in `TransformWithStateInPandasStateServer`. Since we use the same arrow batch write logic for both listState and mapState, this fix also applies to listState.

### Why are the changes needed?

Bring parity to Scala on supported state variables

### Does this PR introduce _any_ user-facing change?

Yes

### How was this patch tested?

Added new unit test.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48290 from bogao007/map-state.

Authored-by: bogao007 <bo.gao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants