Skip to content

[Bug]: PulsarIO.read() is failing with java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor #30688

@rthneha

Description

@rthneha

What happened?

I have configured PulsarIO plugin via Beam to read messages from Pulsar as below:

PCollection<PulsarMessage> pCollectionAll = p.apply("ReadPulsarMessage", PulsarIO
                .read()
                .withAdminUrl(options.getPulsarAdminURL())
                .withClientUrl(options.getPulsarClientURL())
                .withTopic(options.getPulsarTopic())); 

I can see PulsarSourceDescriptor has 3 mandatory things so I set those up. But I am not able to read messages & getting below error:


Error message from worker: java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@53cad662
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:821)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: Could not find a way to create AutoValue class class com.idfy.beam.pulsar.PulsarSourceDescriptor
        org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator(AutoValueSchema.java:133)
        org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:92)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:63)
        org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:43)
        org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:126)
        org.apache.beam.sdk.coders.Coder.decode(Coder.java:154)
        org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:142)
        org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:102)
        org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:96)
        org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:560)
        org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:542)
        org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:474)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:452)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:304)
        org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:297)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:816)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:381)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:211)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1433)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$800(StreamingDataflowWorker.java:155)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$4.run(StreamingDataflowWorker.java:1056)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:163)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        java.lang.Thread.run(Thread.java:750)

I saw other way of consuming messages using PulsarClient & it uses JWT token:


import java.net.URL;
import org.apache.pulsar.client.api.*;

public class SNConsumer {

    public static void main(String[] args) throws Exception
    {

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsarClientUrl")
            .authentication(
                AuthenticationFactory.token("<JWT Token>")
            )
            .build();

I have this JWT token, but not able to set it up in PulsarClient due to SerializableFunction used. Can someone help:


import org.apache.pulsar.client.api.AuthenticationFactory;

 public Read withPulsarClient(SerializableFunction<String, PulsarClient> pulsarClientFn) {
      //return builder().setPulsarClient(pulsarClientFn).build();
      
      PulsarClient client = PulsarClient.builder()
      .serviceUrl("")
      .authentication(
          AuthenticationFactory.token("<JWT Token>")
      )
      .build();
      return client;
    }

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions