这篇文章对应视频:【[Agentic RL] 13 verl infra AgentLoop 代码串讲,multi-turn 推理与 ray trainer】(BV1MizSBJEbi)。

但我不会把它写成“逐句视频笔记”。定位是:verl 的 AgentLoop 代码路径和 RayPPOTrainer 的训练主循环串起来,让你做到:

  1. 读代码不迷路:知道入口在哪、数据在哪流、每个模块的边界是什么。
  2. 能把 multi-turn 推理跑成一个稳定的服务:知道要保什么状态、哪些状态必须 sticky、哪些必须 mask。
  3. 能 debug:遇到卡死/吞吐低/奖励对不上/轨迹错位,知道该查哪层日志。

系列导航:

前置建议(不然你会“看懂一堆类名,但不知道为什么要这么做”):

  1. 12 先建立 AgentLoop 的架构直觉(async / sticky / hybrid / mask)
  2. 多轮训练的底线:token-in-token-out

本文主要对齐你本地仓库的这些笔记(按阅读顺序):

  • Ray 训练系统的 worker group / 资源池 / 单控制器数据流:
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/basics/架构概念.ipynb
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/verl-arch.ipynb
  • AgentLoop 的关键入口(能直接定位到 ray_trainer.py / agent_loop.py):
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/agent/agent_loop_code.ipynb
  • Ray 的 remote / ObjectRef / wait 的基本语义(理解为什么这里用 ray.get,以及可以怎么改):
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/ray/remote.ipynb
  • multi-turn 的一个典型 config 入口(开启 async + tool agent loop):
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/retool/basics.ipynb

下一篇(延伸:把“全流程与计算细节”补齐):


0. 先把“三层 remote”分清楚:Ray remote / 推理 RPC / Tool IO

很多人看 verl agent loop 会混淆三种“远程调用”,导致排障时乱跳层:

  1. Ray remote(控制面)worker.generate_sequences.remote(chunk)
    • 这是“把一段 Python 函数/方法丢给 Ray 调度器执行”。返回的是 ObjectRef(未来值)。
  2. 推理引擎 RPC(数据面)AsyncLLMServerManager -> (vLLM/SGLang server)
    • 这是“让 GPU 上的推理服务生成 token”。
  3. 工具 IO(外部世界):search / sandbox / mcp / env simulator
    • 这是“非确定延迟”的根源(straggler 的根源)。

把它们分开,你才知道:

  • 吞吐低:先看推理服务 batch(vLLM/SGLang),再看工具长尾,再看 Ray 调度。
  • 卡死:先看 tool 超时/死锁,再看 Ray actor 崩溃重启,再看推理 server backlog。
  • 训练不稳定:先看 token-in-token-out、mask、reward 口径;最后才怀疑 PPO loss。

1. RayPPOTrainer 的系统观:单控制器 + 多 WorkerGroup

verl 的 Ray 架构核心是:一个 Trainer(单控制器)串起一条“数据流图”,把重活下发给多个 worker group(多控制器并行)

架构概念.ipynb 给了一个最关键的骨架(我保留原结构,便于你对照源码思路):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
global_pool_id = "global_pool"
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}

self.mapping[Role.ActorRollout] = global_pool_id
self.mapping[Role.Critic] = global_pool_id
self.mapping[Role.RewardModel] = "global_pool"
self.mapping[Role.RefPolicy] = "global_pool"

resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=self.mapping)

self.role_worker_mapping[Role.ActorRollout] = ray.remote(actor_rollout_cls)
self.role_worker_mapping[Role.Critic] = ray.remote(CriticWorker)
self.role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker)
self.role_worker_mapping[Role.RefPolicy] = ray.remote(ref_policy_cls)

trainer = RayPPOTrainer(config,
role_worker_mapping=self.role_worker_mapping,
resource_pool_manager=resource_pool_manager, ...)

trainer.init_workers()
trainer.fit()

1.1 你应该形成的 mental model

你可以把它理解成下面这个“资源与角色图”(示意):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
graph TD
T[RayPPOTrainer (single-controller)] --> WG_AR[actor_rollout_wg]
T --> WG_REF[ref_policy_wg]
T --> WG_C[critic_wg]
T --> WG_RM[reward_model_wg]

subgraph GlobalPool[resource pool: global_pool]
WG_AR
WG_REF
WG_C
end

subgraph RMPool[resource pool: reward_pool (optional)]
WG_RM
end

关键点:Trainer 负责“顺序编排”(dataflow),每个 WG 内部再并行(SPMD/DP/TP/PP)
这就是 架构概念.ipynb 里说的 single-controller vs multi-controller。


2. Training Step:Ray trainer 的“同步序列”长什么样

verl-arch.ipynb 把一个典型训练 step 的顺序写得很清楚(这是你读 ray_trainer.py 的导航图):

  1. generate_sequences() → rollout 数据生成
  2. compute_rm_score() → RM / verifier 给分
  3. compute_log_prob() → actor 重算 logprob(用于 ratio / KL)
  4. compute_ref_log_prob() → ref policy logprob(用于 KL)
  5. compute_values() → critic values(用于 advantage)
  6. update_critic() → critic 更新
  7. update_actor() → actor 更新

当你引入 AgentLoop + async rollout 后,这条链路会发生两个“结构性变化”:

  • generate_sequences() 不再只是 “LLM 单轮生成”:它变成“多轮交错轨迹生成(LLM + tool)”,输出必须带 response_mask
  • compute_rm_score() 可能被前置进 rollout 阶段:尤其是 async 模式下(上一期文章已解释)。

所以,第 13 期代码串讲的重点,其实是:

这条训练流水线里,AgentLoop 插在哪,插进去后哪些步骤的“输入输出契约”变了?


3. 真正的分水岭:rollout.mode=async 触发的代码路径

agent_loop_code.ipynb 里把关键分支点点出来了:

  • ray_trainer.py:init_workers
    • 如果 self.config.actor_rollout_ref.rollout.mode == "async"
    • 实例化 self.async_rollout_manager = AgentLoopManager(...)
  • ray_trainer.py:fit
    • gen_batch_output = self.async_rollout_manager.generate_sequences(gen_batch_output)
  • ray_trainer.py:_validate
    • self.async_rollout_manager.generate_sequences(test_gen_batch_padded)

我建议你把它画成一个“入口级 call graph”,以后你 debug 就按这个树往下钻:

1
2
3
4
5
6
7
8
9
10
11
12
graph TD
A[RayPPOTrainer.fit] --> B{rollout.mode == async?}
B -- No --> C[actor_rollout_wg.generate_sequences]
B -- Yes --> D[AgentLoopManager.generate_sequences]
D --> E[AgentLoopWorker.generate_sequences (Ray remote)]
E --> F[AgentLoop.run (async state machine)]
F --> G[AsyncLLMServerManager.generate (sticky + lb)]
G --> H[vLLM/SGLang server]
F --> I[Tools / Env / Simulator]
E --> J[postprocess: mask / reward attach / metrics]
J --> D
D --> A

注意:这张图里有两个“并行域”

  • Ray 并行:AgentLoopManager 把 batch chunk 到多个 Worker(ray.get([...remote...]))。
  • Worker 内并行:单个 Worker 用 asyncio 并发跑很多条 agent loop(上一期讲过)。

4. AgentLoopManager.generate_sequences:你真正需要读懂的 40 行代码

agent_loop_code.ipynb 直接给了 AgentLoopManager.generate_sequences 的实现骨架(这段代码是你读 infra 的关键锚点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def generate_sequences(self, prompts: DataProto) -> DataProto:
if self.config.actor_rollout_ref.rollout.free_cache_engine:
self.wake_up()
if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine:
self.reward_model_manager.wake_up()

chunkes = prompts.chunk(len(self.agent_loop_workers))
outputs = ray.get(
[
worker.generate_sequences.remote(chunk)
for worker, chunk in zip(self.agent_loop_workers, chunkes, strict=True)
]
)
output = DataProto.concat(outputs)

if self.config.actor_rollout_ref.rollout.free_cache_engine:
self.sleep()
if self.reward_model_manager and self.config.reward_model.rollout.free_cache_engine:
self.reward_model_manager.sleep()

metrics = [output.meta_info.pop("metrics") for output in outputs]
timing = self._performance_metrics(metrics, output)
output.meta_info = {"timing": timing, **outputs[0].meta_info}
return output

4.1 这段代码背后的“系统假设”

  1. rollout 与 train 分阶段:所以有 wake_up/sleep(hybrid 才需要,standalone 可以关)。
  2. DataProto 是第一公民:chunk/concat 都发生在 DataProto 上,而不是 list[dict]。
  3. Worker 是无共享状态的并行单元:batch 级同步点在 ray.get,拿到全部 outputs 才 concat。
  4. metrics 是跨层传播的唯一办法:子 worker 负责采样/工具/推理耗时;manager 汇总成 timing。

你在扩展 agent loop 时,千万别破坏这几个假设,否则代码会从“好 debug”变成“玄学系统”。

4.2 我对这段实现的一个批判:ray.get 会放大长尾

它等价于“等最慢那个 worker 结束”。
如果你的 tool latency 长尾很重,你会看到训练吞吐被 straggler 拖下去。

更激进的设计是用 ray.wait 边完成边消费(remote.ipynb 里就演示了 wait 模式的写法)。
但这会改变 DataProto concat 的节奏,进而影响:

  • 训练 step 的同步边界
  • reward/advantage 的 batch 统计(例如优势标准化)

所以我建议你先接受 ray.get 的简单性,把 correctness 跑稳;真要榨吞吐,再引入 ray.wait 的“流式合并”。


5. DataProto:为什么它是 AgentLoop 和 Ray trainer 的“共同语言”

从代码可以看出,AgentLoopManager 并不关心你做的是 search agent 还是 coding agent,它只认一件事:

输入是一批 prompts(DataProto),输出是一批 trajectories(DataProto)。

这就是 verl 设计的核心优点:把 RLHF/Agentic RL 的复杂性压缩成“Dataflow + Contracts”

在 AgentLoop 场景,你至少要保证输出里有这些字段(概念上):

  • prompt_ids
  • response_ids(包含 action + observation 的交错 token)
  • response_mask(action token=1,observation token=0)
  • meta_info(timing / trajectory_id / server_id / turns / tool stats)

如果你后面要接 PPO/GRPO 的 loss 计算,最关键的一条就是:

任何会参与 logprob / ratio / KL 的 token,都必须是 token-in-token-out 的 action token。

这也是我为什么强烈建议你把 response_mask 当作“训练数据 schema 的一部分”,而不是某个临时 hack。


6. Multi-turn 推理:把 AgentLoop 当成“可观测、可回放”的推理服务

agent_loop_inference.ipynb 说得很直白:纯推理模式下,agent loop 就是 ReAct,一堆框架都能做。

但如果你面向的是 agentic RL(未来要训练),你在推理阶段就该按“可训练”的标准建系统:

  1. 可观测:你必须打出 trajectory_id、turn_id、tool latency、server_id(sticky 结果)。
  2. 可回放:同一套 prompt + tool output,在 deterministic 条件下应该能重放出同样的 token ids(否则训练阶段你永远对不上 logprob)。
  3. 可切换:standalone 模式先跑通;hybrid 是后置优化项(别把系统复杂度绑死)。

你可以把 multi-turn 推理的“标准姿势”理解成:

  • AgentLoop 管对话历史、tool 调用、终止条件(逻辑层)
  • AsyncLLMServerManager 做 sticky + LB(服务层)
  • 用 vLLM/SGLang 做 continuous batching(计算层)

如果你只用 LangChain/LangGraph 跑推理,不做这几件事,短期能跑通,长期很难演进到“可训练”。


7. 最常见的 8 个坑(按出现频率排序)

  1. mask 错了:tool 输出 token 被当成 action token → logprob 污染 → 训练直接发散。
  2. token-in-token-out 被破坏:chat template / tokenizer encode/decode 不一致 → 轨迹对不上。
  3. sticky 没做好:多轮路由到不同 server → prefix cache 失效 + 状态漂移。
  4. reward 口径不一致:rollout 前置打分 vs training phase 打分 → 同一个 batch 分数不一致。
  5. hybrid 切换开销吞掉收益:wake_up/sleep 太重,反而比 standalone 慢。
  6. Ray 资源池映射错:actor_rollout/ref/critic/rm 抢同一组 GPU,或 placement 不合理 → 看起来“卡死”。
  7. 工具长尾拖垮 ray.get:少数样本超时 → 整批等死。
  8. metrics 缺失导致盲调:没有 per-turn latency / per-tool stats,只能看最终 reward 曲线瞎猜。

8. 我建议你怎么学这套东西(很实际)

如果你的目标是“做 agentic RL 来做 deep research”,你不需要一上来就把 verl 全吃透。建议按 3 阶段:

  1. 跑通 multi-turn 推理(standalone):把 trajectory schema、mask、sticky、日志打通。
  2. 接上最小 RL 闭环:哪怕 reward 是规则/verifier,先保证能稳定更新(别追 SOTA)。
  3. 再谈 infra 优化:hybrid、ray.wait 流式、reward 前置、吞吐调优。

学的顺序对了,你会发现“读代码”不再是背类名,而是在验证你对数据契约的理解。


9. 小结

  1. rollout.mode=async 不是一个小开关,它改变的是:rollout 产出的数据形态(交错轨迹 + mask),以及 trainer 的流水线边界。
  2. AgentLoopManager.generate_sequences 的关键不在于“Ray 怎么写”,而在于:它把整个系统的同步点固定在 DataProto concat 这一刻
  3. multi-turn 推理要按“未来可训练”的标准去设计:可观测、可回放、可切换;否则你会在训练阶段被历史债务反噬。

下一篇(如果你希望我继续按这个流程写)我会更偏“实战排障”:针对 Ray actor 崩溃、tool 超时、reward 延迟、吞吐打不满这些问题,给出更具体的日志字段与定位路径。