Skip to content

Conversation

@cbornet
Copy link
Contributor

@cbornet cbornet commented May 19, 2025

Motivation

AVRO uses ByteBuffer for schema BYTES.
In the Kinesis sink, when converting AVRO record to JSON, we get

2025-05-16T19:16:09.142923400Z java.lang.ClassCastException: class java.nio.HeapByteBuffer cannot be cast to class [B (java.nio.HeapByteBuffer and [B are in module java.base of loader 'bootstrap')
2025-05-16T19:16:09.142932060Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:80) ~[?:?]
2025-05-16T19:16:09.142933540Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:119) ~[?:?]
2025-05-16T19:16:09.142934623Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:54) ~[?:?]
2025-05-16T19:16:09.142935763Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:113) ~[?:?]
2025-05-16T19:16:09.142936847Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:119) ~[?:?]
2025-05-16T19:16:09.142938214Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:105) ~[?:?]
2025-05-16T19:16:09.142939866Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:119) ~[?:?]
2025-05-16T19:16:09.142941409Z 	at org.apache.pulsar.io.kinesis.json.JsonConverter.toJson(JsonConverter.java:54) ~[?:?]
2025-05-16T19:16:09.142956643Z 	at org.apache.pulsar.io.kinesis.Utils.toJsonSerializable(Utils.java:265) ~[?:?]
2025-05-16T19:16:09.142958964Z 	at org.apache.pulsar.io.kinesis.Utils.toJsonSerializable(Utils.java:260) ~[?:?]
2025-05-16T19:16:09.142961420Z 	at org.apache.pulsar.io.kinesis.Utils.serializeRecordToJsonExpandingValue(Utils.java:225) ~[?:?]
2025-05-16T19:16:09.142962967Z 	at org.apache.pulsar.io.kinesis.KinesisSink.createKinesisMessage(KinesisSink.java:308) ~[?:?]
2025-05-16T19:16:09.142964593Z 	at org.apache.pulsar.io.kinesis.KinesisSink.write(KinesisSink.java:134) ~[?:?]
2025-05-16T19:16:09.142966767Z 	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.sendOutputMessage(JavaInstanceRunnable.java:447) ~[com.datastax.oss-pulsar-functions-instance-3.1.4.8.jar:3.1.4.8]
2025-05-16T19:16:09.142968398Z 	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.handleResult(JavaInstanceRunnable.java:409) ~[com.datastax.oss-pulsar-functions-instance-3.1.4.8.jar:3.1.4.8]
2025-05-16T19:16:09.142970014Z 	at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:349) ~[com.datastax.oss-pulsar-functions-instance-3.1.4.8.jar:3.1.4.8]
2025-05-16T19:16:09.142972314Z 	at java.lang.Thread.run(Thread.java:840) ~[?:?]

Because in JsonConverter::toJson, we do

            case BYTES:
                // Workaround for https://github.com/wnameless/json-flattener/issues/91
                if (convertBytesToString) {
                    return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value));
                }
                return jsonNodeFactory.binaryNode((byte[]) value);

and value is a ByteBuffer.

Modifications

  • Handle the value as a ByteBuffer in JsonConverter::toJson
  • In unit tests, ensure that the record is correct for the schema codec.

Verifying this change

  • Make sure that the change passes the CI checks.

This change is already covered by existing tests, such as (please describe tests).

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: cbornet#19

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 19, 2025
The AVRO codec uses ByteBuffer for the BYTES schema and not byte[]
@cbornet cbornet force-pushed the fix-kinesis-avro-bytes branch from 8ec9941 to 84760f9 Compare May 19, 2025 14:08
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari
Copy link
Member

lhotari commented May 19, 2025

/pulsarbot rerun-failure-checks

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the unit test failures, it looks like unit tests are making some assumptions. Please ensure that the integration test covers the original issue.

@Override
public void produceMessage(int numMessages, PulsarClient client,
String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception {
if (withSchema) {
Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema =
Schema.KeyValue(Schema.JSON(SimplePojo.class),
Schema.AVRO(SimplePojo.class), KeyValueEncodingType.SEPARATED);
@Cleanup
Producer<KeyValue<SimplePojo, SimplePojo>> producer = client.newProducer(kvSchema)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
String key = String.valueOf(i);
kvs.put(key, key);
final SimplePojo keyPojo = new SimplePojo(
"f1_" + i,
"f2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
final SimplePojo valuePojo = new SimplePojo(
String.valueOf(i),
"v2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
producer.newMessage()
.value(new KeyValue<>(keyPojo, valuePojo))
.send();
}
} else {
@Cleanup
Producer<String> producer = client.newProducer(Schema.STRING)
.topic(inputTopicName)
.create();
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
String value = "value-" + i;
kvs.put(key, value);
producer.newMessage()
.key(key)
.value(value)
.send();
}
}
}

@cbornet
Copy link
Contributor Author

cbornet commented May 20, 2025

Based on the unit test failures, it looks like unit tests are making some assumptions. Please ensure that the integration test covers the original issue.

Yes. My intent was to improve the integration test in a follow-up PR.
This PR already improves things by serializing/deserializing the record before handing it to the sink.

@codecov-commenter
Copy link

codecov-commenter commented May 20, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 74.27%. Comparing base (bbc6224) to head (d7fb9a7).
Report is 1107 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24316      +/-   ##
============================================
+ Coverage     73.57%   74.27%   +0.70%     
+ Complexity    32624    32185     -439     
============================================
  Files          1877     1866      -11     
  Lines        139502   145104    +5602     
  Branches      15299    16591    +1292     
============================================
+ Hits         102638   107781    +5143     
+ Misses        28908    28795     -113     
- Partials       7956     8528     +572     
Flag Coverage Δ
inttests 26.80% <ø> (+2.22%) ⬆️
systests 23.24% <0.00%> (-1.08%) ⬇️
unittests 73.75% <100.00%> (+0.91%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...g/apache/pulsar/io/kinesis/json/JsonConverter.java 67.96% <100.00%> (-0.09%) ⬇️

... and 1084 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari
Copy link
Member

lhotari commented May 20, 2025

Based on the unit test failures, it looks like unit tests are making some assumptions. Please ensure that the integration test covers the original issue.

Yes. My intent was to improve the integration test in a follow-up PR. This PR already improves things by serializing/deserializing the record before handing it to the sink.

I think it's better to add the integration test directly to this PR.

@lhotari
Copy link
Member

lhotari commented May 20, 2025

/pulsarbot rerun-failure-checks

@cbornet
Copy link
Contributor Author

cbornet commented May 20, 2025

I think it's better to add the integration test directly to this PR.

Done.
See https://github.com/cbornet/pulsar/actions/runs/15135460802/job/42548192869#step:12:5815 for a test failure without the fix (the ClassCastException can be seen in the function worker logs)

@cbornet
Copy link
Contributor Author

cbornet commented May 20, 2025

/pulsarbot rerun-failure-checks

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @cbornet

@cbornet cbornet merged commit 54ade7e into apache:master May 20, 2025
60 checks passed
lhotari pushed a commit that referenced this pull request Jun 2, 2025
lhotari pushed a commit that referenced this pull request Jun 2, 2025
lhotari pushed a commit that referenced this pull request Jun 2, 2025
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 4, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit 45fd99f)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 4, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit e5de8ea)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 4, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit 45fd99f)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 4, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit 45fd99f)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 5, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit e5de8ea)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 5, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit 45fd99f)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jun 10, 2025
(cherry picked from commit 54ade7e)
(cherry picked from commit 45fd99f)
nodece pushed a commit to nodece/pulsar that referenced this pull request Jun 18, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants