Skip to content

2 phase indexing#14617

Closed
BaurzhanSakhariev wants to merge 42 commits intodrop-columnfrom
oid-in-source
Closed

2 phase indexing#14617
BaurzhanSakhariev wants to merge 42 commits intodrop-columnfrom
oid-in-source

Conversation

@BaurzhanSakhariev
Copy link
Contributor

@BaurzhanSakhariev BaurzhanSakhariev commented Aug 23, 2023

This is a pre-requisite for using column OID-s instead of names in the _source.

The way we index documents is not changed, only split into 2 phases.

Phase 1: Collect new columns and do a schema update. Don't create a Lucene document with source at this point.
Phase 2: Index. Currently target references still might have unassigned OID since this is only a preparation step.

In a follow up, addColumnAction will return OIDs in response, corresponding target references will be updated and then indexing (Phase 2 after merging this PR) can be done with OID-s as all columns will have OID-s assigned.

TODO

  • deal with synthetics in the Indexer and in the ObjectIndexer, gen expressions might add new columns (but only when computed->inserted for the first time and in general unlikely use case but needs to be handled/tested anyway)
  • fix failing tests

@BaurzhanSakhariev BaurzhanSakhariev force-pushed the oid-in-source branch 5 times, most recently from e326373 to fe75b6b Compare August 23, 2023 11:53
@BaurzhanSakhariev BaurzhanSakhariev changed the title WIP: use oid in source instead of column name WIP: 2 phase indexing Aug 23, 2023
@BaurzhanSakhariev BaurzhanSakhariev force-pushed the oid-in-source branch 9 times, most recently from 1778e5f to e21a27c Compare August 24, 2023 11:53
@BaurzhanSakhariev BaurzhanSakhariev force-pushed the oid-in-source branch 3 times, most recently from 4b970c2 to 019340d Compare August 25, 2023 07:42
Comment on lines +161 to +164
if (indexer == null) {
// Reuse indexer if phase 1 already created one.
// Phase 1 mutates indexer.innerTypes on new columns creation.
// Phase 2 must be aware of all mapping updates.
Copy link
Contributor Author

@BaurzhanSakhariev BaurzhanSakhariev Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More context on this:
After first phase (collect schema updates), Indexer's target columns can be updated in 2 ways

  1. New columns are added to the schema == some target columns got OID-s. We will receive them in the AddColumnActions's response and use them in a follow up.
  2. Existing object columns can get some new sub-columns. Reusing instance is just an easier way to make phase 1's parent object column to be aware of the new sub-columns. Otherwise we need to find a way to replace ALL target refs after performing addColumnAction which might be tricky.

In the object indexer we already cache innerIndexer in a map on master branch, so both phases share same indexers.

@BaurzhanSakhariev BaurzhanSakhariev marked this pull request as ready for review August 25, 2023 08:44
@BaurzhanSakhariev BaurzhanSakhariev changed the title WIP: 2 phase indexing 2 phase indexing Aug 25, 2023
@BaurzhanSakhariev
Copy link
Contributor Author

BaurzhanSakhariev commented Aug 25, 2023

Hi @seut, could you please take a look to a pre-requisite for using OID-s in the source?

Problem this PR tries to solve:

We cannot just use reference OID instead of name in the Indexer since there can be dynamically added columns and they don't have OID yet. On master we do a mapping update after creating a Lucene doc with the source.

As discussed internally, one of the ways to tackle this would be splitting indexing into 2 phases: collect schema updates and update the mapping --> only after that create a Lucene document.

Brief description of the implementation:

  1. extract schema updates gathering from the valueIndexers.indexValue - onDynamicColumn is anyway unused in most of the ValueIndexers so just removed it.
  2. create a dedicated method in the ValueIndexer to collect updates - no-op by default or type specific for Dynamic/Array/Object columns.

Had some tricky failures, for example generated columns apparently can update the schema and we don't have tests on master for that, added such case and added handling of synthetics.

Copy link
Member

@mfussenegger mfussenegger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of suggestions, mostly around docs.

Overall looks good to me - although I think having the Reference updating would be important to get the full picture

@mfussenegger
Copy link
Member

Could you also share numbers of a ./compare_run.py with insert specs? Just to make sure this split into two phases doesn't affect performance too much

@BaurzhanSakhariev BaurzhanSakhariev removed the request for review from seut August 28, 2023 08:40
@BaurzhanSakhariev
Copy link
Contributor Author

BaurzhanSakhariev commented Aug 29, 2023

Update:

./compare_run.py --v1 branch:master --v2 branch:oid-in-source --spec specs/insert_bulk.toml --forks 2

Got different results on 2 forks:

run 1: things got worse for concurrent inserts (second and fourth queries)
 Q: insert into id_int_value_str ("id", "value") values ($1, $2)
C: 1
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      157.723 ±   26.565 |     86.875 |    148.869 |    166.865 |    434.178 |
|   V2    |      157.295 ±   25.413 |     89.655 |    148.812 |    167.214 |    476.131 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               -   0.27%                           -   0.04%   
There is a 28.72% probability that the observed difference is not random, and the best estimate of that difference is 0.27%
The test has no statistical significance

Q: insert into id_int_value_str ("id", "value") values ($1, $2)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |        8.715 ±   15.082 |      3.227 |      5.398 |      7.556 |    169.858 |
|   V2    |       11.213 ±   13.762 |      3.214 |      9.242 |     11.867 |    176.412 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               +  25.07%                           +  52.50%   
There is a 99.99% probability that the observed difference is not random, and the best estimate of that difference is 25.07%
The test has statistical significance

Q: insert into tbl ("id", "value") values ($1, $2)
C: 1
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      156.404 ±   21.198 |     91.013 |    148.382 |    166.355 |    300.072 |
|   V2    |      155.127 ±   18.860 |     69.183 |    148.330 |    166.532 |    255.122 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               -   0.82%                           -   0.04%   
There is a 84.52% probability that the observed difference is not random, and the best estimate of that difference is 0.82%
The test has no statistical significance

Q: insert into tbl ("id", "value") values ($1, $2)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      303.343 ±  103.426 |     99.050 |    280.321 |    332.401 |   2041.807 |
|   V2    |      323.250 ±  128.766 |     99.725 |    296.265 |    375.285 |   2569.434 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               +   6.35%                           +   5.53%   
There is a 99.99% probability that the observed difference is not random, and the best estimate of that difference is 6.35%
The test has statistical significance

Q: insert into uservisits ("adRevenue", "destinationURL", "searchWord", "UserAgent", "duration", "visitDate", "sourceIP", "lCode", "cCode") values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      438.758 ±  197.274 |    146.655 |    406.567 |    491.498 |   2706.805 |
|   V2    |      479.165 ±  347.943 |    110.757 |    420.559 |    508.148 |   3214.849 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               +   8.80%                           +   3.38%   
There is a 99.46% probability that the observed difference is not random, and the best estimate of that difference is 8.80%
The test has statistical significance


System/JVM Metrics (durations in ms, byte-values in MB)
    |    YOUNG GC            |       OLD GC           |      HEAP         |     ALLOC     
    |  cnt      avg      max |  cnt      avg      max |  initial     used |     rate      total
 V1 |  228    17.31    19.69 |   50   339.03   455.70 |      268      773 |   102.44      42589
 V2 |  238    17.75    30.06 |   48   320.71   401.86 |      268      762 |   102.36      42884
run 2: other way around, things got better for concurrent inserts (second and fourth queries)
 Q: insert into id_int_value_str ("id", "value") values ($1, $2)
C: 1
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      157.917 ±   25.482 |     84.217 |    149.310 |    167.111 |    446.864 |
|   V2    |      156.837 ±   24.714 |     86.941 |    148.569 |    166.587 |    463.690 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               -   0.69%                           -   0.50%   
There is a 66.39% probability that the observed difference is not random, and the best estimate of that difference is 0.69%
The test has no statistical significance

Q: insert into id_int_value_str ("id", "value") values ($1, $2)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |       11.759 ±   14.606 |      3.395 |      9.026 |     11.380 |    167.872 |
|   V2    |       10.158 ±   13.574 |      3.138 |      8.250 |     10.483 |    173.096 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               -  14.61%                           -   8.99%   
There is a 98.88% probability that the observed difference is not random, and the best estimate of that difference is 14.61%
The test has statistical significance

Q: insert into tbl ("id", "value") values ($1, $2)
C: 1
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      153.464 ±   20.663 |     84.216 |    147.138 |    165.366 |    259.259 |
|   V2    |      155.958 ±   21.678 |     83.732 |    148.714 |    166.396 |    272.287 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               +   1.61%                           +   1.07%   
There is a 99.15% probability that the observed difference is not random, and the best estimate of that difference is 1.61%
The test has statistical significance

Q: insert into tbl ("id", "value") values ($1, $2)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      360.075 ±  136.169 |     91.648 |    333.290 |    442.543 |   2346.938 |
|   V2    |      344.799 ±  117.869 |     99.186 |    319.133 |    408.873 |   1163.604 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               -   4.33%                           -   4.34%   
There is a 99.26% probability that the observed difference is not random, and the best estimate of that difference is 4.33%
The test has statistical significance

Q: insert into uservisits ("adRevenue", "destinationURL", "searchWord", "UserAgent", "duration", "visitDate", "sourceIP", "lCode", "cCode") values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
C: 10
| Version |         Mean ±    Stdev |        Min |     Median |         Q3 |        Max |
|   V1    |      447.148 ±  180.602 |    127.154 |    417.175 |    518.933 |   2535.173 |
|   V2    |      451.075 ±  170.007 |    110.655 |    425.310 |    520.708 |   2206.340 |
├---------┴-------------------------┴------------┴------------┴------------┴------------┘
|               +   0.87%                           +   1.93%   
There is a 33.73% probability that the observed difference is not random, and the best estimate of that difference is 0.87%
The test has no statistical significance


System/JVM Metrics (durations in ms, byte-values in MB)
    |    YOUNG GC            |       OLD GC           |      HEAP         |     ALLOC     
    |  cnt      avg      max |  cnt      avg      max |  initial     used |     rate      total
 V1 |  231    19.08    20.10 |   47   333.77   506.67 |      268      788 |   101.67      42645
 V2 |  221    17.81    22.87 |   51   333.10   461.67 |      268      717 |   101.65      42659

Not sure how interpret this - opposite results for concurrent results, single thread insert looks almost same.

@BaurzhanSakhariev
Copy link
Contributor Author

BaurzhanSakhariev commented Aug 29, 2023

Results of running modified IndexerBenchmark:

Benchmark                           Mode  Cnt     Score    Error  Units
IndexerBenchmark.measure_index      avgt   10  1119,294 ± 39,155  us/op
IndexerBenchmark.measure_index_old  avgt   10  1043,911 ±  0,501  us/op

Looks like there' is an overhead (7%). Will try to do some micro optimizations as a follow up.

Copy link
Member

@mfussenegger mfussenegger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing it to actually use the oid when writing the source and in the fields will be a follow up, right?

Comment on lines +242 to +244
/**
* @return **only** new columns (with assigned OID-s).
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* @return **only** new columns (with assigned OID-s).
*/
/**
* @return columns added by the last {@link #execute} call. Columns part of the {@link AddColumnRequest} which already existed are not included.
*/

And add a addedColumns.clear() in the start of .execute to make sure it could be called multiple times.
Currently not necessary given how it's used, but to match the docs. It's easier to explain this way than to say that it can only be used once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added clear() in 022a78d

Comment on lines +87 to +91
// We cannot use normalizedColumns for building response since it may contain existing columns:
// parents of a sub-column or already existing reference, taken from the cluster state.
List<ColumnIdent> newColumnIdents = new ArrayList<>();
Consumer<Reference> addNewColumn = (ref) -> newColumnIdents.add(ref.column());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we do something like addedColumns = request.references().stream().filter(ref -> currentTable.getReference(ref.column()) == null).toList(); to avoid having to change the createMapping function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for the idea, reverted createMapping changes.

One remark - we cannot gather cols here as they don't have oids. I modified the suggestion to gather idents and those idents used at the end to get columns with oids . 5260e9e

Comment on lines +77 to +85
public void init(Version minNodeVersion) {
if (minNodeVersion.onOrAfter(Version.V_5_5_0)) {
reader = AddColumnResponse::new;
ackedResponseFunction = AddColumnResponse::new;
} else {
reader = AcknowledgedResponse::new;
ackedResponseFunction = (acked, ignored) -> new AcknowledgedResponse(acked);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void init(Version minNodeVersion) {
if (minNodeVersion.onOrAfter(Version.V_5_5_0)) {
reader = AddColumnResponse::new;
ackedResponseFunction = AddColumnResponse::new;
} else {
reader = AcknowledgedResponse::new;
ackedResponseFunction = (acked, ignored) -> new AcknowledgedResponse(acked);
}
}

Could check the version on use. E.g.:

     @Override
     protected AcknowledgedResponse read(StreamInput in) throws IOException {
-        return reader.read(in);
+        return clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_5_5_0)
+            ? new AddColumnResponse(in)
+            : new AcknowledgedResponse(in);
     }

And:

         clusterService.submitStateUpdateTask("add-column",
             new AckedClusterStateUpdateTask<>(Priority.HIGH, request, listener) {

-                private List<Reference> addedColumns = new ArrayList<>();
-
                 @Override
                 public ClusterState execute(ClusterState currentState) throws Exception {
-                    ClusterState updatedState = addColumnTask.execute(currentState, request);
-                    addedColumns = addColumnTask.addedColumns();
-                    return updatedState;
+                    return addColumnTask.execute(currentState, request);
                 }

                 @Override
                 protected AcknowledgedResponse newResponse(boolean acknowledged) {
-                    return ackedResponseFunction.apply(acknowledged, addedColumns);
+                    return clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_5_5_0)
+                        ? new AddColumnResponse(acknowledged, addColumnTask.addedColumns())
+                        : new AcknowledgedResponse(acknowledged);
                 }
             });
     }

Avoids the special handling in Node

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, look simpler indeed. Applied in 00b4d20

this.columns = targetColumns;
this.synthetics = new HashMap<>();
this.stream = new BytesStreamOutput();
this.targetsHaveOids = table.versionCreated().onOrAfter(Version.V_5_5_0) && minNodeVersion.onOrAfter(Version.V_5_5_0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's safe to use this condition to decide if oid indexing should be used.

A mixed cluster is temporary, so you can end up with a table that has some documents indexed with oids and some without.

I think to avoid the streaming dependency we could look up the oids in the local state? The cluster state should have updated locally too if we got a response from the add-column action.

Copy link
Contributor Author

@BaurzhanSakhariev BaurzhanSakhariev Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed internally in https://crate.slack.com/archives/CBPLN6WJK/p1693396898974569,
we can only rely on Version.created.

Done in b45283b.

Also, in a follow up PR, where I will be actually using OID, we will have kind of Function<Reference, String> sourceKeyWriter, which would be either ref -> ref.column.leafName or Long.toString(ref.oid()), depending on version.created

cc @seut

Comment on lines +581 to +591
/**
* Indicates whether inserts should use OID in source.
* @return true if:
* <ol>
* <li>Table is created on Version on or after 5.5, ie we are not inserting into an old table, restored from a snapshot.</li>
* <li>All cluster nodes support updating schema with getting OIDs in response.</li>
* </ol>
*/
public boolean targetsHaveOids() {
return targetsHaveOids;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Indicates whether inserts should use OID in source.
* @return true if:
* <ol>
* <li>Table is created on Version on or after 5.5, ie we are not inserting into an old table, restored from a snapshot.</li>
* <li>All cluster nodes support updating schema with getting OIDs in response.</li>
* </ol>
*/
public boolean targetsHaveOids() {
return targetsHaveOids;
}

Could have something like this instead in the TransportShardUpsertAction:

        if (response instanceof AddColumnResponse addColumnResponse) {
            indexer.updateTargets(addColumnResponse.addedColumns());
        }

"User needs to call updateTargets after columns from collectSchemaUpdates were added to the cluster state" is a simpler contract than making it aware of the details.

But see the comment further above about the targetsHaveOids condition. I think we'll have to change the approach anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, applied in 287daad

Comment on lines +125 to +132
public SymbolAssert isReference(ColumnIdent expectedColumn,
RelationName expectedRelName,
DataType<?> expectedType,
long oid) {
assertThat(((Reference) actual).oid()).isEqualTo(oid);
return isReference(expectedColumn, expectedRelName, expectedType);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of adding a ton of different overloads we should make it more composable.

E.g. we could have something like:

        .isReference() // returns `ReferenceAssert`
        .hasOid(oid)
        .hasType(...)
        .hasName(...)

Copy link
Contributor Author

@BaurzhanSakhariev BaurzhanSakhariev Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged #14632 to master and added hasOid to drop-column branch in #14634

@BaurzhanSakhariev
Copy link
Contributor Author

BaurzhanSakhariev commented Aug 30, 2023

Changing it to actually use the oid when writing the source and in the fields will be a follow up, right?

Yep, started in #14629.

@BaurzhanSakhariev
Copy link
Contributor Author

BaurzhanSakhariev commented Aug 30, 2023

SQLTypeMappingTest.test_dynamic_null_array_overridden_to_integer_becomes_null fails. Converting to draft

UPD: Probably was temporal hiccup, tried to add 300 iterations and run with the same seed but it passes.

drop-column brach was broken after merging #14632

I fixed the branch but this one got stale/messy, created #14635 which squashed already reviewed comments and don't have conflicts with the feature branch.

@BaurzhanSakhariev BaurzhanSakhariev force-pushed the drop-column branch 4 times, most recently from cd6e29f to 432338c Compare August 30, 2023 18:29
@BaurzhanSakhariev BaurzhanSakhariev deleted the oid-in-source branch August 31, 2023 07:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants