Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

doc(samples): rewrite PendingStream Write API sample to match best practices#1603

Merged
stephaniewang526 merged 2 commits intogoogleapis:mainfrom
gnanda:samples
Apr 11, 2022
Merged

doc(samples): rewrite PendingStream Write API sample to match best practices#1603
stephaniewang526 merged 2 commits intogoogleapis:mainfrom
gnanda:samples

Conversation

@gnanda
Copy link
Copy Markdown
Contributor

@gnanda gnanda commented Mar 31, 2022

  • split samples into initialization, append, cleanup
  • Change appends to async

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:

  • Make sure to open an issue as a bug/issue before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea
  • Ensure the tests and linter pass
  • Code coverage does not decrease (if any source code was changed)
  • Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the samples format.

@product-auto-label product-auto-label bot added api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. samples Issues that are directly related to samples. labels Mar 31, 2022
@stephaniewang526 stephaniewang526 changed the title feat: Rewrite Write API samples to match best practices doc(samples): rewrite Write API samples to match best practices Apr 1, 2022
@stephaniewang526 stephaniewang526 added the owlbot:run Add this label to trigger the Owlbot post processor. label Apr 1, 2022
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Apr 1, 2022
@stephaniewang526 stephaniewang526 self-assigned this Apr 1, 2022
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), tableSchema).build()) {
try {
// Write two batches to the stream, each with 10 JSON records. A writer should be used for as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some explanation here when to use "batches"? My question reading this sample is why not just write 20 JSON records using 1 batch (why 2 batches)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.


public static WriteToDefaultStream create(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, IOException, InterruptedException {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to explain why we are bringing in the BQ client and around schema conversion?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in followup; I changed this to just be PendingStream which encompasses the other cases, so we can finalize on a design in one place and replicate to the others.


public class WriteToDefaultStream {

private final Phaser inflightRequestPhaser = new Phaser(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't prefer having anything out here like this, they should go inside either the driver method (runWriteToDefaultStream) or the body of the sample (writeToDefaultStream). Also, we need comment (explanation) around why we are using inflightRequestPhaser.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment about the Phaser (which we need in order to wait for all responses before closing the stream)

Discussed the variables offline: the Write API is a special case where we need a stateful client and want to make explicit that users should keep the writer around for as long as possible.

throws DescriptorValidationException, IOException, ExecutionException {
ApiFuture<AppendRowsResponse> future = this.streamWriter.append(jsonArr);
this.inflightRequestPhaser.register();
ApiFutures.addCallback(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here to explain this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

import org.json.JSONArray;
import org.json.JSONObject;

public class WriteCommittedStream {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is a little confusing, maybe name it to WriteCommittedStreamSample? I am not sure the impact of changing the sample's name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think most of our samples don't include "Sample" in the name. e.g., createTable.

Maybe WriteToCommittedStream?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Currently it sounded like a Stream. Will we just create a new file, switch the pointer and delete the old file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm I'll do this in a followup


public void cleanup() {
// Wait for all in-flight requests to complete.
this.inflightRequestPhaser.arriveAndAwaitAdvance();
Copy link
Copy Markdown
Contributor

@yirutang yirutang Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the JsonWriter here? Since you are doing a Finalize, with close, maybe we don't need to wait for inflight ourselves?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the close, but as we discussed offline, we still need to wait for inflight requests to finish


public static void writeCommittedStream(String projectId, String datasetName, String tableName)
throws DescriptorValidationException, InterruptedException, IOException {
// One time initialization.
Copy link
Copy Markdown
Contributor

@VeronicaWasson VeronicaWasson Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put the initialization code into a separate writer.initialize() method? I really like the structure of #1585 from a docs perspective.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it back. Not sure how to do it in a way that both exemplifies the structure and is "Good Java"

if (response.hasError()) {
System.out.format("Error: %s\n", response.getError());
} else {
System.out.format("Append %d success\n", response.getAppendResult().getOffset().getValue());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For pending stream, show user a way to set stream_status_ and append to check on the status to fail the append?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thought I'm not great at Java, so maybe there's a better way?


// Once all streams are done, commit all of them in one request. This example only has the one
// stream.
BatchCommitWriteStreamsRequest commitRequest =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't commit for failed writes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an exception throw in cleanup and a comment about only committing successful streams.


public void onSuccess(AppendRowsResponse response) {
if (response.hasError()) {
System.out.format("Error: %s\n", response.getError());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

show a simple retry here?

}

public void onSuccess(AppendRowsResponse response) {
if (response.hasError()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what to do here...Maybe add a comment of how this can be retried? If we don't have a good retry case, should we have this sample?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My $0.02: I think it's better to show how to check the AppendRowsResponse error status, rather than not show anything here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC StreamWriter already handles this field which will invoke onFailure instead

I updated that code to show a little error handling; not sure if there's something more specific to show there

Start with just Pending stream which is the most complex to finalize
design. Will update the other two in followup once consensus is reached.
@gnanda gnanda changed the title doc(samples): rewrite Write API samples to match best practices doc(samples): rewrite PendingStream Write API sample to match best practices Apr 5, 2022
@gnanda
Copy link
Copy Markdown
Contributor Author

gnanda commented Apr 5, 2022

Updated based on comments, and changed it to be only WritePendingStream for now since it encompasses the other cases. Once we finalize on a plan, I'll update the other 2 to match the same structure

@gnanda gnanda requested a review from stephaniewang526 April 8, 2022 20:43
@stephaniewang526 stephaniewang526 added the owlbot:run Add this label to trigger the Owlbot post processor. label Apr 9, 2022
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Apr 9, 2022
@stephaniewang526 stephaniewang526 added automerge Merge the pull request once unit tests and other checks pass. owlbot:run Add this label to trigger the Owlbot post processor. labels Apr 9, 2022
@gcf-owl-bot gcf-owl-bot bot removed the owlbot:run Add this label to trigger the Owlbot post processor. label Apr 9, 2022
@gnanda gnanda requested a review from a team April 9, 2022 21:34
@gnanda gnanda requested a review from a team April 9, 2022 21:34
@product-auto-label product-auto-label bot added the size: m Pull request size is medium. label Apr 9, 2022
@gcf-merge-on-green
Copy link
Copy Markdown

Merge-on-green attempted to merge your PR for 6 hours, but it was not mergeable because either one of your required status checks failed, one of your required reviews was not approved, or there is a do not merge label. Learn more about your required status checks here: https://help.github.com/en/github/administering-a-repository/enabling-required-status-checks. You can remove and reapply the label to re-run the bot.

@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label Apr 10, 2022
@stephaniewang526 stephaniewang526 merged commit 4145ae0 into googleapis:main Apr 11, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. samples Issues that are directly related to samples. size: m Pull request size is medium.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants