掘金 人工智能 3小时前
ROS2 Jazzy:配置高效的进程内通信
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入探讨了ROS2中节点通信的优化方法,重点介绍了进程内通信(intra-process communication)如何通过零拷贝技术提升性能。通过分析双节点流通道、循环管道和图像管道等多种示例,文章展示了如何利用`std::unique_ptr`实现消息的直接指针传递,从而避免数据复制,显著降低通信开销。此外,文章还阐述了在多订阅、一对多关系以及跨进程通信场景下,零拷贝机制的实现细节与潜在挑战,为ROS2开发者提供了优化节点间通信的实践指导。

🚀 **进程内通信与零拷贝优势**:ROS2通过允许节点在同一进程内通信,并利用`std::unique_ptr`等机制,可以实现消息的零拷贝传输。这意味着消息的内存地址在发布者和订阅者之间直接传递,无需进行数据复制,从而大幅提升通信效率,减少CPU和内存资源的消耗,尤其适用于高吞吐量和低延迟的应用场景。

💡 **双节点流通道的零拷贝验证**:在双节点流通道示例中,生产者和消费者节点通过`std::unique_ptr`发布和订阅消息。输出结果显示,发布的消息地址与接收到的消息地址完全一致,直接证明了进程内通信实现了消息的零拷贝传输,这是ROS2为优化节点间通信而设计的关键特性之一。

🔄 **循环管道中的持续零拷贝**:循环管道示例通过两个节点相互发布和订阅同一消息,并不断修改消息内容。即使消息在节点间循环传递并被修改,`std::unique_ptr`的传递方式确保了消息的内存地址始终保持不变,验证了在循环通信模式下,零拷贝机制也能稳定运行,消息的生命周期管理得到有效控制。

🖼️ **图像处理中的零拷贝挑战与解决方案**:在图像处理场景下,当一个节点(如水印节点)需要将同一消息发布给多个订阅者(如多个图像查看节点)时,一对多的关系会带来挑战。如果订阅者使用`std::unique_ptr`回调,同一消息的所有权无法同时传递给多个节点。ROS2通过将`std::unique_ptr`提升为`std::shared_ptr`来解决此问题,使得多个订阅者可以共享同一消息的所有权,但最后一个接收的节点将拥有原始指针。若订阅者使用`const std::shared_ptr`回调,则系统会共享同一消息的指针。

↔️ **跨进程通信与零拷贝的区分**:与进程内通信不同,跨进程通信(如独立的图像查看器节点)需要通过DDS中间件进行序列化和反序列化,这会生成消息的独立副本,导致内存地址不同。即使在存在跨进程订阅的情况下,ROS2的进程内零拷贝机制依然能独立运作,其核心在于节点组合(ComposableNode)使节点运行在同一进程内,从而实现直接的指针传递。

背景

ROS 应用通常由多个独立“节点”组成,每个节点执行特定任务并与系统其他部分解耦。这有助于故障隔离、加速开发、实现模块化和代码复用,但这往往造成性能下降。在ROS1 最初开发出来之后,人们就意识到需要高效的节点组合方式,所以推出了 Nodelets。在 ROS2 中,通过解决一些需要重构节点的基本问题来改进 Nodelets 的设计。

本文的例子将重点展示如何通过手动组合节点,也就是说在不修改节点代码或限制其功能的情况下,将节点分开定义,然后以不同的进程布局来组合它们。

安装例子

如果您通过软件包安装了 ROS2,请确保已安装 ros-jazzy-intra-process-demo。如果您从归档文件下载或从源代码构建 ROS2,它会自动包含在安装中。

运行和理解例子

例子包括多种类型:一些是用于展示进程内通信功能的简单示例,另一些是使用 OpenCV 的端到端示例,展示了将节点重组为不同配置的能力。

双节点流通道例子

这个例子旨在说明:当使用 std::unique_ptr 发布和订阅消息时,进程内的发布/订阅连接可以实现消息的零拷贝传输。

首先查看源代码:
github.com/ros2/demos/…

#include <chrono>#include <cinttypes>#include <cstdio>#include <memory>#include <string>#include <utility>#include "rclcpp/rclcpp.hpp"#include "std_msgs/msg/int32.hpp"using namespace std::chrono_literals;// Node that produces messages.struct Producer : public rclcpp::Node{  Producer(const std::string & name, const std::string & output)  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))  {    // Create a publisher on the output topic.    pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;    // Create a timer which publishes on the output topic at ~1Hz.    auto callback = [captured_pub]() -> void {        auto pub_ptr = captured_pub.lock();        if (!pub_ptr) {          return;        }        static int32_t count = 0;        std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());        msg->data = count++;        printf(          "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,          reinterpret_cast<std::uintptr_t>(msg.get()));        pub_ptr->publish(std::move(msg));      };    timer_ = this->create_wall_timer(1s, callback);  }  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;  rclcpp::TimerBase::SharedPtr timer_;};// Node that consumes messages.struct Consumer : public rclcpp::Node{  Consumer(const std::string & name, const std::string & input)  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))  {    // Create a subscription on the input topic which prints on receipt of new messages.    sub_ = this->create_subscription<std_msgs::msg::Int32>(      input,      10,      [](std_msgs::msg::Int32::UniquePtr msg) {        printf(          " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,          reinterpret_cast<std::uintptr_t>(msg.get()));      });  }  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;};int main(int argc, char * argv[]){  setvbuf(stdout, NULL, _IONBF, BUFSIZ);  rclcpp::init(argc, argv);  rclcpp::executors::SingleThreadedExecutor executor;  auto producer = std::make_shared<Producer>("producer", "number");  auto consumer = std::make_shared<Consumer>("consumer", "number");  executor.add_node(producer);  executor.add_node(consumer);  executor.spin();  rclcpp::shutdown();  return 0;}

从主函数可以看到,我们创建了一个生产者节点和一个消费者节点,将它们添加到单线程执行器中,然后调用 spin

让我们通过执行ros2 run intra_process_demo two_node_pipeline executable来运行例子(不要忘记先source安装文件):

$ ros2 run intra_process_demo two_node_pipeline# 生产者发布# (第一条消息可能因订阅尚未建立而丢失,无对应的消费者输出)Published message with value: 0, and address: 0x7fb02303faf0# 消费者接收,地址一致Published message with value: 1, and address: 0x7fb020cf0520Received message with value: 1, and address: 0x7fb020cf0520  [...]

您可以看到具有相同值的“Published message…”和“Received message…”行也具有相同的消息地址。这表明接收到的消息的地址与发布的消息的地址相同。这是因为我们使用std::unique_ptrs进行消息的发布和订阅,这允许消息的所有权在系统中安全地移动。而如果使用const &和std::shared_ptr发布和订阅消息,则不会发生零复制。

循环管道例子

这个例子与前一个类似,但生产者不再为每次迭代创建新消息,而是通过循环结构重复使用同一个消息实例,并通过在执行器启动前手动触发一次发布来启动通信:

源代码:
github.com/ros2/demos/…

#include <chrono>#include <cinttypes>#include <cstdio>#include <memory>#include <string>#include <utility>#include "rclcpp/rclcpp.hpp"#include "std_msgs/msg/int32.hpp"using namespace std::chrono_literals;// This node receives an Int32, waits 1 second, then increments and sends it.struct IncrementerPipe : public rclcpp::Node{  IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))  {    // Create a publisher on the output topic.    pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);    std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;    // Create a subscription on the input topic.    sub = this->create_subscription<std_msgs::msg::Int32>(      in,      10,      [captured_pub](std_msgs::msg::Int32::UniquePtr msg) {        auto pub_ptr = captured_pub.lock();        if (!pub_ptr) {          return;        }        printf(          "Received message with value:         %d, and address: 0x%" PRIXPTR "\n", msg->data,          reinterpret_cast<std::uintptr_t>(msg.get()));        printf("  sleeping for 1 second...\n");        if (!rclcpp::sleep_for(1s)) {          return;    // Return if the sleep failed (e.g. on :kbd:`ctrl-c`).        }        printf("  done.\n");        msg->data++;    // Increment the message's data.        printf(          "Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,          reinterpret_cast<std::uintptr_t>(msg.get()));        pub_ptr->publish(std::move(msg));    // Send the message along to the output topic.      });  }  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;};int main(int argc, char * argv[]){  setvbuf(stdout, NULL, _IONBF, BUFSIZ);  rclcpp::init(argc, argv);  rclcpp::executors::SingleThreadedExecutor executor;  // Create a simple loop by connecting the in and out topics of two IncrementerPipe's.  // The expectation is that the address of the message being passed between them never changes.  auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");  auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");  rclcpp::sleep_for(1s);  // Wait for subscriptions to be established to avoid race conditions.  // Publish the first message (kicking off the cycle).  std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());  msg->data = 42;  printf(    "Published first message with value:  %d, and address: 0x%" PRIXPTR "\n", msg->data,    reinterpret_cast<std::uintptr_t>(msg.get()));  pipe1->pub->publish(std::move(msg));  executor.add_node(pipe1);  executor.add_node(pipe2);  executor.spin();  rclcpp::shutdown();  return 0;}

与前面的例子不同,这个例子只使用一个Node,然后使用不同的名称和配置实例化两次并形成循环。在一个循环中,ROS 图最终是样的 pipe1 -> pipe2 -> pipe1…

其中这行代码pipe1->pub->publish(msg)启动了这个循环流程,人这时起,每个节点在其自己的订阅回调中调用publish发布消息,在节点之间来回传递消息。

在这里,节点每秒来回传递一次消息,每次都增加消息的值。由于消息是作为unique_ptr发布和订阅的,因此在开始时创建的相同消息就会被持续使用。

为了测试这些期望,让我们运行它:

$ ros2 run intra_process_demo cyclic_pipelinePublished first message with value:  42, and address: 0x7fd2ce0a2bc0Received message with value:         42, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...  done.Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0Received message with value:         43, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...  done.Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0Received message with value:         44, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...  done.Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0Received message with value:         45, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...  done.Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0Received message with value:         46, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...  done.Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0Received message with value:         47, and address: 0x7fd2ce0a2bc0  sleeping for 1 second...[...]

图像管道例子

这个例子使用 OpenCV 实现图像采集、标注和显示,验证复杂场景下的进程内零拷贝通信。

注意(macOS 用户)

若遇到类似 ddsi_conn_write failed -1 的错误,需临时增大系统 UDP 数据包大小:

$ sudo sysctl -w net.inet.udp.recvspace=209715$ sudo sysctl -w net.inet.udp.maxdgram=65500

(重启后设置失效)

简单管道(三节点)

首先,我们将有一个包含三个节点的管道,排列方式如下camera_node -> watermark_node -> image_view_node

在每个节点中,正在发送或已接收或两者同时发送的消息的地址都被写入图片中。watermark 和 image view 节点旨在修改图像而不复制它,因此,只要节点处于同一进程中,那么图片上打印的地址都应该相同。

让我们通过执行以下可执行文件来运行例子:

ros2 run intra_process_demo image_pipeline_all_in_one

可以看到暂停图像(按空格)后,图像上的三个消息地址应应该一致(同一进程内零拷贝)。

带两个图像查看器的管道

现在我们来看一个与上述示例类似的例子,不同之处在于它有两个图像查看节点。所有节点仍处于同一进程中,但现在应该会显示两个图像查看窗口( macOS 用户注意:您的图像查看窗口可能会重叠)。我们可以使用以下命令运行它:

ros2 run intra_process_demo image_pipeline_with_two_image_view

与上一个示例一样,您可以使用空格键暂停渲染,再次按下空格键可继续。您可以停止更新以检查写入屏幕的指针。

如上面的示例图像所示,我们有一个图像中所有指针都相同,另一个图像的前两个条目指针与第一个图像相同,但第二个图像的最后一个指针不同。要理解为什么会发生这种情况,请考虑图的拓扑结构:

camera_node -> watermark_node -> image_view_node                              -> image_view_node2

摄像头节点(camera_node)与水印节点(watermark_node)之间的链接可以使用同一指针而无需复制,这是因为消息只需传递到一个进程内订阅(intra process subscription)。然而,在水印节点与两个图像视图节点之间的链接是一对多关系,因此如果图像视图节点(image_view_node)使用 unique_ptr 回调,则无法将同一指针的所有权同时传递给两者。此时,指针只能传递给其中一个节点。具体哪一个节点会获得原始指针的所有权并未明确定义,而是取决于消息传递的顺序——最后接收的节点将获得指针。

在这里还需要注意,图像查看节点并非使用 unique_ptr 回调进行订阅,而是使用 const shared_ptr 进行订阅。这意味着系统会将同一个 shared_ptr 传递给两个回调。当处理第一个进程内订阅时,内部存储的 unique_ptr 会提升为 shared_ptr。每个回调都将获得同一消息的共享所有权。

带进程间查看器的管道

另一个需要正确理解的重要点是,当进行进程间订阅时,要避免中断进程内的零拷贝行为。为了测试这一点,我们可以运行第一个图像管道演示 ros2 run intra_process_demo image_pipeline_all_in_one,然后运行独立的 ros2 run intra_process_demo image_view_node 实例。这看起来会像这样:

在ROS2中,同时暂停两个图像流以实现对齐存在困难,这可能导致图像帧无法完全同步。但关键点在于观察到的以下现象及其技术含义:image_pipeline_all_in_one 的图像查看器在每个步骤中显示的地址相同。这意味着即使存在外部订阅(例如独立的图像查看器),进程内零拷贝机制仍然生效。这是因为节点在同一进程中运行(如通过 ComposableNode 组合),ROS2 的进程内通信直接传递指针(unique_ptr 或裸指针),避免数据复制。而跨进程的图像查看器节点(独立的图像查看器节点)则显示不同进程ID,且地址与前两行不同。则是因为 ROS2 通过 DDS 中间件在跨进程通信时进行序列化/反序列化,生成独立副本,导致内存地址不同。


关注【智践行】,发送 【机器人】 获得机器人经典学习资料

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

ROS2 进程内通信 零拷贝 节点通信 性能优化
相关文章