Search before asking
Version
2.10+
Minimal reproduce step
In any existing sink used for partitioned topic, in the debugger stop in the write() method.
Inspect content of the record.
sink.write(record) gets record that provides API like
- Optional getPartitionIndex()
- Optional getTopicName()
Record/SinkRecord API assumes that getTopicName is a partitioned topic name ("topic") with index supplied separately
Sink created for partitioned topic gets topic names like "topic-partition-0/1/2/.." (aka complete topic name from getTopicName()) and partition index of Optional.empty()
if the sink uses the topic name as a destination (e.g. table name - for snowflake, BQ, others) it will route the data to multiple tables.
AFAICT, this never worked as intended, e.g.
|
return PulsarRecord.<T>builder() |
|
.message(message) |
|
.schema(schema) |
|
.topicName(message.getTopicName()) |
What did you expect to see?
getTopicName() return partitoined topic name, getPartitionIndex() return actual partition index
What did you see instead?
see above
Anything else?
It is not that hard to fix but hard to estimate what it will break for existing connectors.
Are you willing to submit a PR?
Search before asking
Version
2.10+
Minimal reproduce step
In any existing sink used for partitioned topic, in the debugger stop in the write() method.
Inspect content of the record.
sink.write(record) gets record that provides API like
Record/SinkRecord API assumes that getTopicName is a partitioned topic name ("topic") with index supplied separately
Sink created for partitioned topic gets topic names like "topic-partition-0/1/2/.." (aka complete topic name from getTopicName()) and partition index of Optional.empty()
if the sink uses the topic name as a destination (e.g. table name - for snowflake, BQ, others) it will route the data to multiple tables.
AFAICT, this never worked as intended, e.g.
pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
Lines 131 to 134 in a903733
What did you expect to see?
getTopicName() return partitoined topic name, getPartitionIndex() return actual partition index
What did you see instead?
see above
Anything else?
It is not that hard to fix but hard to estimate what it will break for existing connectors.
Are you willing to submit a PR?