-
Notifications
You must be signed in to change notification settings - Fork 940
[ISSUE #208]support request/reply model in rocketmq-spring #209
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8657209 to
276f088
Compare
|
please add some test cases to improve some code coverage, thanks. |
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } | ||
| org.apache.rocketmq.common.message.Message replyMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replyMessage variable can be initialized here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And you can check the same issue at other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, "variable initializer null here is reduncdant" warnings will show if we initialize null to it.
| charset, destination, message); | ||
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the this method, there is need to check requestCallback is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think null is allowed here because we may have the situation that do nothing when response comes back.
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) { | ||
| return requestSync(destination, message, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) { | ||
| return requestSync(destination, payload, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) { | ||
| return requestSync(destination, message, timeout, 0); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) { | ||
| return requestSync(destination, payload, timeout, 0); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) { | ||
| if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { | ||
| log.error("send request message failed. destination:{}, message is null ", destination); | ||
| throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); | ||
| } | ||
|
|
||
| try { | ||
| org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(), | ||
| charset, destination, message); | ||
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } | ||
| org.apache.rocketmq.common.message.Message replyMessage; | ||
| replyMessage = producer.request(rocketMsg, timeout); | ||
| return replyMessage; | ||
| } catch (Exception e) { | ||
| log.error("send request message failed. destination:{}, message:{} ", destination, message); | ||
| throw new MessagingException(e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) { | ||
| Message<?> message = MessageBuilder.withPayload(payload).build(); | ||
| return requestSync(destination, message, timeout, delayLevel); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) { | ||
| return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) { | ||
| return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) { | ||
| return requestSyncOrderly(destination, message, hashKey, timeout, 0); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) { | ||
| return requestSyncOrderly(destination, payload, hashKey, timeout, 0); | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) { | ||
| if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { | ||
| log.error("send request message failed. destination:{}, message is null ", destination); | ||
| throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); | ||
| } | ||
|
|
||
| try { | ||
| org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(), | ||
| charset, destination, message); | ||
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } | ||
| org.apache.rocketmq.common.message.Message replyMessage; | ||
| if (Objects.isNull(hashKey) || hashKey.isEmpty()) { | ||
| replyMessage = producer.request(rocketMsg, timeout); | ||
| } else { | ||
| replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout); | ||
| } | ||
| return replyMessage; | ||
| } catch (Exception e) { | ||
| log.error("send request message failed. destination:{}, message:{} ", destination, message); | ||
| throw new MessagingException(e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) { | ||
| Message<?> message = MessageBuilder.withPayload(payload).build(); | ||
| return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel); | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) { | ||
| requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Object payload, RequestCallback requestCallback) { | ||
| Message<?> message = MessageBuilder.withPayload(payload).build(); | ||
| requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) { | ||
| requestAsync(destination, message, requestCallback, timeout, 0); | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) { | ||
| requestAsync(destination, payload, requestCallback, timeout, 0); | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) { | ||
| if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { | ||
| log.error("send request message failed. destination:{}, message is null ", destination); | ||
| throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); | ||
| } | ||
|
|
||
| try { | ||
| org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(), | ||
| charset, destination, message); | ||
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } | ||
| producer.request(rocketMsg, requestCallback, timeout); | ||
| } catch (Exception e) { | ||
| log.error("send request message failed. destination:{}, message:{} ", destination, message); | ||
| throw new MessagingException(e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) { | ||
| Message<?> message = MessageBuilder.withPayload(payload).build(); | ||
| requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel); | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) { | ||
| requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) { | ||
| requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout()); | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) { | ||
| requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0); | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) { | ||
| requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0); | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) { | ||
| if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { | ||
| log.error("send request message failed. destination:{}, message is null ", destination); | ||
| throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); | ||
| } | ||
|
|
||
| try { | ||
| org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(), | ||
| charset, destination, message); | ||
| if (delayLevel > 0) { | ||
| rocketMsg.setDelayTimeLevel(delayLevel); | ||
| } | ||
| if (Objects.isNull(hashKey) || hashKey.isEmpty()) { | ||
| producer.request(rocketMsg, requestCallback, timeout); | ||
| } else { | ||
| producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout); | ||
| } | ||
| } catch (Exception e) { | ||
| log.error("send request message failed. destination:{}, message:{} ", destination, message); | ||
| throw new MessagingException(e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) { | ||
| Message<?> message = MessageBuilder.withPayload(payload).build(); | ||
| requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout(), delayLevel); | ||
| } | ||
|
|
||
| /** | ||
| * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes. | ||
| * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?
Yeah, I am not sure about that too, but I think users MAY need the attributes in RocketMQ Message, such as topic/properties.
IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.
I have thought about that in the first place. But I changed my mind because the current implementation will make the code simple and more friendly to users.
(Seems sendXXOrdery method can be merged sendXXX, but can not merge requestSync and requestAsync because of different return types the sync mode and async mode. )
| @Service | ||
| @RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}") | ||
| public class ConsumerWithReply implements RocketMQListener<MessageExt> { | ||
|
|
||
| @Autowired | ||
| private DefaultMQProducer replyProducer; | ||
|
|
||
| @Override | ||
| public void onMessage(MessageExt message) { | ||
| System.out.printf("------- StringConsumer received: %s \n", message); | ||
| try { | ||
| String replyTo = MessageUtil.getReplyToClient(message); | ||
| byte[] replyContent = "reply message contents.".getBytes(); | ||
| // create reply message with given util, do not create reply message by yourself | ||
| Message replyMessage = MessageUtil.createReplyMessage(message, replyContent); | ||
|
|
||
| // send reply message with producer | ||
| SendResult replyResult = replyProducer.send(replyMessage, 3000); | ||
| System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString()); | ||
| }catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) { | ||
| System.out.println(e.getLocalizedMessage()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user use RocketMQListener to receive message, he will not be able to use request-reply model according to current implementation. IMO, for consumer, we needs to implement a new interface, such as
public interface RocketMQReplyListener<T> {
Message<?> onMessage(T message);
}and the job of reply message does not need to be completed by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! I will think about it.
| import org.springframework.stereotype.Service; | ||
|
|
||
| /** | ||
| * RocketMQMessageListener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless comments, Could you add some more concrete comments for this class
| * RocketMQMessageListener | ||
| */ | ||
| @Service | ||
| @RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better off using placeholder instead of the concrete group request_consumer?
|
I have made some changes after referring to the implementation of Kafka-spring.
|
| if (sendResult.getSendStatus() != SendStatus.SEND_OK) { | ||
| log.error("Consumer replys message failed. SendStatus: {}", sendResult.getSendStatus()); | ||
| } else { | ||
| log.info("Consumer replys message success."); | ||
| } | ||
| } | ||
|
|
||
| @Override public void onException(Throwable e) { | ||
| log.error("Consumer replys message failed. error: {}", e.getLocalizedMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer replys message -> Consumer reply message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is replies correct?
| /** | ||
| * RocketMQMessageListener | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless comment
| /** | ||
| * Consumer that support request-response model | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
request-response change to request-reply
| /** | ||
| * Consumer that support request-response model | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
| /** | ||
| * The consumer supported request-response model should implement this interface. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer supporting request-reply should implement this interface. is better
| /** | ||
| * The consumer supported request-response model should implement this interface. | ||
| * | ||
| * @param <T> the type received by the listener |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also comment param <R> .
| private Message<?> validateMessageAndPayload(Message<?> message, Object payload) { | ||
| if (Objects.nonNull(message) && Objects.nonNull(payload)) { | ||
| log.error("`message` and `payload` cannot exist at the same time."); | ||
| throw new IllegalArgumentException("`message` and `payload` cannot exist at the same time"); | ||
| } | ||
|
|
||
| if (Objects.nonNull(payload)) { | ||
| message = MessageBuilder.withPayload(payload).build(); | ||
| } | ||
|
|
||
| if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { | ||
| log.error("send request message failed. Either `message` or `payload` needed."); | ||
| throw new IllegalArgumentException("either `message` or `payload` needed."); | ||
| } | ||
| return message; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete useless method
| rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RequestCallback() { | ||
| @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) { | ||
| System.out.print("receive reply content in callback: " + message.toString()); | ||
| } | ||
|
|
||
| @Override public void onException(Throwable e) { | ||
| e.printStackTrace(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Important] we need to switch RequestCallback, because rocketmq message expose to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for these comments, I will consider that and make another commit.
| } | ||
| } | ||
|
|
||
| private void handleReplyMessage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about change a method name? because it is not just about handling reply message.
| System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString()); | ||
|
|
||
| // send request in sync mode with hashKey parameter and receive a reply of User type. | ||
| User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Byte.valueOf((byte) 9) --> (byte) 9
| class RocketMQRequestCallbackImpl_User implements RocketMQLocalRequestCallback<User> { | ||
| @Override public void onSuccess(User message) { | ||
| System.out.println("receive User: " + message.toString()); | ||
| } | ||
|
|
||
| @Override public void onException(Throwable e) { | ||
| e.printStackTrace(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete useless class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| System.out.printf("send %s and receive %s %n", "request string", replyString); | ||
|
|
||
| // send request in sync mode with timeout parameter and receive a reply of byte[] type. | ||
| byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use MessageBuilder.withPayload("request byte[]").build().
| // Send transactional messages using extRocketMQTemplate | ||
| testExtRocketMQTemplateTransaction(); | ||
|
|
||
| // send request in sync mode and receive a reply of String type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to capitalize first letter of comments
| rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() { | ||
| @Override public void onSuccess(String message) { | ||
| System.out.println("receive string: " + message); | ||
| } | ||
|
|
||
| @Override public void onException(Throwable e) { | ||
| e.printStackTrace(); | ||
| } | ||
| }); | ||
| // send request in async mode and receive a reply of User type. | ||
| rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() { | ||
| @Override public void onSuccess(User message) { | ||
| System.out.println("receive User: " + message.toString()); | ||
| } | ||
|
|
||
| @Override public void onException(Throwable e) { | ||
| e.printStackTrace(); | ||
| } | ||
| }, 5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use System.out.printf instead of System.out.println
| return null; | ||
| } | ||
| }, byte[].class, 3000); | ||
| System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new String(replyBytes) instead of replyBytes.toString()
| public User onMessage(User user) { | ||
| System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user); | ||
| User replyUser = new User(); | ||
| replyUser.setUserAge(Byte.valueOf((byte) 10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(byte) 10 instead of Byte.valueOf((byte) 10)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RongtongJin all fixed
What is the purpose of the change
add feature from #208
Brief changelog
add send request message method in RoketMQTemplate including:
) in async model
and test cases.
Verifying this change
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.[ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyleto make sure basic checks pass. Runmvn clean install -DskipITsto make sure unit-test pass. Runmvn clean test-compile failsafe:integration-testto make sure integration-test pass.