This repository was archived by the owner on Feb 24, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 87
This repository was archived by the owner on Feb 24, 2026. It is now read-only.
java.lang.RuntimeException: ManagedChannel allocation site #643
Copy link
Copy link
Closed
Labels
api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.Issues related to the googleapis/java-bigquerystorage API.
Description
Environment details
- Specify the API at the beginning of the title. For example, "BigQuery: ...").
General, Core, and Other are also allowed as types - OS type and version:
- Java version: jdk-11.0.7
- bigquerystorage version(s): 1.5.6
- opts: -Xmx2048m -Xms2048m -server -XX:+UseG1GC -verbose:gc -Xlog:gc:stdout -XX:+UseStringDeduplication
Steps to reproduce
Use Jmeter to load test our application, it shows the following errors. The error shows randomly.
- ?
- ?
Code example
public BigQueryReadClient getBigQueryReadClient() throws Exception
{
final BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings.newBuilder()
.setCredentialsProvider(
FixedCredentialsProvider.create(credentials)
)
// .setTransportChannelProvider(channelProvider)
// .setExecutorProvider(executorProvider)
.build();
final BigQueryReadClient readClient = BigQueryReadClient.create(bigQueryReadSettings);
return readClient;
}
@NotNull
private Flux<T> getTableRecords(long sysSkip, TableId destinationTable, Long sysTakeLong) throws Exception
{
final BigQueryReadClient readClient = bigQueryConfig.getBigQueryReadClient();
final String table =
String.format(
"projects/%s/datasets/%s/tables/%s",
bigQueryConfig.getProjectId(), destinationTable.getDataset(), destinationTable.getTable());
final String parent = String.format("projects/%s", bigQueryConfig.getProjectId());
// Prepare the read options
final ReadSession.TableReadOptions tableReadOptions = api.getFields().keySet().stream()
.reduce(
ReadSession.TableReadOptions.newBuilder(),
ReadSession.TableReadOptions.Builder::addSelectedFields,
(b1, b2) -> b1
)
.setRowRestriction(sysTakeLong == null ?
"row_number > " + sysSkip :
"row_number BETWEEN " + sysSkip + " AND " + (sysSkip + sysTakeLong))
.build();
final ReadSession.Builder readSessionBuilder =
ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.ARROW)
.setReadOptions(tableReadOptions);
final CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(readSessionBuilder)
.setMaxStreamCount(1)
.build();
// Init the read session
final ReadSession readSession = readClient.createReadSession(createReadSessionRequest);
if (readSession.getStreamsCount() == 0)
{
return Flux.fromIterable(new ArrayList<>());
}
// Get the streams
final ReadStream readStream = readSession.getStreams(0);
final ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder()
.setReadStream(readStream.getName())
.build();
final ServerStream<ReadRowsResponse> readRowsResponseStream = readClient
.readRowsCallable()
.call(readRowsRequest);
// Allocate the limit
final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Prepare the wrapper vectors
final ArrowSchema arrowSchema = readSession.getArrowSchema();
final Schema schema = MessageSerializer.deserializeSchema(
new ReadChannel(new ByteArrayReadableSeekableByteChannel(arrowSchema.getSerializedSchema().toByteArray()))
);
final List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields())
{
vectors.add(field.createVector(allocator));
}
final VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors);
final VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
final Iterable<ReadRowsResponse> iterable = new Iterable<ReadRowsResponse>()
{
Iterator<ReadRowsResponse> iterator = readRowsResponseStream.iterator();
@NotNull
@Override
public Iterator<ReadRowsResponse> iterator()
{
return iterator;
}
};
return Flux
.fromIterable(iterable)
.flatMapIterable(Unchecked.function(readRowsResponse -> {
final ArrowRecordBatch batch = readRowsResponse.getArrowRecordBatch();
final org.apache.arrow.vector.ipc.message.ArrowRecordBatch arrowRecordBatch = MessageSerializer
.deserializeRecordBatch(
new ReadChannel(
new ByteArrayReadableSeekableByteChannel(
batch.getSerializedRecordBatch().toByteArray())),
allocator);
vectorLoader.load(arrowRecordBatch);
// Release buffers from batch (they are still held in the vectors in root).
arrowRecordBatch.close();
final List<FieldVector> sortedFieldVectors = vectorSchemaRoot.getFieldVectors()
.stream()
.sorted(Comparator.comparingLong((fv) -> api.getFieldNames().indexOf(fv.getName())))
.collect(Collectors.toList());
final int rowCount = vectorSchemaRoot.getRowCount();
final ArrayList<T> records = new ArrayList<>(1024);
final ArrayList<Object> row = new ArrayList<>(api.getFieldNames().size());
for (int i = 0; i < rowCount; i++)
{
final ArrayList<Object> rowArrayList = api.getRowArrayList(row, sortedFieldVectors, i);
final T record = api.fromList(rowArrayList);
records.add(record);
}
// Release buffers from vectors in root.
vectorSchemaRoot.clear();
return records;
}))
.doOnCancel(() -> {
logger.info("Cancel.");
})
.doFinally(Unchecked.consumer((signalType) -> {
logger.info("signalType=[{}], iterator=[{}].", signalType, iterable.iterator());
vectorSchemaRoot.close();
allocator.close();
readClient.shutdown();
final boolean awaitTermination = readClient.awaitTermination(1, TimeUnit.MINUTES);
logger.info(
"signalType=[{}], iterator=[{}], awaitTermination=[{}].",
signalType, iterable.iterator(), awaitTermination
);
readClient.close();
}));
}Stack trace
2020-10-29 11:19:42.763 ERROR 57248 --- [ elastic-20] i.g.i.ManagedChannelOrphanWrapper : *~*~*~ Channel ManagedChannelImpl{logId=97, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:596)
at io.grpc.ForwardingChannelBuilder.build(ForwardingChannelBuilder.java:255)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:314)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:71)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:210)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:217)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:200)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
at com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub.create(EnhancedBigQueryReadStub.java:89)
at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.<init>(BigQueryReadClient.java:129)
at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.create(BigQueryReadClient.java:110)
at hk.com.mycomp.BigQueryConfig.getBigQueryReadClient(BigQueryConfig.java:96)
at hk.com.mycomp.ApiAdapter.getTableRecords(ApiAdapter.java:200)
at hk.com.mycomp.ApiAdapter.lambda$getItems$7(ApiAdapter.java:191)
at org.jooq.lambda.Unchecked.lambda$function$21(Unchecked.java:878)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:249)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
External references such as API reference guides
- ?
Any additional information below
Following these steps guarantees the quickest resolution possible.
Thanks!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.Issues related to the googleapis/java-bigquerystorage API.