-
Notifications
You must be signed in to change notification settings - Fork 24
Description
In the job below a relatively low volume, for testing purposes, PubSubLite topic is consumed and intended to be written to a GCS bucket in Windowed files. It appears that all activities are performed but the Windows/Watermarks never advance or close so the actual files are never written.
Environment details
- OS type and version: Windows -> Dataflow GCP
- Java version: 1.8
- pubsublite version(s):
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-beam-io</artifactId>
<version>0.11.1</version>
</dependency>
Steps to reproduce
- Create PubSub Subscriber in DataFlow Job
- Adding Windowing, Trigger, etc.
- Convert SequencedMessage
- Attempt to write out to Cloud Storage
Code example
public static void main(String[] args) {
/******* some other setup, etc. *************/
Pipeline pipeline = Pipeline.create(options);
SubscriptionPath subPath = SubscriptionPath.newBuilder()
.setLocation(CloudZone.parse(options.getTargetProjectZone()))
.setProject(ProjectNumber.of(options.getTargetProjectNumber()))
.setName(SubscriptionName.of(options.getTargetPubsubSubscription()))
.build();
/* unused */
FlowControlSettings flowControlSettings = FlowControlSettings.builder()
// 10 MiB. Must be greater than the allowed size of the largest message (1 MiB)
.setBytesOutstanding(10 * 1024 * 1024L)
// 1,000 outstanding messages. Must be >0
.setMessagesOutstanding(1000L)
.build();
SubscriberOptions subOptions = SubscriberOptions.newBuilder()
.setSubscriptionPath(subPath)
//.setFlowControlSettings(flowControlSettings)
.build();
Window<SequencedMessage> window = Window.<SequencedMessage>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardSeconds(60)))))
.withAllowedLateness(Duration.standardMinutes(1), Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes();
pipeline
.apply("Read messages", PubsubLiteIO.read(subOptions))
.apply(window)
.apply(ParDo.of(new ConvertToStrings()))
.apply(TextIO.write().to(options.getOutputDirectory()).withWindowedWrites().withNumShards(10));
pipeline.run();
}
static class ConvertToStrings extends DoFn<SequencedMessage, String> {
/**
*
*/
private static final long serialVersionUID = -7457075599298968462L;
@ProcessElement
public void processElement(@Element SequencedMessage word, OutputReceiver<String> out) {
String messageString = word.getMessage().getData().toStringUtf8();
LOG.info(messageString);
// Use OutputReceiver.output to emit the output element.
out.output(messageString);
}
}
Stack trace
No indications of errors are written to the logs at all. Other than:
"*~*~*~ Channel ManagedChannelImpl{logId=1017, target=us-east1-pubsublite.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true."
exception: "java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:518)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:314)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:71)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:210)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:217)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:200)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
at com.google.cloud.pubsublite.v1.stub.GrpcTopicStatsServiceStub.create(GrpcTopicStatsServiceStub.java:80)
at com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings.createStub(TopicStatsServiceStubSettings.java:108)
at com.google.cloud.pubsublite.v1.TopicStatsServiceClient.<init>(TopicStatsServiceClient.java:136)
at com.google.cloud.pubsublite.v1.TopicStatsServiceClient.create(TopicStatsServiceClient.java:117)
at com.google.cloud.pubsublite.internal.TopicStatsClientSettings.instantiate(TopicStatsClientSettings.java:60)
at com.google.cloud.pubsublite.internal.TopicStatsClient.create(TopicStatsClient.java:29)
at com.google.cloud.pubsublite.beam.TopicBacklogReaderSettings.instantiate(TopicBacklogReaderSettings.java:85)
at com.google.cloud.pubsublite.beam.SubscriberOptions.getBacklogReader(SubscriberOptions.java:164)
at com.google.cloud.pubsublite.beam.SubscribeTransform.newRestrictionTracker(SubscribeTransform.java:91)
at com.google.cloud.pubsublite.beam.PerSubscriptionPartitionSdf.newTracker(PerSubscriptionPartitionSdf.java:108)
at com.google.cloud.pubsublite.beam.PerSubscriptionPartitionSdf$DoFnInvoker.invokeNewTracker(Unknown Source)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:482)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
at org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:63)
at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
External references such as API reference guides
- We've looked at a lot of documentation. Tried many many variations. The above is the simplest variant we could get down to.
- This is reminiscent of items mentioned in here https://stackoverflow.com/questions/58221372/what-actually-manages-watermarks-in-beam
Any additional information below
For context, we currently process 100's of billions of monthly messages via PubSub with a fundamentally similar dataflow job. This was our attempt at a POC for PubSubLite. It seems like the Watermark and Window closing behavior is incorrect.
The following image shows the "WriteShardedBundlesToTempFiles" step which is clearing getting elements and data, but the follow on steps never fire and the GCS file is never written.
