Skip to content

通讯

Roplat 将通讯分为两类:主流通讯旁路通讯。本章在简述主流通讯后,重点给出旁路通讯三类原语的教程:三缓冲 / 环形队列 / 进程间通讯。


一、主流通讯

主流通讯由节点输入输出关系决定,是系统拓扑的主干数据流。

sensor >> filter >> controller >> motor;
//  Output ─→ Input ─→ Output ─→ Input

特点:

  1. 强类型sensor.Output 必须与 filter.Input 类型一致,否则编译期报错
  2. 参与编译期拓扑分析#[system] 宏读取连线,用于调度优化
  3. 零开销:域内节点间直接值传递,无序列化、无拷贝
  4. 同域限定:只在同一节律域内有效;跨域由图规约自动改写为旁路通讯

什么时候不能用主流通讯?

  • 两个节律域之间需要共享状态(例如 1 kHz 控制器需要持续读取 100 Hz 视觉模块的最新姿态)
  • 一个节点产生不定长序列、由另一个节点按 FIFO 消费(命令队列、事件流)
  • 跨进程(不同可执行文件、不同语言运行时)数据交换

以上三种场景,进入旁路通讯。


二、旁路通讯总览

旁路通讯用于不适合放在主干 I/O 上的数据交换。三类原语按语义拆分:

原语 拓扑 语义 典型用途 章节
三缓冲 triple_buffer SPMC(1 写 N 读) 最新值覆盖(有损) 传感器状态广播、全局参数 §三
环形队列 ring_buffer SPSC(1 写 1 读) FIFO 有序(无损) 命令序列、事件流 §四
进程间通讯 ipc::* 跨进程 三缓冲 / 环形队列的 IPC 版本 语言隔离、故障隔离、热插拔 §五 + 12 进程间通讯

共同特征:

  1. Lock-free:原子操作,实时线程安全
  2. 写端永不阻塞:有损原语直接覆盖;无损原语写满时返回 false 而非 sleep
  3. 跨语言:通过 #[repr(C)] FFI 内核 + C++/Python 薄封装共享同一块内存
  4. 不穿透 system 图规约:旁路资源由用户显式声明或宏自动插入在域边界

三、三缓冲(Triple Buffer)

3.1 语义

写端始终发布最新值,读端始终读取可见的最新快照。三块缓冲区轮转:

 writer ──写入──→ [slot A]
                      │  原子指针交换(ready slot)
 reader1 ──读取──→ [slot B]
 reader2 ──读取──→ [slot B]
  • 写端永远不阻塞;若读端慢,中间帧会被覆盖(有损)
  • 读端总能拿到非撕裂的完整一帧,但不保证每帧都看到

3.2 API

use roplat::comm::create_triple_buffer;

// 创建:(容量提示, 期望订阅者数)
let (mut publisher, mut subscribers) = create_triple_buffer::<SensorData>(3);

// 写端:按值发布
publisher.publish(SensorData { x: 1.0, y: 2.0, z: 3.0 });

// 读端:借出最新值(&T),无则返回 None
if let Some(data) = subscribers[0].get_latest() {
    println!("latest = {:?}", data);
}

3.3 适用场景

  • ✅ IMU / 关节角度 / 位姿持续发布
  • ✅ 运行参数广播(config hot-reload)
  • ✅ 任何「只关心最新值」的状态同步
  • ❌ 命令队列(会丢)
  • ❌ 审计日志(会丢)

3.4 常见坑

  1. 误把事件流往三缓冲塞:事件被覆盖,下游看不到中间状态 → 改用环形队列
  2. 读端轮询频率过低:丢帧率高,但语义上不是错误 → 调高读节律或改用 ring_buffer
  3. T: Clone 要求:跨进程(IPC 三缓冲尚未落地)时需 #[repr(C)] + Copy

四、环形队列(Ring Buffer)

4.1 语义

FIFO 有序 SPSC 队列。消费者按推入顺序拿到每一条消息;队列满时写失败。

 writer ──try_push──→ [ . . . . e1 e2 e3 ] ──try_pop──→ reader
                       │                 │
                     tail              head
  • 每条消息都不会丢(除非调用方在写失败后主动丢弃)
  • 满时 try_push 返回 false,写端自行决定:丢弃 / 重试 / 降级

4.2 API

use roplat::comm::create_ring_buffer;

// 容量 64
let (mut producer, mut consumer) = create_ring_buffer::<Command>(64);

let cmd = Command::MoveForward(1.0);
if !producer.try_push(&cmd) {
    // 队列满;策略由你定:丢弃 / 重试 / 降级
}

while let Some(cmd) = consumer.try_pop() {
    execute(cmd);
}

签名要点:try_push(&T) -> bool(按引用入队,内部按字节复制)。

4.3 适用场景

  • ✅ 遥操命令下发(每条都要到)
  • ✅ 节点生命周期事件(init / shutdown)
  • ✅ 传感器突发事件(碰撞、急停)
  • ❌ 持续高频状态(建议三缓冲避免积压)

4.4 常见坑

  1. 容量选得太小:突发流量直接写失败 → 容量应覆盖「最大消费者停顿窗口 × 生产速率」
  2. T 非 #[repr(C)] + Copy:不能跨语言;必要时改用 #[roplat_msg] + Copy
  3. SPSC 误当 MPSC:多个生产者同时写会破坏 tail 指针;MPSC 版本参见 RFC 0017(尚未实现)

五、进程间通讯(IPC,旁路通讯的跨进程版本)

5.1 何时选择 IPC?

当你需要以下任意一点,单进程旁路原语就不够:

  1. 语言/运行时隔离:C++ 硬实时控制 + Python 深度学习,不能同进程(GIL 会卡控制线程)
  2. 故障隔离:视觉模块 OOM 不能带着控制器一起崩
  3. 在线热插拔:调试节点、可视化节点想随时挂接/拔除
  4. 多机部署:感知在 A 机、控制在 B 机

5.2 契约

用户代码与进程内保持一致:

let (writer, _) = create_ipc_ring_buffer::<Pose>(&uri, Role::Publisher, rdv)?;
writer.unwrap().try_push(&pose);

只是边界位置不同 —— 跨进程时 #[system] 图规约只到进程边缘,边缘之外通过 EndpointUri + Rendezvous 文件动态对接。

5.3 身份三要素

跨进程无法共享 Rust 类型系统,需要字符串身份:

roplat-ipc://<namespace>/<endpoint>?msg=<schema_id>&v=<version>
             └── 逻辑名 ──┘            └── 字段指纹 ─┘  └─ 语义版本
  • schema_id#[roplat_msg] 宏在编译期对 StructName{field:Type,...} 做 FNV-1a 48-bit 哈希自动生成
  • 握手阶段双侧校验,字段错配立刻拒绝,不污染数据层

5.4 最小示例

use roplat::comm::{
    ConnectOptions, EndpointUri, IpcOptions, OverflowPolicy, RendezvousDir,
    Role, SchemaId, TcpOptions, create_ipc_ring_buffer_with_opts,
};
use roplat::roplat_msg;

#[roplat_msg(version = 1)]
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct Pose { pub seq: u64, pub x: f32, pub y: f32, pub z: f32 }

// 两侧共用的 URI
let uri = EndpointUri::new(
    "default", "pose",
    SchemaId::new(Pose::SCHEMA_ID),  // 宏自动生成的常量
    Pose::MSG_VERSION,
);

// —— 发布者 ——
let pub_opts = IpcOptions {
    tcp: TcpOptions { high_watermark: Some(256), overflow: OverflowPolicy::DropOldest },
    ..Default::default()
};
let (writer, _) = create_ipc_ring_buffer_with_opts::<Pose>(
    &uri, Role::Publisher, RendezvousDir::new_default(), &pub_opts
)?;

// —— 订阅者 ——
let sub_opts = IpcOptions { connect: ConnectOptions::wait_forever(), ..Default::default() };
let (_, reader) = create_ipc_ring_buffer_with_opts::<Pose>(
    &uri, Role::Subscriber, RendezvousDir::new_default(), &sub_opts
)?;

完整可运行版本见 roplat/examples/ipc_pubsub/,跨进程回归测试见 examples/ipc_pubsub/tests/cross_process.rs

5.5 失败模式

错误 触发条件 处理建议
IpcError::NotReady 发布者尚未写 rendezvous 改用 ConnectOptions::wait_forever() / with_timeout(d)
IpcError::PeerGone 对端崩溃或正常退出 读端轮询 is_connected(),按业务决定重连
IpcError::SchemaMismatch 两侧字段定义不同 对齐结构体;利用 #[roplat_msg] 自动哈希避免手填
IpcError::RoleMismatch 角色写反(两个发布者) 代码 review;一条 URI 只允许一个 Publisher

其他细节(Rendezvous 文件格式、Hello/HelloAck 握手、后端可扩展性)见 12 进程间通讯


六、消息类型:透明 vs 不透明

旁路通讯传递的是消息对象。按跨语言可见性分两类:

6.1 透明类型

#[roplat::roplat_msg]
#[repr(C)]
pub struct SensorData {
    pub x: f64,
    pub y: f64,
}
  • 参与跨语言布局协作(C++ struct / Python ctypes 自动生成)
  • 自动实现 TypeBinding<CppLang>TypeBinding<PyLang>
  • #[roplat_msg(version = N)] 版本参数可选,默认 1
  • 编译期生成 SCHEMA_ID / SCHEMA_FULL / MSG_VERSION 三个关联常量,供 IPC 使用

6.2 不透明类型

#[roplat::roplat_msg(lang = "cpp")]
pub struct CppData;
  • Rust 侧只保留类型标记,字段由目标语言用户文件定义
  • 默认只生成用户文件,不生成系统基类
  • 模板注释保留 alias / wrapper 两种可选方案
  • 通过线程本地总线跨语言传递,避免序列化

6.3 TypeBinding<L> 的作用

TypeBinding<L> 回答两个问题:

  1. 目标语言中的类型名是什么?
  2. 这个类型是否是 opaque?

Publisher<T> / Subscriber<T> / TripleBufferChannel<T> / RingBufferWriter<T> 都把绑定能力转发到 T


七、选型决策树

遇到新场景时按下列问题走一遍:

  1. 是否跨进程?
    • 是 → 跳到 3
    • 否 → 跳到 2
  2. 是否允许丢帧?
    • 允许丢帧(状态同步) → 三缓冲 create_triple_buffer
    • 不允许丢帧(命令 / 事件) → 环形队列 create_ring_buffer
  3. 是否允许丢帧?
    • 允许丢帧 → create_ipc_triple_buffer<T> + #[roplat_msg]
    • 不允许丢帧 → create_ipc_ring_buffer<T> + #[roplat_msg]

主链路(同域内)仍优先走主流通讯 >>;跨域由 #[system] 图规约 R4 规则自动插入合适的旁路通道。


八、调试与诊断

现象 排查工具
三缓冲丢帧率高 读端加 seq 字段、日志 diff;考虑切换到 ring_buffer
ring_buffer try_push 频繁返回 false 监控队列深度(需要工具层扩展);扩容或优化消费者
IPC 连不上 查看 $ROPLAT_RUNTIME_DIR/ipc/<ns>/<name>.rdv 是否存在;PID 是否存活
IPC 订阅者收不到但握手成功 检查 schema_id 是否一致;确认 writer.is_connected() 为 true
跨语言布局错配 查看 roplat_gen/ 下生成的头文件;确保两侧 #[repr(C)]

诊断 IPC 端点:

cargo roplat ipc ls                                        # 列出所有端点 + 存活 PID
cargo roplat ipc introspect "roplat-ipc://ns/name?msg=..." # 单端点详情 + schema 校验

九、新人建议

  1. 能主流就主流:同域内用 >> 最简洁;只在跨域 / 共享状态 / 跨进程时转旁路
  2. 能透明就透明:透明消息优先,保证跨语言可见
  3. 先跑通,再优化:先让主链路稳定,再引入 opaque / IPC
  4. IPC 单独写示例:不要一开始就在主系统里上 IPC,先在 examples/ 跑通握手与背压
  5. 善用版本号#[roplat_msg(version = N)],字段有破坏性变更时递增,避免线上两侧偷偷不一致