Skip to content

PubSubLite Not Advancing Watermarks in Dataflow/Beam Job #542

@GLStephen

Description

@GLStephen

In the job below a relatively low volume, for testing purposes, PubSubLite topic is consumed and intended to be written to a GCS bucket in Windowed files. It appears that all activities are performed but the Windows/Watermarks never advance or close so the actual files are never written.

Environment details

  1. OS type and version: Windows -> Dataflow GCP
  2. Java version: 1.8
  3. pubsublite version(s):
<dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>pubsublite-beam-io</artifactId>
      <version>0.11.1</version>
    </dependency>

Steps to reproduce

  1. Create PubSub Subscriber in DataFlow Job
  2. Adding Windowing, Trigger, etc.
  3. Convert SequencedMessage
  4. Attempt to write out to Cloud Storage

Code example

public static void main(String[] args) {
               /******* some other setup, etc. *************/
                Pipeline pipeline = Pipeline.create(options);

		SubscriptionPath subPath = SubscriptionPath.newBuilder()
		.setLocation(CloudZone.parse(options.getTargetProjectZone()))
		.setProject(ProjectNumber.of(options.getTargetProjectNumber()))
		.setName(SubscriptionName.of(options.getTargetPubsubSubscription()))						
		.build();

		/* unused */
		FlowControlSettings flowControlSettings = FlowControlSettings.builder()
			// 10 MiB. Must be greater than the allowed size of the largest message (1 MiB)
			.setBytesOutstanding(10 * 1024 * 1024L)
			// 1,000 outstanding messages. Must be >0
			.setMessagesOutstanding(1000L)		
			.build();

		SubscriberOptions subOptions = SubscriberOptions.newBuilder()
			.setSubscriptionPath(subPath)
			//.setFlowControlSettings(flowControlSettings)
			.build();

		Window<SequencedMessage> window = Window.<SequencedMessage>into(FixedWindows.of(Duration.standardMinutes(1)))
			.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()
				.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
            	.alignedTo(Duration.standardSeconds(60)))))			
			.withAllowedLateness(Duration.standardMinutes(1), Window.ClosingBehavior.FIRE_ALWAYS)
			.discardingFiredPanes();

		pipeline
			.apply("Read messages", PubsubLiteIO.read(subOptions))
			.apply(window)
			.apply(ParDo.of(new ConvertToStrings()))	
			.apply(TextIO.write().to(options.getOutputDirectory()).withWindowedWrites().withNumShards(10));				
							
		pipeline.run();
	}

	static class ConvertToStrings extends DoFn<SequencedMessage, String> {
		/**
		 *
		 */
		private static final long serialVersionUID = -7457075599298968462L;

		@ProcessElement
		public void processElement(@Element SequencedMessage word, OutputReceiver<String> out) {
			String messageString = word.getMessage().getData().toStringUtf8();
			LOG.info(messageString);
			// Use OutputReceiver.output to emit the output element.
			out.output(messageString);
		}
	}

Stack trace

No indications of errors are written to the logs at all. Other than:

"*~*~*~ Channel ManagedChannelImpl{logId=1017, target=us-east1-pubsublite.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true."
exception: "java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:518)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:314)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:71)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:210)
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:217)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:200)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
	at com.google.cloud.pubsublite.v1.stub.GrpcTopicStatsServiceStub.create(GrpcTopicStatsServiceStub.java:80)
	at com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings.createStub(TopicStatsServiceStubSettings.java:108)
	at com.google.cloud.pubsublite.v1.TopicStatsServiceClient.<init>(TopicStatsServiceClient.java:136)
	at com.google.cloud.pubsublite.v1.TopicStatsServiceClient.create(TopicStatsServiceClient.java:117)
	at com.google.cloud.pubsublite.internal.TopicStatsClientSettings.instantiate(TopicStatsClientSettings.java:60)
	at com.google.cloud.pubsublite.internal.TopicStatsClient.create(TopicStatsClient.java:29)
	at com.google.cloud.pubsublite.beam.TopicBacklogReaderSettings.instantiate(TopicBacklogReaderSettings.java:85)
	at com.google.cloud.pubsublite.beam.SubscriberOptions.getBacklogReader(SubscriberOptions.java:164)
	at com.google.cloud.pubsublite.beam.SubscribeTransform.newRestrictionTracker(SubscribeTransform.java:91)
	at com.google.cloud.pubsublite.beam.PerSubscriptionPartitionSdf.newTracker(PerSubscriptionPartitionSdf.java:108)
	at com.google.cloud.pubsublite.beam.PerSubscriptionPartitionSdf$DoFnInvoker.invokeNewTracker(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:482)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
	at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
	at org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:63)
	at org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:339)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:212)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163)
	at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163)
	at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

External references such as API reference guides

Any additional information below

For context, we currently process 100's of billions of monthly messages via PubSub with a fundamentally similar dataflow job. This was our attempt at a POC for PubSubLite. It seems like the Watermark and Window closing behavior is incorrect.

The following image shows the "WriteShardedBundlesToTempFiles" step which is clearing getting elements and data, but the follow on steps never fire and the GCS file is never written.

image

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: pubsubliteIssues related to the googleapis/java-pubsublite API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions