跳转至

零拷贝数据传输

在传统操作系统的数据传输过程中,系统内部会在磁盘、内存、缓存中多次进行数据拷贝,每次都会占用CPU的资源,数据量小的时候还好,随着数据量的增加,CPU的开销也会持续增加,尤其是在机器人图像数据的应用中 ,经常会发生这种问题,导致CPU都在做数据拷贝,没有时间处理其他的应用功能了,直接的感觉就是处理卡顿。

image-20220614143518914

零拷贝技术

针对这种问题,零拷贝技术应运而生。

image-20220614143523146

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。

TogetherROS中就提供了灵活、高效的零拷贝功能,可以显著降低大尺寸数据的通信延时和CPU占用,具体有多显著呢,我们不妨进行一个测试。

TogetherROS零拷贝性能测试

我们使用TogetherROS系统内部集成的性能测试工具——performance_test,来评估下开启零拷贝前后的性能差异,这里我们传输的样本数据量是4M。

未开启零拷贝进行数据传输

$ ros2 run performance_test perf_test --reliable --keep-last --history-depth 10 -s 1 -m Array4m -r 100 --max-runtime 30 #未开启 

开启零拷贝数据传输:

$ ros2 run performance_test perf_test --zero-copy --reliable --keep-last --history-depth 10 -s 1 -m Array4m -r 100 --max-runtime 30 #开启 

image-20220614143623456

好的,测试已经跑完了,我们把结果放到这里,来分析一下。

在这个测试中,关键有四个指标:

  • 时延,也就是消息从发布者到订阅者的传输时间。不开启零拷贝的情况下,平均为0.004912s,开启零拷贝之后,速度快了差不多40倍,平均为0.000180s。
  • CPU使用率,表示通信活动所占用的CPU时间,大家可以看这个utime指标,开启零拷贝之后时间有显著的提升,消耗CPU的资源少了。
  • 驻留内存,包括通信过程中分配的内存和共享内存,是这个maxrss中的数据,开启零拷贝之后,占用的内存也更少。
  • 样本统计,包括测试中发送、接收以及丢失的消息数量,是这组数据,依然是开启零拷贝之后性能更好。

通过测试,对于大数据通信来讲,零拷贝在CPU消耗、内存占用以及通信延迟抖动方面的性能都会更好。

这里只是便于大家感受零拷贝技术的效果,具体编程中如何使用零拷贝机制呢?

编程开发

为了方便大家使用,TogetherROS针对零拷贝功能进行了封装,风格类似ROS2中话题通信的接口,还是话题通信一样的流程,我们只需要修改几个函数就可以实现啦。

image-20220614143855639

运行例程

$ source /opt/tros/local_setup.bash
$ source install/local_setup.bash
$ ros2 run hbmem_pubsub talker
$ ros2 run hbmem_pubsub listener

image-20220614143932216

代码解析

发布者publisher_hbmem.cpp:

#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"

using namespace std::chrono_literals;


class MinimalHbmemPublisher  : public rclcpp::Node {
 public:
  MinimalHbmemPublisher () : Node("minimal_hbmem_publisher"), count_(0) {
    // 创建publisher_hbmem,topic为"topic",QOS为KEEPLAST(10),以及默认的可靠传输
    publisher_ = this->create_publisher_hbmem<hbmem_pubsub::msg::SampleMessage>(
        "topic", 10);

    // 定时器,每隔40毫秒调用一次timer_callback进行消息发送
    timer_ = this->create_wall_timer(
        40ms, std::bind(&MinimalHbmemPublisher ::timer_callback, this));
  }

 private:
  // 定时器回调函数
  void timer_callback() {
    // 获取要发送的消息
    auto loanedMsg = publisher_->borrow_loaned_message();
    // 判断消息是否可用,可能出现获取消息失败导致消息不可用的情况
    if (loanedMsg.is_valid()) {
      // 引用方式获取实际的消息
      auto& msg = loanedMsg.get();

      // 获取当前时间,单位为us
      auto time_now =
          std::chrono::duration_cast<std::chrono::microseconds>(
              std::chrono::steady_clock::now().time_since_epoch()).count();

      // 对消息的index和time_stamp进行赋值
      msg.index = count_;
      msg.time_stamp = time_now;

      // 打印发送消息
      RCLCPP_INFO(this->get_logger(), "message: %d", msg.index);
      publisher_->publish(std::move(loanedMsg));
      // 注意,发送后,loanedMsg已不可用
      // 计数器加一
      count_++;
    } else {
      // 获取消息失败,丢弃该消息
      RCLCPP_INFO(this->get_logger(), "Failed to get LoanMessage!");
    }
  }

  // 定时器
  rclcpp::TimerBase::SharedPtr timer_;

  // hbmem publisher
  rclcpp::PublisherHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr publisher_;

  // 计数器
  size_t count_;
};

int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<MinimalHbmemPublisher>());
  rclcpp::shutdown();
  return 0;
}

订阅者subscriber_hbmem.cpp:

#include <memory>

#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"


class MinimalHbmemSubscriber  : public rclcpp::Node {
 public:
  MinimalHbmemSubscriber () : Node("minimal_hbmem_subscriber") {
    // 创建subscription_hbmem,topic为"sample",QOS为KEEPLAST(10),以及默认的可靠传输
    // 消息回调函数为topic_callback
    subscription_ =
        this->create_subscription_hbmem<hbmem_pubsub::msg::SampleMessage>(
            "topic", 10,
            std::bind(&MinimalHbmemSubscriber ::topic_callback, this,
                      std::placeholders::_1));
  }

 private:
  // 消息回调函数
  void topic_callback(
      const hbmem_pubsub::msg::SampleMessage::SharedPtr msg) const {
    // 注意,msg只能在回调函数中使用,回调函数返回后,该消息就会被释放
    // 获取当前时间
    auto time_now =
        std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::steady_clock::now().time_since_epoch())
            .count();
    // 计算延时并打印出来
    RCLCPP_INFO(this->get_logger(), "msg %d, time cost %dus", msg->index,
                time_now - msg->time_stamp);
  }

  // hbmem subscription
  rclcpp::SubscriptionHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr
      subscription_;
};


int main(int argc, char * argv[])
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<MinimalHbmemSubscriber>());
  rclcpp::shutdown();
  return 0;
}

接口汇总

我们整理一下,与ROS2中的话题通信相比,TogetherROS带有零拷贝机制的话题通信接口是这样的。

image-20220614144236038

在发布者中,我们可以使用PublisherHbmem来创建一个发布者对象,然后通过create_publisher_hbmem发布数据,而在订阅者中,SubscriptionHbmem用来创建一个订阅者对象,通过 create_subscription_hbmem订阅需要的数据,至于底层如何完成零拷贝的过程,都交给TogetherROS即可。

图片1