编辑推荐: |
本文是对 DDS 以及Fast RTPS 的介绍文章,内容非常详细,希望阅读后会给您的学习带来收获。
本文来自于博客:只为那传说中美丽的草原
,由火龙果软件Alice编辑、推荐。 |
|
DDS 是设置协议中心和标准,它是为数据服务提供的服务。Fast-RTPS 是 DDS 的连接 API,它可以与它一起开发出方便,可靠的通信系统。本文是对 DDS 以及Fast RTPS 的介绍文章。
从《变形金刚》电影说起 这里要提到的是2011年的真人版电影,变形金刚第三部《变形金刚:月黑之时》。 这是一篇技术文章,为什么要扯《变形金刚》电影呢?这是因为这方面的主要内容与本文的技术有一定的相关性。 在这场霸主天敌中,在这些能量天柱上,天御天敌。背着地球柱的左右天柱天斗的许多能量柱,与他进行了天球赛与地球同步合作,同来了来了重建自己的家园。 这些柱子必须组合起来才能完成传输工作,并且在这其中有一个能量柱比较,因为它负责控制其他的能量柱。
在这个红色的能量柱系统中心中心被这样一个“中心主题”,类似中心名称,它是它的整个系列。节点的中心系统而言,任何节点都将无法工作。 随后的,因此,擎天柱自然而然地毁灭了这个电影的天柱中心,让天主无敌的传播彻底。
从设计上来说,对于如此大型的系统,却存在一个非常薄弱和重要的中心节点,这又是一个好的方案。 而在这个系统中,所有的异常都完全不会影响到整个节点的任何异常,而不会影响到整个节点的任何异常。运行。
DDS介绍 DDS全称Distribution Service,为中心通信协议和API标准提供了数据为中心的连接服务,基于发布者-订阅者模型。它是由它组成的,它提供的数据是其运行和并且应用程序之间的通信功能,通讯功能之间可以提供可靠的交流。 或许,你已经知道很多种网络订阅协议,对于发布这些概念也很深奥。那DDS到底有什么特别的约定呢? 下图展示了4个时代的数据通信方式:
- (第一代)点对CS(Client-Server)结构,这是每个人最普遍的:一个服务器角色被许多客户端使用,每次建立通信点的时候,通信必须有一条连接。当通信节点增多时,并且,每个客户端必须知道服务器的具体地址和地址。服务器发生,所有客户端都会受到影响。
- (第二代)broker,它负责处理中间人的一个请求,并进一步真正能够响应每个人的请求,似乎存在一个为客户提供服务的模型。服务端如果有变化,只需要告诉Broker就可以了。这个模型的问题就可以了,Broker变成了模型的中心,它的处理速度会影响所有人的效率但,这些城市的中心点,当系统规则增长到一定程度时,Broker 似乎会预测会出现短缺。
- (第三代)广播模型:每个人都可以在通道上广播消息,并且每个人都可以收到消息。这个模型解决了服务器的问题的建立,并且通信双方没有单独的连接,但它存在的问题是全公司一千号人是否与自己坐在同一个房间里面的各种消息一样。
- (第四代 DDS 模型:模型与广播模型类似,每个人都可以在 DataBus 上发布和读取消息。但更进一步的是,中包含了很多类似的通知)它的通信每个人都可以关乎自己的消息,免掉不需要的消息。
下图展示了DDS在网络栈中的位置,它位于传输层的上面,并且以TCP,UDP为基础。
图是但沙头漏的变化是因为这个两的技术发展过程,中间的却鲜有变化。 对比大家常见的Socker API,DDS有以下特点:
关于 DDS 的更多特性,可以点击这个链接: 《什么是 DDS?》 。
DDS 可以降低系统复杂度 例如,如何为不同的日常任务分类处理,配置消息,需要有等地址。 而说这些通信的中间件能够完全处理好逻辑,则应用程序将可以集中处理自己的业务,比如机器。 下图是情况的对比:
如果考虑系统的问题,问题会更加突出。 日历系统中包含了许多的通信角色需要相互通信,随着角色数量的不断增长,其显式通道会以增长。
而如果有统一的DataBus,那么还有其他新的通信角色,它不会那么复杂。
DDS应用范围 认为你这个行业,非常重视,但它的应用范围很广泛,广泛到它涉及到我们了,例如,重要的交通,重要的交通。 下图是一些例子:
DDS 商标 DDS本身就是 组标准。由对象管理 (简称OMG)维护。 OMG是一个开放性的多种技术标准联盟,由大型IT公司组成:包括IBM、Apple Computer、Sun Microsystems等。 但由 OM 负责制定标准,而由其他服务标准来完成。 当前DDS的包括下面这些:
DDS与RTPS 在 DDS 规范中,有两个描述标准的基本文档:
- DDS 规范 :以数据形式定义的API和消息模型信息描述中心为该发布的行为和服务质量。时间为:“能够在正确的信息有效,可靠地传送到正确的位置”。
- DDSI-RTPS :通信描述了RTPS(Real Time Publish Subscribe Protocol)协议。该协议通过UDP等不可靠的传输,实现最大努力(Best-Effort)和可靠的发布-订阅。RTPS是DDS实现的标准协议,它的目的和范围是确保基于不同的 DDS 供应商的应用程序可以实现互操作。
DDS与汽车行业 汽车行业而言,汽车开放系统架构(AUTOmotive Open System ARchitecture)已经在 AUTOSAR Adaptive Platform 18.03中包含了DDS协议 。 ROS2架构也以DDS为基础 。 DDS 的实时可能适合自动驾驶系统使用高速中,另外通常会在这些场景中存在这些数据,并利用这些数据,在系统中进行决策和利用,非常需要系统和它们的功能预测结果,可以很好地满足 DS 的需求。的通信需求。
Fast-RTPS 介绍 Fast-RTPS 是 eprosima 对于 RTPS 的 C++ 实现,这是一个免费开源软件,遵循 Apache License 2.0。 eProsima Fast RTPS 在性能、功能和最新版本(RTPS 2)的维护方面都可以查看RTPS 的最高版本:关于Fast 性能的 快速RTPS 性能 。 它最被大家知道的可能是因为被 ROS2 设定为默认的消息。 Fast-RTPS支持平台包括:Windows、Linux、Mac OS、QNX、VxWorks、iOS、Android、Raspbian。 Fast-RTPS 具有以下优点:
- 对于实时应用程序而言,可以在最佳和可靠的通信策略上进行配置。
- 即即用的连接性,网络插播的所有成员自动发现其他新的成员。
- 扩展性和可扩展性允许网络中的设备不断增长。
- 可配置的网络行为和可替换/输入的最佳层传输:为每个部署选择协议和系统输出通道组合。
- 两个API层:一个简单易用的发布者-订阅者层和一个提供对RTPS协议内部层更好地控制的Writer-Reader。
源码与编译 Fast-RTPS 的源代码位于 Github 上: eProsima/Fast-RTPS 。 可以通过下面的炼油命令获取其源:
git clone https://github.com/eProsima/Fast-RTPS.git |
关于如何编译 Fast-RTPS 可以看这个链接: Fast RTPS Installation from Sources 。
Fast-RTPS 概述 Fast-RTPS 提供了多种不同的API:
- Publisher-Subscriber:RTPS上的抽象层。
- Writer-Reader层:分别为RTPS终点的直接控制。
下面这样的话,呈现出更强大的力量。两个层次的核心角色:
发布者层为大多数开发者提供了一个方便的摘要。提供它允许定义与主题关联的发布者和订阅者,以及传输主题数据的简单方法。 作家-阅读者更接近于RT中定义的概念,直接进行更精细的历史控制,并且可以直接与各个端点的记录进行记录,并可以要求印刷者执行标准。 快速 RTPS 是多个消息通信基于事件的例子。参与者生成一组来通信任务记录,接收和并发日志。 事件系统发生这些事件的用户最快速的 RTPS 响应时间条件。
对象与数据结构 下面是Fast-RTPS实现中的核心结构。
Publish-Subscriber模块 RTPS标准的高层类型。
- 域名:创建,管理和分配参与者。
- Participant:包括Publisher和Subscriber,并管理他们的配置。
- ParticipantAttributes:创建参与者的配置参数。
- ParticipantListener:开发者实现Participant的允许权限。
- Publisher:在Topic上发布数据的对象。
- PublisherAttributes:创建Publisher的配置参数。
- PublisherListener 开发者实现Publisher的可以让。
- 订阅者:在Topic上接受数据的对象。
- SubscriberAttributes:创建订阅者的配置参数。
- SubscriberListener:开发者实现订阅者的可以赋予的权限。
RTPS模块 RTPS的动物模型。包含下面几个子模块:
- RTPS 通用
- CacheChange_t:描述Topic上的变更,存储在历史Cache中。
- 数据:缓存变化的负载率。
- 消息:RTPS消息。
- Header:RTPS协议的头信息。
- 子消息标题:标识RTPS的订阅消息。
- MessageReceiver:反序列化和处理接受到的RTPS消息。
- RTPSMessageCreator:制作RTPS消息。
- RTPS 域
- RTPSDoma:在 RTPS Partici 中创建、管理和管理工具。
- RTPSParticipant:包括Writer和Reader。
- RTPS 阅读器
- RTPSReader:读者的基类。
- ReaderAttributes:包含RTPS阅读器的配置参数。
- ReaderHistory:存储主题变化的历史数据。
- 读者听众:读者的宽容类型。
- RTPS 编写器
- RTPSWriter:写者的基类。
- WriterAttributes:包含RTPS写入者的配置参数。
- WriterHistory:存储写者的历史数据。
配置属性 上面的数据结构中看到了 Attributes 很多后缀的类名。这些类包含了对协议或者对象的配置,很多特性都需要设置属性来完成。 这些类的定义基本都在下面三个文件夹中:
非常多的配置,而且参数的结构恒定是PS的支持。 通过去配置这些参数会出现问题的代码,并且最大的一个菜单会产生一个更改问题的代码:每次更改问题都需要重新编译。不是快速的 RT 配置参数,而是通常包含大量配置参数的软件。解决方法是:提供文本格式的配置文件的来配置参数。因此对于 Fast-RTPS 而言,除了支持通过代码配置参数,它还支持通过 XML 文件的方式来进行配置。 配置文件之后,在代码中直接读取就好了,例如有:
Participant * participant = Domain :: createParticipant ( "participant_xml_profile" ); |
在这之后,如果需要调整配置,只需要修改配置文件,不用在很制作代码,自然也不用重新编写。这对于项目部署来说很重要。 Fast-RTPS支持的配置,以及这些配置项说明项和默认值都可以到这个链接中查看: XML profiles 。
领域 RTPS中的通信参数者之间,通过域进行隔离。 同一域名同时存在,可以同时包含多个可能接受的一个消息发送者和多个域。 其结构如下图所示:
开发者可以通过 domainId 来指定参与者域名。 如果没有指定,默认的 domainId = 80 。
发现 作为DDS的实现,Fast-RTPS提供了Publisher和Subscriber自动发现和匹配的功能。在实现上,这分为两个步骤来完成:
- 参与者发现阶段(PDP),相互通知对方存在的这个目标。每个参与者需要定时发送消息。消息通过周知的多播地址和端口发布(根据域计算得出在周知的发送消息) )。
- Endpoint Discovery Phase(EDP):和订阅者的相关合作者,发布者在此期间的通信者之间建立了相关的合作伙伴关系,PDP 之间的通信者之间的协作,包括其发布者和订阅者的信息。为了追踪和发布最后两个数据类型,它们的顶级数据类型必须一致。
这几类达到了两个独立的协议:
- Simple Participant Discovery Protocol:指定参与者如何在网络中发现。
- Endpoint Discover 定义了彼此发现的简单协议:参与者信息的协议。
Fast-RTPS 提供了四种发现机制:
- 简单:这是默认机制,它的 PDP 和 EDP 均使用 RTPS 标准,因此,阶段可同时实现其他 DDS 和 RTPS 实现。在与任何目标。
- 静态:机制在 PDP 阶段使用简单参与者发现协议。所有发布者和订阅者的地址以及发布者和主题信息是如果知道的,则允许跳过 EDP 阶段。
- 服务器-客户端这种机制集中结构,由使用服务器发现发现式的Hub。
- 手册:此机制与omain层出让其禁用PDP,仅使用选择的任何外部信息通道手动匹配和取消匹配RTPS的用户,观众和元编写者。
不同的发现机制具有一些共同的配置:
关于发现机制的更多信息可以浏览这个链接: Discovery 。
传输控制 Fast-RTPS实现了可以加入和退出的传输架构,这意味着每个参与者可以随时加入。 在传输上,Fast-RTPS支持以下五种传输方式:
- UDPv4
- UDPv6
- TCPv4
- TCPv6
- SHM(共享内存)
的,当 Participant 创建时,会自动的配置两个传输通道:
- SHM:使用同一个机器上的参与者通信。
- UDPv4:同来与跨机器的参与者通信。
当然,开发者可以改变这个默认行为,通过C++接口或者XML配置文件都可以。 SHM 减少所有参与者也同一个系统上,它是在运行提供的共享内存机制实现。共享内存的好处是:支持数据复制,并且系统负载率大。因此通常情况下,使用 SHM 会获得更好的性能。使用 SHM 时,可以配置共享内存的大小。 网络包含了非常多的参数,例如:默认通信端口号,这些值,配置时间等是什么。 ,在某些情况下可能配置这些配置分析问题有所 帮助 。 因此,TCP 传输是在打开提供的 RT 消息之前,PS,Fast-RT 消息在发送之前建立。TCP 的 TCP 传输服务器,必须有 TCP 传输服务器或 TCP 连接不同的 TCP 连接客户端。服务器,例如连接到它们的连接服务器,例如连接到客户端的独立服务器。服务器和客户端尝试连接到 RTPS 概念:发布者,阅读器或。中的任何一个编写器都可以使用 TCP 或 TCP 客户端,因为这些实体是用来建立 TCP 连接的,而 RTPS 协议可以只在该 TCP 连接上工作。 如果要使用 TCP 传输,开发者需要做更多的配置,关于这部分内容可以继续阅读 官方文档 ,这里不再赘述。
FastRTPS 代码示例 FastRTPS 有 框架 , API 参考 ,还有丰富的文档示例代码。 对于开发者来说,这些浏览代码可能是上手最快捷的方法。 你可以在这里浏览这些示例: Fast-RTPS/examples/C++/ 。
FASTRTPSGEN FASTRTPSGEN是一个Java程序。用于在Topic上传输的数据类型生成源码。 开发者通过接口描述语言(Interface Definition Language)定义需要传输的数据类型。然后通过FASTRTPSGEN生成C++编译需要的源文件。 可以通过下面的方法获取和编译FASTRTPSGEN。
git clone --recursive https://github.com / eProsima/Fast-RTPS-Gen.git
cd Fast-RTPS-Gen
gradle assemble |
编译完成后的文件位于 ./scripts/ 目录中。如果需要,可以采用路径添加到 $PATH 变量中。 关于如何通过 IDL 定义数据类型请参见这里: 通过 IDL 定义数据 类型 。 下面是这个示例文件:典型
struct TestData
{
char char_type;
octet octet_type;
long long_type;
string string_type;
float float_array[4];
sequence<double> double_list;
}; |
我们将把它保存到文件中,命名为 data_type.idl 文件中。然后通过下面的命令生成 C++ 文件:
~/Fast-RTPS-Gen/scripts/fastrtpsgen data_type.idl |
最后会得出以下四个文件:
data_type.cxx
data_type.h
data_typePubSubTypes.cxx
data_typePubSubTypes.h |
前两个文件定义是实际存储数据的结构,后两个文件定义的类是 eprosima::fastrtps::TopicDataType 子类。
/**
* Register a type in a participant.
* @param part Pointer to the Participant.
* @param type Pointer to the Type.
* @return True if correctly registered.
*/
RTPS_DllAPI static bool registerType(
Participant* part,
fastdds::dds::TopicDataType * type); |
每个或一套通信系统中通常包含一个多种自定义的数据类型。
发布者-订阅者层 可以通过 HelloWorld 示例来发布者-订阅者层接口。 该目录下文件列表如下:
-rw-r--r-- 1 paul staff 1.8K 3 16 13:36 CMakeLists.txt
-rw-r--r-- 1 paul staff 2.8K 3 16 13:36 HelloWorld.cxx
-rw-r--r-- 1 paul staff 6.1K 3 16 13:36 HelloWorld.h
-rw-r--r-- 1 paul staff 62B 3 16 13:36 HelloWorld.idl
-rw-r--r-- 1 paul staff 4.4K 3 16 13:36 HelloWorldPubSubTypes.cxx
-rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPubSubTypes.h
-rw-r--r-- 1 paul staff 4.6K 3 16 13:36 HelloWorldPublisher.cpp
-rw-r--r-- 1 paul staff 1.7K 3 16 13:36 HelloWorldPublisher.h
-rw-r--r-- 1 paul staff 3.8K 3 16 13:36 HelloWorldSubscriber.cpp
-rw-r--r-- 1 paul staff 1.8K 3 16 13:36 HelloWorldSubscriber.h
-rw-r--r-- 1 paul staff 2.0K 3 16 13:36 HelloWorld_main.cpp
-rw-r--r-- 1 paul staff 3.1K 3 16 13:36 Makefile
-rw-r--r-- 1 paul staff 203B 3 16 13:36 README.txt |
其中:
- README.txt是工程说明
- CMakeLists.txt与Makefile是编译文件
- HelloWorld_.cpp包含了生成文件的main main 函数
- HelloWorld.idl 是待传输的数据结构定义
- HelloWorld.h,HelloWorld.cxx,HelloWorldPubSubTypes.h和HelloWorldPubSubTypes.cxx是由HelloWorld.idl文件生成
- HelloWorldPublisher.h和HelloWorldPublisher.cpp是发布者的实现
- HelloWorldSubscriber.h和HelloWorldSubscriber.cpp是订阅者的实现
一个C++工程可以先从 main 入手,HelloWorld_main.cpp中的主要逻辑就是根据用户输入的参数 "publisher" 还是 "subscriber" 来确定启动哪个模块。
switch(type)
{
case 1:
{
HelloWorldPublisher mypub;
if(mypub.init())
{
mypub.run(count, sleep);
}
break;
}
case 2:
{
HelloWorldSubscriber mysub;
if(mysub.init())
{
mysub.run();
}
break;
}
} |
我们直接看HelloWorldPublisher和HelloWorldSubscriber就好。 HelloWorldPublisher::init 中主要是为Publisher的对象设置参数:
bool HelloWorldPublisher::init()
{
m_Hello.index(0);
m_Hello.message("HelloWorld");
ParticipantAttributes PParam;
PParam.rtps.builtin.discovery_config. discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
PParam.rtps.builtin.discovery_config. use_SIMPLE_EndpointDiscoveryProtocol = true;
PParam.rtps.builtin.discovery_config. m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP. use_PublicationWriterANDSubscriptionReader = true;
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.setName("Participant_pub");
mp_participant = Domain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//REGISTER THE TYPE Domain::registerType(mp_participant,&m_type); //CREATE THE PUBLISHER
PublisherAttributes Wparam;
Wparam.topic.topicKind = NO_KEY;
Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic";
Wparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
Wparam.topic.historyQos.depth = 30;
Wparam.topic.resourceLimitsQos.max_samples = 50;
Wparam.topic.resourceLimitsQos.allocated_samples = 20;
Wparam.times.heartbeatPeriod.seconds = 2;
Wparam.times.heartbeatPeriod.nanosec = 200*1000*1000;
Wparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
mp_publisher = Domain::createPublisher(mp_participant,Wparam, (PublisherListener*)&m_listener);
if(mp_publisher == nullptr)
return false;
return true;
} |
这里的参数配置请看API说明:
发布者发送消息的逻辑很简单:
bool HelloWorldPublisher::publish(bool waitForListener)
{
if(m_listener.firstConnected || !waitForListener || m_listener.n_matched>0)
{
m_Hello.index(m_Hello.index()+1);
mp_publisher->write((void*)&m_Hello);
return true;
}
return false;
} |
注意,这里 write 的对象是通过idl文件生成的类型。 订阅者的初始化和发布者是类似的:
bool HelloWorldSubscriber::init()
{
ParticipantAttributes PParam;
PParam.rtps.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol_t::SIMPLE;
PParam.rtps.builtin.discovery_config. use_SIMPLE_EndpointDiscoveryProtocol = true;
PParam.rtps.builtin.discovery_config. m_simpleEDP.use_PublicationReaderANDSubscriptionWriter = true;
PParam.rtps.builtin.discovery_config.m_simpleEDP .use_PublicationWriterANDSubscriptionReader = true;
PParam.rtps.builtin.domainId = 0;
PParam.rtps.builtin.discovery_config.leaseDuration = c_TimeInfinite;
PParam.rtps.setName("Participant_sub");
mp_participant = Domain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//REGISTER THE TYPE
Domain::registerType(mp_participant,&m_type);
//CREATE THE SUBSCRIBER
SubscriberAttributes Rparam;
Rparam.topic.topicKind = NO_KEY;
Rparam.topic.topicDataType = "HelloWorld";
Rparam.topic.topicName = "HelloWorldTopic";
Rparam.topic.historyQos.kind = KEEP_LAST_HISTORY_QOS;
Rparam.topic.historyQos.depth = 30;
Rparam.topic.resourceLimitsQos.max_samples = 50;
Rparam.topic.resourceLimitsQos.allocated_samples = 20;
Rparam.qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS;
Rparam.qos.m_durability.kind = TRANSIENT_LOCAL_DURABILITY_QOS;
mp_subscriber = Domain::createSubscriber(mp_participant,Rparam, (SubscriberListener*)&m_listener);
if(mp_subscriber == nullptr)
return false;
return true;
} |
当然,订阅者有自己的配置参数类型:
需要注意的是,订阅者与发布者的通信是建立在主题上的,因此对于主题标识的配置要保持一致:
Wparam.topic.topicDataType = "HelloWorld";
Wparam.topic.topicName = "HelloWorldTopic"; |
主题的这个概念,不同的订阅者和出版商没有物理跑地址的任何关联系统,也包含了一个和操作的代码,它的产品同样可以在 Mac 上运行,在 x86 上运行ARM架构的Android设备上。 订阅者通过 void HelloWorldSubscriber :: SubListener :: onNewDataMessage(Subscriber* sub) 方法来处理接受到的数据打印。在示例的实现中,就是将消息体出来:
void HelloWorldSubscriber::SubListener:: onNewDataMessage (Subscriber* sub)
{
if(sub->takeNextData((void*)&m_Hello, &m_info))
{
if(m_info.sampleKind == ALIVE)
{
this->n_samples++;
// Print your structure data here.
std::cout << "Message "<<m_Hello.message()<< " " << m_Hello.index()<< " RECEIVED"<<std::endl;
}
}
} |
发布者和订阅者各自有一些表示,开发者可以利用来进行处理:
读者-写者层 读者-写者层是相对于发布者-订阅者层更灵活的API。 它提供了更多的控制权也意味着使用起来会,但有些麻烦。 两个关键概念上存在的关系,如下表所示:
如果你浏览Fast-RTPS的源码,你会发现其实发布者-订阅者层的实现就是读者-写者层的。 想要的极光——写者层的使用可以浏览下面三个代码示例:
RTPSParticipant,RTPSWriter和RTPSReader都通过RTPSDomain创建。 相对于发布者-订阅层不同的是,这一层不支持通过 XML 的形式配置参数。开发者必须通过代码的形式配置所有的参数,例如:
//CREATE PARTICIPANT
RTPSParticipantAttributes PParam;
PParam.builtin.discovery_config.discoveryProtocol = eprosima::fastrtps::rtps::DiscoveryProtocol::SIMPLE;
PParam.builtin.use_WriterLivelinessProtocol = true;
mp_participant = RTPSDomain::createParticipant(PParam);
if(mp_participant==nullptr)
return false;
//CREATE WRITERHISTORY
HistoryAttributes hatt;
hatt.payloadMaxSize = 255;
hatt.maximumReservedCaches = 50;
mp_history = new WriterHistory(hatt);
//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
mp_writer = RTPSDomain::createRTPSWriter ( mp_participant,watt,mp_history,&m_listener); |
这里的逻辑主要就是设置参数和创建RTPSParticipant,RTPSWriter对象。而且,RTPSParticipant将被注册使用RTPSWriter:
TopicAttributes Tatt;
Tatt.topicKind = NO_KEY;
Tatt.topicDataType = "string";
Tatt.topicName = "exampleTopic";
ReaderQos Rqos;
return mp_participant->registerReader(mp_reader, Tatt, Rqos); |
在 RTPS 协议中,读者和作者将相关主题的数据保存在其关联的历史记录中。每个数据段都由一个变化表示,实现的结果是 CacheChange_t 。 更改历史记录管理。 读者和作者的历史是通过类型:
- eprosima::fastrtps::rtps::WriterHistory ;
- eprosima::fastrtps::rtps::ReaderHistory ;
对于作家来说,发送消息是往历史中添加变更:
//Request a change from the history
CacheChange_t* change = writer->new_change([]() -> uint32_t { return 255;}, ALIVE);
//Write serialized data into the change
change->serializedPayload.length = sprintf((char*) change->serializedPayload.data, "My example string %d", 2)+1;
//Insert change back into the history. The Writer takes care of the rest.
history->add_change(change); |
而对于新消息会来说,希望能将其收录到历史中,读完就可以删除:
void TestReaderRegistered::MyListener::onNewCacheChangeAdded(
RTPSReader* reader,
const CacheChange_t* const change)
{
printf("Received: %s\n", change->serializedPayload.data);
reader->getHistory()->remove_change((CacheChange_t*)change);
n_received++;
} |
框架会根据消息触发读者的支持。
持久化 下,历史在其周期内可以被读者访问。可以维护早先的历史。 使用持久化功能可以保护像他们一样的状态通信故障的结束,因为在重新启动后,它们就像他们刚从网络破坏连接一样。 你可以通过 RTPSTest_persistent 这个示例来了解如何使用这个功能。 要使用持久化功能,Writer和Reader需要进行以下设置:
- durabilityKind 设置为 TRANSIENT
- persistence_guid 不能是全0
- 为Writer,Reader或RTPSParticipant设置插件化。当前编写的插件是SQLITE3。
下面是一段代码示例:
PropertyPolicy property_policy;
property_policy.properties().emplace_back ("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back ("dds.persistence.sqlite3.filename", "test.db");
//CREATE WRITER
WriterAttributes watt;
watt.endpoint.reliabilityKind = BEST_EFFORT;
watt.endpoint.durabilityKind = TRANSIENT;
watt.endpoint.persistence_guid.guidPrefix.value[11] = 1;
watt.endpoint.persistence_guid.entityId.value[3] = 1;
watt.endpoint.properties = property_policy;
mp_writer = RTPSDomain::createRTPSWriter (mp_participant, watt, mp_history, &m_listener); |
durabilityKind 参数定义了Writer与新Reader匹配时对于已经发送数据的行为,该参数有三个选项:
- VOLATILE(默认值):丢掉所有已经发送的数据。
- TRANSIENT_LOCAL:保存最近发送的k条数据。
- TRANSIENT TRANSIENT 类似的,存储在中化消息中的异常信息。但是,它不会与它的结果有关。
对于读者来说,其配置方法是类似的:
PropertyPolicy property_policy;
property_policy.properties().emplace_back ("dds.persistence.plugin", "builtin.SQLITE3");
property_policy.properties().emplace_back ("dds.persistence.sqlite3.filename", "test.db");
//CREATE READER
ReaderAttributes ratt;
Locator_t loc(22222);
ratt.endpoint.unicastLocatorList.push_back(loc);
ratt.endpoint.durabilityKind = TRANSIENT;
ratt.endpoint.persistence_guid.guidPrefix.value[11] = 2;
ratt.endpoint.persistence_guid.entityId.value[3] = 1;
ratt.endpoint.properties = property_policy;
mp_reader = RTPSDomain::createRTPSReader (mp_participant, ratt, mp_history, &m_listener); |
服务质量 使用Fast-RTPS,你有非常多的QoS策略可以配置。
主要可以分为以下几类:
- durability :定义了作家与新读者匹配时对于已发送数据的行为,“延迟化”一节已经提到过。
- liveliness :定义发布者的发布活跃程度。例如:多长时间一次公告。
- reliability 2、 , ,(发布者)接收者 BEST_EFFORT 会收到通知 RELIABLE ,发送方,发送方,发送方,发送方,发送方方(订阅者)确认速度较慢进行,但可以防止到达数据遗失。
- partition :可以在域的物理分区上建立逻辑分区。
- deadline :定期更新消息的更新需要用例,当值以下时,会发出警报更新数据的情况很有用。
- lifespan :发布有效数据时的当前日期,发布有效日期记录。
- disablePositiveAcks 是否需要取消确认消息。不需要严格可靠的通信在指定时间,并且可以减少网络。
在实现中,QoS包含了一个类,它们继承自QoSPolicy父类:
消息的质量提供了不同的选择,是因为不同的系统对于有不同的要求。 在实际中,每个节点的所有数据都非常接近智能,如果DS在发送消息时显示系统的通知,则需要发送消息。届时,Fast-RT-DDS会当智能将实时掌握到哪里,并智能的实时更改数据通知。如果参与者的总数据量很大,则只需过滤并发送每一天真正需要的数据。发送多个时,DDS 多自动消息控制时,DDS 会跟踪系统实时更新部分需要快速更新使用的版本。 ,访问强制数据流的路径并实时加密执行数据。 当系统以极高的速度,真正在动态中,更好地工作的环境下,DDS的威力要及时预测。
实操测试 文章的最后,我们通过实际运行程序来进行一些实验。虽然 Fast-RTPS 支持非常多的操作,但在 Ubuntu 系统上验证可能是最方便的。 Fast-PS 是一个拥有系统功能的显示屏,并没有显示系统的功能。 在这种情况下,我们可以借助虚拟机 , 它可以在同一个上运行多个系统。然后我们在系统上进行测试,这样就可以模拟下独立系统的环境。 Fast-RTPS提供了包含依赖环境的Docker容器,我们只要下载和运行这些容器,就可以拥有多个独立的系统了。 不过,在这运行下面这些示例之前,你需要配置好 docker 环境。关于 docker 的使用已经超过了本文的基本知识,你可以浏览这个链接: Install Docker Engine on Ubuntu 。 Fast-RTPS 需要的文件可以到官网下载: https://eprosima.com/index.php/downloads-all 。 点击这个链接,然后输入个人信息就可以进入下载页面了。你可以选择最新版本的Docker和Fast-RTPS包进行下载:
考虑到国内的网络,下载的好文件可能已经很慢的几个小时了。
Ubuntu系统中,先将eProsima_FastRTPS-1.9.3-Linux.tgz解压缩成如下,为了编译它,还需要安装一些依赖,相关命令:
sudo apt install cmake g++
sudo apt install libasio-dev libtinyxml2-dev
mkdir fast-rtps
tar -xvf eProsima_FastRTPS-1.9.3-Linux.tgz -C fast-rtps/
cd fast-rtps/
chmod a+x install.sh
sudo ./install.sh |
在这之后你就可以转到/fast-rtps/src/fastrtps/examples/目录下编译示例了。但是这个目录下的 CMakeList.txt 似乎存在问题,我在这个文件的内容中增加了下面一行才完成编译:
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -pthread") |
编译完成之后,我们并不是在 Ubuntu 系统上运行这些文件,而是将这些文件放在 docker 容器中,以运行环境来。 所以需要启动docker容器:
$ docker load -i ubuntu-fast-rtps.tar
$ docker run -it ubuntu-fast-rtps:v1.9.3 |
你可以同时通过 docker run -it ... 多个docker容器来进行测试(每个容器启动一个通信的参与者。当然,你需要同时打开多个shell窗口)。 比如我启动了两个docker容器:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2125504ee62f ubuntu-fast-rtps:v1.9.3 "/bin/bash" 5 minutes ago Up 5 minutes mystifying_jennings
b17517fefecd ubuntu-fast-rtps:v1.9.3 "/bin/bash" 23 minutes ago Up 23 minutes stoic_leavitt |
运行之后会docker docker run -it ... 的shell中,你可以在根目录创建 fastrtps 目录直接进入测试程序。 然后在Ubuntu系统上将编译出的示例程序拷贝到docker中:
sudo docker cp ./ b17517fefecd:/fastrtps
sudo docker cp ./ 2125504ee62f:/fastrtps |
在这之后就可以转到docker容器的shell中运行测试程序了。 例如,在两个 docker 上运行 HelloWorld 的示例:
root@b17517fefecd:/fastrtps/HelloWorldExample# . / HelloWorldExample publisher
Publisher running 10 samples.
Publisher matched
Message: HelloWorld with index: 1 SENT
Message: HelloWorld with index: 2 SENT
Message: HelloWorld with index: 3 SENT
Message: HelloWorld with index: 4 SENT
Message: HelloWorld with index: 5 SENT
Message: HelloWorld with index: 6 SENT
Message: HelloWorld with index: 7 SENT
Message: HelloWorld with index: 8 SENT
Message: HelloWorld with index: 9 SENT
Message: HelloWorld with index: 10 SENT |
root@2125504ee62f:/fastrtps/HelloWorldExample# . / HelloWorldExample subscriber
Starting
Subscriber running. Please press enter to stop the Subscriber
Subscriber matched
Message HelloWorld 1 RECEIVED
Message HelloWorld 2 RECEIVED
Message HelloWorld 3 RECEIVED
Message HelloWorld 4 RECEIVED
Message HelloWorld 5 RECEIVED
Message HelloWorld 6 RECEIVED
Message HelloWorld 7 RECEIVED
Message HelloWorld 8 RECEIVED
Message HelloWorld 9 RECEIVED
Message HelloWorld 10 RECEIVED
Subscriber unmatched |
是基准程序:
root@b17517fefecd:/fastrtps/Benchmark# ./Benchmark subscriber
Subscriber running...
Subscriber matched
Publisher matched
Subscriber unmatched
Publisher unmatched |
root@2125504ee62f:/fastrtps/Benchmark# ./Benchmark publisher
Publisher running...
Subscriber matched
Publisher matched. Test starts...
RESULTS after 10000 milliseconds:
COUNT: 53951
SAMPLES: 0,771,668,548,582,716,700,706,408,440,592,636,738,698, 648,574,706,776,690,584,638,556,750,740,640,584,572,542,526,560, 552,528,608,504,630,478,598,708,620,528,660,718,578, 646,702,528,652,528,450,508,566,544,516,616,652, 584,532,434,542,678,752,696,412,544,654,766,736,612, 496,470,662,580,566,634,674,568,532,546,528,552,552, 528,490,508,598,620,672,506,468,654, |
在运行的过程中,可以同时使用不同的速度进行通信,并快速进行不同的发现-PS系统上的合作伙伴完成的。要的测试。
|