Skip to content

Commit 380ed7b

Browse files
authored
fix: add topic existing validation (#32465)
* feat: add topic existing validation * feat: add validation to write * docs: add changes * docs: change docs * refactor: change validate to primitive type * test: change test more clearly * style: follow lint * docs: fix CHANGES * docs: follow changes
1 parent 01c7caf commit 380ed7b

9 files changed

Lines changed: 370 additions & 1 deletion

File tree

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959

6060
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
6161

62+
## I/Os
63+
64+
* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465))
65+
6266
## New Features / Improvements
6367

6468
* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
507507
/** Return a list of topics for {@code project}. */
508508
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;
509509

510+
/** Return {@literal true} if {@code topic} exists. */
511+
public abstract boolean isTopicExists(TopicPath topic) throws IOException;
512+
510513
/** Create {@code subscription} to {@code topic}. */
511514
public abstract void createSubscription(
512515
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.grpc.Channel;
5555
import io.grpc.ClientInterceptors;
5656
import io.grpc.ManagedChannel;
57+
import io.grpc.StatusRuntimeException;
5758
import io.grpc.auth.ClientAuthInterceptor;
5859
import io.grpc.netty.GrpcSslContexts;
5960
import io.grpc.netty.NegotiationType;
@@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
372373
return topics;
373374
}
374375

376+
@Override
377+
public boolean isTopicExists(TopicPath topic) throws IOException {
378+
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
379+
try {
380+
publisherStub().getTopic(request);
381+
return true;
382+
} catch (StatusRuntimeException e) {
383+
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
384+
return false;
385+
}
386+
387+
throw e;
388+
}
389+
}
390+
375391
@Override
376392
public void createSubscription(
377393
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
5151
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
5252
import org.apache.beam.sdk.metrics.Lineage;
53+
import org.apache.beam.sdk.options.PipelineOptions;
5354
import org.apache.beam.sdk.options.ValueProvider;
5455
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
5556
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -860,6 +861,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
860861

861862
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
862863

864+
abstract boolean getValidate();
865+
863866
abstract Builder<T> toBuilder();
864867

865868
static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
@@ -871,6 +874,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
871874
builder.setNeedsOrderingKey(false);
872875
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
873876
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
877+
builder.setValidate(true);
874878
return builder;
875879
}
876880

@@ -918,6 +922,8 @@ abstract static class Builder<T> {
918922
abstract Builder<T> setBadRecordErrorHandler(
919923
ErrorHandler<BadRecord, ?> badRecordErrorHandler);
920924

925+
abstract Builder<T> setValidate(boolean validation);
926+
921927
abstract Read<T> build();
922928
}
923929

@@ -1097,6 +1103,11 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
10971103
.build();
10981104
}
10991105

1106+
/** Disable validation of the existence of the topic. */
1107+
public Read<T> withoutValidation() {
1108+
return toBuilder().setValidate(false).build();
1109+
}
1110+
11001111
@VisibleForTesting
11011112
/**
11021113
* Set's the internal Clock.
@@ -1262,6 +1273,35 @@ public T apply(PubsubMessage input) {
12621273
return read.setCoder(getCoder());
12631274
}
12641275

1276+
@Override
1277+
public void validate(PipelineOptions options) {
1278+
if (!getValidate()) {
1279+
return;
1280+
}
1281+
1282+
PubsubOptions psOptions = options.as(PubsubOptions.class);
1283+
1284+
// Validate the existence of the topic.
1285+
if (getTopicProvider() != null) {
1286+
PubsubTopic topic = getTopicProvider().get();
1287+
boolean topicExists = true;
1288+
try (PubsubClient pubsubClient =
1289+
getPubsubClientFactory()
1290+
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
1291+
topicExists =
1292+
pubsubClient.isTopicExists(
1293+
PubsubClient.topicPathFromName(topic.project, topic.topic));
1294+
} catch (Exception e) {
1295+
throw new RuntimeException(e);
1296+
}
1297+
1298+
if (!topicExists) {
1299+
throw new IllegalArgumentException(
1300+
String.format("Pubsub topic '%s' does not exist.", topic));
1301+
}
1302+
}
1303+
}
1304+
12651305
@Override
12661306
public void populateDisplayData(DisplayData.Builder builder) {
12671307
super.populateDisplayData(builder);
@@ -1341,6 +1381,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>
13411381

13421382
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
13431383

1384+
abstract boolean getValidate();
1385+
13441386
abstract Builder<T> toBuilder();
13451387

13461388
static <T> Builder<T> newBuilder(
@@ -1350,6 +1392,7 @@ static <T> Builder<T> newBuilder(
13501392
builder.setFormatFn(formatFn);
13511393
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
13521394
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
1395+
builder.setValidate(true);
13531396
return builder;
13541397
}
13551398

@@ -1386,6 +1429,8 @@ abstract Builder<T> setFormatFn(
13861429
abstract Builder<T> setBadRecordErrorHandler(
13871430
ErrorHandler<BadRecord, ?> badRecordErrorHandler);
13881431

1432+
abstract Builder<T> setValidate(boolean validation);
1433+
13891434
abstract Write<T> build();
13901435
}
13911436

@@ -1396,11 +1441,14 @@ abstract Builder<T> setBadRecordErrorHandler(
13961441
* {@code topic} string.
13971442
*/
13981443
public Write<T> to(String topic) {
1399-
return to(StaticValueProvider.of(topic));
1444+
ValueProvider<String> topicProvider = StaticValueProvider.of(topic);
1445+
validateTopic(topicProvider);
1446+
return to(topicProvider);
14001447
}
14011448

14021449
/** Like {@code topic()} but with a {@link ValueProvider}. */
14031450
public Write<T> to(ValueProvider<String> topic) {
1451+
validateTopic(topic);
14041452
return toBuilder()
14051453
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
14061454
.setTopicFunction(null)
@@ -1421,6 +1469,13 @@ public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> topicFun
14211469
.build();
14221470
}
14231471

1472+
/** Handles validation of {@code topic}. */
1473+
private static void validateTopic(ValueProvider<String> topic) {
1474+
if (topic.isAccessible()) {
1475+
PubsubTopic.fromPath(topic.get());
1476+
}
1477+
}
1478+
14241479
/**
14251480
* The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
14261481
* PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
@@ -1497,6 +1552,14 @@ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandle
14971552
.build();
14981553
}
14991554

1555+
/**
1556+
* Disable validation of the existence of the topic. Validation of the topic works only if the
1557+
* topic is set statically and not dynamically.
1558+
*/
1559+
public Write<T> withoutValidation() {
1560+
return toBuilder().setValidate(false).build();
1561+
}
1562+
15001563
@Override
15011564
public PDone expand(PCollection<T> input) {
15021565
if (getTopicProvider() == null && !getDynamicDestinations()) {
@@ -1573,6 +1636,35 @@ public void populateDisplayData(DisplayData.Builder builder) {
15731636
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
15741637
}
15751638

1639+
@Override
1640+
public void validate(PipelineOptions options) {
1641+
if (!getValidate()) {
1642+
return;
1643+
}
1644+
1645+
PubsubOptions psOptions = options.as(PubsubOptions.class);
1646+
1647+
// Validate the existence of the topic.
1648+
if (getTopicProvider() != null) {
1649+
PubsubTopic topic = getTopicProvider().get();
1650+
boolean topicExists = true;
1651+
try (PubsubClient pubsubClient =
1652+
getPubsubClientFactory()
1653+
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
1654+
topicExists =
1655+
pubsubClient.isTopicExists(
1656+
PubsubClient.topicPathFromName(topic.project, topic.topic));
1657+
} catch (Exception e) {
1658+
throw new RuntimeException(e);
1659+
}
1660+
1661+
if (!topicExists) {
1662+
throw new IllegalArgumentException(
1663+
String.format("Pubsub topic '%s' does not exist.", topic));
1664+
}
1665+
}
1666+
}
1667+
15761668
/**
15771669
* Writer to Pubsub which batches messages from bounded collections.
15781670
*

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

22+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
2223
import com.google.api.client.http.HttpRequestInitializer;
2324
import com.google.api.services.pubsub.Pubsub;
2425
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
@@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
310311
return topics;
311312
}
312313

314+
@Override
315+
public boolean isTopicExists(TopicPath topic) throws IOException {
316+
try {
317+
pubsub.projects().topics().get(topic.getPath()).execute();
318+
return true;
319+
} catch (GoogleJsonResponseException e) {
320+
if (e.getStatusCode() == 404) {
321+
return false;
322+
}
323+
throw e;
324+
}
325+
}
326+
313327
@Override
314328
public void createSubscription(
315329
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,12 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
605605
throw new UnsupportedOperationException();
606606
}
607607

608+
@Override
609+
public boolean isTopicExists(TopicPath topic) throws IOException {
610+
// Always return true for testing purposes.
611+
return true;
612+
}
613+
608614
@Override
609615
public void createSubscription(
610616
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.google.pubsub.v1.Topic;
4141
import io.grpc.ManagedChannel;
4242
import io.grpc.Server;
43+
import io.grpc.Status;
4344
import io.grpc.StatusRuntimeException;
4445
import io.grpc.inprocess.InProcessChannelBuilder;
4546
import io.grpc.inprocess.InProcessServerBuilder;
@@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver<Schema> responseO
432433
server.shutdownNow();
433434
}
434435
}
436+
437+
@Test
438+
public void isTopicExists() throws IOException {
439+
initializeClient(null, null);
440+
TopicPath topicDoesNotExist =
441+
PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist");
442+
TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist");
443+
444+
PublisherImplBase publisherImplBase =
445+
new PublisherImplBase() {
446+
@Override
447+
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
448+
String topicPath = request.getTopic();
449+
if (topicPath.equals(topicDoesNotExist.getPath())) {
450+
responseObserver.onError(
451+
new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND)));
452+
}
453+
if (topicPath.equals(topicExists.getPath())) {
454+
responseObserver.onNext(
455+
Topic.newBuilder()
456+
.setName(topicPath)
457+
.setSchemaSettings(
458+
SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build())
459+
.build());
460+
responseObserver.onCompleted();
461+
}
462+
}
463+
};
464+
Server server =
465+
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
466+
try {
467+
assertEquals(false, client.isTopicExists(topicDoesNotExist));
468+
469+
assertEquals(true, client.isTopicExists(topicExists));
470+
471+
} finally {
472+
server.shutdownNow();
473+
}
474+
}
435475
}

0 commit comments

Comments
 (0)