Skip to content

Commit a169e6a

Browse files
committed
Re-add Iceberg bounded source; test splitting
1 parent 36f3228 commit a169e6a

9 files changed

Lines changed: 1176 additions & 89 deletions

File tree

sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
*/
1818
package org.apache.beam.io.iceberg;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
22+
import org.apache.beam.sdk.io.Read;
2023
import org.apache.beam.sdk.transforms.PTransform;
24+
import org.apache.beam.sdk.values.PBegin;
2125
import org.apache.beam.sdk.values.PCollection;
2226
import org.apache.beam.sdk.values.Row;
27+
import org.apache.iceberg.Table;
28+
import org.apache.iceberg.catalog.TableIdentifier;
29+
import org.checkerframework.checker.nullness.qual.Nullable;
2330

2431
public class IcebergIO {
2532

@@ -28,6 +35,10 @@ public static WriteRows writeToDynamicDestinations(
2835
return new WriteRows(catalog, dynamicDestinations);
2936
}
3037

38+
public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
39+
return new ReadTable(catalogConfig, tableId);
40+
}
41+
3142
static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
3243

3344
private final IcebergCatalogConfig catalog;
@@ -47,4 +58,36 @@ public IcebergWriteResult expand(PCollection<Row> input) {
4758
"Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations));
4859
}
4960
}
61+
62+
public static class ReadTable extends PTransform<PBegin, PCollection<Row>> {
63+
64+
private final IcebergCatalogConfig catalogConfig;
65+
private final transient @Nullable TableIdentifier tableId;
66+
67+
private TableIdentifier getTableId() {
68+
return checkStateNotNull(
69+
tableId, "Transient field tableId null; it should not be accessed after serialization");
70+
}
71+
72+
private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
73+
this.catalogConfig = catalogConfig;
74+
this.tableId = tableId;
75+
}
76+
77+
@Override
78+
public PCollection<Row> expand(PBegin input) {
79+
80+
Table table = catalogConfig.catalog().loadTable(getTableId());
81+
82+
return input.apply(
83+
Read.from(
84+
new ScanSource(
85+
IcebergScanConfig.builder()
86+
.setCatalogConfig(catalogConfig)
87+
.setScanType(IcebergScanConfig.ScanType.TABLE)
88+
.setTableIdentifier(getTableId())
89+
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
90+
.build())));
91+
}
92+
}
5093
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.iceberg;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.io.Serializable;
22+
import org.apache.beam.sdk.schemas.Schema;
23+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
24+
import org.apache.iceberg.Table;
25+
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.expressions.Expression;
27+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
28+
import org.checkerframework.checker.nullness.qual.Nullable;
29+
import org.checkerframework.dataflow.qual.Pure;
30+
31+
@AutoValue
32+
public abstract class IcebergScanConfig implements Serializable {
33+
34+
private transient @MonotonicNonNull Table cachedTable;
35+
36+
public enum ScanType {
37+
TABLE,
38+
BATCH
39+
}
40+
41+
@Pure
42+
public abstract ScanType getScanType();
43+
44+
@Pure
45+
public abstract IcebergCatalogConfig getCatalogConfig();
46+
47+
@Pure
48+
public abstract String getTableIdentifier();
49+
50+
@Pure
51+
public Table getTable() {
52+
if (cachedTable == null) {
53+
cachedTable =
54+
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
55+
}
56+
return cachedTable;
57+
}
58+
59+
@Pure
60+
public abstract Schema getSchema();
61+
62+
@Pure
63+
public abstract @Nullable Expression getFilter();
64+
65+
@Pure
66+
public abstract @Nullable Boolean getCaseSensitive();
67+
68+
@Pure
69+
public abstract ImmutableMap<String, String> getOptions();
70+
71+
@Pure
72+
public abstract @Nullable Long getSnapshot();
73+
74+
@Pure
75+
public abstract @Nullable Long getTimestamp();
76+
77+
@Pure
78+
public abstract @Nullable Long getFromSnapshotInclusive();
79+
80+
@Pure
81+
public abstract @Nullable String getFromSnapshotRefInclusive();
82+
83+
@Pure
84+
public abstract @Nullable Long getFromSnapshotExclusive();
85+
86+
@Pure
87+
public abstract @Nullable String getFromSnapshotRefExclusive();
88+
89+
@Pure
90+
public abstract @Nullable Long getToSnapshot();
91+
92+
@Pure
93+
public abstract @Nullable String getToSnapshotRef();
94+
95+
@Pure
96+
public abstract @Nullable String getTag();
97+
98+
@Pure
99+
public abstract @Nullable String getBranch();
100+
101+
@Pure
102+
public static Builder builder() {
103+
return new AutoValue_IcebergScanConfig.Builder()
104+
.setScanType(ScanType.TABLE)
105+
.setFilter(null)
106+
.setCaseSensitive(null)
107+
.setOptions(ImmutableMap.of())
108+
.setSnapshot(null)
109+
.setTimestamp(null)
110+
.setFromSnapshotInclusive(null)
111+
.setFromSnapshotRefInclusive(null)
112+
.setFromSnapshotExclusive(null)
113+
.setFromSnapshotRefExclusive(null)
114+
.setToSnapshot(null)
115+
.setToSnapshotRef(null)
116+
.setTag(null)
117+
.setBranch(null);
118+
}
119+
120+
@AutoValue.Builder
121+
public abstract static class Builder {
122+
public abstract Builder setScanType(ScanType type);
123+
124+
public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog);
125+
126+
public abstract Builder setTableIdentifier(String tableIdentifier);
127+
128+
public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
129+
return this.setTableIdentifier(tableIdentifier.toString());
130+
}
131+
132+
public Builder setTableIdentifier(String... names) {
133+
return setTableIdentifier(TableIdentifier.of(names));
134+
}
135+
136+
public abstract Builder setSchema(Schema schema);
137+
138+
public abstract Builder setFilter(@Nullable Expression filter);
139+
140+
public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive);
141+
142+
public abstract Builder setOptions(ImmutableMap<String, String> options);
143+
144+
public abstract Builder setSnapshot(@Nullable Long snapshot);
145+
146+
public abstract Builder setTimestamp(@Nullable Long timestamp);
147+
148+
public abstract Builder setFromSnapshotInclusive(@Nullable Long fromInclusive);
149+
150+
public abstract Builder setFromSnapshotRefInclusive(@Nullable String ref);
151+
152+
public abstract Builder setFromSnapshotExclusive(@Nullable Long fromExclusive);
153+
154+
public abstract Builder setFromSnapshotRefExclusive(@Nullable String ref);
155+
156+
public abstract Builder setToSnapshot(@Nullable Long snapshot);
157+
158+
public abstract Builder setToSnapshotRef(@Nullable String ref);
159+
160+
public abstract Builder setTag(@Nullable String tag);
161+
162+
public abstract Builder setBranch(@Nullable String branch);
163+
164+
public abstract IcebergScanConfig build();
165+
}
166+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.io.iceberg;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.apache.beam.sdk.coders.Coder;
24+
import org.apache.beam.sdk.coders.RowCoder;
25+
import org.apache.beam.sdk.io.BoundedSource;
26+
import org.apache.beam.sdk.options.PipelineOptions;
27+
import org.apache.beam.sdk.transforms.display.DisplayData;
28+
import org.apache.beam.sdk.values.Row;
29+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
30+
import org.apache.iceberg.BaseCombinedScanTask;
31+
import org.apache.iceberg.CombinedScanTask;
32+
import org.apache.iceberg.TableProperties;
33+
import org.apache.iceberg.TableScan;
34+
import org.apache.iceberg.io.CloseableIterable;
35+
36+
/**
37+
* Source that reads all the data in a table described by an IcebergScanConfig. Supports only
38+
* initial spliting.
39+
*/
40+
class ScanSource extends BoundedSource<Row> {
41+
42+
private IcebergScanConfig scanConfig;
43+
44+
public ScanSource(IcebergScanConfig scanConfig) {
45+
this.scanConfig = scanConfig;
46+
}
47+
48+
private TableScan getTableScan() {
49+
TableScan tableScan =
50+
scanConfig
51+
.getTable()
52+
.newScan()
53+
.project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema()));
54+
55+
if (scanConfig.getFilter() != null) {
56+
tableScan = tableScan.filter(scanConfig.getFilter());
57+
}
58+
if (scanConfig.getCaseSensitive() != null) {
59+
tableScan = tableScan.caseSensitive(scanConfig.getCaseSensitive());
60+
}
61+
if (scanConfig.getSnapshot() != null) {
62+
tableScan = tableScan.useSnapshot(scanConfig.getSnapshot());
63+
}
64+
if (scanConfig.getBranch() != null) {
65+
tableScan = tableScan.useRef(scanConfig.getBranch());
66+
} else if (scanConfig.getTag() != null) {
67+
tableScan = tableScan.useRef(scanConfig.getTag());
68+
}
69+
70+
return tableScan;
71+
}
72+
73+
private CombinedScanTask wholeTableReadTask() {
74+
// Always project to our destination schema
75+
return new BaseCombinedScanTask(ImmutableList.copyOf(getTableScan().planFiles()));
76+
}
77+
78+
@Override
79+
public List<? extends BoundedSource<Row>> split(
80+
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
81+
ArrayList<ScanTaskSource> splits = new ArrayList<>();
82+
83+
switch (scanConfig.getScanType()) {
84+
case TABLE:
85+
TableScan tableScan = getTableScan();
86+
if (desiredBundleSizeBytes > 0) {
87+
tableScan =
88+
tableScan.option(TableProperties.SPLIT_SIZE, Long.toString(desiredBundleSizeBytes));
89+
}
90+
91+
try (CloseableIterable<CombinedScanTask> tasks = tableScan.planTasks()) {
92+
for (CombinedScanTask combinedScanTask : tasks) {
93+
splits.add(new ScanTaskSource(scanConfig, combinedScanTask));
94+
}
95+
} catch (IOException e) {
96+
throw new RuntimeException(e);
97+
}
98+
break;
99+
case BATCH:
100+
throw new UnsupportedOperationException("BATCH scan not supported");
101+
default:
102+
throw new UnsupportedOperationException("Unknown scan type: " + scanConfig.getScanType());
103+
}
104+
105+
return splits;
106+
}
107+
108+
@Override
109+
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
110+
return wholeTableReadTask().sizeBytes();
111+
}
112+
113+
@Override
114+
public void populateDisplayData(DisplayData.Builder builder) {
115+
super.populateDisplayData(builder);
116+
}
117+
118+
@Override
119+
public Coder<Row> getOutputCoder() {
120+
return RowCoder.of(scanConfig.getSchema());
121+
}
122+
123+
@Override
124+
public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
125+
return new ScanTaskReader(new ScanTaskSource(scanConfig, wholeTableReadTask()));
126+
}
127+
}

0 commit comments

Comments
 (0)