零拷贝数据传输
在传统操作系统的数据传输过程中,系统内部会在磁盘、内存、缓存中多次进行数据拷贝,每次都会占用CPU的资源,数据量小的时候还好,随着数据量的增加,CPU的开销也会持续增加,尤其是在机器人图像数据的应用中 ,经常会发生这种问题,导致CPU都在做数据拷贝,没有时间处理其他的应用功能了,直接的感觉就是处理卡顿。
零拷贝技术
针对这种问题,零拷贝技术应运而生。
零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。
TogetherROS中就提供了灵活、高效的零拷贝功能,可以显著降低大尺寸数据的通信延时和CPU占用,具体有多显著呢,我们不妨进行一个测试。
TogetherROS零拷贝性能测试
我们使用TogetherROS系统内部集成的性能测试工具——performance_test,来评估下开启零拷贝前后的性能差异,这里我们传输的样本数据量是4M。
未开启零拷贝进行数据传输
$ ros2 run performance_test perf_test --reliable --keep-last --history-depth 10 -s 1 -m Array4m -r 100 --max-runtime 30 #未开启
开启零拷贝数据传输:
$ ros2 run performance_test perf_test --zero-copy --reliable --keep-last --history-depth 10 -s 1 -m Array4m -r 100 --max-runtime 30 #开启
好的,测试已经跑完了,我们把结果放到这里,来分析一下。
在这个测试中,关键有四个指标:
- 时延,也就是消息从发布者到订阅者的传输时间。不开启零拷贝的情况下,平均为0.004912s,开启零拷贝之后,速度快了差不多40倍,平均为0.000180s。
- CPU使用率,表示通信活动所占用的CPU时间,大家可以看这个utime指标,开启零拷贝之后时间有显著的提升,消耗CPU的资源少了。
- 驻留内存,包括通信过程中分配的内存和共享内存,是这个maxrss中的数据,开启零拷贝之后,占用的内存也更少。
- 样本统计,包括测试中发送、接收以及丢失的消息数量,是这组数据,依然是开启零拷贝之后性能更好。
通过测试,对于大数据通信来讲,零拷贝在CPU消耗、内存占用以及通信延迟抖动方面的性能都会更好。
这里只是便于大家感受零拷贝技术的效果,具体编程中如何使用零拷贝机制呢?
编程开发
为了方便大家使用,TogetherROS针对零拷贝功能进行了封装,风格类似ROS2中话题通信的接口,还是话题通信一样的流程,我们只需要修改几个函数就可以实现啦。
运行例程
$ source /opt/tros/local_setup.bash
$ source install/local_setup.bash
$ ros2 run hbmem_pubsub talker
$ ros2 run hbmem_pubsub listener
代码解析
发布者publisher_hbmem.cpp:
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
using namespace std::chrono_literals;
class MinimalHbmemPublisher : public rclcpp::Node {
public:
MinimalHbmemPublisher () : Node("minimal_hbmem_publisher"), count_(0) {
// 创建publisher_hbmem,topic为"topic",QOS为KEEPLAST(10),以及默认的可靠传输
publisher_ = this->create_publisher_hbmem<hbmem_pubsub::msg::SampleMessage>(
"topic", 10);
// 定时器,每隔40毫秒调用一次timer_callback进行消息发送
timer_ = this->create_wall_timer(
40ms, std::bind(&MinimalHbmemPublisher ::timer_callback, this));
}
private:
// 定时器回调函数
void timer_callback() {
// 获取要发送的消息
auto loanedMsg = publisher_->borrow_loaned_message();
// 判断消息是否可用,可能出现获取消息失败导致消息不可用的情况
if (loanedMsg.is_valid()) {
// 引用方式获取实际的消息
auto& msg = loanedMsg.get();
// 获取当前时间,单位为us
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count();
// 对消息的index和time_stamp进行赋值
msg.index = count_;
msg.time_stamp = time_now;
// 打印发送消息
RCLCPP_INFO(this->get_logger(), "message: %d", msg.index);
publisher_->publish(std::move(loanedMsg));
// 注意,发送后,loanedMsg已不可用
// 计数器加一
count_++;
} else {
// 获取消息失败,丢弃该消息
RCLCPP_INFO(this->get_logger(), "Failed to get LoanMessage!");
}
}
// 定时器
rclcpp::TimerBase::SharedPtr timer_;
// hbmem publisher
rclcpp::PublisherHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr publisher_;
// 计数器
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemPublisher>());
rclcpp::shutdown();
return 0;
}
订阅者subscriber_hbmem.cpp:
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "hbmem_pubsub/msg/sample_message.hpp"
class MinimalHbmemSubscriber : public rclcpp::Node {
public:
MinimalHbmemSubscriber () : Node("minimal_hbmem_subscriber") {
// 创建subscription_hbmem,topic为"sample",QOS为KEEPLAST(10),以及默认的可靠传输
// 消息回调函数为topic_callback
subscription_ =
this->create_subscription_hbmem<hbmem_pubsub::msg::SampleMessage>(
"topic", 10,
std::bind(&MinimalHbmemSubscriber ::topic_callback, this,
std::placeholders::_1));
}
private:
// 消息回调函数
void topic_callback(
const hbmem_pubsub::msg::SampleMessage::SharedPtr msg) const {
// 注意,msg只能在回调函数中使用,回调函数返回后,该消息就会被释放
// 获取当前时间
auto time_now =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
// 计算延时并打印出来
RCLCPP_INFO(this->get_logger(), "msg %d, time cost %dus", msg->index,
time_now - msg->time_stamp);
}
// hbmem subscription
rclcpp::SubscriptionHbmem<hbmem_pubsub::msg::SampleMessage>::SharedPtr
subscription_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalHbmemSubscriber>());
rclcpp::shutdown();
return 0;
}
接口汇总
我们整理一下,与ROS2中的话题通信相比,TogetherROS带有零拷贝机制的话题通信接口是这样的。
在发布者中,我们可以使用PublisherHbmem来创建一个发布者对象,然后通过create_publisher_hbmem发布数据,而在订阅者中,SubscriptionHbmem用来创建一个订阅者对象,通过 create_subscription_hbmem订阅需要的数据,至于底层如何完成零拷贝的过程,都交给TogetherROS即可。