通讯
Roplat 将通讯分为两类:主流通讯与旁路通讯。本章在简述主流通讯后,重点给出旁路通讯三类原语的教程:三缓冲 / 环形队列 / 进程间通讯。
一、主流通讯
主流通讯由节点输入输出关系决定,是系统拓扑的主干数据流。
特点:
- 强类型:
sensor.Output必须与filter.Input类型一致,否则编译期报错 - 参与编译期拓扑分析:
#[system]宏读取连线,用于调度优化 - 零开销:域内节点间直接值传递,无序列化、无拷贝
- 同域限定:只在同一节律域内有效;跨域由图规约自动改写为旁路通讯
什么时候不能用主流通讯?
- 两个节律域之间需要共享状态(例如 1 kHz 控制器需要持续读取 100 Hz 视觉模块的最新姿态)
- 一个节点产生不定长序列、由另一个节点按 FIFO 消费(命令队列、事件流)
- 跨进程(不同可执行文件、不同语言运行时)数据交换
以上三种场景,进入旁路通讯。
二、旁路通讯总览
旁路通讯用于不适合放在主干 I/O 上的数据交换。三类原语按语义拆分:
| 原语 | 拓扑 | 语义 | 典型用途 | 章节 |
|---|---|---|---|---|
三缓冲 triple_buffer |
SPMC(1 写 N 读) | 最新值覆盖(有损) | 传感器状态广播、全局参数 | §三 |
环形队列 ring_buffer |
SPSC(1 写 1 读) | FIFO 有序(无损) | 命令序列、事件流 | §四 |
进程间通讯 ipc::* |
跨进程 | 三缓冲 / 环形队列的 IPC 版本 | 语言隔离、故障隔离、热插拔 | §五 + 12 进程间通讯 |
共同特征:
- Lock-free:原子操作,实时线程安全
- 写端永不阻塞:有损原语直接覆盖;无损原语写满时返回 false 而非 sleep
- 跨语言:通过
#[repr(C)]FFI 内核 + C++/Python 薄封装共享同一块内存 - 不穿透 system 图规约:旁路资源由用户显式声明或宏自动插入在域边界
三、三缓冲(Triple Buffer)
3.1 语义
写端始终发布最新值,读端始终读取可见的最新快照。三块缓冲区轮转:
- 写端永远不阻塞;若读端慢,中间帧会被覆盖(有损)
- 读端总能拿到非撕裂的完整一帧,但不保证每帧都看到
3.2 API
use roplat::comm::create_triple_buffer;
// 创建:(容量提示, 期望订阅者数)
let (mut publisher, mut subscribers) = create_triple_buffer::<SensorData>(3);
// 写端:按值发布
publisher.publish(SensorData { x: 1.0, y: 2.0, z: 3.0 });
// 读端:借出最新值(&T),无则返回 None
if let Some(data) = subscribers[0].get_latest() {
println!("latest = {:?}", data);
}
3.3 适用场景
- ✅ IMU / 关节角度 / 位姿持续发布
- ✅ 运行参数广播(config hot-reload)
- ✅ 任何「只关心最新值」的状态同步
- ❌ 命令队列(会丢)
- ❌ 审计日志(会丢)
3.4 常见坑
- 误把事件流往三缓冲塞:事件被覆盖,下游看不到中间状态 → 改用环形队列
- 读端轮询频率过低:丢帧率高,但语义上不是错误 → 调高读节律或改用 ring_buffer
T: Clone要求:跨进程(IPC 三缓冲尚未落地)时需#[repr(C)] + Copy
四、环形队列(Ring Buffer)
4.1 语义
FIFO 有序 SPSC 队列。消费者按推入顺序拿到每一条消息;队列满时写失败。
- 每条消息都不会丢(除非调用方在写失败后主动丢弃)
- 满时
try_push返回false,写端自行决定:丢弃 / 重试 / 降级
4.2 API
use roplat::comm::create_ring_buffer;
// 容量 64
let (mut producer, mut consumer) = create_ring_buffer::<Command>(64);
let cmd = Command::MoveForward(1.0);
if !producer.try_push(&cmd) {
// 队列满;策略由你定:丢弃 / 重试 / 降级
}
while let Some(cmd) = consumer.try_pop() {
execute(cmd);
}
签名要点:try_push(&T) -> bool(按引用入队,内部按字节复制)。
4.3 适用场景
- ✅ 遥操命令下发(每条都要到)
- ✅ 节点生命周期事件(init / shutdown)
- ✅ 传感器突发事件(碰撞、急停)
- ❌ 持续高频状态(建议三缓冲避免积压)
4.4 常见坑
- 容量选得太小:突发流量直接写失败 → 容量应覆盖「最大消费者停顿窗口 × 生产速率」
- T 非
#[repr(C)] + Copy:不能跨语言;必要时改用#[roplat_msg] + Copy - SPSC 误当 MPSC:多个生产者同时写会破坏 tail 指针;MPSC 版本参见 RFC 0017(尚未实现)
五、进程间通讯(IPC,旁路通讯的跨进程版本)
5.1 何时选择 IPC?
当你需要以下任意一点,单进程旁路原语就不够:
- 语言/运行时隔离:C++ 硬实时控制 + Python 深度学习,不能同进程(GIL 会卡控制线程)
- 故障隔离:视觉模块 OOM 不能带着控制器一起崩
- 在线热插拔:调试节点、可视化节点想随时挂接/拔除
- 多机部署:感知在 A 机、控制在 B 机
5.2 契约
用户代码与进程内保持一致:
let (writer, _) = create_ipc_ring_buffer::<Pose>(&uri, Role::Publisher, rdv)?;
writer.unwrap().try_push(&pose);
只是边界位置不同 —— 跨进程时 #[system] 图规约只到进程边缘,边缘之外通过 EndpointUri + Rendezvous 文件动态对接。
5.3 身份三要素
跨进程无法共享 Rust 类型系统,需要字符串身份:
schema_id由#[roplat_msg]宏在编译期对StructName{field:Type,...}做 FNV-1a 48-bit 哈希自动生成- 握手阶段双侧校验,字段错配立刻拒绝,不污染数据层
5.4 最小示例
use roplat::comm::{
ConnectOptions, EndpointUri, IpcOptions, OverflowPolicy, RendezvousDir,
Role, SchemaId, TcpOptions, create_ipc_ring_buffer_with_opts,
};
use roplat::roplat_msg;
#[roplat_msg(version = 1)]
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct Pose { pub seq: u64, pub x: f32, pub y: f32, pub z: f32 }
// 两侧共用的 URI
let uri = EndpointUri::new(
"default", "pose",
SchemaId::new(Pose::SCHEMA_ID), // 宏自动生成的常量
Pose::MSG_VERSION,
);
// —— 发布者 ——
let pub_opts = IpcOptions {
tcp: TcpOptions { high_watermark: Some(256), overflow: OverflowPolicy::DropOldest },
..Default::default()
};
let (writer, _) = create_ipc_ring_buffer_with_opts::<Pose>(
&uri, Role::Publisher, RendezvousDir::new_default(), &pub_opts
)?;
// —— 订阅者 ——
let sub_opts = IpcOptions { connect: ConnectOptions::wait_forever(), ..Default::default() };
let (_, reader) = create_ipc_ring_buffer_with_opts::<Pose>(
&uri, Role::Subscriber, RendezvousDir::new_default(), &sub_opts
)?;
完整可运行版本见 roplat/examples/ipc_pubsub/,跨进程回归测试见 examples/ipc_pubsub/tests/cross_process.rs。
5.5 失败模式
| 错误 | 触发条件 | 处理建议 |
|---|---|---|
IpcError::NotReady |
发布者尚未写 rendezvous | 改用 ConnectOptions::wait_forever() / with_timeout(d) |
IpcError::PeerGone |
对端崩溃或正常退出 | 读端轮询 is_connected(),按业务决定重连 |
IpcError::SchemaMismatch |
两侧字段定义不同 | 对齐结构体;利用 #[roplat_msg] 自动哈希避免手填 |
IpcError::RoleMismatch |
角色写反(两个发布者) | 代码 review;一条 URI 只允许一个 Publisher |
其他细节(Rendezvous 文件格式、Hello/HelloAck 握手、后端可扩展性)见 12 进程间通讯。
六、消息类型:透明 vs 不透明
旁路通讯传递的是消息对象。按跨语言可见性分两类:
6.1 透明类型
- 参与跨语言布局协作(C++ struct / Python ctypes 自动生成)
- 自动实现
TypeBinding<CppLang>与TypeBinding<PyLang> #[roplat_msg(version = N)]版本参数可选,默认 1- 编译期生成
SCHEMA_ID/SCHEMA_FULL/MSG_VERSION三个关联常量,供 IPC 使用
6.2 不透明类型
- Rust 侧只保留类型标记,字段由目标语言用户文件定义
- 默认只生成用户文件,不生成系统基类
- 模板注释保留 alias / wrapper 两种可选方案
- 通过线程本地总线跨语言传递,避免序列化
6.3 TypeBinding<L> 的作用
TypeBinding<L> 回答两个问题:
- 目标语言中的类型名是什么?
- 这个类型是否是 opaque?
Publisher<T> / Subscriber<T> / TripleBufferChannel<T> / RingBufferWriter<T> 都把绑定能力转发到 T。
七、选型决策树
遇到新场景时按下列问题走一遍:
- 是否跨进程?
- 是 → 跳到 3
- 否 → 跳到 2
- 是否允许丢帧?
- 允许丢帧(状态同步) → 三缓冲
create_triple_buffer - 不允许丢帧(命令 / 事件) → 环形队列
create_ring_buffer
- 允许丢帧(状态同步) → 三缓冲
- 是否允许丢帧?
- 允许丢帧 →
create_ipc_triple_buffer<T>+#[roplat_msg] - 不允许丢帧 →
create_ipc_ring_buffer<T>+#[roplat_msg]
- 允许丢帧 →
主链路(同域内)仍优先走主流通讯 >>;跨域由 #[system] 图规约 R4 规则自动插入合适的旁路通道。
八、调试与诊断
| 现象 | 排查工具 |
|---|---|
| 三缓冲丢帧率高 | 读端加 seq 字段、日志 diff;考虑切换到 ring_buffer |
ring_buffer try_push 频繁返回 false |
监控队列深度(需要工具层扩展);扩容或优化消费者 |
| IPC 连不上 | 查看 $ROPLAT_RUNTIME_DIR/ipc/<ns>/<name>.rdv 是否存在;PID 是否存活 |
| IPC 订阅者收不到但握手成功 | 检查 schema_id 是否一致;确认 writer.is_connected() 为 true |
| 跨语言布局错配 | 查看 roplat_gen/ 下生成的头文件;确保两侧 #[repr(C)] |
诊断 IPC 端点:
cargo roplat ipc ls # 列出所有端点 + 存活 PID
cargo roplat ipc introspect "roplat-ipc://ns/name?msg=..." # 单端点详情 + schema 校验
九、新人建议
- 能主流就主流:同域内用
>>最简洁;只在跨域 / 共享状态 / 跨进程时转旁路 - 能透明就透明:透明消息优先,保证跨语言可见
- 先跑通,再优化:先让主链路稳定,再引入 opaque / IPC
- IPC 单独写示例:不要一开始就在主系统里上 IPC,先在
examples/跑通握手与背压 - 善用版本号:
#[roplat_msg(version = N)],字段有破坏性变更时递增,避免线上两侧偷偷不一致