Skip to content

Commit d6e0b0c

Browse files
authored
Add support for Iceberg table identifiers with special characters (#33293)
* Add support for Iceberg table identifiers with special characters * Add used undeclared dependencies * Fix style * Trigger iceberg integration tests
1 parent bbccf52 commit d6e0b0c

11 files changed

Lines changed: 94 additions & 18 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 2
3+
"modification": 3
44
}

sdks/java/io/iceberg/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ dependencies {
5555
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
5656
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
5757
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
58-
implementation library.java.hadoop_common
5958
runtimeOnly "org.apache.iceberg:iceberg-gcp:$iceberg_version"
59+
implementation library.java.hadoop_common
60+
implementation library.java.jackson_core
61+
implementation library.java.jackson_databind
6062

6163
testImplementation project(":sdks:java:managed")
6264
testImplementation library.java.hadoop_client

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.iceberg.Snapshot;
4848
import org.apache.iceberg.Table;
4949
import org.apache.iceberg.catalog.Catalog;
50-
import org.apache.iceberg.catalog.TableIdentifier;
5150
import org.apache.iceberg.io.FileIO;
5251
import org.apache.iceberg.io.OutputFile;
5352
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -134,7 +133,7 @@ public void processElement(
134133
return;
135134
}
136135

137-
Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
136+
Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));
138137

139138
// vast majority of the time, we will simply append data files.
140139
// in the rare case we get a batch that contains multiple partition specs, we will group

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iceberg.DataFile;
2626
import org.apache.iceberg.PartitionSpec;
2727
import org.apache.iceberg.catalog.TableIdentifier;
28+
import org.apache.iceberg.catalog.TableIdentifierParser;
2829
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
2930

3031
@AutoValue
@@ -41,7 +42,7 @@ abstract class FileWriteResult {
4142
@SchemaIgnore
4243
public TableIdentifier getTableIdentifier() {
4344
if (cachedTableIdentifier == null) {
44-
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
45+
cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString());
4546
}
4647
return cachedTableIdentifier;
4748
}
@@ -67,7 +68,7 @@ abstract static class Builder {
6768

6869
@SchemaIgnore
6970
public Builder setTableIdentifier(TableIdentifier tableId) {
70-
return setTableIdentifierString(tableId.toString());
71+
return setTableIdentifierString(TableIdentifierParser.toJson(tableId));
7172
}
7273

7374
public abstract FileWriteResult build();

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.beam.sdk.values.PCollection;
3232
import org.apache.beam.sdk.values.PCollectionRowTuple;
3333
import org.apache.beam.sdk.values.Row;
34-
import org.apache.iceberg.catalog.TableIdentifier;
3534

3635
/**
3736
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
@@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
8685
.getPipeline()
8786
.apply(
8887
IcebergIO.readRows(configuration.getIcebergCatalog())
89-
.from(TableIdentifier.parse(configuration.getTable())));
88+
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));
9089

9190
return PCollectionRowTuple.of(OUTPUT_TAG, output);
9291
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
2424
import org.apache.iceberg.Table;
2525
import org.apache.iceberg.catalog.TableIdentifier;
26+
import org.apache.iceberg.catalog.TableIdentifierParser;
2627
import org.apache.iceberg.expressions.Expression;
2728
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
2829
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -51,7 +52,9 @@ public enum ScanType {
5152
public Table getTable() {
5253
if (cachedTable == null) {
5354
cachedTable =
54-
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
55+
getCatalogConfig()
56+
.catalog()
57+
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
5558
}
5659
return cachedTable;
5760
}
@@ -126,7 +129,7 @@ public abstract static class Builder {
126129
public abstract Builder setTableIdentifier(String tableIdentifier);
127130

128131
public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
129-
return this.setTableIdentifier(tableIdentifier.toString());
132+
return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier));
130133
}
131134

132135
public Builder setTableIdentifier(String... names) {

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2121

22+
import com.fasterxml.jackson.core.JsonProcessingException;
23+
import com.fasterxml.jackson.databind.JsonNode;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
2225
import java.nio.ByteBuffer;
2326
import java.time.LocalDate;
2427
import java.time.LocalDateTime;
@@ -36,6 +39,8 @@
3639
import org.apache.beam.sdk.values.Row;
3740
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
3841
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
42+
import org.apache.iceberg.catalog.TableIdentifier;
43+
import org.apache.iceberg.catalog.TableIdentifierParser;
3944
import org.apache.iceberg.data.GenericRecord;
4045
import org.apache.iceberg.data.Record;
4146
import org.apache.iceberg.types.Type;
@@ -47,6 +52,9 @@
4752

4853
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
4954
public class IcebergUtils {
55+
56+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
57+
5058
private IcebergUtils() {}
5159

5260
private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
@@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
506514
// LocalDateTime, LocalDate, LocalTime
507515
return icebergValue;
508516
}
517+
518+
public static TableIdentifier parseTableIdentifier(String table) {
519+
try {
520+
JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
521+
return TableIdentifierParser.fromJson(jsonNode);
522+
} catch (JsonProcessingException e) {
523+
return TableIdentifier.parse(table);
524+
}
525+
}
509526
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable
4141
@VisibleForTesting
4242
TableIdentifier getTableIdentifier() {
4343
if (tableId == null) {
44-
tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
44+
tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
4545
}
4646
return tableId;
4747
}
@@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException {
8686
@Override
8787
public void readExternal(ObjectInput in) throws IOException {
8888
tableIdString = in.readUTF();
89-
tableId = TableIdentifier.parse(tableIdString);
89+
tableId = IcebergUtils.parseTableIdentifier(tableIdString);
9090
}
9191
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.beam.sdk.values.Row;
2525
import org.apache.beam.sdk.values.ValueInSingleWindow;
2626
import org.apache.iceberg.FileFormat;
27-
import org.apache.iceberg.catalog.TableIdentifier;
2827
import org.checkerframework.checker.nullness.qual.Nullable;
2928

3029
class PortableIcebergDestinations implements DynamicDestinations {
@@ -73,7 +72,7 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
7372
@Override
7473
public IcebergDestination instantiateDestination(String dest) {
7574
return IcebergDestination.builder()
76-
.setTableIdentifier(TableIdentifier.parse(dest))
75+
.setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
7776
.setTableCreateConfig(null)
7877
.setFileFormat(FileFormat.fromString(fileFormat))
7978
.build();

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.hamcrest.MatcherAssert.assertThat;
2222
import static org.hamcrest.Matchers.containsInAnyOrder;
2323

24+
import java.util.Arrays;
25+
import java.util.Collection;
2426
import java.util.List;
2527
import java.util.Map;
2628
import java.util.UUID;
@@ -46,11 +48,11 @@
4648
import org.junit.Test;
4749
import org.junit.rules.TemporaryFolder;
4850
import org.junit.runner.RunWith;
49-
import org.junit.runners.JUnit4;
51+
import org.junit.runners.Parameterized;
5052
import org.slf4j.Logger;
5153
import org.slf4j.LoggerFactory;
5254

53-
@RunWith(JUnit4.class)
55+
@RunWith(Parameterized.class)
5456
public class IcebergIOReadTest {
5557

5658
private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class);
@@ -61,6 +63,21 @@ public class IcebergIOReadTest {
6163

6264
@Rule public TestPipeline testPipeline = TestPipeline.create();
6365

66+
@Parameterized.Parameters
67+
public static Collection<Object[]> data() {
68+
return Arrays.asList(
69+
new Object[][] {
70+
{String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())},
71+
{String.format("default.%s", tableId())},
72+
});
73+
}
74+
75+
public static String tableId() {
76+
return "table" + Long.toString(UUID.randomUUID().hashCode(), 16);
77+
}
78+
79+
@Parameterized.Parameter public String tableStringIdentifier;
80+
6481
static class PrintRow extends DoFn<Row, Row> {
6582

6683
@ProcessElement
@@ -72,8 +89,7 @@ public void process(@Element Row row, OutputReceiver<Row> output) throws Excepti
7289

7390
@Test
7491
public void testSimpleScan() throws Exception {
75-
TableIdentifier tableId =
76-
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
92+
TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier);
7793
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
7894
final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
7995

0 commit comments

Comments
 (0)