-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][io] Fix kinesis avro bytes handling #24316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The AVRO codec uses ByteBuffer for the BYTES schema and not byte[]
8ec9941 to
84760f9
Compare
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
/pulsarbot rerun-failure-checks |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the unit test failures, it looks like unit tests are making some assumptions. Please ensure that the integration test covers the original issue.
Lines 138 to 186 in 906d10e
| @Override | |
| public void produceMessage(int numMessages, PulsarClient client, | |
| String inputTopicName, LinkedHashMap<String, String> kvs) throws Exception { | |
| if (withSchema) { | |
| Schema<KeyValue<SimplePojo, SimplePojo>> kvSchema = | |
| Schema.KeyValue(Schema.JSON(SimplePojo.class), | |
| Schema.AVRO(SimplePojo.class), KeyValueEncodingType.SEPARATED); | |
| @Cleanup | |
| Producer<KeyValue<SimplePojo, SimplePojo>> producer = client.newProducer(kvSchema) | |
| .topic(inputTopicName) | |
| .create(); | |
| for (int i = 0; i < numMessages; i++) { | |
| String key = String.valueOf(i); | |
| kvs.put(key, key); | |
| final SimplePojo keyPojo = new SimplePojo( | |
| "f1_" + i, | |
| "f2_" + i, | |
| Arrays.asList(i, i +1), | |
| new HashSet<>(Arrays.asList((long) i)), | |
| ImmutableMap.of("map1_k_" + i, "map1_kv_" + i)); | |
| final SimplePojo valuePojo = new SimplePojo( | |
| String.valueOf(i), | |
| "v2_" + i, | |
| Arrays.asList(i, i +1), | |
| new HashSet<>(Arrays.asList((long) i)), | |
| ImmutableMap.of("map1_v_" + i, "map1_vv_" + i)); | |
| producer.newMessage() | |
| .value(new KeyValue<>(keyPojo, valuePojo)) | |
| .send(); | |
| } | |
| } else { | |
| @Cleanup | |
| Producer<String> producer = client.newProducer(Schema.STRING) | |
| .topic(inputTopicName) | |
| .create(); | |
| for (int i = 0; i < numMessages; i++) { | |
| String key = "key-" + i; | |
| String value = "value-" + i; | |
| kvs.put(key, value); | |
| producer.newMessage() | |
| .key(key) | |
| .value(value) | |
| .send(); | |
| } | |
| } | |
| } |
Yes. My intent was to improve the integration test in a follow-up PR. |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #24316 +/- ##
============================================
+ Coverage 73.57% 74.27% +0.70%
+ Complexity 32624 32185 -439
============================================
Files 1877 1866 -11
Lines 139502 145104 +5602
Branches 15299 16591 +1292
============================================
+ Hits 102638 107781 +5143
+ Misses 28908 28795 -113
- Partials 7956 8528 +572
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
I think it's better to add the integration test directly to this PR. |
|
/pulsarbot rerun-failure-checks |
Done. |
|
/pulsarbot rerun-failure-checks |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work @cbornet
(cherry picked from commit 54ade7e)
(cherry picked from commit 54ade7e)
(cherry picked from commit 54ade7e)
(cherry picked from commit 54ade7e)
Motivation
AVRO uses
ByteBufferfor schema BYTES.In the Kinesis sink, when converting AVRO record to JSON, we get
Because in
JsonConverter::toJson, we doand
valueis aByteBuffer.Modifications
ByteBufferinJsonConverter::toJsonVerifying this change
This change is already covered by existing tests, such as (please describe tests).
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: cbornet#19