Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

java.lang.RuntimeException: ManagedChannel allocation site #643

@hktv-mkchoi

Description

@hktv-mkchoi

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ...").
    General, Core, and Other are also allowed as types
  2. OS type and version:
  3. Java version: jdk-11.0.7
  4. bigquerystorage version(s): 1.5.6
  5. opts: -Xmx2048m -Xms2048m -server -XX:+UseG1GC -verbose:gc -Xlog:gc:stdout -XX:+UseStringDeduplication

Steps to reproduce

Use Jmeter to load test our application, it shows the following errors. The error shows randomly.

  1. ?
  2. ?

Code example

	public BigQueryReadClient getBigQueryReadClient() throws Exception
	{
		final BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings.newBuilder()
				.setCredentialsProvider(
						FixedCredentialsProvider.create(credentials)
				)
//				.setTransportChannelProvider(channelProvider)
//				.setExecutorProvider(executorProvider)
				.build();

		final BigQueryReadClient readClient = BigQueryReadClient.create(bigQueryReadSettings);

		return readClient;
	}

        @NotNull
	private Flux<T> getTableRecords(long sysSkip, TableId destinationTable, Long sysTakeLong) throws Exception
	{
		final BigQueryReadClient readClient = bigQueryConfig.getBigQueryReadClient();

		final String table =
				String.format(
						"projects/%s/datasets/%s/tables/%s",
						bigQueryConfig.getProjectId(), destinationTable.getDataset(), destinationTable.getTable());
		final String parent = String.format("projects/%s", bigQueryConfig.getProjectId());

		// Prepare the read options
		final ReadSession.TableReadOptions tableReadOptions = api.getFields().keySet().stream()
				.reduce(
						ReadSession.TableReadOptions.newBuilder(),
						ReadSession.TableReadOptions.Builder::addSelectedFields,
						(b1, b2) -> b1
				)
				.setRowRestriction(sysTakeLong == null ?
						"row_number > " + sysSkip :
						"row_number BETWEEN " + sysSkip + " AND " + (sysSkip + sysTakeLong))
				.build();
		final ReadSession.Builder readSessionBuilder =
				ReadSession.newBuilder()
						.setTable(table)
						.setDataFormat(DataFormat.ARROW)
						.setReadOptions(tableReadOptions);
		final CreateReadSessionRequest createReadSessionRequest = CreateReadSessionRequest.newBuilder()
				.setParent(parent)
				.setReadSession(readSessionBuilder)
				.setMaxStreamCount(1)
				.build();

		// Init the read session
		final ReadSession readSession = readClient.createReadSession(createReadSessionRequest);
		if (readSession.getStreamsCount() == 0)
		{
			return Flux.fromIterable(new ArrayList<>());
		}

		// Get the streams
		final ReadStream readStream = readSession.getStreams(0);
		final ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder()
				.setReadStream(readStream.getName())
				.build();
		final ServerStream<ReadRowsResponse> readRowsResponseStream = readClient
				.readRowsCallable()
				.call(readRowsRequest);

		// Allocate the limit
		final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

		// Prepare the wrapper vectors
		final ArrowSchema arrowSchema = readSession.getArrowSchema();
		final Schema schema = MessageSerializer.deserializeSchema(
				new ReadChannel(new ByteArrayReadableSeekableByteChannel(arrowSchema.getSerializedSchema().toByteArray()))
		);
		final List<FieldVector> vectors = new ArrayList<>();
		for (Field field : schema.getFields())
		{
			vectors.add(field.createVector(allocator));
		}

		final VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(vectors);
		final VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);

		final Iterable<ReadRowsResponse> iterable = new Iterable<ReadRowsResponse>()
		{
			Iterator<ReadRowsResponse> iterator = readRowsResponseStream.iterator();

			@NotNull
			@Override
			public Iterator<ReadRowsResponse> iterator()
			{
				return iterator;
			}
		};

		return Flux
				.fromIterable(iterable)
				.flatMapIterable(Unchecked.function(readRowsResponse -> {
					final ArrowRecordBatch batch = readRowsResponse.getArrowRecordBatch();
					final org.apache.arrow.vector.ipc.message.ArrowRecordBatch arrowRecordBatch = MessageSerializer
							.deserializeRecordBatch(
									new ReadChannel(
											new ByteArrayReadableSeekableByteChannel(
													batch.getSerializedRecordBatch().toByteArray())),
									allocator);
					vectorLoader.load(arrowRecordBatch);

					// Release buffers from batch (they are still held in the vectors in root).
					arrowRecordBatch.close();

					final List<FieldVector> sortedFieldVectors = vectorSchemaRoot.getFieldVectors()
							.stream()
							.sorted(Comparator.comparingLong((fv) -> api.getFieldNames().indexOf(fv.getName())))
							.collect(Collectors.toList());
					final int rowCount = vectorSchemaRoot.getRowCount();

					final ArrayList<T> records = new ArrayList<>(1024);
					final ArrayList<Object> row = new ArrayList<>(api.getFieldNames().size());
					for (int i = 0; i < rowCount; i++)
					{
						final ArrayList<Object> rowArrayList = api.getRowArrayList(row, sortedFieldVectors, i);
						final T record = api.fromList(rowArrayList);

						records.add(record);
					}

					// Release buffers from vectors in root.
					vectorSchemaRoot.clear();

					return records;
				}))
				.doOnCancel(() -> {
					logger.info("Cancel.");
				})
				.doFinally(Unchecked.consumer((signalType) -> {
					logger.info("signalType=[{}], iterator=[{}].", signalType, iterable.iterator());

					vectorSchemaRoot.close();
					allocator.close();

					readClient.shutdown();
					final boolean awaitTermination = readClient.awaitTermination(1, TimeUnit.MINUTES);
					logger.info(
							"signalType=[{}], iterator=[{}], awaitTermination=[{}].",
							signalType, iterable.iterator(), awaitTermination
					);
					readClient.close();
				}));
	}

Stack trace

2020-10-29 11:19:42.763 ERROR 57248 --- [     elastic-20] i.g.i.ManagedChannelOrphanWrapper        : *~*~*~ Channel ManagedChannelImpl{logId=97, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.

java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:596)
	at io.grpc.ForwardingChannelBuilder.build(ForwardingChannelBuilder.java:255)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:314)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600(InstantiatingGrpcChannelProvider.java:71)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:210)
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:217)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:200)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
	at com.google.cloud.bigquery.storage.v1.stub.EnhancedBigQueryReadStub.create(EnhancedBigQueryReadStub.java:89)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.<init>(BigQueryReadClient.java:129)
	at com.google.cloud.bigquery.storage.v1.BigQueryReadClient.create(BigQueryReadClient.java:110)
	at hk.com.mycomp.BigQueryConfig.getBigQueryReadClient(BigQueryConfig.java:96)
	at hk.com.mycomp.ApiAdapter.getTableRecords(ApiAdapter.java:200)
	at hk.com.mycomp.ApiAdapter.lambda$getItems$7(ApiAdapter.java:191)
	at org.jooq.lambda.Unchecked.lambda$function$21(Unchecked.java:878)
	at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:249)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

External references such as API reference guides

  • ?

Any additional information below

Following these steps guarantees the quickest resolution possible.

Thanks!

Metadata

Metadata

Assignees

Labels

api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions