Search before asking
Read release policy
Version
any released version
Minimal reproduce step
What did you expect to see?
I'd expect that the configured compression type would be used.
compression type is handled in the other location where Pulsar Sources create producers:
|
if (producerConfig.getCompressionType() != null) { |
|
builder.compressionType(producerConfig.getCompressionType()); |
|
} else { |
|
builder.compressionType(CompressionType.LZ4); |
|
} |
crypto config is handled for Pulsar Sources:
|
if (producerConfig.getCryptoConfig() != null) { |
|
builder.cryptoKeyReader(crypto.keyReader); |
|
builder.cryptoFailureAction(crypto.failureAction); |
|
for (String encryptionKeyName : crypto.getEncryptionKeys()) { |
|
builder.addEncryptionKey(encryptionKeyName); |
|
} |
|
} |
What did you see instead?
compression type is hard coded to LZ4:
|
.compressionType(CompressionType.LZ4) |
There's no crypto config configuration.
Anything else?
Code to create producers shouldn't be duplicated in Pulsar Functions since it leads to inconsistencies.
Are you willing to submit a PR?
Search before asking
Read release policy
Version
any released version
Minimal reproduce step
What did you expect to see?
I'd expect that the configured compression type would be used.
compression type is handled in the other location where Pulsar Sources create producers:
pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Lines 123 to 127 in 5fc0eaf
crypto config is handled for Pulsar Sources:
pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Lines 134 to 140 in 5fc0eaf
What did you see instead?
compression type is hard coded to LZ4:
pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Line 565 in 5fc0eaf
There's no crypto config configuration.
Anything else?
Code to create producers shouldn't be duplicated in Pulsar Functions since it leads to inconsistencies.
Are you willing to submit a PR?