Skip to content

Commit 6cb188b

Browse files
committed
Support SQL MERGE in the Iceberg connector
1 parent 435d100 commit 6cb188b

13 files changed

Lines changed: 858 additions & 76 deletions

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ public class IcebergColumnHandle
3636
implements ColumnHandle
3737
{
3838
// Iceberg reserved row ids begin at INTEGER.MAX_VALUE and count down. Starting with MIN_VALUE here to avoid conflicts.
39-
public static final int TRINO_UPDATE_ROW_ID_COLUMN_ID = Integer.MIN_VALUE;
40-
public static final String TRINO_UPDATE_ROW_ID_COLUMN_NAME = "$row_id";
39+
public static final int TRINO_UPDATE_ROW_ID = Integer.MIN_VALUE;
40+
public static final int TRINO_MERGE_ROW_ID = Integer.MIN_VALUE + 1;
41+
public static final String TRINO_ROW_ID_NAME = "$row_id";
42+
43+
public static final int TRINO_MERGE_FILE_RECORD_COUNT = Integer.MIN_VALUE + 2;
44+
public static final int TRINO_MERGE_PARTITION_SPEC_ID = Integer.MIN_VALUE + 3;
45+
public static final int TRINO_MERGE_PARTITION_DATA = Integer.MIN_VALUE + 4;
4146

4247
private final ColumnIdentity baseColumnIdentity;
4348
private final Type baseType;
@@ -157,7 +162,13 @@ public boolean isRowPositionColumn()
157162
@JsonIgnore
158163
public boolean isUpdateRowIdColumn()
159164
{
160-
return id == TRINO_UPDATE_ROW_ID_COLUMN_ID;
165+
return id == TRINO_UPDATE_ROW_ID;
166+
}
167+
168+
@JsonIgnore
169+
public boolean isMergeRowIdColumn()
170+
{
171+
return id == TRINO_MERGE_ROW_ID;
161172
}
162173

163174
/**
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import com.google.common.collect.ImmutableList;
17+
import com.google.common.collect.ImmutableMap;
18+
import io.airlift.json.JsonCodec;
19+
import io.airlift.slice.Slice;
20+
import io.trino.plugin.hive.HdfsEnvironment;
21+
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
22+
import io.trino.plugin.iceberg.delete.IcebergPositionDeletePageSink;
23+
import io.trino.spi.Page;
24+
import io.trino.spi.PageBuilder;
25+
import io.trino.spi.block.ColumnarRow;
26+
import io.trino.spi.connector.ConnectorMergeSink;
27+
import io.trino.spi.connector.ConnectorPageSink;
28+
import io.trino.spi.connector.ConnectorSession;
29+
import io.trino.spi.connector.MergePage;
30+
import io.trino.spi.type.VarcharType;
31+
import org.apache.iceberg.PartitionSpec;
32+
import org.apache.iceberg.Schema;
33+
import org.apache.iceberg.io.LocationProvider;
34+
import org.apache.iceberg.types.Type;
35+
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
36+
import org.roaringbitmap.longlong.LongBitmapDataProvider;
37+
import org.roaringbitmap.longlong.Roaring64Bitmap;
38+
39+
import java.util.ArrayList;
40+
import java.util.Collection;
41+
import java.util.HashMap;
42+
import java.util.List;
43+
import java.util.Map;
44+
import java.util.Optional;
45+
import java.util.concurrent.CompletableFuture;
46+
47+
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
48+
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
49+
import static io.trino.spi.connector.MergePage.createDeleteAndInsertPages;
50+
import static io.trino.spi.type.BigintType.BIGINT;
51+
import static io.trino.spi.type.IntegerType.INTEGER;
52+
import static java.lang.Math.toIntExact;
53+
import static java.util.Objects.requireNonNull;
54+
import static java.util.concurrent.CompletableFuture.completedFuture;
55+
56+
public class IcebergMergeSink
57+
implements ConnectorMergeSink
58+
{
59+
private final LocationProvider locationProvider;
60+
private final IcebergFileWriterFactory fileWriterFactory;
61+
private final HdfsEnvironment hdfsEnvironment;
62+
private final FileIoProvider fileIoProvider;
63+
private final JsonCodec<CommitTaskData> jsonCodec;
64+
private final ConnectorSession session;
65+
private final IcebergFileFormat fileFormat;
66+
private final Map<String, String> storageProperties;
67+
private final Schema schema;
68+
private final Map<Integer, PartitionSpec> partitionsSpecs;
69+
private final ConnectorPageSink insertPageSink;
70+
private final int columnCount;
71+
private final Map<Slice, FileDeletion> fileDeletions = new HashMap<>();
72+
73+
public IcebergMergeSink(
74+
LocationProvider locationProvider,
75+
IcebergFileWriterFactory fileWriterFactory,
76+
HdfsEnvironment hdfsEnvironment,
77+
FileIoProvider fileIoProvider,
78+
JsonCodec<CommitTaskData> jsonCodec,
79+
ConnectorSession session,
80+
IcebergFileFormat fileFormat,
81+
Map<String, String> storageProperties,
82+
Schema schema,
83+
Map<Integer, PartitionSpec> partitionsSpecs,
84+
ConnectorPageSink insertPageSink,
85+
int columnCount)
86+
{
87+
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
88+
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
89+
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
90+
this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null");
91+
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
92+
this.session = requireNonNull(session, "session is null");
93+
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
94+
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
95+
this.schema = requireNonNull(schema, "schema is null");
96+
this.partitionsSpecs = ImmutableMap.copyOf(requireNonNull(partitionsSpecs, "partitionsSpecs is null"));
97+
this.insertPageSink = requireNonNull(insertPageSink, "insertPageSink is null");
98+
this.columnCount = columnCount;
99+
}
100+
101+
@Override
102+
public void storeMergedRows(Page page)
103+
{
104+
MergePage mergePage = createDeleteAndInsertPages(page, columnCount);
105+
106+
mergePage.getInsertionsPage().ifPresent(insertPageSink::appendPage);
107+
108+
mergePage.getDeletionsPage().ifPresent(deletions -> {
109+
ColumnarRow rowIdRow = toColumnarRow(deletions.getBlock(deletions.getChannelCount() - 1));
110+
111+
for (int position = 0; position < rowIdRow.getPositionCount(); position++) {
112+
Slice filePath = VarcharType.VARCHAR.getSlice(rowIdRow.getField(0), position);
113+
long rowPosition = BIGINT.getLong(rowIdRow.getField(1), position);
114+
115+
int index = position;
116+
FileDeletion deletion = fileDeletions.computeIfAbsent(filePath, ignored -> {
117+
long fileRecordCount = BIGINT.getLong(rowIdRow.getField(2), index);
118+
int partitionSpecId = toIntExact(INTEGER.getLong(rowIdRow.getField(3), index));
119+
String partitionData = VarcharType.VARCHAR.getSlice(rowIdRow.getField(4), index).toStringUtf8();
120+
return new FileDeletion(partitionSpecId, partitionData, fileRecordCount);
121+
});
122+
123+
deletion.rowsToDelete().addLong(rowPosition);
124+
}
125+
});
126+
}
127+
128+
@Override
129+
public CompletableFuture<Collection<Slice>> finish()
130+
{
131+
List<Slice> fragments = new ArrayList<>(insertPageSink.finish().join());
132+
133+
fileDeletions.forEach((dataFilePath, deletion) -> {
134+
ConnectorPageSink sink = createPositionDeletePageSink(
135+
dataFilePath.toStringUtf8(),
136+
partitionsSpecs.get(deletion.partitionSpecId()),
137+
deletion.partitionDataJson(),
138+
deletion.fileRecordCount());
139+
140+
fragments.addAll(writePositionDeletes(sink, deletion.rowsToDelete()));
141+
});
142+
143+
return completedFuture(fragments);
144+
}
145+
146+
@Override
147+
public void abort()
148+
{
149+
insertPageSink.abort();
150+
}
151+
152+
private ConnectorPageSink createPositionDeletePageSink(String dataFilePath, PartitionSpec partitionSpec, String partitionDataJson, long fileRecordCount)
153+
{
154+
Optional<PartitionData> partitionData = Optional.empty();
155+
if (partitionSpec.isPartitioned()) {
156+
Type[] columnTypes = partitionSpec.fields().stream()
157+
.map(field -> field.transform().getResultType(schema.findType(field.sourceId())))
158+
.toArray(Type[]::new);
159+
partitionData = Optional.of(PartitionData.fromJson(partitionDataJson, columnTypes));
160+
}
161+
162+
return new IcebergPositionDeletePageSink(
163+
dataFilePath,
164+
partitionSpec,
165+
partitionData,
166+
locationProvider,
167+
fileWriterFactory,
168+
hdfsEnvironment,
169+
new HdfsContext(session),
170+
fileIoProvider,
171+
jsonCodec,
172+
session,
173+
fileFormat,
174+
storageProperties,
175+
fileRecordCount);
176+
}
177+
178+
private static Collection<Slice> writePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete)
179+
{
180+
try {
181+
return doWritePositionDeletes(sink, rowsToDelete);
182+
}
183+
catch (Throwable t) {
184+
closeAllSuppress(t, sink::abort);
185+
throw t;
186+
}
187+
}
188+
189+
private static Collection<Slice> doWritePositionDeletes(ConnectorPageSink sink, ImmutableLongBitmapDataProvider rowsToDelete)
190+
{
191+
PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(BIGINT));
192+
193+
rowsToDelete.forEach(rowPosition -> {
194+
BIGINT.writeLong(pageBuilder.getBlockBuilder(0), rowPosition);
195+
pageBuilder.declarePosition();
196+
if (pageBuilder.isFull()) {
197+
sink.appendPage(pageBuilder.build());
198+
pageBuilder.reset();
199+
}
200+
});
201+
202+
if (!pageBuilder.isEmpty()) {
203+
sink.appendPage(pageBuilder.build());
204+
}
205+
206+
return sink.finish().join();
207+
}
208+
209+
private static class FileDeletion
210+
{
211+
private final int partitionSpecId;
212+
private final String partitionDataJson;
213+
private final long fileRecordCount;
214+
private final LongBitmapDataProvider rowsToDelete = new Roaring64Bitmap();
215+
216+
public FileDeletion(int partitionSpecId, String partitionDataJson, long fileRecordCount)
217+
{
218+
this.partitionSpecId = partitionSpecId;
219+
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
220+
this.fileRecordCount = fileRecordCount;
221+
}
222+
223+
public int partitionSpecId()
224+
{
225+
return partitionSpecId;
226+
}
227+
228+
public String partitionDataJson()
229+
{
230+
return partitionDataJson;
231+
}
232+
233+
public long fileRecordCount()
234+
{
235+
return fileRecordCount;
236+
}
237+
238+
public LongBitmapDataProvider rowsToDelete()
239+
{
240+
return rowsToDelete;
241+
}
242+
}
243+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.iceberg;
15+
16+
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonProperty;
18+
import io.trino.spi.connector.ConnectorMergeTableHandle;
19+
20+
import static java.util.Objects.requireNonNull;
21+
22+
public class IcebergMergeTableHandle
23+
implements ConnectorMergeTableHandle
24+
{
25+
private final IcebergTableHandle tableHandle;
26+
private final IcebergWritableTableHandle insertTableHandle;
27+
28+
@JsonCreator
29+
public IcebergMergeTableHandle(IcebergTableHandle tableHandle, IcebergWritableTableHandle insertTableHandle)
30+
{
31+
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
32+
this.insertTableHandle = requireNonNull(insertTableHandle, "insertTableHandle is null");
33+
}
34+
35+
@Override
36+
@JsonProperty
37+
public IcebergTableHandle getTableHandle()
38+
{
39+
return tableHandle;
40+
}
41+
42+
@JsonProperty
43+
public IcebergWritableTableHandle getInsertTableHandle()
44+
{
45+
return insertTableHandle;
46+
}
47+
}

0 commit comments

Comments
 (0)