12 进程间通讯
面向初学者:把「跨进程通讯」讲清楚,并指明它在 roplat 中的独特定位。
一、为什么需要进程间通讯(IPC)?
前面章节介绍的「通讯」都是进程内的旁路通讯:两个节点处于同一个 roplat_system 进程中,直接共享内存,用三缓冲或环形队列搬数据。
但真实机器人系统常有这些需求:
- 语言隔离:一个节点用 C++ 做硬实时控制、另一个用 Python 跑深度学习模型;它们必须跑在不同进程(Python GIL 不能阻塞控制线程)。
- 故障隔离:视觉模块 OOM 崩溃时,不能带着控制器一起挂。
- 在线热插拔:调试节点、录制节点、可视化节点想随时插入/退出,不想重启整个系统。
- 多机部署:一台机器跑控制、一台跑感知、通过局域网互联。
这些都绕不开「跨进程」。roplat 为此提供第三类旁路通讯资源:IPC。
二、roplat IPC 的设计定位
一句话总结:「进程内静态、进程间动态」。
进程内(triple_buffer / ring_buffer) |
进程间(ipc::*) |
|
|---|---|---|
| 拓扑 | 编译期由 #[system] 宏静态确定 |
运行时通过 URI 查找、动态绑定 |
| 资源归属 | #[system] 语句生成 |
独立资源,不穿透系统图 |
| 失败模式 | 几乎不可能失败(共享内存) | 可能 NotReady / PeerGone / SchemaMismatch |
| 语义契约 | 进程边界两侧代码不需改动,契约保持一致 | 同左 |
关键主张:用户在节点内写的代码形态和进程内一致,例如:
let (writer, _) = create_ipc_ring_buffer::<Pose>(&uri, Role::Publisher, rdv)?;
writer.unwrap().try_push(&Pose { x: 1.0, y: 2.0, z: 3.0 });
只是边界位置改了 —— 跨进程时 system 图规约只到进程的边缘,边缘之外通过 URI + Rendezvous 文件对接。
三、身份三要素
跨进程无法共享 Rust 类型系统,必须用静态字符串 ID来唯一认定「两侧在谈同一件事」。roplat 使用三要素:
- 端点 URI:
roplat-ipc://<namespace>/<endpoint>?msg=<schema_id>&v=<version> namespace:通常是系统名或部署名endpoint:端点名,类似 ROS topic- Schema 指纹(
SchemaId):消息结构的短哈希(12 位 hex),由#[roplat_msg]宏在编译期计算。两侧若字段布局不同会立刻暴露。 - 消息版本(
msg_version):人手递增的u16,用于灰度升级(同字段重命名 / 单位变更等)。
三项任一不符,握手时立刻断开,绝不让错配的字节流进入数据层 —— 这是系统篇 §VII「类型安全跨进程延伸」的代码体现。
四、Rendezvous(会合)文件
两个独立编译、独立启动的进程如何找到彼此?方案是基于文件系统的会合:
- 发布者启动时绑定本地地址(例如
127.0.0.1:54123),把地址、schema、PID 写进:
- 订阅者启动时读同一路径、校验
schema_id一致、检查发布者 PID 存活、连上address。 - 发布者退出时删除该文件;若进程崩溃没来得及删,订阅者下一次查找发现 PID 已消失会主动清理。
为什么选文件:零外部依赖、零后台服务、零网络发现。嵌入式板卡、离线调试、CI 容器全都适用。
文件内容示例:
{
"uri": "roplat-ipc://default/sensor?msg=a4f1c2&v=1",
"namespace": "default",
"name": "sensor",
"schema_id": "a4f1c2",
"msg_version": 1,
"transport": "tcp",
"address": "127.0.0.1:54123",
"publisher_pid": 12345,
"created_at": 1713840000
}
五、握手协议
订阅者连接到发布者后,在数据字节流之前完成一次 Hello → HelloAck 交换:
Subscriber → Publisher:
{ magic: "roplat-ipc", protocol_version: 1,
role: "subscriber", schema_id: "a4f1c2", msg_version: 1,
endpoint: "default/sensor" }
Publisher → Subscriber:
{ magic: "roplat-ipc", protocol_version: 1,
accepted: true, reason: null }
任一字段不符,发布者回 accepted: false, reason: "<描述>" 并关闭连接。这是类型安全的最后一道闸门。
六、延迟绑定(Late Binding)
IPC 与进程内通讯最大的差异:订阅者不一定能立刻连上发布者。
可能的场景:
- 发布者还没启动(异步部署)
- 发布者在另一台机器上,网络暂不可达
- 发布者崩溃重启中
roplat 的做法是延迟绑定状态机:
Pending ──(读到 rendezvous)──▶ Connecting ──(握手成功)──▶ Ready
▲ │
└──────────────(PeerGone / 重连退避)──────────────────────┘
用户代码感知不到状态机细节 —— try_pop() 在未就绪时返回 None,和「队列暂时为空」表现一致。节点里写的逻辑不因进程边界而改变。
七、后端(Transport)
IpcTransport trait 把「怎么搬字节」从语义层抽走。MVP 提供:
| 后端 | 定位 | 延迟量级 | 跨平台 |
|---|---|---|---|
tcp |
默认,loopback TCP | ~100 μs | ✅ Windows/Linux/macOS |
loopback |
进程内 mpsc,单元测试用 | ns 级 | ✅ |
规划中的后端(不在本次 MVP 内):
| 后端 | 定位 | 延迟量级 |
|---|---|---|
uds |
Unix Domain Socket | ~10 μs |
np |
Windows Named Pipe | ~10 μs |
shm |
共享内存 ring(需同步原语) | < 1 μs |
用户不用关心底层选择 —— rendezvous 文件里 transport 字段由发布者决定,订阅者自动适配。
八、完整示例
发布者
use roplat::comm::{
create_ipc_ring_buffer, EndpointUri, RendezvousDir, Role, SchemaId,
};
#[repr(C)]
#[derive(Copy, Clone)]
struct Pose { x: f32, y: f32, z: f32 }
fn main() -> roplat::RoplatResult<()> {
let uri = EndpointUri::new("default", "sensor", SchemaId::new("a4f1c2"), 1);
let rdv = RendezvousDir::new_default();
let (writer, _) = create_ipc_ring_buffer::<Pose>(&uri, Role::Publisher, rdv)?;
let writer = writer.unwrap();
loop {
writer.try_push(&Pose { x: 1.0, y: 2.0, z: 3.0 });
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
订阅者
use roplat::comm::{
create_ipc_ring_buffer, EndpointUri, RendezvousDir, Role, SchemaId,
};
#[repr(C)]
#[derive(Copy, Clone, Debug)]
struct Pose { x: f32, y: f32, z: f32 }
fn main() -> roplat::RoplatResult<()> {
let uri = EndpointUri::new("default", "sensor", SchemaId::new("a4f1c2"), 1);
let rdv = RendezvousDir::new_default();
// 注意:如果发布者还没启动,返回 NotReady;上层可轮询重试
let (_, reader) = create_ipc_ring_buffer::<Pose>(&uri, Role::Subscriber, rdv)?;
let reader = reader.unwrap();
loop {
if let Some(pose) = reader.try_pop()? {
println!("got {:?}", pose);
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
}
九、与 ROS 2 的对比
| 维度 | ROS 2 Topic | roplat IPC |
|---|---|---|
| 类型安全 | IDL + 序列化库对齐 | 编译期 SchemaId 双侧等值校验 |
| 发现机制 | DDS(多播 + SPDP) | 文件系统 rendezvous |
| 运行时依赖 | 完整 DDS 栈(rmw + fastdds/cyclone) | 零外部依赖,仅标准库 + serde |
| 进程内路径 | 仍走 DDS 序列化 | 进程内零拷贝(三缓冲 / 环形队列) |
| 延迟绑定 | 自然支持 | 显式状态机,行为可预测 |
| 确定性回放 | 需外挂 rosbag + 时间同步 | 与进程内消息同一套 #[replay] 框架 |
roplat 取舍:以「运行时更简单、进程内更快」换取「不做全网 pub/sub」。跨机场景由上层显式桥接,而非透明广播。
十、当前 MVP 限制
MVP 阶段(2026-04-23)覆盖的功能:
- ✅
EndpointUri解析 / 序列化 - ✅ Rendezvous 文件读写 + PID 僵尸清理(Linux/Windows)
- ✅ Hello / HelloAck 握手协议
- ✅
IpcRingWriter<T>/IpcRingReader<T>+ TCP 后端 - ✅ 进程内 loopback 后端(单元测试用)
- ✅
#[roplat_msg]宏自动生成SCHEMA_ID/SCHEMA_FULL/MSG_VERSION(2026-04-23 增量) - ✅ TCP 后端真·一写多读广播(独立 peer outbox,2026-04-23 增量)
- ✅
ConnectOptions/IpcOptions自动重连封装(2026-04-24) - ✅ TCP 发布侧背压水位
TcpOptions { high_watermark, overflow }(2026-04-24) - ✅ 跨进程
cargo teste2e 集成测试(2026-04-24) - ✅
IpcTripleBuffer<T>—— 最新值覆盖的 IPC 三缓冲语义(2026-04-24,基于 TCP 广播的 facade 实现) - ✅
cargo roplat ipc ls/ipc introspect诊断子命令(2026-04-24)
选项化用法示例
发布者带背压、订阅者带等待超时的最新推荐写法:
use roplat::comm::{
ConnectOptions, EndpointUri, IpcOptions, OverflowPolicy, RendezvousDir,
Role, SchemaId, TcpOptions, create_ipc_ring_buffer_with_opts,
};
// 发布者:防止订阅者卡死导致内存无界增长
let pub_opts = IpcOptions {
tcp: TcpOptions {
high_watermark: Some(256),
overflow: OverflowPolicy::DropOldest,
},
..Default::default()
};
// 订阅者:等发布者上线,最多等 30s
let sub_opts = IpcOptions {
connect: ConnectOptions::wait_forever(),
..Default::default()
};
let (writer, _) = create_ipc_ring_buffer_with_opts::<Pose>(
&uri, Role::Publisher, RendezvousDir::new_default(), &pub_opts
)?;
尚未实现(后续迭代):
- ⏳ UDS / Named Pipe / 共享内存后端(需抽象 stream I/O 层)
- ⏳ 真·零拷贝共享内存
IpcTripleBuffer(当前是 TCP 广播 facade) - ⏳
#[system]宏对 IPC 端点的原生识别 - ⏳ IPC 消息参与
#[replay]录制
参见 roplat-pages/docs/Log/2026-04-23_comm_ipc.md、2026-04-24_comm_ipc_hardening.md 与 2026-04-24_ipc_triple_buffer_and_cli.md 了解落地过程与决策细节。
IpcTripleBuffer<T> 用法
与 IpcRingBuffer<T> 工厂对称,返回 (Option<Writer>, Option<Reader>):
use roplat::comm::{
EndpointUri, RendezvousDir, Role, SchemaId, create_ipc_triple_buffer,
};
let uri = EndpointUri::new(
"default", "pose",
SchemaId::new(Pose::SCHEMA_ID),
Pose::MSG_VERSION,
);
// 发布者
let (writer, _) = create_ipc_triple_buffer::<Pose>(
&uri, Role::Publisher, RendezvousDir::new_default()
)?;
writer.unwrap().publish(&pose);
// 订阅者
let (_, reader) = create_ipc_triple_buffer::<Pose>(
&uri, Role::Subscriber, RendezvousDir::new_default()
)?;
if let Some(pose) = reader.unwrap().get_latest()? {
// 最新一帧;中间帧可能被跳过
}
CLI 诊断
# 列出所有端点
cargo roplat ipc ls
# 或按自定义根目录
cargo roplat ipc ls --root D:\custom\runtime\ipc
# 机器可读
cargo roplat ipc ls --json
# 查看单个端点
cargo roplat ipc introspect "roplat-ipc://ipc_pubsub/pose?msg=137a75816a5a&v=1"
输出会显示 URI / 后端 / 地址 / schema_id / PID / 是否存活。URI 与 rendezvous 的 schema_id / msg_version 不一致时显式警告,便于在升级消息结构后定位错配。