Skip to content

图规约与系统宏

本文档介绍 #[roplat::system] 宏背后的图规约引擎 (Graph Reduction Engine)——它将用户编写的系统 DSL 转换为高效的异步调度代码。

设计思想

图即调度,规约即编译。

系统宏的工作可以概括为:

  1. 解析:将系统函数体中的 >> 连线、match 分支、节律闭包 解析为内部 IR
  2. 建图:从 IR 构建 FlowGraph(节点 + 有向边 = 数据依赖)
  3. 规约:通过 ReducibleGraph 反复应用规约规则,将 N 个节点折叠为单一调度树
  4. 代码生成:遍历调度树,输出 async fn + tokio::join! + tokio::spawn 代码

关键洞察:规约过程本身就是编译过程——每条规则对应一种运行时执行模式。

数据结构

RNode(规约节点)

规约的核心是 RNode 枚举,它同时表示原子节点和复合调度结构:

变体 含义 运行时映射
Atom(id) 原始计算/资源/子图节点 直接调用
Seq(Vec<RNode>) 串行序列 async { a; b; c; }
Par(Vec<RNode>) 并行分组 tokio::join!(a, b, c)
Spawned(Vec<RNode>) 独立协程 tokio::spawn(a); spawn(b);
Blocking(Box<RNode>) CPU 密集线程 tokio::task::spawn_blocking(...)
ChannelTx(ch) / ChannelRx(ch) 信道端点 tx.send() / rx.recv()

ReducibleGraph

ReducibleGraph {
    nodes:   BTreeMap<NodeId, RNode>,    // 当前存活节点
    fwd/rev: BTreeMap<NodeId, BTreeSet>, // 邻接表
    weights: BTreeMap<NodeId, usize>,    // 节点权重
}

每次规约操作会:

  1. 从图中移除若干节点
  2. 创建一个新的复合 RNode(如 SeqPar
  3. 继承被移除节点的外部边

重复这个过程直到图中只剩一个节点——它就是最终的调度树。

规约规则

规则按优先级顺序尝试,高优先级规则总是先于低优先级应用。

R0: 线程卸载 (Thread Offload)

条件: 节点 weight >= HEAVY_THRESHOLD(当前阈值 = 10)

动作: 将重节点从主图移除,切断所有入边和出边,每条被切断的边用一对信道 (ChannelTx, ChannelRx) 替代。被卸载的节点包装为 Blocking(Seq(ChannelRx..., Atom, ChannelTx...)),放入独立线程列表。

设计意图: CPU 密集型节点(如图像处理、矩阵运算)不应阻塞 tokio 运行时。将其提升到 spawn_blocking 线程池,通过信道与主图异步通信。

 主图: A → [heavy B] → C
           ↓ R0
 主图: A → ChannelTx(0) ─── ChannelRx(1) → C
 卸载: Blocking(Seq(ChannelRx(0), B, ChannelTx(1)))

R0 在所有其他规则之前单独执行一次。

R1: 串行融合 (Sequence)

条件: 边 u → v,且 out(u) == 1 ∧ in(v) == 1

动作: 合并为 Seq(u, v),继承 u 的前驱和 v 的后继。

这是最常见的规约——线性管道中的节点被自然地串联成顺序执行。

R2: 并行融合 (Parallel)

条件: 一组节点 {N1, N2, ..., Nk} 共享完全相同的 (入邻居集, 出邻居集),且至少一侧非空。

动作: 合并为 Par(N1, N2, ..., Nk)

典型场景:fork-join 模式中的并行分支。

     A
    / \
   B   C    →  Seq(A, Par(B, C), D)
    \ /
     D

R3: 拓扑序列 (TopoSequence)

条件: 当前图的拓扑排序唯一(每一层只有一个节点可选)。

动作: 将所有节点按拓扑排序收集为 Seq(全部节点)

这是一条"兜底融合"规则——当 R1 无法单步融合但图已经有确定的全序时,一步到位。

R4: 信道切割 (ChannelCut)

条件: R1~R3 均无法推进。

动作: 使用启发式评分选择一条边切割,插入 ChannelTx/Rx 对,将强耦合打断。

评分公式:score = 100 * layer_diff + 10 * in_degree(v) - weight(v)

  • 优先选择跨越层数多的边(长依赖链)
  • 优先选择高入度汇聚点的边(瓶颈边)
  • 避免切割重节点的边(已由 R0 处理)

R4 是"最后手段"——引入信道意味着运行时额外开销,所以仅在图结构复杂到无法通过纯融合处理时使用。

规约执行流程

                    ┌──────────────┐
                    │ ReducibleGraph │
                    │  (N nodes)    │
                    └──────┬───────┘
                    ┌──────▼───────┐
                    │   R0: 卸载    │──→ offloaded: Vec<Blocking(...)>
                    │  重节点(≥10)  │
                    └──────┬───────┘
              ┌────────────▼────────────┐
              │  循环 {                  │
              │    try R1(串行融合)       │
              │    try R2(并行融合)       │
              │    try R3(拓扑序列)       │
              │    try R4(信道切割)       │
              │  } until 1 node left    │
              └────────────┬────────────┘
                    ┌──────▼───────┐
                    │ ReduceOutput  │
                    │  main: RNode  │
                    │  offloaded: [] │
                    └──────────────┘

从 FlowGraph 到 ReducibleGraph

集成路径:

  1. 解析器 (parser.rs) 将 #[roplat::system] 函数体解析为 Scope IR
  2. 建图 (graph.rs) 将 IR 转为 FlowGraph(包含 NodeKindInputSource 等元数据)
  3. 适配 (system_v3/mod.rs) 将 FlowGraph 映射到 ReducibleGraph
  4. 规约 (system_v3/graph.rs) 执行 R0~R4,输出 ReduceOutput
  5. 转换 (system_v3/mod.rs) 将 RNode 树映射回 Schedule
  6. 代码生成 (codegen.rs) 将 Schedule 树生成最终 TokenStream
FlowGraph                 ReducibleGraph                 Schedule
┌──────┐  to_reducible()  ┌──────────┐  to_schedule()   ┌────────┐
│ Node │ ───────────────→ │  RNode   │ ──────────────→  │ Step   │
│ Edge │                  │  fwd/rev │                  │ Seq    │
│ Kind │                  │  weight  │                  │ Par    │
└──────┘                  └──────────┘                  │ Spawn  │
                                                        │ Loop   │
                                                        └────────┘

代码生成:节律闭包中的节点传递与生命周期

在代码生成阶段,节律闭包(Subgraph::Rhythm)的处理包含以下关键设计:

Pass-by-Value 节点传递

节点以 &mut 引用打包成元组传入 drive(),回调通过值移动接收并返回:

// 生成的代码(伪)
let mut nodes_tuple = (&mut source, &mut sink, tx_0, rx_0, ...);
driver.drive(nodes_tuple, |mut nodes, event| {
    let resource = resource.clone();  // 每 tick 克隆 Arc(原子操作)
    async move {
        let source = &mut nodes.0;
        let sink = &mut nodes.1;
        // ... inner pipeline
        ((), nodes)  // 归还所有权
    }
}).await;

这种设计消除了 BoxFuture 的逐 tick 堆分配。回调返回 ((), nodes) 将节点元组归还给 drive 循环。

on_init / on_shutdown 自动注入

代码生成器为每个节律闭包中捕获的计算节点自动插入生命周期调用:

node_a.on_init().await;   // 声明顺序
node_b.on_init().await;
let mut nodes_tuple = (&mut node_a, &mut node_b);
driver.drive(...).await;
node_b.on_shutdown().await;  // 声明逆序
node_a.on_shutdown().await;

因为节点以 &mut 引用传入 drive,drive 结束后节点仍然存活,on_shutdown 可以正常执行。

参与开发

代码位置

文件 职责
roplat_macros/src/system_v3/graph.rs 图规约核心算法 + 单元测试
roplat_macros/src/system_v3/mod.rs FlowGraph ↔ ReducibleGraph 适配
roplat_macros/src/system.rs 宏入口,串联完整管道
roplat_macros/src/system/codegen.rs Schedule → TokenStream 代码生成
examples/macros/ 端到端测试用例

添加新规约规则

  1. Rule 枚举中添加新变体
  2. reduce_step() 中按优先级顺序插入新规则的尝试逻辑
  3. 编写单元测试验证规则的触发条件和输出

编译期安全检查

跨组死锁检测

图规约后,独立执行单元(Spawned 子树、offloaded 任务)通过通道通信。ReduceOutput::detect_channel_deadlock() 自动检测通道依赖是否形成环路:

  1. 提取独立执行单元
  2. 收集每个单元的 ChannelTx / ChannelRx ID
  3. 构建依赖图:单元 A 有 Rx(ch) 且单元 B 有 Tx(ch) → A 依赖 B
  4. DFS 检测环路

若检测到环路,schedule_graph_v3() 会 panic 并报告涉及的单元和通道。

 单元 A: Rx(1) → Node0 → Tx(0)
 单元 B: Rx(0) → Node1 → Tx(1)
 ──────────────────────────────
 A 依赖 B (ch1),B 依赖 A (ch0) → 死锁!

解析器错误报告

SystemParser 在遇到不合法的 DSL 语法时,通过 syn::Error 报告带精确 span 的中文错误信息:

场景 错误信息
不支持的宏调用 "system! 宏体中暂不支持宏调用语句"
无法识别的表达式 "无法识别的 system DSL 表达式" + 支持语法列表
无法解析的端点 "无法解析连接的源/目标端点"
节律域驱动源解析失败 "无法解析节律域驱动源"
  1. 如果规则产生新的 RNode 变体,需要同步更新 to_schedule()codegen.rs

调试技巧

使用 reduce_traced() 可以获取规约过程中每一步应用的规则列表:

let (output, trace) = graph.reduce_traced();
// trace: [ThreadOffload, Sequence, Parallel, Sequence]

ReducibleGraph 实现了 Display trait,可以直接打印图的节点数和边数。