Skip to content

[Bug] getTopicName in SinkRecord returns complete topic name for partitioned topic + empty partition index #19922

@dlg99

Description

@dlg99

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.10+

Minimal reproduce step

In any existing sink used for partitioned topic, in the debugger stop in the write() method.
Inspect content of the record.

sink.write(record) gets record that provides API like

  • Optional getPartitionIndex()
  • Optional getTopicName()

Record/SinkRecord API assumes that getTopicName is a partitioned topic name ("topic") with index supplied separately

Sink created for partitioned topic gets topic names like "topic-partition-0/1/2/.." (aka complete topic name from getTopicName()) and partition index of Optional.empty()
if the sink uses the topic name as a destination (e.g. table name - for snowflake, BQ, others) it will route the data to multiple tables.

AFAICT, this never worked as intended, e.g.

return PulsarRecord.<T>builder()
.message(message)
.schema(schema)
.topicName(message.getTopicName())

What did you expect to see?

getTopicName() return partitoined topic name, getPartitionIndex() return actual partition index

What did you see instead?

see above

Anything else?

It is not that hard to fix but hard to estimate what it will break for existing connectors.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Staletype/bugThe PR fixed a bug or issue reported a bug

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions