Skip to content

Commit 93a66bb

Browse files
committed
Added support for Flow publishers coming back from the subscription field data fetcher
1 parent 1318a24 commit 93a66bb

File tree

5 files changed

+133
-51
lines changed

5 files changed

+133
-51
lines changed

src/main/java/graphql/execution/SubscriptionExecutionStrategy.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package graphql.execution;
22

3+
import graphql.Assert;
34
import graphql.ExecutionResult;
45
import graphql.ExecutionResultImpl;
56
import graphql.GraphQLContext;
@@ -14,13 +15,14 @@
1415
import graphql.language.Field;
1516
import graphql.schema.GraphQLFieldDefinition;
1617
import graphql.schema.GraphQLObjectType;
18+
import org.reactivestreams.FlowAdapters;
1719
import org.reactivestreams.Publisher;
1820

1921
import java.util.concurrent.CompletableFuture;
2022
import java.util.concurrent.CompletionStage;
23+
import java.util.concurrent.Flow;
2124
import java.util.function.Function;
2225

23-
import static graphql.Assert.assertTrue;
2426
import static graphql.execution.instrumentation.SimpleInstrumentationContext.nonNullCtx;
2527
import static java.util.Collections.singletonMap;
2628

@@ -70,14 +72,12 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7072
CompletableFuture<ExecutionResult> overallResult = sourceEventStream.thenApply((publisher) ->
7173
{
7274
if (publisher == null) {
73-
ExecutionResultImpl executionResult = new ExecutionResultImpl(null, executionContext.getErrors());
74-
return executionResult;
75+
return new ExecutionResultImpl(null, executionContext.getErrors());
7576
}
7677
Function<Object, CompletionStage<ExecutionResult>> mapperFunction = eventPayload -> executeSubscriptionEvent(executionContext, parameters, eventPayload);
7778
boolean keepOrdered = keepOrdered(executionContext.getGraphQLContext());
7879
SubscriptionPublisher mapSourceToResponse = new SubscriptionPublisher(publisher, mapperFunction, keepOrdered);
79-
ExecutionResultImpl executionResult = new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
80-
return executionResult;
80+
return new ExecutionResultImpl(mapSourceToResponse, executionContext.getErrors());
8181
});
8282

8383
// dispatched the subscription query
@@ -111,14 +111,33 @@ private CompletableFuture<Publisher<Object>> createSourceEventStream(ExecutionCo
111111
CompletableFuture<FetchedValue> fieldFetched = Async.toCompletableFuture(fetchField(executionContext, newParameters));
112112
return fieldFetched.thenApply(fetchedValue -> {
113113
Object publisher = fetchedValue.getFetchedValue();
114-
if (publisher != null) {
115-
assertTrue(publisher instanceof Publisher, () -> "Your data fetcher must return a Publisher of events when using graphql subscriptions");
116-
}
117-
//noinspection unchecked,DataFlowIssue
118-
return (Publisher<Object>) publisher;
114+
return mkReactivePublisher(publisher);
119115
});
120116
}
121117

118+
/**
119+
* The user code can return either a reactive stream {@link Publisher} or a JDK {@link Flow.Publisher}
120+
* and we adapt it to a reactive streams one since we use reactive streams in our implementation.
121+
*
122+
* @param publisherObj - the object returned from the data fetcher as the source of events
123+
*
124+
* @return a reactive streams {@link Publisher} always
125+
*/
126+
@SuppressWarnings("unchecked")
127+
private static Publisher<Object> mkReactivePublisher(Object publisherObj) {
128+
if (publisherObj != null) {
129+
if (publisherObj instanceof Publisher) {
130+
return (Publisher<Object>) publisherObj;
131+
} else if (publisherObj instanceof Flow.Publisher) {
132+
Flow.Publisher<Object> flowPublisher = (Flow.Publisher<Object>) publisherObj;
133+
return FlowAdapters.toPublisher(flowPublisher);
134+
} else {
135+
return Assert.assertShouldNeverHappen("Your data fetcher must return a Publisher of events when using graphql subscriptions");
136+
}
137+
}
138+
return null; // null is valid - we return null data in this case
139+
}
140+
122141
/*
123142
ExecuteSubscriptionEvent(subscription, schema, variableValues, initialValue):
124143

src/test/groovy/graphql/execution/SubscriptionExecutionStrategyTest.groovy

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package graphql.execution
22

3+
import graphql.AssertException
34
import graphql.ErrorType
45
import graphql.ExecutionInput
56
import graphql.ExecutionResult
@@ -12,6 +13,7 @@ import graphql.execution.instrumentation.InstrumentationState
1213
import graphql.execution.instrumentation.LegacyTestingInstrumentation
1314
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
1415
import graphql.execution.pubsub.CapturingSubscriber
16+
import graphql.execution.pubsub.FlowMessagePublisher
1517
import graphql.execution.pubsub.Message
1618
import graphql.execution.pubsub.ReactiveStreamsMessagePublisher
1719
import graphql.execution.pubsub.ReactiveStreamsObjectPublisher
@@ -138,7 +140,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
138140
def "subscription query sends out a stream of events using the '#why' implementation"() {
139141
140142
given:
141-
Publisher<Object> publisher = eventStreamPublisher
143+
Object publisher = eventStreamPublisher
142144
143145
DataFetcher newMessageDF = new DataFetcher() {
144146
@Override
@@ -181,14 +183,15 @@ class SubscriptionExecutionStrategyTest extends Specification {
181183
why | eventStreamPublisher
182184
'reactive streams stream' | new ReactiveStreamsMessagePublisher(10)
183185
'rxjava stream' | new RxJavaMessagePublisher(10)
186+
'flow stream' | new FlowMessagePublisher(10)
184187
185188
}
186189
187190
@Unroll
188191
def "subscription alias is correctly used in response messages using '#why' implementation"() {
189192
190193
given:
191-
Publisher<Object> publisher = eventStreamPublisher
194+
Object publisher = eventStreamPublisher
192195
193196
DataFetcher newMessageDF = new DataFetcher() {
194197
@Override
@@ -227,6 +230,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
227230
why | eventStreamPublisher
228231
'reactive streams stream' | new ReactiveStreamsMessagePublisher(1)
229232
'rxjava stream' | new RxJavaMessagePublisher(1)
233+
'flow stream' | new FlowMessagePublisher(1)
230234
}
231235
232236
@@ -238,7 +242,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
238242
// capability and it costs us little to support it, lets have a test for it.
239243
//
240244
given:
241-
Publisher<Object> publisher = eventStreamPublisher
245+
Object publisher = eventStreamPublisher
242246
243247
DataFetcher newMessageDF = new DataFetcher() {
244248
@Override
@@ -279,7 +283,7 @@ class SubscriptionExecutionStrategyTest extends Specification {
279283
why | eventStreamPublisher
280284
'reactive streams stream' | new ReactiveStreamsMessagePublisher(10)
281285
'rxjava stream' | new RxJavaMessagePublisher(10)
282-
286+
'flow stream' | new FlowMessagePublisher(10)
283287
}
284288
285289
@@ -312,6 +316,33 @@ class SubscriptionExecutionStrategyTest extends Specification {
312316
executionResult.errors.size() == 1
313317
}
314318
319+
def "if you dont return a Publisher we will assert"() {
320+
321+
DataFetcher newMessageDF = new DataFetcher() {
322+
@Override
323+
Object get(DataFetchingEnvironment environment) {
324+
return "Not a Publisher"
325+
}
326+
}
327+
328+
GraphQL graphQL = buildSubscriptionQL(newMessageDF)
329+
330+
def executionInput = ExecutionInput.newExecutionInput().query("""
331+
subscription NewMessages {
332+
newMessage(roomId: 123) {
333+
sender
334+
text
335+
}
336+
}
337+
""").build()
338+
339+
when:
340+
graphQL.execute(executionInput)
341+
342+
then:
343+
thrown(AssertException)
344+
}
345+
315346
def "subscription query will surface event stream exceptions"() {
316347
317348
DataFetcher newMessageDF = new DataFetcher() {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package graphql.execution.pubsub;
2+
3+
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
4+
5+
import java.util.Iterator;
6+
import java.util.concurrent.ForkJoinPool;
7+
import java.util.function.Function;
8+
9+
class CommonMessagePublisher {
10+
11+
protected final AsyncIterablePublisher<Message> iterablePublisher;
12+
13+
protected CommonMessagePublisher(final int count) {
14+
Iterable<Message> iterable = mkIterable(count, at -> {
15+
Message message = new Message("sender" + at, "text" + at);
16+
return examineMessage(message, at);
17+
});
18+
iterablePublisher = new AsyncIterablePublisher<>(iterable, ForkJoinPool.commonPool());
19+
}
20+
21+
@SuppressWarnings("unused")
22+
protected Message examineMessage(Message message, Integer at) {
23+
return message;
24+
}
25+
26+
private static Iterable<Message> mkIterable(int count, Function<Integer, Message> msgMaker) {
27+
return () -> new Iterator<>() {
28+
private int at = 0;
29+
30+
@Override
31+
public boolean hasNext() {
32+
return at < count;
33+
}
34+
35+
@Override
36+
public Message next() {
37+
Message message = msgMaker.apply(at);
38+
at++;
39+
return message;
40+
}
41+
};
42+
}
43+
44+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package graphql.execution.pubsub;
2+
3+
import org.reactivestreams.FlowAdapters;
4+
5+
import java.util.concurrent.Flow;
6+
7+
/**
8+
* This example publisher will create count "messages" and then terminate. It
9+
* uses the reactive streams TCK as its implementation but presents itself
10+
* as a {@link Flow.Publisher}
11+
*/
12+
public class FlowMessagePublisher extends CommonMessagePublisher implements Flow.Publisher<Message> {
13+
14+
public FlowMessagePublisher(int count) {
15+
super(count);
16+
}
17+
18+
@Override
19+
public void subscribe(Flow.Subscriber<? super Message> s) {
20+
iterablePublisher.subscribe(FlowAdapters.toSubscriber(s));
21+
}
22+
}

src/test/groovy/graphql/execution/pubsub/ReactiveStreamsMessagePublisher.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,54 +2,20 @@
22

33
import org.reactivestreams.Publisher;
44
import org.reactivestreams.Subscriber;
5-
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
6-
7-
import java.util.Iterator;
8-
import java.util.concurrent.ForkJoinPool;
9-
import java.util.function.Function;
105

116
/**
127
* This example publisher will create count "messages" and then terminate. It
138
* uses the reactive streams TCK as its implementation
149
*/
15-
public class ReactiveStreamsMessagePublisher implements Publisher<Message> {
16-
17-
private final AsyncIterablePublisher<Message> iterablePublisher;
10+
public class ReactiveStreamsMessagePublisher extends CommonMessagePublisher implements Publisher<Message> {
1811

1912
public ReactiveStreamsMessagePublisher(final int count) {
20-
Iterable<Message> iterable = mkIterable(count, at -> {
21-
Message message = new Message("sender" + at, "text" + at);
22-
return examineMessage(message, at);
23-
});
24-
iterablePublisher = new AsyncIterablePublisher<>(iterable, ForkJoinPool.commonPool());
13+
super(count);
2514
}
2615

2716
@Override
2817
public void subscribe(Subscriber<? super Message> s) {
2918
iterablePublisher.subscribe(s);
3019
}
31-
32-
@SuppressWarnings("unused")
33-
protected Message examineMessage(Message message, Integer at) {
34-
return message;
35-
}
36-
37-
private static Iterable<Message> mkIterable(int count, Function<Integer, Message> msgMaker) {
38-
return () -> new Iterator<Message>() {
39-
private int at = 0;
40-
41-
@Override
42-
public boolean hasNext() {
43-
return at < count;
44-
}
45-
46-
@Override
47-
public Message next() {
48-
Message message = msgMaker.apply(at);
49-
at++;
50-
return message;
51-
}
52-
};
53-
}
54-
5520
}
21+

0 commit comments

Comments
 (0)