-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Bug]: PulsarIO.read() is failing with java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor #30688
Copy link
Copy link
Closed
Labels
Milestone
Description
What happened?
I have configured PulsarIO plugin via Beam to read messages from Pulsar as below:
PCollection<PulsarMessage> pCollectionAll = p.apply("ReadPulsarMessage", PulsarIO
.read()
.withAdminUrl(options.getPulsarAdminURL())
.withClientUrl(options.getPulsarClientURL())
.withTopic(options.getPulsarTopic()));
I can see PulsarSourceDescriptor has 3 mandatory things so I set those up. But I am not able to read messages & getting below error:
Error message from worker: java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@53cad662
org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:821)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor
org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator(AutoValueSchema.java:133)
org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:63)
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:43)
org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
org.apache.beam.sdk.coders.Coder.decode(Coder.java:154)
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142)
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102)
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96)
org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:560)
org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:542)
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:474)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297)
org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:750)
I saw other way of consuming messages using PulsarClient & it uses JWT token:
import java.net.URL;
import org.apache.pulsar.client.api.*;
public class SNConsumer {
public static void main(String[] args) throws Exception
{
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsarClientUrl")
.authentication(
AuthenticationFactory.token("<JWT Token>")
)
.build();
I have this JWT token, but not able to set it up in PulsarClient due to SerializableFunction used. Can someone help:
import org.apache.pulsar.client.api.AuthenticationFactory;
public Read withPulsarClient(SerializableFunction<String, PulsarClient> pulsarClientFn) {
//return builder().setPulsarClient(pulsarClientFn).build();
PulsarClient client = PulsarClient.builder()
.serviceUrl("")
.authentication(
AuthenticationFactory.token("<JWT Token>")
)
.build();
return client;
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable