Skip to content

Conversation

@xiangwangcheng
Copy link

What is the purpose of the change

add feature from #208

Brief changelog

add send request message method in RoketMQTemplate including:

  1. in sync model
  2. in sync model and send in orderly
    ) in async model
  3. in aysnc model and send in ordery

and test cases.

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • [√] Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • [√] Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • [√] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • [√] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist.
  • [√] Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@zongtanghu
Copy link

please add some test cases to improve some code coverage, thanks.

if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
org.apache.rocketmq.common.message.Message replyMessage;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replyMessage variable can be initialized here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you can check the same issue at other places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, "variable initializer null here is reduncdant" warnings will show if we initialize null to it.

charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the this method, there is need to check requestCallback is null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think null is allowed here because we may have the situation that do nothing when response comes back.

@vongosling vongosling changed the title [ISSUE #208]support request/response model in rocketmq-spring [ISSUE #208]support request/reply model in rocketmq-spring Dec 24, 2019
Comment on lines 83 to 421
public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
return requestSync(destination, message, producer.getSendMsgTimeout());
}

public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
return requestSync(destination, payload, producer.getSendMsgTimeout());
}

public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
return requestSync(destination, message, timeout, 0);
}

public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
return requestSync(destination, payload, timeout, 0);
}

public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}

try {
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
org.apache.rocketmq.common.message.Message replyMessage;
replyMessage = producer.request(rocketMsg, timeout);
return replyMessage;
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}

public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return requestSync(destination, message, timeout, delayLevel);
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
return requestSyncOrderly(destination, message, hashKey, timeout, 0);
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}

try {
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
org.apache.rocketmq.common.message.Message replyMessage;
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
replyMessage = producer.request(rocketMsg, timeout);
} else {
replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
}
return replyMessage;
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}

public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
}

public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
}

public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
Message<?> message = MessageBuilder.withPayload(payload).build();
requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
}

public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
requestAsync(destination, message, requestCallback, timeout, 0);
}

public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
requestAsync(destination, payload, requestCallback, timeout, 0);
}

public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}

try {
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
producer.request(rocketMsg, requestCallback, timeout);
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}

public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
}

public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
}

public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
}

public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
}

public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
}

public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. destination:{}, message is null ", destination);
throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
}

try {
org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
charset, destination, message);
if (delayLevel > 0) {
rocketMsg.setDelayTimeLevel(delayLevel);
}
if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
producer.request(rocketMsg, requestCallback, timeout);
} else {
producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
}
} catch (Exception e) {
log.error("send request message failed. destination:{}, message:{} ", destination, message);
throw new MessagingException(e.getMessage(), e);
}
}

public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
Message<?> message = MessageBuilder.withPayload(payload).build();
requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout(), delayLevel);
}

/**
* <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
* Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?

Yeah, I am not sure about that too, but I think users MAY need the attributes in RocketMQ Message, such as topic/properties.

IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.

I have thought about that in the first place. But I changed my mind because the current implementation will make the code simple and more friendly to users.
(Seems sendXXOrdery method can be merged sendXXX, but can not merge requestSync and requestAsync because of different return types the sync mode and async mode. )

Comment on lines 36 to 59
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}")
public class ConsumerWithReply implements RocketMQListener<MessageExt> {

@Autowired
private DefaultMQProducer replyProducer;

@Override
public void onMessage(MessageExt message) {
System.out.printf("------- StringConsumer received: %s \n", message);
try {
String replyTo = MessageUtil.getReplyToClient(message);
byte[] replyContent = "reply message contents.".getBytes();
// create reply message with given util, do not create reply message by yourself
Message replyMessage = MessageUtil.createReplyMessage(message, replyContent);

// send reply message with producer
SendResult replyResult = replyProducer.send(replyMessage, 3000);
System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
}catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
System.out.println(e.getLocalizedMessage());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user use RocketMQListener to receive message, he will not be able to use request-reply model according to current implementation. IMO, for consumer, we needs to implement a new interface, such as

public interface RocketMQReplyListener<T> {
    Message<?> onMessage(T message);
}

and the job of reply message does not need to be completed by the user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I will think about it.

import org.springframework.stereotype.Service;

/**
* RocketMQMessageListener
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useless comments, Could you add some more concrete comments for this class

* RocketMQMessageListener
*/
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better off using placeholder instead of the concrete group request_consumer?

@xiangwangcheng
Copy link
Author

xiangwangcheng commented Jan 5, 2020

I have made some changes after referring to the implementation of Kafka-spring.

  1. reduce the number of methods
  2. use generic type to receive reply content
  3. add a new interface RocketMQReplyListener for consuming and replying.
  4. conceal the reply logic in consumer side.

@vongosling @RongtongJin @zongtanghu @duhenglucky

Comment on lines 402 to 410
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("Consumer replys message failed. SendStatus: {}", sendResult.getSendStatus());
} else {
log.info("Consumer replys message success.");
}
}

@Override public void onException(Throwable e) {
log.error("Consumer replys message failed. error: {}", e.getLocalizedMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer replys message -> Consumer reply message

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is replies correct?

Comment on lines 25 to 27
/**
* RocketMQMessageListener
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useless comment

Comment on lines 26 to 28
/**
* Consumer that support request-response model
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request-response change to request-reply

Comment on lines 25 to 27
/**
* Consumer that support request-response model
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines 20 to 22
/**
* The consumer supported request-response model should implement this interface.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer supporting request-reply should implement this interface. is better

/**
* The consumer supported request-response model should implement this interface.
*
* @param <T> the type received by the listener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also comment param <R> .

Comment on lines 880 to 895
private Message<?> validateMessageAndPayload(Message<?> message, Object payload) {
if (Objects.nonNull(message) && Objects.nonNull(payload)) {
log.error("`message` and `payload` cannot exist at the same time.");
throw new IllegalArgumentException("`message` and `payload` cannot exist at the same time");
}

if (Objects.nonNull(payload)) {
message = MessageBuilder.withPayload(payload).build();
}

if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
log.error("send request message failed. Either `message` or `payload` needed.");
throw new IllegalArgumentException("either `message` or `payload` needed.");
}
return message;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete useless method

Comment on lines 159 to 167
rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RequestCallback() {
@Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
System.out.print("receive reply content in callback: " + message.toString());
}

@Override public void onException(Throwable e) {
e.printStackTrace();
}
});
Copy link
Contributor

@RongtongJin RongtongJin Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Important] we need to switch RequestCallback, because rocketmq message expose to users.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for these comments, I will consider that and make another commit.

}
}

private void handleReplyMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change a method name? because it is not just about handling reply message.

System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());

// send request in sync mode with hashKey parameter and receive a reply of User type.
User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Byte.valueOf((byte) 9) --> (byte) 9

Comment on lines 300 to 308
class RocketMQRequestCallbackImpl_User implements RocketMQLocalRequestCallback<User> {
@Override public void onSuccess(User message) {
System.out.println("receive User: " + message.toString());
}

@Override public void onException(Throwable e) {
e.printStackTrace();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete useless class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

System.out.printf("send %s and receive %s %n", "request string", replyString);

// send request in sync mode with timeout parameter and receive a reply of byte[] type.
byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use MessageBuilder.withPayload("request byte[]").build().

// Send transactional messages using extRocketMQTemplate
testExtRocketMQTemplateTransaction();

// send request in sync mode and receive a reply of String type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to capitalize first letter of comments

Comment on lines 159 to 177
rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
@Override public void onSuccess(String message) {
System.out.println("receive string: " + message);
}

@Override public void onException(Throwable e) {
e.printStackTrace();
}
});
// send request in async mode and receive a reply of User type.
rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
@Override public void onSuccess(User message) {
System.out.println("receive User: " + message.toString());
}

@Override public void onException(Throwable e) {
e.printStackTrace();
}
}, 5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use System.out.printf instead of System.out.println

return null;
}
}, byte[].class, 3000);
System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new String(replyBytes) instead of replyBytes.toString()

public User onMessage(User user) {
System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
User replyUser = new User();
replyUser.setUserAge(Byte.valueOf((byte) 10));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(byte) 10 instead of Byte.valueOf((byte) 10)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RongtongJin all fixed

@RongtongJin RongtongJin added this to the 2.1.0 milestone Jan 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants