Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.

Commit 64a7d65

Browse files
stephaniewang526Praful Makani
andauthored
feat: sql fast path impl (#509)
* feat: sql fast path impl add QueryJobConfig to QueryRequest logic high level mode reset private methods refactor: modified code update logic add test refactor: update code and test case add integration tests code format add clir ignore and remove pom file feat: add more assert nit update * add logic for DML and DDL queries enable requestId add integration tests for fast path multipages query, DML, and DDL queries fix requestId logic update QueryRequestInfo and add mock test add mock test cases for SQL, DML, and DDL clean up code fix IT add schema test * update ITs to check table content correctness, update fastquery logic nit nit * add test for bogus query * add check for idempotent requestId * update QueryRequestInfo and error handling logic * add mock test for query JobException * update mock test * fix unit tests, nit update * update exception handling from JobException to BigQueryException * update based on comments * nit * update based on comments * add maxResult support optimization changes * update code * add test coverage to address: google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java#L69-L71 Added lines #L69 - L71 were not covered by tests * lint fix * feat: add more code cov * set method back * feat: code cove * add codecov Co-authored-by: Praful Makani <praful@qlogic.io>
1 parent f2ecf15 commit 64a7d65

12 files changed

Lines changed: 314367 additions & 12 deletions

File tree

google-cloud-bigquery/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@
3232
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
3333
<method>com.google.api.services.bigquery.model.TestIamPermissionsResponse testIamPermissions(java.lang.String, java.util.List, java.util.Map)</method>
3434
</difference>
35+
<difference>
36+
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
37+
<method>com.google.api.services.bigquery.model.QueryResponse queryRpc(java.lang.String, com.google.api.services.bigquery.model.QueryRequest)</method>
38+
<differenceType>7012</differenceType>
39+
</difference>
3540
</differences>

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryError.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public String getLocation() {
8484
return location;
8585
}
8686

87-
String getDebugInfo() {
87+
public String getDebugInfo() {
8888
return debugInfo;
8989
}
9090

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.google.cloud.http.BaseHttpServiceException;
2222
import com.google.common.collect.ImmutableSet;
2323
import java.io.IOException;
24+
import java.util.Arrays;
25+
import java.util.List;
2426
import java.util.Objects;
2527
import java.util.Set;
2628
import java.util.concurrent.ExecutionException;
@@ -39,37 +41,52 @@ public final class BigQueryException extends BaseHttpServiceException {
3941
new Error(500, null), new Error(502, null), new Error(503, null), new Error(504, null));
4042
private static final long serialVersionUID = -5006625989225438209L;
4143

42-
private final BigQueryError error;
44+
private final List<BigQueryError> errors;
4345

4446
public BigQueryException(int code, String message) {
4547
this(code, message, (Throwable) null);
4648
}
4749

4850
public BigQueryException(int code, String message, Throwable cause) {
4951
super(code, message, null, true, RETRYABLE_ERRORS, cause);
50-
this.error = null;
52+
this.errors = null;
5153
}
5254

5355
public BigQueryException(int code, String message, BigQueryError error) {
5456
super(code, message, error != null ? error.getReason() : null, true, RETRYABLE_ERRORS);
55-
this.error = error;
57+
this.errors = Arrays.asList(error);
58+
}
59+
60+
public BigQueryException(List<BigQueryError> errors) {
61+
super(0, null, null, false, RETRYABLE_ERRORS, null);
62+
this.errors = errors;
5663
}
5764

5865
public BigQueryException(IOException exception) {
5966
super(exception, true, RETRYABLE_ERRORS);
60-
BigQueryError error = null;
67+
List<BigQueryError> errors = null;
6168
if (getReason() != null) {
62-
error = new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo());
69+
errors =
70+
Arrays.asList(
71+
new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo()));
6372
}
64-
this.error = error;
73+
this.errors = errors;
6574
}
6675

6776
/**
6877
* Returns the {@link BigQueryError} that caused this exception. Returns {@code null} if none
6978
* exists.
7079
*/
7180
public BigQueryError getError() {
72-
return error;
81+
return errors == null || errors.isEmpty() || errors.size() == 0 ? null : errors.get(0);
82+
}
83+
84+
/**
85+
* Returns a list of {@link BigQueryError}s that caused this exception. Returns {@code null} if
86+
* none exists.
87+
*/
88+
public List<BigQueryError> getErrors() {
89+
return errors;
7390
}
7491

7592
@Override
@@ -81,12 +98,12 @@ public boolean equals(Object obj) {
8198
return false;
8299
}
83100
BigQueryException other = (BigQueryException) obj;
84-
return super.equals(other) && Objects.equals(error, other.error);
101+
return super.equals(other) && Objects.equals(errors, other.errors);
85102
}
86103

87104
@Override
88105
public int hashCode() {
89-
return Objects.hash(super.hashCode(), error);
106+
return Objects.hash(super.hashCode(), errors);
90107
}
91108

92109
/**

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.api.gax.paging.Page;
2727
import com.google.api.services.bigquery.model.ErrorProto;
2828
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
29+
import com.google.api.services.bigquery.model.QueryRequest;
2930
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
3031
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
3132
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
@@ -48,6 +49,7 @@
4849
import com.google.common.collect.FluentIterable;
4950
import com.google.common.collect.ImmutableList;
5051
import com.google.common.collect.Iterables;
52+
import com.google.common.collect.Lists;
5153
import com.google.common.collect.Maps;
5254
import java.util.ArrayList;
5355
import java.util.List;
@@ -198,6 +200,43 @@ public Page<FieldValueList> getNextPage() {
198200
}
199201
}
200202

203+
private class QueryPageFetcher extends Thread implements NextPageFetcher<FieldValueList> {
204+
205+
private static final long serialVersionUID = -8501991114794410114L;
206+
private final Map<BigQueryRpc.Option, ?> requestOptions;
207+
private final BigQueryOptions serviceOptions;
208+
private Job job;
209+
private final TableId table;
210+
private final Schema schema;
211+
212+
QueryPageFetcher(
213+
JobId jobId,
214+
Schema schema,
215+
BigQueryOptions serviceOptions,
216+
String cursor,
217+
Map<BigQueryRpc.Option, ?> optionMap) {
218+
this.requestOptions =
219+
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
220+
this.serviceOptions = serviceOptions;
221+
this.job = getJob(jobId);
222+
this.table = ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
223+
this.schema = schema;
224+
}
225+
226+
@Override
227+
public Page<FieldValueList> getNextPage() {
228+
while (!JobStatus.State.DONE.equals(job.getStatus().getState())) {
229+
try {
230+
sleep(5000);
231+
} catch (InterruptedException ex) {
232+
throw new RuntimeException(ex.getMessage());
233+
}
234+
job = job.reload();
235+
}
236+
return listTableData(table, schema, serviceOptions, requestOptions).x();
237+
}
238+
}
239+
201240
private final BigQueryRpc bigQueryRpc;
202241

203242
BigQueryImpl(BigQueryOptions options) {
@@ -1184,9 +1223,79 @@ public Boolean call() {
11841223
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
11851224
throws InterruptedException, JobException {
11861225
Job.checkNotDryRun(configuration, "query");
1226+
1227+
// If all parameters passed in configuration are supported by the query() method on the backend,
1228+
// put on fast path
1229+
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
1230+
if (requestInfo.isFastQuerySupported()) {
1231+
String projectId = getOptions().getProjectId();
1232+
QueryRequest content = requestInfo.toPb();
1233+
return queryRpc(projectId, content, options);
1234+
}
1235+
// Otherwise, fall back to the existing create query job logic
11871236
return create(JobInfo.of(configuration), options).getQueryResults();
11881237
}
11891238

1239+
private TableResult queryRpc(
1240+
final String projectId, final QueryRequest content, JobOption... options) {
1241+
com.google.api.services.bigquery.model.QueryResponse results;
1242+
try {
1243+
results =
1244+
runWithRetries(
1245+
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
1246+
@Override
1247+
public com.google.api.services.bigquery.model.QueryResponse call() {
1248+
return bigQueryRpc.queryRpc(projectId, content);
1249+
}
1250+
},
1251+
getOptions().getRetrySettings(),
1252+
EXCEPTION_HANDLER,
1253+
getOptions().getClock());
1254+
} catch (RetryHelperException e) {
1255+
throw BigQueryException.translateAndThrow(e);
1256+
}
1257+
1258+
if (results.getErrors() != null) {
1259+
List<BigQueryError> bigQueryErrors =
1260+
Lists.transform(results.getErrors(), BigQueryError.FROM_PB_FUNCTION);
1261+
// Throwing BigQueryException since there may be no JobId and we want to stay consistent
1262+
// with the case where there there is a HTTP error
1263+
throw new BigQueryException(bigQueryErrors);
1264+
}
1265+
1266+
Schema schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
1267+
Long numRows;
1268+
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
1269+
numRows = 0L;
1270+
} else if (results.getNumDmlAffectedRows() != null) {
1271+
numRows = results.getNumDmlAffectedRows();
1272+
} else {
1273+
numRows = results.getTotalRows().longValue();
1274+
}
1275+
1276+
if (results.getPageToken() != null) {
1277+
JobId jobId = JobId.fromPb(results.getJobReference());
1278+
String cursor = results.getPageToken();
1279+
return new TableResult(
1280+
schema,
1281+
numRows,
1282+
new PageImpl<>(
1283+
// fetch next pages of results
1284+
new QueryPageFetcher(jobId, schema, getOptions(), cursor, optionMap(options)),
1285+
cursor,
1286+
// cache first page of result
1287+
transformTableData(results.getRows(), schema)));
1288+
}
1289+
// only 1 page of result
1290+
return new TableResult(
1291+
schema,
1292+
numRows,
1293+
new PageImpl<>(
1294+
new TableDataPageFetcher(null, schema, getOptions(), null, optionMap(options)),
1295+
null,
1296+
transformTableData(results.getRows(), schema)));
1297+
}
1298+
11901299
@Override
11911300
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
11921301
throws InterruptedException, JobException {

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/QueryJobConfiguration.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public final class QueryJobConfiguration extends JobConfiguration {
6969
private final Map<String, String> labels;
7070
private final RangePartitioning rangePartitioning;
7171
private final List<ConnectionProperty> connectionProperties;
72+
// maxResults is only used for fast query path
73+
private final Long maxResults;
7274

7375
/**
7476
* Priority levels for a query. If not specified the priority is assumed to be {@link
@@ -118,6 +120,7 @@ public static final class Builder
118120
private Map<String, String> labels;
119121
private RangePartitioning rangePartitioning;
120122
private List<ConnectionProperty> connectionProperties;
123+
private Long maxResults;
121124

122125
private Builder() {
123126
super(Type.QUERY);
@@ -150,6 +153,7 @@ private Builder(QueryJobConfiguration jobConfiguration) {
150153
this.labels = jobConfiguration.labels;
151154
this.rangePartitioning = jobConfiguration.rangePartitioning;
152155
this.connectionProperties = jobConfiguration.connectionProperties;
156+
this.maxResults = jobConfiguration.maxResults;
153157
}
154158

155159
private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
@@ -603,6 +607,20 @@ public Builder setConnectionProperties(List<ConnectionProperty> connectionProper
603607
return this;
604608
}
605609

610+
/**
611+
* This is only supported in the fast query path [Optional] The maximum number of rows of data
612+
* to return per page of results. Setting this flag to a small value such as 1000 and then
613+
* paging through results might improve reliability when the query result set is large. In
614+
* addition to this limit, responses are also limited to 10 MB. By default, there is no maximum
615+
* row count, and only the byte limit applies.
616+
*
617+
* @param maxResults maxResults or {@code null} for none
618+
*/
619+
public Builder setMaxResults(Long maxResults) {
620+
this.maxResults = maxResults;
621+
return this;
622+
}
623+
606624
public QueryJobConfiguration build() {
607625
return new QueryJobConfiguration(this);
608626
}
@@ -644,6 +662,7 @@ private QueryJobConfiguration(Builder builder) {
644662
this.labels = builder.labels;
645663
this.rangePartitioning = builder.rangePartitioning;
646664
this.connectionProperties = builder.connectionProperties;
665+
this.maxResults = builder.maxResults;
647666
}
648667

649668
/**
@@ -833,6 +852,19 @@ public List<ConnectionProperty> getConnectionProperties() {
833852
return connectionProperties;
834853
}
835854

855+
/**
856+
* This is only supported in the fast query path [Optional] The maximum number of rows of data to
857+
* return per page of results. Setting this flag to a small value such as 1000 and then paging
858+
* through results might improve reliability when the query result set is large. In addition to
859+
* this limit, responses are also limited to 10 MB. By default, there is no maximum row count, and
860+
* only the byte limit applies.
861+
*
862+
* @return value or {@code null} for none
863+
*/
864+
public Long getMaxResults() {
865+
return maxResults;
866+
}
867+
836868
@Override
837869
public Builder toBuilder() {
838870
return new Builder(this);
@@ -851,7 +883,7 @@ ToStringHelper toStringHelper() {
851883
.add("flattenResults", flattenResults)
852884
.add("priority", priority)
853885
.add("tableDefinitions", tableDefinitions)
854-
.add("userQueryCache", useQueryCache)
886+
.add("useQueryCache", useQueryCache)
855887
.add("userDefinedFunctions", userDefinedFunctions)
856888
.add("createDisposition", createDisposition)
857889
.add("writeDisposition", writeDisposition)

0 commit comments

Comments
 (0)