Skip to content

[POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry#8780

Closed
keeratsingh wants to merge 4 commits intoapache:masterfrom
keeratsingh:10671-bearer-token-refresh-retry
Closed

[POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry#8780
keeratsingh wants to merge 4 commits intoapache:masterfrom
keeratsingh:10671-bearer-token-refresh-retry

Conversation

@keeratsingh
Copy link
Contributor

  • This is a POC for the proposed design Link
  • This POC only add the retry capability to ListFlights for now, once we have agreement on the design, changes will be made to the other Flight APIs as well to have the retry capability.

@keeratsingh keeratsingh marked this pull request as draft November 26, 2020 19:38
@github-actions
Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on JIRA?
https://issues.apache.org/jira/browse/ARROW

Then could you also rename pull request title in the following format?

ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}

See also:

@keeratsingh keeratsingh force-pushed the 10671-bearer-token-refresh-retry branch from f524874 to 93f13f1 Compare November 26, 2020 20:54
private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor;
private final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor;
private final List<FlightClientMiddleware.Factory> middleware;
private boolean retryFlightCall = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name this something more specifc (retryOnUnauthorized perhaps)

return itr.hasNext();
} catch (StatusRuntimeException e) {
if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) {
// TODO: Retry Iterator from the last known position.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you don't have to worry about this since you will either get the first result or UNAUTHENTICATED. (Or else, you'd have a very odd server that explicitly sends an UNAUTHENTICATED after giving you results.)

// TODO: Retry Iterator from the last known position.
// TODO: Log the retry explicitly stating the token authentication failed and retrying
// the failed operation.
return supplier.get().hasNext();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should update this.itr so that subsequent calls use the new call.

*/
public class CredentialCallOption implements CallOptions.GrpcCallOption {
private final Consumer<CallHeaders> credentialWriter;
public Consumer<CallHeaders> getCredentialWriter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we keep getters after fields/constructor?

* Wrapper class to wrap the iterator and handle auth failures and retry.
* @param <T> The type of iterator.
*/
public static class WrappedFlightIterator<T> implements Iterator<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment in the interests of code reuse; you may be better off implementing a generic StreamObserver which can wrap any gRPC call. You'll have to change FlightClient to always use the async gRPC stub instead of the blocking stub, but then you can handle retries for any gRPC call without having to write special-purpose wrappers for each call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm Thank you for your feedback

I have pushed a PS with the suggested changes.

I do currently have a blocking call [link] on the API calls that return an Iterable, like listFlights and listActions, as we need to have the iterable populated before we return it to the FlightClient, if not then we are back to the same approach using a wrapped iterator.

While the changes latest PS doClientCall only caters to API calls that return an iterable/iterator, the next step is to make them generic for calls like getInfo, getSchema
Before I go ahead and do that, I wanted to confirm if this is what you had in mind when you made the suggestions above, regarding a generic StreamObserver.

Other options that I had in mind were to pass the functional interfaces to pass the ApiCallObserver and have it retry the API call onError, in that case, we would not need to block doClientCall until the ApiCallObserver completes as the API call would be retried asynchronously when the Observer encounters an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - let me clarify. What I mean is that gRPC offers 3 types of clients in Java: a blocking client, async client, and a "generic" client. This last client offers a unified interface for all types of calls: it accepts a StreamObserver which gets called with data from the server and it is given a StreamObserver to send data to the server. So if you were to rewrite all gRPC calls in terms of this last type of client, then you could implement a single StreamObserver wrapper that handles retries for all calls.

Not a requirement - just offering a suggestion to avoid having to implement lots of single-purpose retries. It would take a decent amount of refactoring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I see you have switched to using the generic client; my comment is then more about the level of abstraction appropriate for implementing retry. In particular, keeping the logic inside a wrapper StreamObserver would, IMO, make it easier to preserve things like the streaming nature of calls without having to buffer results.

try {
final Iterator<T> iterator = doClientCall(apiCall).iterator();
return () -> result.apply(iterator);
} catch (FlightRuntimeException ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only retry on the appropriate status right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, doesn't this need to somehow reset the middleware so that it sends the basic auth credentials and not the bearer token?

final ApiCallObserver<T> observer = new ApiCallObserver<>();
try {
apiCall.accept(observer);
while (!observer.completed.get()) {/* Wait for results */}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a condition variable instead of busy-waiting?

*/
static class ApiCallObserver<T> implements StreamObserver<T> {
private final CompletableFuture<Boolean> completed;
private final List<T> result = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we now build up the entire result set in memory instead of being able to stream it.

import io.grpc.stub.StreamObserver;

/**
* Utility class for executing an API calls on the FlightServer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we make this more descriptive (this is a set of utilities for making RPC calls with optional retries)?

public class ClientApiCallWrapper {

/**
* Execute client call using the Stream Observer ApiCallObserver.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: more specifically, this converts an async callback-based call into a synchronous iterator



/**
* Generates and caches bearer tokens from user credentials.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it clear here that this class generates bearer tokens with a sequentially increasing ID, and it considers token #1 to be 'expired'? It was a little unclear how the retry was getting tested at first.

@lidavidm
Copy link
Member

Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?

@keeratsingh
Copy link
Contributor Author

Following up here - given 5.0.0 is targeting July, do you think this will be ready in time?

Hey @lidavidm I am no longer working on this project. @tifflhl or @kylep-dremio would be better able to give you a definitive answer.

@lidavidm
Copy link
Member

@kylep-dremio Just to follow up here, I'm assuming this isn't a priority at the moment?

@kylep-dremio
Copy link
Contributor

@lidavidm - correct, we'll circle back to this but current priority is the C++ Flight SQL PR.

@pitrou
Copy link
Member

pitrou commented May 4, 2022

@lidavidm @kylep-dremio Is it worth keeping this PR open? Otherwise let's close it to clean up the queue a bit.

@lidavidm
Copy link
Member

lidavidm commented May 4, 2022

@kylep-dremio @jduo please reopen or create a new PR when this is ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants