Skip to content

Conversation

@Denovo1998
Copy link
Contributor

@Denovo1998 Denovo1998 commented Sep 2, 2024

Feedback is not positive due to PIP-371.
apache/pulsar#23143
apache/pulsar#23194

We need to implement this distributed RPC framework in a way that does not intrude into the pulsar core library. Therefore, we need to use two topics, one is the request topic and the other is the reply topic. The client side sends RPC requests to the request topic, the server side receives request message and performs customized processing, and finally sends them to the reply topic. The client receives the reply message and returns.

Motivation

As we known,Pulsar's current asynchronous publish-subscribe model serves well for decoupled message distribution, but it lacks a native mechanism for handling synchronous interactions typical of Remote Procedure Calls (RPC).

This request-reply model can greatly enhance the utility of Pulsar. We can then use Pulsar as RPC.

Why would we use Pulsar for this RPC call?

  • Implement RPC using Apache Pulsar. Requests can be sent through a client, received by one or more servers and processed in parallel. Finally, the server returns all processing results after processing, and the client can perform summary and other operations after receiving them.
  • This proposal to achieve the function is request. Request and existing send function of pulsar can be mixed to same topic. This means that the user can choose, and the call to the server side (consumer) can be asynchronous or synchronous, which is controlled by the user flexibly.
  • You can directly use Pulsar's own delaying messages, that is, you can execute RPC regularly.
  • You can directly use Pulsar's own load balancing mechanism.
  • You can directly use Pulsar's own message consumption throttling mechanism.
  • You can directly use Pulsar's own expansion and contraction mechanism.
  • You can directly use Pulsar's own message call tracking, monitoring, and logging mechanisms.

Modifications

RPC.drawio

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

@StevenLuMT
Copy link
Member

this image has some Chinese,please translate to English

@Denovo1998
Copy link
Contributor Author

this image has some Chinese,please translate to English

Picture updated.

* @param <V> The type of the reply messages.
*/
@RequiredArgsConstructor(access = PACKAGE)
public class PulsarRpcClient<T, V> implements AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to expose the interface externally, refer to PulsarClient.

Copy link
Contributor Author

@Denovo1998 Denovo1998 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe same as #6 (comment)

PulsarRpcClient, PulsarRpcClientImpl, PulsarRpcClientBuilder, PulsarRpcClientBuilderImpl
Already added.

PTAL!

If this method is correct, I will modify the server code later.

*/
@Slf4j
@RequiredArgsConstructor
public class RequestListener<T, V> implements MessageListener<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not need to expose this class to the business, you can use package-level private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@Slf4j
@RequiredArgsConstructor
public class ReplyListener<V> implements MessageListener<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not need to expose this class to the business, you can use package-level private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@liangyepianzhou
Copy link
Contributor

Feedback is not positive due to PIP-371. apache/pulsar#23143 apache/pulsar#23194

We need to implement this distributed RPC framework in a way that does not intrude into the pulsar core library. Therefore, we need to use two topics, one is the request topic and the other is the reply topic. The client side sends RPC requests to the request topic, the server side receives request message and performs customized processing, and finally sends them to the reply topic. The client receives the reply message and returns.

Motivation

As we known,Pulsar's current asynchronous publish-subscribe model serves well for decoupled message distribution, but it lacks a native mechanism for handling synchronous interactions typical of Remote Procedure Calls (RPC).

This request-reply model can greatly enhance the utility of Pulsar. We can then use Pulsar as RPC.

Why would we use Pulsar for this RPC call?

  • This proposal to achieve the function is request. Request and existing send function of pulsar can be mixed to same topic. This means that the user can choose, and the call to the server side (consumer) can be asynchronous or synchronous, which is controlled by the user flexibly.
  • You can directly use Pulsar's own delaying messages, that is, you can execute RPC regularly.
  • You can directly use Pulsar's own load balancing mechanism.
  • You can directly use Pulsar's own message consumption throttling mechanism.
  • You can directly use Pulsar's own expansion and contraction mechanism.
  • You can directly use Pulsar's own message call tracking, monitoring, and logging mechanisms.

Another benefit of using message queues to implement RPC requests is that the request message can be received by multiple services and processed in parallel, and then all the processing results are returned. Finally, the client will do the integration.

Copy link
Contributor

@liangyepianzhou liangyepianzhou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work! I left some comments. The most important point is the need for a concise API. Make it simple enough for users to use.
A simplest model:

newClient()
reply send Request (k, v)

* @return The reply value.
* @throws Exception if an error occurs during the request or while waiting for the reply.
*/
public V request(String correlationId, TypedMessageBuilder<T> message) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface should return a specific exception. Here you can catch the interrupt exception and throw a custom exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better not expose the MQ implementation. Only key and value are passed in here.
This will make it easier for users to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better not expose the MQ implementation. Only key and value are passed in here. This will make it easier for users to use.

It is also considered here that some custom configurations may be made for the property or eventTime of the message. It is a bit too inflexible to pass only key and value. It may be better for users to construct a message passed in by themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allows users to pass in config. As I said before, you can load more configuration files for Producer and Consumer through .loadConf(). You can also pass in message configuration through message.loadConf().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each message and the request producer has been modified using the loadConf().

public Producer<T> requestProducer(ProducerBuilder<T> requestProducer) throws IOException {
return requestProducer
// allow only one client
.accessMode(ProducerAccessMode.Exclusive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can expose a way to modify the Producer and consumer configuration by .loadConf().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each message and the request producer has been modified using the loadConf().

* @return A new instance of {@link PulsarRpcClient}.
* @throws IOException if there is an error during the client initialization.
*/
public static <T, V> PulsarRpcClient<T, V> create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an implementation that doesn't expose the internal MQ client.

Copy link
Contributor Author

@Denovo1998 Denovo1998 Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe same as #6 (comment)

PulsarRpcClient, PulsarRpcClientImpl, PulsarRpcClientBuilder, PulsarRpcClientBuilderImpl
Already added.

PTAL!

If this method is correct, I will modify the server code later.

@Denovo1998
Copy link
Contributor Author

Feedback is not positive due to PIP-371. apache/pulsar#23143 apache/pulsar#23194
We need to implement this distributed RPC framework in a way that does not intrude into the pulsar core library. Therefore, we need to use two topics, one is the request topic and the other is the reply topic. The client side sends RPC requests to the request topic, the server side receives request message and performs customized processing, and finally sends them to the reply topic. The client receives the reply message and returns.

Motivation

As we known,Pulsar's current asynchronous publish-subscribe model serves well for decoupled message distribution, but it lacks a native mechanism for handling synchronous interactions typical of Remote Procedure Calls (RPC).
This request-reply model can greatly enhance the utility of Pulsar. We can then use Pulsar as RPC.
Why would we use Pulsar for this RPC call?

  • This proposal to achieve the function is request. Request and existing send function of pulsar can be mixed to same topic. This means that the user can choose, and the call to the server side (consumer) can be asynchronous or synchronous, which is controlled by the user flexibly.
  • You can directly use Pulsar's own delaying messages, that is, you can execute RPC regularly.
  • You can directly use Pulsar's own load balancing mechanism.
  • You can directly use Pulsar's own message consumption throttling mechanism.
  • You can directly use Pulsar's own expansion and contraction mechanism.
  • You can directly use Pulsar's own message call tracking, monitoring, and logging mechanisms.

Another benefit of using message queues to implement RPC requests is that the request message can be received by multiple services and processed in parallel, and then all the processing results are returned. Finally, the client will do the integration.

Has been updated.

@StevenLuMT
Copy link
Member

@liangyepianzhou please agree to the workflow and do some checks first.

@Denovo1998
Copy link
Contributor Author

@liangyepianzhou @AuroraTwinkle @StevenLuMT
Currently only SimpleRpcCallTest#testRpcCallWithCallBack has some problems. Can you help me look at it? It may be related to topicsPattern.

@Denovo1998
Copy link
Contributor Author

@StevenLuMT I added the testcontainers, and fixed the SimpleRpcCallTest#testRpcCallWithCallBack. Please agree to the workflow.

@StevenLuMT
Copy link
Member

about Question5
I will confirm this question with you again. It may also be related to translation. My suggestion logic at that time was:

Can you add a few confirmation methods, such as writing needs to wait for the following strategies to return success, similar to these scenarios:

Scenario 1: All subscriptions to sub1/sub2 are acked successfully
Scenario 2: n (0<n<All) subscriptions to sub1/sub2 are acked successfully
Scenario 3: No need to wait for the ack of subscriptions to sub1/sub2, fall back to the original mq mode

@Denovo1998
Copy link
Contributor Author

about Question5 I will confirm this question with you again. It may also be related to translation. My suggestion logic at that time was:

Can you add a few confirmation methods, such as writing needs to wait for the following strategies to return success, similar to these scenarios:

Scenario 1: All subscriptions to sub1/sub2 are acked successfully Scenario 2: n (0<n<All) subscriptions to sub1/sub2 are acked successfully Scenario 3: No need to wait for the ack of subscriptions to sub1/sub2, fall back to the original mq mode

Users can implement a org.apache.pulsar.rpc.contrib.client.RequestCallBack and implement onReplySuccess methods to achieve the three scenarios described above.

/**
 * Invoked after receiving a reply from the server successfully.
 *
 * <p>Please note that {@code replyFuture.complete(value)} must be executed at the end.
 *
 * @param correlationId The correlation ID associated with the reply.
 * @param subscription The subscription name the reply was received on.
 * @param value The value of the reply.
 * @param replyFuture The future to be completed with the received value.
 */
void onReplySuccess(String correlationId, String subscription, V value, CompletableFuture<V> replyFuture);

When the ReplyListener receives the reply message, it directly calls onReplySuccess, and the parameter contains the subscription, key, and value to which it belongs. If the user-defined "success" criteria are met, the user will call rpcClient.removeRequest(correlationId); remove this request. At this time, if the ReplyListener still receives reply message corresponding to non-existent request, it will directly ack and not return to the user.

So the framework itself does not provide the corresponding scenario implementation, but through the RequestCallBack, users can achieve a variety of scenarios.

I added a simple scenario implementation in SimpleRpcCallTest#testRpcCallWithCallBack.
@StevenLuMT

Copy link
Member

@StevenLuMT StevenLuMT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, a module pulsar-rpc-contrib has been added, and a PIP needs to be proposed. I think we should take this opportunity to vote and decide how to merge the current project in the future, similar to what lari proposed, https://lists.apache.org/thread/y2ccnoxj76pnw25n5p91ymqygjoq1x27

My suggestion is to be consistent with bookkeeper's method. I will initiate a vote:
image

@liangyepianzhou
Copy link
Contributor

Currently, a module pulsar-rpc-contrib has been added, and a PIP needs to be proposed. I think we should take this opportunity to vote and decide how to merge the current project in the future, similar to what lari proposed, https://lists.apache.org/thread/y2ccnoxj76pnw25n5p91ymqygjoq1x27

My suggestion is to be consistent with bookkeeper's method. I will initiate a vote: image

Because this project belongs to Pulsar, I think it is better to initiate discussions and votes on Pulsar's mailing list.
And Pulsar is also optimizing its voting process to reduce blocks.
https://lists.apache.org/thread/5lv8pwdgtxcobkjfgto274xxvrr3b1q7

@Denovo1998 Denovo1998 changed the title RPC framework implemented by the Pulsar client [improve][pcip] PCIP-1: Distributed RPC framework implemented by the Pulsar client Nov 4, 2024
@Denovo1998 Denovo1998 changed the title [improve][pcip] PCIP-1: Distributed RPC framework implemented by the Pulsar client [improve][pcip] PCIP-2: Distributed RPC framework implemented by the Pulsar client Nov 6, 2024
@Denovo1998 Denovo1998 changed the title [improve][pcip] PCIP-2: Distributed RPC framework implemented by the Pulsar client [improve][pcip] PCIP-2 Distributed RPC framework implemented by the Pulsar client Nov 6, 2024
@Denovo1998
Copy link
Contributor Author

@StevenLuMT @liangyepianzhou @AuroraTwinkle The PCIP document has been updated. PTAL!

Copy link
Member

@StevenLuMT StevenLuMT left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Job

@liangyepianzhou
Copy link
Contributor

@lhotari @eolivelli Would you be willing to help review this PR when you have time?

@liangyepianzhou liangyepianzhou requested review from AuroraTwinkle, eolivelli and lhotari and removed request for AuroraTwinkle December 1, 2024 14:57
@HQebupt HQebupt self-requested a review December 5, 2024 06:55
@liangyepianzhou liangyepianzhou merged commit fd3cdda into apache:main Dec 6, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants