Conversation
e326373 to
fe75b6b
Compare
1778e5f to
e21a27c
Compare
4b970c2 to
019340d
Compare
| if (indexer == null) { | ||
| // Reuse indexer if phase 1 already created one. | ||
| // Phase 1 mutates indexer.innerTypes on new columns creation. | ||
| // Phase 2 must be aware of all mapping updates. |
There was a problem hiding this comment.
More context on this:
After first phase (collect schema updates), Indexer's target columns can be updated in 2 ways
- New columns are added to the schema == some target columns got OID-s. We will receive them in the AddColumnActions's response and use them in a follow up.
- Existing object columns can get some new sub-columns. Reusing instance is just an easier way to make phase 1's parent object column to be aware of the new sub-columns. Otherwise we need to find a way to replace ALL target refs after performing
addColumnActionwhich might be tricky.
In the object indexer we already cache innerIndexer in a map on master branch, so both phases share same indexers.
019340d to
1787c09
Compare
|
Hi @seut, could you please take a look to a pre-requisite for using OID-s in the source? Problem this PR tries to solve: We cannot just use reference OID instead of name in the Indexer since there can be dynamically added columns and they don't have OID yet. On master we do a mapping update after creating a Lucene doc with the source. As discussed internally, one of the ways to tackle this would be splitting indexing into 2 phases: collect schema updates and update the mapping --> only after that create a Lucene document. Brief description of the implementation:
Had some tricky failures, for example generated columns apparently can update the schema and we don't have tests on master for that, added such case and added handling of synthetics. |
mfussenegger
left a comment
There was a problem hiding this comment.
Left a couple of suggestions, mostly around docs.
Overall looks good to me - although I think having the Reference updating would be important to get the full picture
server/src/main/java/io/crate/execution/dml/DynamicIndexer.java
Outdated
Show resolved
Hide resolved
server/src/main/java/io/crate/execution/dml/upsert/TransportShardUpsertAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/io/crate/execution/dml/upsert/TransportShardUpsertAction.java
Outdated
Show resolved
Hide resolved
|
Could you also share numbers of a |
|
Update:
Got different results on 2 forks: run 1: things got worse for concurrent inserts (second and fourth queries)run 2: other way around, things got better for concurrent inserts (second and fourth queries)Not sure how interpret this - opposite results for concurrent results, single thread insert looks almost same. |
|
Results of running modified IndexerBenchmark: Looks like there' is an overhead (7%). Will try to do some micro optimizations as a follow up. |
mfussenegger
left a comment
There was a problem hiding this comment.
Changing it to actually use the oid when writing the source and in the fields will be a follow up, right?
| /** | ||
| * @return **only** new columns (with assigned OID-s). | ||
| */ |
There was a problem hiding this comment.
| /** | |
| * @return **only** new columns (with assigned OID-s). | |
| */ | |
| /** | |
| * @return columns added by the last {@link #execute} call. Columns part of the {@link AddColumnRequest} which already existed are not included. | |
| */ |
And add a addedColumns.clear() in the start of .execute to make sure it could be called multiple times.
Currently not necessary given how it's used, but to match the docs. It's easier to explain this way than to say that it can only be used once.
| // We cannot use normalizedColumns for building response since it may contain existing columns: | ||
| // parents of a sub-column or already existing reference, taken from the cluster state. | ||
| List<ColumnIdent> newColumnIdents = new ArrayList<>(); | ||
| Consumer<Reference> addNewColumn = (ref) -> newColumnIdents.add(ref.column()); | ||
|
|
There was a problem hiding this comment.
Couldn't we do something like addedColumns = request.references().stream().filter(ref -> currentTable.getReference(ref.column()) == null).toList(); to avoid having to change the createMapping function?
There was a problem hiding this comment.
Thank for the idea, reverted createMapping changes.
One remark - we cannot gather cols here as they don't have oids. I modified the suggestion to gather idents and those idents used at the end to get columns with oids . 5260e9e
| public void init(Version minNodeVersion) { | ||
| if (minNodeVersion.onOrAfter(Version.V_5_5_0)) { | ||
| reader = AddColumnResponse::new; | ||
| ackedResponseFunction = AddColumnResponse::new; | ||
| } else { | ||
| reader = AcknowledgedResponse::new; | ||
| ackedResponseFunction = (acked, ignored) -> new AcknowledgedResponse(acked); | ||
| } | ||
| } |
There was a problem hiding this comment.
| public void init(Version minNodeVersion) { | |
| if (minNodeVersion.onOrAfter(Version.V_5_5_0)) { | |
| reader = AddColumnResponse::new; | |
| ackedResponseFunction = AddColumnResponse::new; | |
| } else { | |
| reader = AcknowledgedResponse::new; | |
| ackedResponseFunction = (acked, ignored) -> new AcknowledgedResponse(acked); | |
| } | |
| } |
Could check the version on use. E.g.:
@Override
protected AcknowledgedResponse read(StreamInput in) throws IOException {
- return reader.read(in);
+ return clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_5_5_0)
+ ? new AddColumnResponse(in)
+ : new AcknowledgedResponse(in);
}
And:
clusterService.submitStateUpdateTask("add-column",
new AckedClusterStateUpdateTask<>(Priority.HIGH, request, listener) {
- private List<Reference> addedColumns = new ArrayList<>();
-
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
- ClusterState updatedState = addColumnTask.execute(currentState, request);
- addedColumns = addColumnTask.addedColumns();
- return updatedState;
+ return addColumnTask.execute(currentState, request);
}
@Override
protected AcknowledgedResponse newResponse(boolean acknowledged) {
- return ackedResponseFunction.apply(acknowledged, addedColumns);
+ return clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_5_5_0)
+ ? new AddColumnResponse(acknowledged, addColumnTask.addedColumns())
+ : new AcknowledgedResponse(acknowledged);
}
});
}Avoids the special handling in Node
There was a problem hiding this comment.
Thanks, look simpler indeed. Applied in 00b4d20
| this.columns = targetColumns; | ||
| this.synthetics = new HashMap<>(); | ||
| this.stream = new BytesStreamOutput(); | ||
| this.targetsHaveOids = table.versionCreated().onOrAfter(Version.V_5_5_0) && minNodeVersion.onOrAfter(Version.V_5_5_0); |
There was a problem hiding this comment.
I don't think it's safe to use this condition to decide if oid indexing should be used.
A mixed cluster is temporary, so you can end up with a table that has some documents indexed with oids and some without.
I think to avoid the streaming dependency we could look up the oids in the local state? The cluster state should have updated locally too if we got a response from the add-column action.
There was a problem hiding this comment.
As discussed internally in https://crate.slack.com/archives/CBPLN6WJK/p1693396898974569,
we can only rely on Version.created.
Done in b45283b.
Also, in a follow up PR, where I will be actually using OID, we will have kind of Function<Reference, String> sourceKeyWriter, which would be either ref -> ref.column.leafName or Long.toString(ref.oid()), depending on version.created
cc @seut
| /** | ||
| * Indicates whether inserts should use OID in source. | ||
| * @return true if: | ||
| * <ol> | ||
| * <li>Table is created on Version on or after 5.5, ie we are not inserting into an old table, restored from a snapshot.</li> | ||
| * <li>All cluster nodes support updating schema with getting OIDs in response.</li> | ||
| * </ol> | ||
| */ | ||
| public boolean targetsHaveOids() { | ||
| return targetsHaveOids; | ||
| } |
There was a problem hiding this comment.
| /** | |
| * Indicates whether inserts should use OID in source. | |
| * @return true if: | |
| * <ol> | |
| * <li>Table is created on Version on or after 5.5, ie we are not inserting into an old table, restored from a snapshot.</li> | |
| * <li>All cluster nodes support updating schema with getting OIDs in response.</li> | |
| * </ol> | |
| */ | |
| public boolean targetsHaveOids() { | |
| return targetsHaveOids; | |
| } |
Could have something like this instead in the TransportShardUpsertAction:
if (response instanceof AddColumnResponse addColumnResponse) {
indexer.updateTargets(addColumnResponse.addedColumns());
}
"User needs to call updateTargets after columns from collectSchemaUpdates were added to the cluster state" is a simpler contract than making it aware of the details.
But see the comment further above about the targetsHaveOids condition. I think we'll have to change the approach anyway.
| public SymbolAssert isReference(ColumnIdent expectedColumn, | ||
| RelationName expectedRelName, | ||
| DataType<?> expectedType, | ||
| long oid) { | ||
| assertThat(((Reference) actual).oid()).isEqualTo(oid); | ||
| return isReference(expectedColumn, expectedRelName, expectedType); | ||
| } | ||
|
|
There was a problem hiding this comment.
I think instead of adding a ton of different overloads we should make it more composable.
E.g. we could have something like:
.isReference() // returns `ReferenceAssert`
.hasOid(oid)
.hasType(...)
.hasName(...)
Yep, started in #14629. |
Co-authored-by: Mathias Fußenegger <mfussenegger@users.noreply.github.com>
…sk.java Co-authored-by: Mathias Fußenegger <mfussenegger@users.noreply.github.com>
…dColumnAction.java Co-authored-by: Mathias Fußenegger <mfussenegger@users.noreply.github.com>
Co-authored-by: Mathias Fußenegger <mfussenegger@users.noreply.github.com>
|
UPD: Probably was temporal hiccup, tried to add 300 iterations and run with the same seed but it passes. drop-column brach was broken after merging #14632 I fixed the branch but this one got stale/messy, created #14635 which squashed already reviewed comments and don't have conflicts with the feature branch. |
cd6e29f to
432338c
Compare
This is a pre-requisite for using column OID-s instead of names in the _source.
The way we index documents is not changed, only split into 2 phases.
Phase 1: Collect new columns and do a schema update. Don't create a Lucene document with source at this point.
Phase 2: Index. Currently target references still might have unassigned OID since this is only a preparation step.
In a follow up,
addColumnActionwill return OIDs in response, corresponding target references will be updated and then indexing (Phase 2 after merging this PR) can be done with OID-s as all columns will have OID-s assigned.TODO