Skip to content

Commit 8eef4a3

Browse files
authored
[Managed Iceberg] add GiB autosharding (#32612) (#32663)
* [Managed Iceberg] add GiB autosharding * trigger iceberg integration tests * fix test * add to CHANGES.md * increase GiB limits * increase GiB limits * data file size distribution metric; max file size 512mb
1 parent cca83d2 commit 8eef4a3

9 files changed

Lines changed: 149 additions & 178 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959

6060
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
6161
* [Managed Iceberg] Added support for streaming writes ([#32451](https://github.com/apache/beam/pull/32451))
62+
* [Managed Iceberg] Added auto-sharding for streaming writes ([#32612](https://github.com/apache/beam/pull/32612))
6263
* [Managed Iceberg] Added support for writing to dynamic destinations ([#32565](https://github.com/apache/beam/pull/32565))
6364

6465
## New Features / Improvements

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void processElement(
105105
}
106106
update.commit();
107107
Snapshot snapshot = table.currentSnapshot();
108-
LOG.info("Created new snapshot for table '{}': {}.", element.getKey(), snapshot);
108+
LOG.info("Created new snapshot for table '{}': {}", element.getKey(), snapshot);
109109
snapshotsCreated.inc();
110110
out.outputWithTimestamp(
111111
KV.of(element.getKey(), SnapshotInfo.fromSnapshot(snapshot)), window.maxTimestamp());

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinations.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg;
1919

20-
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DATA;
21-
import static org.apache.beam.sdk.io.iceberg.WriteToDestinations.DEST;
22-
23-
import org.apache.beam.sdk.schemas.Schema;
20+
import org.apache.beam.sdk.coders.KvCoder;
21+
import org.apache.beam.sdk.coders.RowCoder;
22+
import org.apache.beam.sdk.coders.StringUtf8Coder;
2423
import org.apache.beam.sdk.transforms.DoFn;
2524
import org.apache.beam.sdk.transforms.PTransform;
2625
import org.apache.beam.sdk.transforms.ParDo;
2726
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2827
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
28+
import org.apache.beam.sdk.values.KV;
2929
import org.apache.beam.sdk.values.PCollection;
3030
import org.apache.beam.sdk.values.Row;
3131
import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -37,7 +37,7 @@
3737
* <p>The output record will have the format { dest: ..., data: ...} where the dest field has the
3838
* assigned metadata and the data field has the original row.
3939
*/
40-
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<Row>> {
40+
class AssignDestinations extends PTransform<PCollection<Row>, PCollection<KV<String, Row>>> {
4141

4242
private final DynamicDestinations dynamicDestinations;
4343

@@ -46,34 +46,27 @@ public AssignDestinations(DynamicDestinations dynamicDestinations) {
4646
}
4747

4848
@Override
49-
public PCollection<Row> expand(PCollection<Row> input) {
50-
51-
final Schema outputSchema =
52-
Schema.builder()
53-
.addStringField(DEST)
54-
.addRowField(DATA, dynamicDestinations.getDataSchema())
55-
.build();
56-
49+
public PCollection<KV<String, Row>> expand(PCollection<Row> input) {
5750
return input
5851
.apply(
5952
ParDo.of(
60-
new DoFn<Row, Row>() {
53+
new DoFn<Row, KV<String, Row>>() {
6154
@ProcessElement
6255
public void processElement(
6356
@Element Row element,
6457
BoundedWindow window,
6558
PaneInfo paneInfo,
6659
@Timestamp Instant timestamp,
67-
OutputReceiver<Row> out) {
60+
OutputReceiver<KV<String, Row>> out) {
6861
String tableIdentifier =
6962
dynamicDestinations.getTableStringIdentifier(
7063
ValueInSingleWindow.of(element, timestamp, window, paneInfo));
7164
Row data = dynamicDestinations.getData(element);
7265

73-
out.output(
74-
Row.withSchema(outputSchema).addValues(tableIdentifier, data).build());
66+
out.output(KV.of(tableIdentifier, data));
7567
}
7668
}))
77-
.setRowSchema(outputSchema);
69+
.setCoder(
70+
KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema())));
7871
}
7972
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.io.iceberg;
1919

20-
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2120
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2221

2322
import com.google.auto.value.AutoValue;
@@ -28,12 +27,6 @@
2827
import org.apache.beam.sdk.managed.Managed;
2928
import org.apache.beam.sdk.schemas.Schema;
3029
import org.apache.beam.sdk.transforms.PTransform;
31-
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
32-
import org.apache.beam.sdk.transforms.windowing.AfterPane;
33-
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
34-
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
35-
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
36-
import org.apache.beam.sdk.transforms.windowing.Window;
3730
import org.apache.beam.sdk.values.PBegin;
3831
import org.apache.beam.sdk.values.PCollection;
3932
import org.apache.beam.sdk.values.Row;
@@ -288,7 +281,6 @@ public static WriteRows writeRows(IcebergCatalogConfig catalog) {
288281

289282
@AutoValue
290283
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
291-
private static final int TRIGGERING_RECORD_COUNT = 50_000;
292284

293285
abstract IcebergCatalogConfig getCatalogConfig();
294286

@@ -322,12 +314,14 @@ public WriteRows to(DynamicDestinations destinations) {
322314
}
323315

324316
/**
325-
* Sets the frequency at which data is committed and a new {@link org.apache.iceberg.Snapshot}
326-
* is produced.
317+
* Sets the frequency at which data is written to files and a new {@link
318+
* org.apache.iceberg.Snapshot} is produced.
327319
*
328-
* <p>Roughly every triggeringFrequency duration, this connector will try to accumulate all
329-
* {@link org.apache.iceberg.ManifestFile}s and commit them to the table as appended files. Each
330-
* commit results in a new table {@link org.apache.iceberg.Snapshot}.
320+
* <p>Roughly every triggeringFrequency duration, records are written to data files and appended
321+
* to the respective table. Each append operation created a new table snapshot.
322+
*
323+
* <p>Generally speaking, increasing this duration will result in fewer, larger data files and
324+
* fewer snapshots.
331325
*
332326
* <p>This is only applicable when writing an unbounded {@link PCollection} (i.e. a streaming
333327
* pipeline).
@@ -350,34 +344,13 @@ public IcebergWriteResult expand(PCollection<Row> input) {
350344
Preconditions.checkNotNull(getTableIdentifier()), input.getSchema());
351345
}
352346

353-
// Assign destinations before re-windowing to global because
347+
// Assign destinations before re-windowing to global in WriteToDestinations because
354348
// user's dynamic destination may depend on windowing properties
355-
PCollection<Row> assignedRows =
356-
input.apply("Set Destination Metadata", new AssignDestinations(destinations));
357-
358-
if (assignedRows.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
359-
Duration triggeringFrequency = getTriggeringFrequency();
360-
checkArgumentNotNull(
361-
triggeringFrequency, "Streaming pipelines must set a triggering frequency.");
362-
assignedRows =
363-
assignedRows.apply(
364-
"WindowIntoGlobal",
365-
Window.<Row>into(new GlobalWindows())
366-
.triggering(
367-
Repeatedly.forever(
368-
AfterFirst.of(
369-
AfterProcessingTime.pastFirstElementInPane()
370-
.plusDelayOf(triggeringFrequency),
371-
AfterPane.elementCountAtLeast(TRIGGERING_RECORD_COUNT))))
372-
.discardingFiredPanes());
373-
} else {
374-
Preconditions.checkArgument(
375-
getTriggeringFrequency() == null,
376-
"Triggering frequency is only applicable for streaming pipelines.");
377-
}
378-
return assignedRows.apply(
379-
"Write Rows to Destinations",
380-
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
349+
return input
350+
.apply("Assign Table Destinations", new AssignDestinations(destinations))
351+
.apply(
352+
"Write Rows to Destinations",
353+
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
381354
}
382355
}
383356

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import org.apache.beam.sdk.metrics.Counter;
22+
import org.apache.beam.sdk.metrics.Distribution;
2223
import org.apache.beam.sdk.metrics.Metrics;
2324
import org.apache.iceberg.DataFile;
2425
import org.apache.iceberg.FileFormat;
@@ -38,6 +39,8 @@ class RecordWriter {
3839
private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
3940
private final Counter activeIcebergWriters =
4041
Metrics.counter(RecordWriterManager.class, "activeIcebergWriters");
42+
private final Distribution dataFileByteSize =
43+
Metrics.distribution(RecordWriter.class, "dataFileByteSize");
4144
private final DataWriter<Record> icebergDataWriter;
4245
private final Table table;
4346
private final String absoluteFilename;
@@ -95,7 +98,7 @@ class RecordWriter {
9598
}
9699
activeIcebergWriters.inc();
97100
LOG.info(
98-
"Opened {} writer for table {}, partition {}. Writing to path: {}",
101+
"Opened {} writer for table '{}', partition {}. Writing to path: {}",
99102
fileFormat,
100103
table.name(),
101104
partitionKey,
@@ -117,7 +120,15 @@ public void close() throws IOException {
117120
e);
118121
}
119122
activeIcebergWriters.dec();
120-
LOG.info("Closed {} writer for table {}, path: {}", fileFormat, table.name(), absoluteFilename);
123+
DataFile dataFile = icebergDataWriter.toDataFile();
124+
LOG.info(
125+
"Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
126+
fileFormat,
127+
table.name(),
128+
dataFile.recordCount(),
129+
dataFile.fileSizeInBytes(),
130+
absoluteFilename);
131+
dataFileByteSize.update(dataFile.fileSizeInBytes());
121132
}
122133

123134
public long bytesWritten() {

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.apache.beam.sdk.transforms.ParDo;
2525
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2626
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
27+
import org.apache.beam.sdk.util.ShardedKey;
2728
import org.apache.beam.sdk.util.WindowedValue;
2829
import org.apache.beam.sdk.values.KV;
2930
import org.apache.beam.sdk.values.PCollection;
3031
import org.apache.beam.sdk.values.Row;
31-
import org.apache.beam.sdk.values.ShardedKey;
3232
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
3333
import org.apache.iceberg.ManifestFile;
3434
import org.apache.iceberg.catalog.Catalog;
@@ -38,7 +38,7 @@ class WriteGroupedRowsToFiles
3838
extends PTransform<
3939
PCollection<KV<ShardedKey<String>, Iterable<Row>>>, PCollection<FileWriteResult>> {
4040

41-
static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 40); // 1TB
41+
private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb
4242

4343
private final DynamicDestinations dynamicDestinations;
4444
private final IcebergCatalogConfig catalogConfig;

0 commit comments

Comments
 (0)