Skip to content

Fix a bug when publishing unique_ptr#504

Closed
Jrdevil-Wang wants to merge 2 commits intoros2:masterfrom
Jrdevil-Wang:patch-1
Closed

Fix a bug when publishing unique_ptr#504
Jrdevil-Wang wants to merge 2 commits intoros2:masterfrom
Jrdevil-Wang:patch-1

Conversation

@Jrdevil-Wang
Copy link
Copy Markdown

A do_inter_process_publish is always called when publishing unique_ptr, even for the intra-process situation.
It will cause the performance of intra-process publishing degrade as the inter-process situation.
Since other cases of intra-process publishing (i.e. publishing shared_ptr, publishing message reference, and publishing message pointer) all call this version of publish method, it will basically affect all intra-process publishing performance.

This fix will bring the expected intra-process low-latency performance.

A do_inter_process_publish is always called when publishing unique_ptr, even for the intra-process situation.
It will cause the performance of intra-process publishing degrade as the inter-process situation.
Since other cases of intra-process publishing (i.e. publishing shared_ptr, publishing message reference, and publishing message pointer) all call this version of publish method, it will basically affect all intra-process publishing performance.

This fix will bring the expected intra-process low-latency performance.
@dirk-thomas dirk-thomas added the in review Waiting for review (Kanban column) label Jun 22, 2018
@dirk-thomas
Copy link
Copy Markdown
Member

Have you tried to subscribe to the message from a different process with this patch applied?

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jun 22, 2018

This is not an appropriate fix, external subscriptions will not work when intra-process is on, as @dirk-thomas suggested.

@dirk-thomas dirk-thomas added in progress Actively being worked on (Kanban column) requires-changes and removed in review Waiting for review (Kanban column) labels Jun 22, 2018
@Jrdevil-Wang
Copy link
Copy Markdown
Author

Yes, you are right, @wjwwood @dirk-thomas. Sorry, I miss the requirement that intra- and inter- should be able to work together.
On the other hand, I think you get my point too. Even though the current intra- implement DO provide zero-copy feature, the key purpose is missed if the performance of intra- is the same as inter-.
I will try to come up with an appropriated fix.

Add a do_inter_process_publish AFTER the intra- message is published. This will enable intra- and inter-process publishing to work together.
@Jrdevil-Wang
Copy link
Copy Markdown
Author

OK. I add a do_inter_process_publish AFTER the intra- message is published. This will enable intra- and inter-process publishing to work together.
Tests have been done by publishing an 800x600x24bit image. The intra- performance is better than inter- (about 5-10 times faster), but is still a little worse (about 2 times) than the expected zero-copy performance.
Please try and review. @dirk-thomas @wjwwood

It could still be improved, I think. I guess that the subscriber should subscribe the intra- version DDS handle only, and do not subscribe the normal version DDS handle.

if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
}
this->do_inter_process_publish(msg_ptr);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function call store_intra_process_message_() above is taking ownership of the msg_ptr. So we can't rely on msg_ptr being still valid here. That is reason why the "inter" call currently happens before the "intra" call.

It would certainly be good to address the latency issue you describe but it will probably be more complicated than this.

@dirk-thomas
Copy link
Copy Markdown
Member

I guess that the subscriber should subscribe the intra- version DDS handle only, and do not subscribe the normal version DDS handle.

The subscriber might additionally receive a message from another process. So it still needs to subscribe to the DDS topic in general.

@Jrdevil-Wang
Copy link
Copy Markdown
Author

The subscriber might additionally receive a message from another process. So it still needs to subscribe to the DDS topic in general.

OK, I get your point.
At this point, I have to admit that I am confused why ros2 abandons the pub-sub link design in ros1.
Tagging a node, rather than a pub-sub link, as "intra_comms" do not really make sense to me.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jun 26, 2018

@Jrdevil-Wang it could be more granular so that you could control it on a per topic or even per publisher or subscription basis, but that doesn't change the need to do interprocess communication as well.

In ROS 1, the passing of pointers instead of sending over the network within a node or between nodelets was on by default and you didn't need to do anything special to turn that on, so having it on a Node level seems natural to me.

At this point, I have to admit that I am confused why ros2 abandons the pub-sub link design in ros1.

Can you clarify what feature or API in ROS 1 you're referring to? I just want to make sure we're talking about the same thing.

@Jrdevil-Wang
Copy link
Copy Markdown
Author

Can you clarify what feature or API in ROS 1 you're referring to? I just want to make sure we're talking about the same thing.

@wjwwood , please correct me if I am wrong.

It seems in ros2, whether using intra-process communication is decided by a parameter on Node construction (use_intra_process_comms in the following code).

RCLCPP_PUBLIC
explicit Node(
const std::string & node_name,
const std::string & namespace_ = "",
bool use_intra_process_comms = false);

In ros1, whether a message is published with intra- mode is decided per subscriber-link.
https://github.com/ros/ros_comm/blob/89ca9a7bf288b09d033a1ab6966ef8cfe39e1002/clients/roscpp/src/libros/publication.cpp#L410-L417

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jun 26, 2018

In ros1, whether a message is published with intra- mode is decided per subscriber-link.

But the user usually doesn't influence that. In ROS 1 a Subscriber (which is the only thing users typically interact with) will create multiple links as appropriate, including intra process, or various kinds of inter process ones. Whether or not a user created Subscriber ends up creating an intra process subscriber link depends on a global policy, just like the setting in the ROS 2 node for intra process or not. So even though the information is contained in a more granular way in ROS 1, it tends to be controller globally.

What may not be obvious from your linked code is that there are many subscriber links and some will not be intra process, so in all cases the the serialized version of the message is sent to the publish queue for interprocess communications:

https://github.com/ros/ros_comm/blob/89ca9a7bf288b09d033a1ab6966ef8cfe39e1002/clients/roscpp/src/libros/publication.cpp#L425

You can see the various kind of links created, for intra-process here:

https://github.com/ros/ros_comm/blob/89ca9a7bf288b09d033a1ab6966ef8cfe39e1002/clients/roscpp/src/libros/subscription.cpp#L188-L194

And for transport subscriber links:

https://github.com/ros/ros_comm/blob/89ca9a7bf288b09d033a1ab6966ef8cfe39e1002/clients/roscpp/src/libros/transport_subscriber_link.cpp#L117

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jun 26, 2018

Further more, you can see that the "local" connection is made for all publishers and subscriptions unconditionally in their respective creation functions:

@Jrdevil-Wang
Copy link
Copy Markdown
Author

Jrdevil-Wang commented Jun 26, 2018

OK, let me give an example to explain.

Say there are two processes: process A has one publisher and one subscriber, and process B has one publisher and subscriber. All these publishers and subscribers are with the same topic.

From my understanding, in ros1, there will be four subscriber links, and two of them are intra-process link, the other two of them are inter-process link. When A.pub publishes a message, this message is sent to A.sub through intra-process link, and to B.sub through inter-process link. It is fine to me.

However, in ros2, all publishers and subscribers are with the same DDS topic. Therefore, when A.pub publishes a message, this message is sent to A.sub AND B.sub through DDS (DDS don't know we have another channel), and to A.sub with intra-process mechanism.

Problem is, I should prevent DDS to send this message from A.pub to A.sub, but I can't, right?

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jun 26, 2018

Yes, that's the double delivery problem. Borrowing from your example A.sub will receive the message from A.pub twice, once via intra process and once via inter process.

DDS doesn't know about our "side channel", but we can tell it to not send data to itself when we have a side channel. That's what this option is for in our middleware API:

https://github.com/ros2/rmw/blob/b4e6596161f122d520e5085742dd2e48dcdca82b/rmw/include/rmw/rmw.h#L246

Even in the case that there is double delivery, subscriptions which have intra process enabled will ignore interprocess messages from publishers in the same node, see:

if (matches_any_intra_process_publishers_) {
if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
// In this case, the message will be delivered via intra process and
// we should ignore this copy of the message.
return;
}
}

This is inefficient because we've needlessly sent the message to ourselves over the middleware (DDS in this case), but we don't deliver data to the user twice.


Unfortunately, the ignore_local_publications option is not consistently supported at the moment in our rmw implementations, so we're not using it right now.

That's the remaining work that needs to be done (in my opinion) in order to make intra process on by default in Nodes.


A further enhancement would be to have publishers watch the "matched subscriptions" (we don't expose this information from DDS yet) in order to know if publishing to DDS would actually end up sending data to anyone, and then as an optimization not doing inter process publish if there are no matched subscriptions from other participants.


Each of these are sort of related, but different topics than the original question, or proposed enhancement in my opinion and perhaps deserve their own issues.

To the best of my knowledge, the original proposal was to publish the intra process message first to improve latency, but that cannot be done without copying the message first to send to the inter process manager.

The other option is to convert the unique pointer to a shared pointer before giving it to the intra process manager so that the inter process publish which is called later may be assured it will still exist. This causes other issues however, and makes truly zero copy across several pub/sub links impossible, because you can never be sure that the user removed read-write access to the message they received in a subscription callback and then published again.

@Jrdevil-Wang
Copy link
Copy Markdown
Author

Happy to learn the plan.

To the best of my knowledge, the original proposal was to publish the intra process message first to improve latency, but that cannot be done without copying the message first to send to the inter process manager.

Yes, that's true. There were no problem in my test, only because I used SingleThreadedExecutor, and there were no chance for the intra-process subscriber to consume the message before the message is sent through DDS in the changed publisher's code.

Same principle, maybe I could add some kind of mutex to protect the msg_ptr?

@Jrdevil-Wang
Copy link
Copy Markdown
Author

@dirk-thomas , @wjwwood . I get an idea about protecting the msg_ptr. Please check if it is acceptable before I implement it.

During store_intra_process_message_, a record is created in a MappedRingBuffer.
Each record is an element struct shown below.

struct element
{
uint64_t key;
ElemUniquePtr value;
bool in_use;
};

I plan to add another field "bool is_locked;". Initially, it is set to true, and will be set to false after finishing do_inter_process_publish. After that, nothing is different.
During the time when is_locked is true, any calls to MappedRingBuffer::pop_at_key will return a copy rather than pop the unique_ptr.
Thus, the msg_ptr here is guaranteed to be valid.

@mikaelarguedas mikaelarguedas added in review Waiting for review (Kanban column) and removed in progress Actively being worked on (Kanban column) labels Jul 5, 2018
@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 19, 2018

Sorry for long delay in replying, and thanks for patience :)


@Jrdevil-Wang I believe your proposal will work, however it has the draw back that having zero-copy pub/sub in a chain will be a race condition. For example, consider the image_pipeline_all_in_one demo from here:

https://github.com/ros2/ros2/wiki/Intra-Process-Communication#simple-pipeline

Which generates and image and in each of three nodes the image is "stamped" with the address of the image message itself:

687474703a2f2f692e696d6775722e636f6d2f747169495667542e706e67

The point of the demo is that the three addresses stamped on the image are always the same, implying zero-copy between several pub/sub links (and separately developed nodes).

With your proposed change, if the inter-process copying of the message isn't fast enough, then the pointers on the image would not be the same, as the call to pop_key_at would now be returning a copy, not the original.

So we'll go from "you'll get zero-copy so long as there's only one subscriber" to "you'll sometimes (but non-deterministically) get zero-copy if there's only one subscriber".

I also question whether or not this is a good trade-off. If most of the latency comes from copying the message before doing intra-process, then in the cases where the race fails (and pop_key_at needs to make a copy) we'll get the same lag in delivering to the subscription. So we'll go from "always having additional latency due to copying the message" to "sometimes having additional latency due to copying the message". The average latency will be lower (and maybe higher throughput), but we'll have less deterministic and consistent behavior.

That all being said, perhaps this is something we could add and make optional (something users could turn on with a flag of some sort). It does provide something between the unique pointer and shared pointer.

Alternatively, I can see two other things we might do to address this problem.


We could avoid copying the message for use with the inter-process publish if we could determine that there are no matched inter-process subscribers (avoiding the inter-process work all together). That would allow us to pass the unique_ptr directly to intra-process without worrying about inter-process. So you'd get three distinct and mostly deterministic behaviors:

  • if there is only one intra-process subscription, you get low latency and zero-copy
  • if there is only one intra-process subscription and any number of inter-process subscriptions, you get zero-copy (for the intra-process subscription) but slightly higher latency (due to the copy for inter-process)
  • if there are no subscriptions, you don't do any work at all

To do this we would need to implement the "count matched subscriptions" feature I described above.


We have (in the past) discussed providing an option for intra-process to behave like ROS 1, where a call to publish will directly call all subscription callbacks in the same process (without sending a DDS message or rescheduling the subscription callback), and only after all intra-process callbacks have completed, then send the message to inter-process. So for a system with a publisher, two intra-process subscriptions and one inter-process subscription it would look like this:

  • user calls publish:
    • system calls subscription A's callback and waits for it to finish
    • system calls subscription B's callback and waits for it to finish
    • system gives the message to DDS to do inter-process communications
  • publish returns control to the user

This has a few drawbacks:

  • it behaves differently from inter-process only communication
    • long subscription callbacks will delay publish returning
    • intra-process subscription callbacks will be called in publish thread rather than executor thread (if different)
  • intra-process subscription callbacks cannot be handled concurrently (like in a thread-pool of an executor)
  • long intra-process subscriptions add lots of latency to inter-process communications

However, if the user opts into it, then it is more efficient, lower latency, and more deterministic because there's no scheduling (callbacks are called as a direct result of calling publish).

I don't think this is the right behavior by default, but might be preferable in specific scenarios.

@wjwwood wjwwood removed their request for review July 19, 2018 00:26
@Jrdevil-Wang
Copy link
Copy Markdown
Author

Jrdevil-Wang commented Jul 19, 2018

Thanks for the reviews! @wjwwood

With your proposed change, if the inter-process copying of the message isn't fast enough, then the pointers on the image would not be the same, as the call to pop_key_at would now be returning a copy, not the original.

I think the image_pipeline_all_in_one demo will still work with my proposed change, because it uses the SingleThreadedExecutor executor. The executor has to complete of the publishing process, before any subscribers get the chance to execute. But yeah, when using multi-threaded executor, my proposed change will bring uncertainty.

To do this we would need to implement the "count matched subscriptions" feature I described above.

This is a good solution, but I don't think it is easy to keep this count accurate, considering inter-machine subscriptions.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 19, 2018

I think the image_pipeline_all_in_one demo will still work with my proposed change, because it uses the SingleThreadedExecutor executor. The executor has to complete of the publishing process, before any subscribers get the chance to execute. But yeah, when using multi-threaded executor, my proposed change will bring uncertainty.

Actually the only case where the copy is avoided all the time is when the publish call is coming from a callback in the single threaded executor (like from a timer callback) and the subscription callback is in the same single threaded executor. If the publish call is made from a different thread or if from a different executor or if in a multithreaded executor (assuming different callback groups for the subscription and timer), then it is a race to see whether or not a copy is returned by the subscription callback or not.

This is a good solution, but I don't think it is easy to keep this count accurate, considering inter-machine subscriptions.

Well, so long as we use the same information the middleware keeps it should be accurate, after all this is how the middleware knows to create unicast connections to other endpoints. But we still need the feature.


So what should we do with this pull request? Personally I'd prefer to keep the copy in place as there are only specific cases where we might be able to avoid it with the current proposal and instead we should strive to remove the copy when we can do it always, i.e. when there are no inter-process subscriptions.

Separately we could consider merging this proposed optimization, but I think we should have an opt out option so that people can select determinism versus lower average latency (and so demos like image_pipeline_all_in_one can continue to work consistently).

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 19, 2018

Sorry I made a mistake in the previous comment, with a single threaded executor it will always avoid the copy, which is pretty good. I'll update the comment, but I still think having the ability to opt-out of this optimization would be best.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 19, 2018

Actually it's more complicated than even what I said in the previous comment, it is actually still possible that a copy is made on delivery with a single threaded executor if the publish call is made outside of the executor thread (asynchronously).

@Jrdevil-Wang
Copy link
Copy Markdown
Author

@wjwwood Yeah, this problem is much more complicated than I thought, and there seems no easy way to solve here. Maybe I should just close this PR, and wait for a more thorough solution?

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 26, 2018

@Jrdevil-Wang if you find the enhancement a benefit for your use case, I'd be ok iterating on this to make it opt-in or opt-out, but otherwise I would agree that we should close this and look towards one of the other optimizations/improvements I described above.

I do appreciate you looking into it though! It was good to get written down somewhere and explain it.

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Jul 26, 2018

I'll let you decide to close it or not, or at the very least I'll wait for your feedback before closing it myself.

@Jrdevil-Wang
Copy link
Copy Markdown
Author

@wjwwood Thanks!
I really think my enhancement would be a benefit for a lot of use cases. Despite of the unsafe pointer operation, my enhancement should work well with SingleThreadExecutor. Of course, there is a more severe problem when using MultiThreadExecutor.
In particular, I think most users start trying the intra-process zero-copy feature with the image_pipeline demo, and among them, a lot of users (including myself) will find the performance is not more efficient than inter-process communication, even for a demo, which is pretty frustrating.
Make it an opt seems a good way to let more people notice the problem here. What should I do if I would like to make it an opt?

@wjwwood
Copy link
Copy Markdown
Member

wjwwood commented Aug 3, 2018

Ok, so first you'd need to implement what you proposed above (#504 (comment)), where the "final" call to pop_at_key for a message instance will return a copy rather than the original.

Then we'd want to expose an option (probably at creation of the publisher) which opted into or out of the new behavior. Then in the demos (like the image pipeline demo) we'd need to ensure we're using the current behavior (because zero-copy is more important than the latency). At best we'd update the demo (and wiki page) to highlight this trade-off (lower average latency for occasionally needing to copy the message).

One thing to watch out for in the implementation of the suggestion above, is that the message still needs to get removed from the ring buffer, even if the "last" call to pop_at_key was called before do_inter_process_publish finishes. Normally the last call to pop_at_key removes the original unique pointer.

@wjwwood wjwwood added in progress Actively being worked on (Kanban column) and removed in review Waiting for review (Kanban column) labels Aug 3, 2018
@Jrdevil-Wang
Copy link
Copy Markdown
Author

Ok, I see. I will implement the proposal soon, hopefully.

@ivanpauno
Copy link
Copy Markdown
Member

@Jrdevil-Wang #664 together with #690 have solved what you were proposing here.

I will summarize a little bit:

publish(std::unique_ptr<MessageT, MessageDeleter> & msg)
{
if (!intra_process_is_enabled_) {
this->do_inter_process_publish(msg.get());
msg.reset();
return;
}
// If an interprocess subscription exist, then the unique_ptr is promoted
// to a shared_ptr and published.
// This allows doing the intraprocess publish first and then doing the
// interprocess publish, resulting in lower publish-to-subscribe latency.
// It's not possible to do that with an unique_ptr,
// as do_intra_process_publish takes the ownership of the message.
uint64_t message_seq;
bool inter_process_publish_needed =
get_subscription_count() > get_intra_process_subscription_count();
MessageSharedPtr shared_msg;
if (inter_process_publish_needed) {
shared_msg = std::move(msg);
message_seq =
store_intra_process_message(intra_process_publisher_id_, shared_msg);
} else {
message_seq =
store_intra_process_message(intra_process_publisher_id_, std::move(msg));
}
this->do_intra_process_publish(message_seq);
if (inter_process_publish_needed) {
this->do_inter_process_publish(shared_msg.get());
}
}

I think this PR could be closed now, but I will wait for your feedback before doing it.

@Jrdevil-Wang
Copy link
Copy Markdown
Author

Jrdevil-Wang commented May 5, 2019 via email

@wjwwood wjwwood closed this May 5, 2019
@wjwwood wjwwood removed the in progress Actively being worked on (Kanban column) label May 5, 2019
nnmm pushed a commit to ApexAI/rclcpp that referenced this pull request Jul 9, 2022
Signed-off-by: ivanpauno <ivanpauno@ekumenlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants