Skip to content

Commit 5a7bd80

Browse files
tgrohdhalperi
authored andcommitted
Add CountingInput as a PTransform
This transform produces an unbounded PCollection containing longs based on a CountingSource. Deprecate methods producing a Source in CountingSource.
1 parent 3650298 commit 5a7bd80

3 files changed

Lines changed: 334 additions & 10 deletions

File tree

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright (C) 2016 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package com.google.cloud.dataflow.sdk.io;
18+
19+
import static com.google.common.base.Preconditions.checkArgument;
20+
import static com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
23+
import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
24+
import com.google.cloud.dataflow.sdk.transforms.PTransform;
25+
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
26+
import com.google.cloud.dataflow.sdk.values.PBegin;
27+
import com.google.cloud.dataflow.sdk.values.PCollection;
28+
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
29+
import com.google.common.base.Optional;
30+
31+
import org.joda.time.Duration;
32+
import org.joda.time.Instant;
33+
34+
/**
35+
* A {@link PTransform} that produces longs. When used to produce a
36+
* {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
37+
* and counts up to a specified maximum. When used to produce an
38+
* {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
39+
* and then never produces more output. (In practice, this limit should never be reached.)
40+
*
41+
* <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
42+
* {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
43+
* supports dynamic work rebalancing.
44+
*
45+
* <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
46+
*
47+
* <pre>{@code
48+
* Pipeline p = ...
49+
* PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
50+
* PCollection<Long> bounded = p.apply(producer);
51+
* }</pre>
52+
*
53+
* <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
54+
* calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
55+
* with timestamps other than {@link Instant#now}.
56+
*
57+
* <pre>{@code
58+
* Pipeline p = ...
59+
*
60+
* // To create an unbounded producer that uses processing time as the element timestamp.
61+
* PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
62+
* // Or, to create an unbounded source that uses a provided function to set the element timestamp.
63+
* PCollection<Long> unboundedWithTimestamps =
64+
* p.apply(CountingInput.unbounded().withTimestampFn(someFn));
65+
* }</pre>
66+
*/
67+
public class CountingInput {
68+
/**
69+
* Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
70+
* from {@code 0} to {@code numElements - 1}.
71+
*/
72+
public static BoundedCountingInput upTo(long numElements) {
73+
checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
74+
return new BoundedCountingInput(numElements);
75+
}
76+
77+
/**
78+
* Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
79+
* to {@link Long#MAX_VALUE}.
80+
*
81+
* <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
82+
* limit should never be reached.)
83+
*
84+
* <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
85+
* timestamps corresponding to processing time at element generation, provided by
86+
* {@link Instant#now}. Use the transform returned by
87+
* {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
88+
* timestamps.
89+
*/
90+
public static UnboundedCountingInput unbounded() {
91+
return new UnboundedCountingInput(
92+
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
93+
}
94+
95+
/**
96+
* A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from
97+
* 0.
98+
*/
99+
public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
100+
private final long numElements;
101+
102+
private BoundedCountingInput(long numElements) {
103+
this.numElements = numElements;
104+
}
105+
106+
@SuppressWarnings("deprecation")
107+
@Override
108+
public PCollection<Long> apply(PBegin begin) {
109+
return begin.apply(Read.from(CountingSource.upTo(numElements)));
110+
}
111+
}
112+
113+
/**
114+
* A {@link PTransform} that will produce numbers starting from {@code 0} up to
115+
* {@link Long#MAX_VALUE}.
116+
*
117+
* <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
118+
* limit should never be reached.)
119+
*
120+
* <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will by default have
121+
* timestamps corresponding to processing time at element generation, provided by
122+
* {@link Instant#now}. Use the transform returned by
123+
* {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
124+
* timestamps.
125+
*/
126+
public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
127+
private final SerializableFunction<Long, Instant> timestampFn;
128+
private final Optional<Long> maxNumRecords;
129+
private final Optional<Duration> maxReadTime;
130+
131+
private UnboundedCountingInput(
132+
SerializableFunction<Long, Instant> timestampFn,
133+
Optional<Long> maxNumRecords,
134+
Optional<Duration> maxReadTime) {
135+
this.timestampFn = timestampFn;
136+
this.maxNumRecords = maxNumRecords;
137+
this.maxReadTime = maxReadTime;
138+
}
139+
140+
/**
141+
* Returns an {@link UnboundedCountingInput} like this one, but where output elements have the
142+
* timestamp specified by the timestampFn.
143+
*
144+
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
145+
*/
146+
public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
147+
return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
148+
}
149+
150+
/**
151+
* Returns an {@link UnboundedCountingInput} like this one, but that will read at most the
152+
* specified number of elements.
153+
*
154+
* <p>A bounded amount of elements will be produced by the result transform, and the result
155+
* {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
156+
*/
157+
public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
158+
checkArgument(
159+
maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
160+
return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
161+
}
162+
163+
/**
164+
* Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the
165+
* specified amount of time.
166+
*
167+
* <p>A bounded amount of elements will be produced by the result transform, and the result
168+
* {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
169+
*/
170+
public UnboundedCountingInput withMaxReadTime(Duration readTime) {
171+
checkNotNull(readTime, "ReadTime cannot be null");
172+
return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
173+
}
174+
175+
@SuppressWarnings("deprecation")
176+
@Override
177+
public PCollection<Long> apply(PBegin begin) {
178+
Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
179+
if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
180+
return begin.apply(read);
181+
} else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
182+
return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
183+
} else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
184+
return begin.apply(read.withMaxReadTime(maxReadTime.get()));
185+
} else {
186+
return begin.apply(
187+
read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
188+
}
189+
}
190+
}
191+
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.dataflow.sdk.coders.Coder;
2323
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
2424
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
25+
import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
2526
import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
2627
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
2728
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
@@ -48,29 +49,33 @@
4849
*
4950
* <pre>{@code
5051
* Pipeline p = ...
51-
* BoundedSource<Long> source = CountingSource.upTo(1000);
52-
* PCollection<Long> bounded = p.apply(Read.from(source));
52+
* PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
53+
* PCollection<Long> bounded = p.apply(producer);
5354
* }</pre>
5455
*
55-
* <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingSource#unbounded} or
56-
* {@link CountingSource#unboundedWithTimestampFn}:
56+
* <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
57+
* calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
58+
* with timestamps other than {@link Instant#now}.
5759
*
5860
* <pre>{@code
5961
* Pipeline p = ...
6062
*
61-
* // To create an unbounded source that uses processing time as the element timestamp.
62-
* UnboundedSource<Long, CounterMark> source = CountingSource.unbounded();
63+
* // To create an unbounded PCollection that uses processing time as the element timestamp.
64+
* PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
6365
* // Or, to create an unbounded source that uses a provided function to set the element timestamp.
64-
* UnboundedSource<Long, CounterMark> source = CountingSource.unboundedWithTimestampFn(someFn);
66+
* PCollection<Long> unboundedWithTimestamps =
67+
* p.apply(CountingInput.unbounded().withTimestampFn(someFn));
6568
*
66-
* PCollection<Long> unbounded = p.apply(Read.from(source));
6769
* }</pre>
6870
*/
6971
public class CountingSource {
7072
/**
7173
* Creates a {@link BoundedSource} that will produce the specified number of elements,
7274
* from {@code 0} to {@code numElements - 1}.
75+
*
76+
* @deprecated use {@link CountingInput#upTo(long)} instead
7377
*/
78+
@Deprecated
7479
public static BoundedSource<Long> upTo(long numElements) {
7580
checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
7681
return new BoundedCountingSource(0, numElements);
@@ -85,7 +90,10 @@ public static BoundedSource<Long> upTo(long numElements) {
8590
*
8691
* <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} will have timestamps
8792
* corresponding to processing time at element generation, provided by {@link Instant#now}.
93+
*
94+
* @deprecated use {@link CountingInput#unbounded()} instead
8895
*/
96+
@Deprecated
8997
public static UnboundedSource<Long, CounterMark> unbounded() {
9098
return unboundedWithTimestampFn(new NowTimestampFn());
9199
}
@@ -98,7 +106,11 @@ public static UnboundedSource<Long, CounterMark> unbounded() {
98106
* limit should never be reached.)
99107
*
100108
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
109+
*
110+
* @deprecated use {@link CountingInput#unbounded()} and call
111+
* {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
101112
*/
113+
@Deprecated
102114
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
103115
SerializableFunction<Long, Instant> timestampFn) {
104116
return new UnboundedCountingSource(0, 1, timestampFn);
@@ -109,11 +121,10 @@ public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
109121
/** Prevent instantiation. */
110122
private CountingSource() {}
111123

112-
113124
/**
114125
* A function that returns {@link Instant#now} as the timestamp for each generated element.
115126
*/
116-
private static class NowTimestampFn implements SerializableFunction<Long, Instant> {
127+
static class NowTimestampFn implements SerializableFunction<Long, Instant> {
117128
@Override
118129
public Instant apply(Long input) {
119130
return Instant.now();

0 commit comments

Comments
 (0)