-
Notifications
You must be signed in to change notification settings - Fork 8
[improve][pcip] PCIP-2 Distributed RPC framework implemented by the Pulsar client #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/package-info.java
Show resolved
Hide resolved
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/common/package-info.java
Show resolved
Hide resolved
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/server/package-info.java
Show resolved
Hide resolved
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java
Show resolved
Hide resolved
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java
Show resolved
Hide resolved
pulsar-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/ReplyListener.java
Show resolved
Hide resolved
|
this image has some Chinese,please translate to English |
pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/SimpleRpcCallTest.java
Show resolved
Hide resolved
…e version definitions to parent
Picture updated. |
| * @param <V> The type of the reply messages. | ||
| */ | ||
| @RequiredArgsConstructor(access = PACKAGE) | ||
| public class PulsarRpcClient<T, V> implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is recommended to expose the interface externally, refer to PulsarClient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe same as #6 (comment)
PulsarRpcClient, PulsarRpcClientImpl, PulsarRpcClientBuilder, PulsarRpcClientBuilderImpl
Already added.
PTAL!
If this method is correct, I will modify the server code later.
| */ | ||
| @Slf4j | ||
| @RequiredArgsConstructor | ||
| public class RequestListener<T, V> implements MessageListener<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do not need to expose this class to the business, you can use package-level private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| */ | ||
| @Slf4j | ||
| @RequiredArgsConstructor | ||
| public class ReplyListener<V> implements MessageListener<V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do not need to expose this class to the business, you can use package-level private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Another benefit of using message queues to implement RPC requests is that the request message can be received by multiple services and processed in parallel, and then all the processing results are returned. Finally, the client will do the integration. |
liangyepianzhou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! I left some comments. The most important point is the need for a concise API. Make it simple enough for users to use.
A simplest model:
newClient()
reply send Request (k, v)
| * @return The reply value. | ||
| * @throws Exception if an error occurs during the request or while waiting for the reply. | ||
| */ | ||
| public V request(String correlationId, TypedMessageBuilder<T> message) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface should return a specific exception. Here you can catch the interrupt exception and throw a custom exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better not expose the MQ implementation. Only key and value are passed in here.
This will make it easier for users to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better not expose the MQ implementation. Only key and value are passed in here. This will make it easier for users to use.
It is also considered here that some custom configurations may be made for the property or eventTime of the message. It is a bit too inflexible to pass only key and value. It may be better for users to construct a message passed in by themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It allows users to pass in config. As I said before, you can load more configuration files for Producer and Consumer through .loadConf(). You can also pass in message configuration through message.loadConf().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each message and the request producer has been modified using the loadConf().
...r-rpc-contrib/src/main/java/org/apache/pulsar/rpc/contrib/client/DefaultRequestCallBack.java
Outdated
Show resolved
Hide resolved
| public Producer<T> requestProducer(ProducerBuilder<T> requestProducer) throws IOException { | ||
| return requestProducer | ||
| // allow only one client | ||
| .accessMode(ProducerAccessMode.Exclusive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can expose a way to modify the Producer and consumer configuration by .loadConf().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each message and the request producer has been modified using the loadConf().
| * @return A new instance of {@link PulsarRpcClient}. | ||
| * @throws IOException if there is an error during the client initialization. | ||
| */ | ||
| public static <T, V> PulsarRpcClient<T, V> create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an implementation that doesn't expose the internal MQ client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe same as #6 (comment)
PulsarRpcClient, PulsarRpcClientImpl, PulsarRpcClientBuilder, PulsarRpcClientBuilderImpl
Already added.
PTAL!
If this method is correct, I will modify the server code later.
Has been updated. |
|
@liangyepianzhou please agree to the workflow and do some checks first. |
|
@liangyepianzhou @AuroraTwinkle @StevenLuMT |
|
@StevenLuMT I added the |
pulsar-rpc-contrib/src/test/java/org/apache/pulsar/rpc/contrib/PulsarRpcBase.java
Show resolved
Hide resolved
|
about Question5 Can you add a few confirmation methods, such as writing needs to wait for the following strategies to return success, similar to these scenarios: Scenario 1: All subscriptions to sub1/sub2 are acked successfully |
Users can implement a /**
* Invoked after receiving a reply from the server successfully.
*
* <p>Please note that {@code replyFuture.complete(value)} must be executed at the end.
*
* @param correlationId The correlation ID associated with the reply.
* @param subscription The subscription name the reply was received on.
* @param value The value of the reply.
* @param replyFuture The future to be completed with the received value.
*/
void onReplySuccess(String correlationId, String subscription, V value, CompletableFuture<V> replyFuture);When the ReplyListener receives the reply message, it directly calls So the framework itself does not provide the corresponding scenario implementation, but through the I added a simple scenario implementation in |
StevenLuMT
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, a module pulsar-rpc-contrib has been added, and a PIP needs to be proposed. I think we should take this opportunity to vote and decide how to merge the current project in the future, similar to what lari proposed, https://lists.apache.org/thread/y2ccnoxj76pnw25n5p91ymqygjoq1x27
My suggestion is to be consistent with bookkeeper's method. I will initiate a vote:

Because this project belongs to Pulsar, I think it is better to initiate discussions and votes on Pulsar's mailing list. |
|
@StevenLuMT @liangyepianzhou @AuroraTwinkle The PCIP document has been updated. PTAL! |
StevenLuMT
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Job
|
@lhotari @eolivelli Would you be willing to help review this PR when you have time? |

Feedback is not positive due to PIP-371.
apache/pulsar#23143
apache/pulsar#23194
We need to implement this distributed RPC framework in a way that does not intrude into the pulsar core library. Therefore, we need to use two topics, one is the request topic and the other is the reply topic. The client side sends RPC requests to the request topic, the server side receives request message and performs customized processing, and finally sends them to the reply topic. The client receives the reply message and returns.
Motivation
As we known,Pulsar's current asynchronous publish-subscribe model serves well for decoupled message distribution, but it lacks a native mechanism for handling synchronous interactions typical of Remote Procedure Calls (RPC).
This request-reply model can greatly enhance the utility of Pulsar. We can then use Pulsar as RPC.
Why would we use Pulsar for this RPC call?
request.Requestand existing send function of pulsar can be mixed to same topic. This means that the user can choose, and the call to the server side (consumer) can be asynchronous or synchronous, which is controlled by the user flexibly.Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)