[POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry#8780
[POC] ARROW-10671: [FlightRPC] Bearer Token refresh design with retry#8780keeratsingh wants to merge 4 commits intoapache:masterfrom
Conversation
keeratsingh
commented
Nov 26, 2020
- This is a POC for the proposed design Link
- This POC only add the retry capability to ListFlights for now, once we have agreement on the design, changes will be made to the other Flight APIs as well to have the retry capability.
|
Thanks for opening a pull request! Could you open an issue for this pull request on JIRA? Then could you also rename pull request title in the following format? See also: |
f524874 to
93f13f1
Compare
| private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor; | ||
| private final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor; | ||
| private final List<FlightClientMiddleware.Factory> middleware; | ||
| private boolean retryFlightCall = false; |
There was a problem hiding this comment.
Let's name this something more specifc (retryOnUnauthorized perhaps)
| return itr.hasNext(); | ||
| } catch (StatusRuntimeException e) { | ||
| if (enableRetry && e.getStatus().getCode() == Status.Code.UNAUTHENTICATED) { | ||
| // TODO: Retry Iterator from the last known position. |
There was a problem hiding this comment.
I believe you don't have to worry about this since you will either get the first result or UNAUTHENTICATED. (Or else, you'd have a very odd server that explicitly sends an UNAUTHENTICATED after giving you results.)
| // TODO: Retry Iterator from the last known position. | ||
| // TODO: Log the retry explicitly stating the token authentication failed and retrying | ||
| // the failed operation. | ||
| return supplier.get().hasNext(); |
There was a problem hiding this comment.
This should update this.itr so that subsequent calls use the new call.
| */ | ||
| public class CredentialCallOption implements CallOptions.GrpcCallOption { | ||
| private final Consumer<CallHeaders> credentialWriter; | ||
| public Consumer<CallHeaders> getCredentialWriter() { |
There was a problem hiding this comment.
nit: can we keep getters after fields/constructor?
...ght-core/src/main/java/org/apache/arrow/flight/auth2/ClientIncomingAuthHeaderMiddleware.java
Show resolved
Hide resolved
| * Wrapper class to wrap the iterator and handle auth failures and retry. | ||
| * @param <T> The type of iterator. | ||
| */ | ||
| public static class WrappedFlightIterator<T> implements Iterator<T> { |
There was a problem hiding this comment.
A general comment in the interests of code reuse; you may be better off implementing a generic StreamObserver which can wrap any gRPC call. You'll have to change FlightClient to always use the async gRPC stub instead of the blocking stub, but then you can handle retries for any gRPC call without having to write special-purpose wrappers for each call.
There was a problem hiding this comment.
@lidavidm Thank you for your feedback
I have pushed a PS with the suggested changes.
I do currently have a blocking call [link] on the API calls that return an Iterable, like listFlights and listActions, as we need to have the iterable populated before we return it to the FlightClient, if not then we are back to the same approach using a wrapped iterator.
While the changes latest PS doClientCall only caters to API calls that return an iterable/iterator, the next step is to make them generic for calls like getInfo, getSchema
Before I go ahead and do that, I wanted to confirm if this is what you had in mind when you made the suggestions above, regarding a generic StreamObserver.
Other options that I had in mind were to pass the functional interfaces to pass the ApiCallObserver and have it retry the API call onError, in that case, we would not need to block doClientCall until the ApiCallObserver completes as the API call would be retried asynchronously when the Observer encounters an error.
There was a problem hiding this comment.
Sorry - let me clarify. What I mean is that gRPC offers 3 types of clients in Java: a blocking client, async client, and a "generic" client. This last client offers a unified interface for all types of calls: it accepts a StreamObserver which gets called with data from the server and it is given a StreamObserver to send data to the server. So if you were to rewrite all gRPC calls in terms of this last type of client, then you could implement a single StreamObserver wrapper that handles retries for all calls.
Not a requirement - just offering a suggestion to avoid having to implement lots of single-purpose retries. It would take a decent amount of refactoring.
There was a problem hiding this comment.
So I see you have switched to using the generic client; my comment is then more about the level of abstraction appropriate for implementing retry. In particular, keeping the logic inside a wrapper StreamObserver would, IMO, make it easier to preserve things like the streaming nature of calls without having to buffer results.
| try { | ||
| final Iterator<T> iterator = doClientCall(apiCall).iterator(); | ||
| return () -> result.apply(iterator); | ||
| } catch (FlightRuntimeException ex) { |
There was a problem hiding this comment.
This should only retry on the appropriate status right?
There was a problem hiding this comment.
Also, doesn't this need to somehow reset the middleware so that it sends the basic auth credentials and not the bearer token?
| final ApiCallObserver<T> observer = new ApiCallObserver<>(); | ||
| try { | ||
| apiCall.accept(observer); | ||
| while (!observer.completed.get()) {/* Wait for results */} |
There was a problem hiding this comment.
Can we use a condition variable instead of busy-waiting?
| */ | ||
| static class ApiCallObserver<T> implements StreamObserver<T> { | ||
| private final CompletableFuture<Boolean> completed; | ||
| private final List<T> result = new ArrayList<>(); |
There was a problem hiding this comment.
This means we now build up the entire result set in memory instead of being able to stream it.
| import io.grpc.stub.StreamObserver; | ||
|
|
||
| /** | ||
| * Utility class for executing an API calls on the FlightServer. |
There was a problem hiding this comment.
nit: can we make this more descriptive (this is a set of utilities for making RPC calls with optional retries)?
| public class ClientApiCallWrapper { | ||
|
|
||
| /** | ||
| * Execute client call using the Stream Observer ApiCallObserver. |
There was a problem hiding this comment.
nit: more specifically, this converts an async callback-based call into a synchronous iterator
|
|
||
|
|
||
| /** | ||
| * Generates and caches bearer tokens from user credentials. |
There was a problem hiding this comment.
Can we make it clear here that this class generates bearer tokens with a sequentially increasing ID, and it considers token #1 to be 'expired'? It was a little unclear how the retry was getting tested at first.
d4608a9 to
356c300
Compare
|
Following up here - given 5.0.0 is targeting July, do you think this will be ready in time? |
Hey @lidavidm I am no longer working on this project. @tifflhl or @kylep-dremio would be better able to give you a definitive answer. |
|
@kylep-dremio Just to follow up here, I'm assuming this isn't a priority at the moment? |
|
@lidavidm - correct, we'll circle back to this but current priority is the C++ Flight SQL PR. |
|
@lidavidm @kylep-dremio Is it worth keeping this PR open? Otherwise let's close it to clean up the queue a bit. |
|
@kylep-dremio @jduo please reopen or create a new PR when this is ready |