Skip to content

[SPARK-49513][SS] Add Support for timer in transformWithStateInPandas API #47878

Closed
jingz-db wants to merge 17 commits intoapache:masterfrom
jingz-db:python-timer-impl
Closed

[SPARK-49513][SS] Add Support for timer in transformWithStateInPandas API #47878
jingz-db wants to merge 17 commits intoapache:masterfrom
jingz-db:python-timer-impl

Conversation

@jingz-db
Copy link
Copy Markdown
Contributor

@jingz-db jingz-db commented Aug 26, 2024

What changes were proposed in this pull request?

Support for timer in TransformWithStateInPandas Python API.

Why are the changes needed?

To couple with Scala API, TransformWithStateInPandas should also support processing/event time timer for arbitrary state.

Does this PR introduce any user-facing change?

Yes. Users can now interact with timers from handleInputRows with two addtional parameters as:

def handleInputRows(
        self, key: Any, rows: Iterator["PandasDataFrameLike"], 
          timer_values: TimerValues,
          expired_timer_info: ExpiredTimerInfo)

And user can interact with a newly introduce TimerValues to get processing/event time for current batch:

class TimerValues:
    def get_current_processing_time_in_ms(self) -> int

    def get_current_watermark_in_ms(self) -> int

Users can also interact with expired_timer_info to get the timestamp for expired timers:

class ExpiredTimerInfo:
    def is_valid(self) -> bool

    def get_expiry_time_in_ms(self) ->

How was this patch tested?

Unit tests in TransformWithStateInPandasStateServerSuite and integration tests in test_pandas_transform_with_state.py.

Was this patch authored or co-authored using generative AI tooling?

No.

@jingz-db jingz-db changed the title compiling [SS] Add Support for timer in transformWithStateInPandas API Aug 28, 2024
@jingz-db jingz-db changed the title [SS] Add Support for timer in transformWithStateInPandas API [SPARK-49513][SS] Add Support for timer in transformWithStateInPandas API Sep 4, 2024
@jingz-db jingz-db marked this pull request as ready for review September 4, 2024 17:25
Comment on lines +506 to +507
batch_timestamp = statefulProcessorApiClient.get_batch_timestamp()
watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these 2 API calls inside the in-else clause below and only call it with supported time mode?

Copy link
Copy Markdown
Contributor Author

@jingz-db jingz-db Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to have some values to initialize the TimerValues in the handleInputRows. On Scala side, we will always pass the real timestamp into TimerValues even timer is not defined: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala#L250

It is unlikely that users will call TimerValues if they do not have timer registered, but my original intention is to align the behavior with the Scala side. I guess here we might need to decide between saving a call and aligning with Scala side. I don't have a strong opinion on which is better. Which approach do you prefer?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC these 2 values are only being used when time mode is not none, I was meaning that for none time mode, we don't need these 2 extra API calls since it's not needed anyway

if timeMode == "processingtime" and expiry_timestamp < batch_timestamp:
result_iter_list.append(statefulProcessor.handleInputRows(
(key_obj,), iter([]),
TimerValues(batch_timestamp, watermark_timestamp),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is watermark_timestamp needed for processingTime time mode and vise versa?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same above, this is to couple with behavior on Scala side.

ExpiredTimerInfo(True, expiry_timestamp)))

# TODO(SPARK-49603) set the handle state in the lazily initialized iterator
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a TODO here, we can remove the commented code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if len(response_message[2]) == 0:
return -1
# TODO: can we simply parse from utf8 string here?
timestamp = int(response_message[2])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious: would this return the correct value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a row schema and use CPickleSerializer seems a bit heavy-weighted. Modified this to pass a byte buffer of exact 8 bytes and read exactly 8 bytes on Python client.

return []
elif status == 0:
iterator = self._read_arrow_state()
batch = next(iterator)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect all the timers can be stored within a single arrow batch? If not, should we handle it properly here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. It is now returning an iterator of List. Do you think this API makes sense to you?

while (iter.hasNext) {
val timestamp = iter.next()
val internalRow = InternalRow(timestamp)
arrowStreamWriter.writeRow(internalRow)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the python side comment: here we don't limit how many arrow batches we construct for timers, if user sets a fairly low value for arrowTransformWithStateInPandasMaxRecordsPerBatch, we would send multiple arrow batches and client side needs to handle this properly as well.

Question: should we have a lower limit on how many records we send throw a single batch (e.g. the default value 10000)? IIUC, each timer record is very small and should not consume a lot of memory. The user also doesn't care about how many records each batch contains since they would always get a single list from this API.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'll rebase on your ListState PR change and this arrowTransformWithStateInPandasMaxRecordsPerBatch will be passed as the new config you'll add here: https://github.com/apache/spark/pull/47933/files#diff-0b0aaf91850194b6980b75d47bc166148566cbdc1b17b3da16faff1f0740e0f4R107.
But your concern above still holds. Should we pass a different default value for transmitting the list[Int] here? If so, should we add a new config or shall we just assign a fixed value for it?

val allocator = ArrowUtils.rootAllocator.newChildAllocator(
s"stdout writer for transformWithStateInPandas state socket", 0, Long.MaxValue)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
new BaseStreamingArrowWriter(root, new ArrowStreamWriter(root, null, outputStream),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to abstract this logic out since it's being used in multiple places?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Create a util object and put all python related writer functions into the Util object.

batch = next(iterator)
result_list = []
key_fields = [field.name for field in self.key_schema.fields]
# TODO any better way to restore a grouping object from a batch?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bogao007 Is there any common practice for deserializing data from a batch object to the Python object for grouping key?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe take a look at how load_stream is implemented in ApplyInPandasWithStateSerializer and TransformWithStateInPandasSerializer in pyspark/sql/pandas/serializers.py. (and maybe some other customize serializers in the same file)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe try something like below?

df.itertuples(index=False, name=None)]

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.itertuples.html

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, what if multiple batches are being sent from JVM, are we handling it correctly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed with Bo offline, JVM will return Row type to Python and we can directly convert it into Tuple.

@jingz-db jingz-db requested a review from bogao007 September 17, 2024 21:42
Comment on lines +506 to +507
batch_timestamp = statefulProcessorApiClient.get_batch_timestamp()
watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC these 2 values are only being used when time mode is not none, I was meaning that for none time mode, we don't need these 2 extra API calls since it's not needed anyway

batch = next(iterator)
result_list = []
key_fields = [field.name for field in self.key_schema.fields]
# TODO any better way to restore a grouping object from a batch?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe try something like below?

df.itertuples(index=False, name=None)]

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.itertuples.html

batch = next(iterator)
result_list = []
key_fields = [field.name for field in self.key_schema.fields]
# TODO any better way to restore a grouping object from a batch?
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, what if multiple batches are being sent from JVM, are we handling it correctly?

outputStream.write(responseMessageBytes)
}

def serializeLongToByteString(longValue: Long): ByteString = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may bring some extra complexity to do serde between long and ByteString. Since this is only used in TimerValueRequests, maybe we could add a dedicated response message for it which returns a long value? That way we can just use read_long on python side.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new type of StateResponse to transmit Long type directly in proto message.

Copy link
Copy Markdown
Contributor

@bogao007 bogao007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, left one minor comment regarding arrow resources clean up. Thanks for making the changes!

arrowStreamWriter.writeRow(internalRow)
}
arrowStreamWriter.finalizeCurrentArrowBatch()
writer.end()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: We might need to do something similar to what PythonArrowInput does to ensure we don't see unexpected errors

protected def close(): Unit = {
Utils.tryWithSafeFinally {
// end writes footer to the output stream and doesn't clean any resources.
// It could throw exception if the output stream is closed, so it should be
// in the try block.
writer.end()
} {
root.close()
allocator.close()
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK in overall. There are some small correction but mostly minors and nits.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might lose following of how TWS (for PySpark) works, but given we get the iterator of expiry timers based on the timestamp, isn't this if statement already covered from API call? In other words, shouldn't API need to cover this?

Please let me know if there is specific reason - don't need to change the code directly if there is a reason. I just wanted to understand and possibly refresh my head.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct about this. Thanks for noticing the redundant check. Removed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this indentation correct? looks a bit odd, compared to others - params start from same indentation with the first _.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we leave Github to resolve the review comment rather than manually marking as resolved? I don't see any new commit to resolve these style comments. I guess you've addressed but missed to push commit, but easier to track if we resolve the comment as "outdated".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I tend to use reaction to distinguish comments I agree to address, especially style comments.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ditto

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: method doc in Python is placed "after" definition of the method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ditto

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other functionality we add the spying instance as a param to test this. Do we test this in e2e instead? I'm OK with it. Just wanted to check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same above. This is also tested in e2e suites by assertions on the output of handle expired timer rows.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any specific reason to implement here separately rather than calling _prepare_input_data?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored using calling_prepare_input_data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true - watermark for eviction is 5 but watermark for late record is 0, hence ("a", 4) is not dropped. This is exactly the reason you still see event for "a". Otherwise you shouldn't have ("a", 20).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might wonder how this works differently with Scala tests - AddData() & CheckNewAnswer() will trigger no-data batch, hence executing two batches.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for leaving the comments! By reading your comments I realized I did not quite understand the difference between watermark for eviction and watermark for late record before.
The test case should be still fine, I just deleted the comments. Dropping late record will be tested more throughly in the chaining of operator PR.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@jingz-db
Could you please rebase and apply the way of handling iterator from #48290? Thanks!

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me except nits.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be missed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, do you like to leave getExpiredTimers() as it is? Then let's leave a code comment that the requests of getExpiredTimers won't be interleaved, hence this is safe.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

HeartSaVioR commented Oct 26, 2024

test failure seems to be unrelated but lint seems to be either related or simply broken.

https://github.com/jingz-db/spark/actions/runs/11526119046/job/32090806651

@HyukjinKwon Do you somehow know the reason of failure? I guess the generated py file should be excluded from linter and, I thought we did it, as I didn't see linter failure in prior PRs. Was anything changed around pyspark linter?

@HyukjinKwon
Copy link
Copy Markdown
Member

@jingz-db mind updating your master branch latest, and rebase against that this branch, and push it?

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@jingz-db
Looks like linter is still failing - have you ensured that the master branch for your repo is up-to-date with the master branch for Apache repo?

@jingz-db
Copy link
Copy Markdown
Contributor Author

jingz-db commented Oct 28, 2024

@jingz-db Looks like linter is still failing - have you ensured that the master branch for your repo is up-to-date with the master branch for Apache repo?

I do, I rebased on the latest master branch few hours ago:
image
Also my local lint command passed:
image

Let me add a type checking imports and see if it passes.

@jingz-db
Copy link
Copy Markdown
Contributor Author

Hey @HyukjinKwon, do we have any place that could manually escape the python style check for certain files? Currently the linter check is only failing on auto-generated file created by protoc. I've rebased my latest branch on master and run dev/reformat-python but it is still failing on the python/pyspark/sql/streaming/StateMessage_pb2.py file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add # noqa: E501 back to ignore the length check.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Could you please try modifying mypy.ini file to ignore errors on proto generated python files? You'll need to move the generated file to proto directory (create a new directory) and add the exclusion.

spark/python/mypy.ini

Lines 183 to 185 in 413242b

; Ignore errors for proto generated code
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto]
ignore_errors = True

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Also please rebase to incorporate the removal of generated code for java.

@jingz-db
Copy link
Copy Markdown
Contributor Author

Could you please try modifying mypy.ini file to ignore errors on proto generated python files? You'll need to move the generated file to proto directory (create a new directory) and add the exclusion.

spark/python/mypy.ini

Lines 183 to 185 in 413242b

; Ignore errors for proto generated code
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto]
ignore_errors = True

Thanks for the pointer! Moved proto generated py file under sql/streaming/proto directory and add the entry in the mypy.init file.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants