这篇文章对应视频:【[Agentic RL] 14 verl AgentLoop 全流程与计算细节,async rollout 实现,状态机,tool / interaction】(BV18d6sBpEZq)。

我会用“系统 + 数据契约”的方式把 AgentLoop 讲清楚:不是复述视频,而是把你真正会卡住的点拆开,直到你能做到:

  1. AgentLoop 相关代码时,能快速定位“当前在数据流的哪一段”。
  2. 你能解释清楚:为什么 agent loop 输出必须是交错轨迹(LLM token + tool obs token),以及为什么必须带 response_mask
  3. 你能把“tool / interaction / termination / reward attach”这几个最容易写乱的逻辑写成一套可 debug 的状态机。

系列导航:

建议先读(否则你会觉得本文有点“infra 细节过密”):

  1. 12 先建立 AgentLoop 的架构直觉(async / sticky / hybrid / mask)
  2. 13 先把 AgentLoop 插到 RayPPOTrainer 的主循环里
  3. mask 影响 logprob/ratio/KL 的计算口径

本文主要对齐你本地仓库的这些笔记(推荐只看其中的关键段落,不要从头刷到尾):

  • AgentLoop 的状态机、mask、tool/interaction、输出字段与 reward attach:
    • /Users/wangpeng/Downloads/modern_genai_bilibili-main/agentic_rl/verl/agent/agent_loop_details.ipynb

0. 先讲一句最重要的:AgentLoop 不是“推理编排”,而是“可训练轨迹生成器”

LangChain/LangGraph 也能做 tool-use agent 的编排,但它们通常不会强迫你保留这些“训练所需的数据”:

  • token ids(每一轮到底生成了哪些 token)
  • action vs observation 的边界(tool 输出到底算不算 policy 的 action)
  • reward 的挂载位置(哪一个 token 上有非零 reward)
  • 训练阶段能否 token-in-token-out 地重算 logprob(否则 ratio/KL 全崩)

而 veRL/verl 的 AgentLoop 是为 RL 训练服务的:它输出的不是“最终答案字符串”,而是一条可用于计算 PG/PPO/GRPO loss 的轨迹

你可以把它理解成:

AgentLoop = 逻辑状态机 + 轨迹数据结构(契约) + 异步执行框架(吞吐)

本文就是把这三件事拆开讲清楚。


1. 数据契约:AgentLoopOutput / AgentData / DataProto 三者如何对齐

agent_loop_details.ipynb 把 AgentLoop 的输出 schema 写得很明确(我建议你把这段当成“接口文档”背下来):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class AgentLoopOutput(BaseModel):
"""Agent loop output."""
prompt_ids: list[int]
"""Prompt token ids."""
response_ids: list[int]
"""Response token ids including LLM generated token, tool response token."""
response_mask: list[int]
"""Response mask, 1 for LLM generated token, 0 for tool response token."""
response_logprobs: Optional[list[float]] = None
"""Log probabilities for the response tokens."""
reward_score: Optional[float] = None
"""Reward score for the trajectory."""
num_turns: int = 0
"""Number of chat turns, including user, assistant, tool."""
metrics: AgentLoopMetrics
"""Auxiliary performance metrics"""
extra_fields: dict[str, Any] = {}
"""Extra fields for dynamic addition."""

这里面最关键的是三项:

  1. response_ids交错轨迹(LLM 生成 token + tool 返回 token)都塞在一起。
  2. response_mask:告诉训练代码,“哪些 token 属于 policy action”。
  3. num_turns:不是装饰,它常常被用来做 reward shaping / debug(例如鼓励 tool 调用)。

而在状态机内部,通常会用一个 AgentData 来累积轨迹(agent_loop_details.ipynb 也给了骨架):

1
2
3
4
5
6
7
8
9
10
class AgentData:
prompt_ids: list[int] = [] # 不断追加
response_mask: list[int] = [] # 标记哪些是 LLM 生成的
response_logprobs: list[float] = []

turn_scores: list[float] = [] # 每轮 interaction 的 reward
tool_rewards: list[float] = [] # 每次工具调用的 reward

user_turns = 0
assistant_turns = 0

最后,AgentLoopManager/Worker 的批处理层会把一堆 AgentLoopOutput 合并成一个 DataProto batch 返回给 Trainer(上一期/上上期已经串起来了)。

1.1 response_mask 的“硬约束”:tool 输出必须是 observation(mask=0)

agent_loop_details.ipynb 给了最精简的规则:

1
2
3
4
# LLM 生成的 token: mask = 1
agent_data.response_mask += [1] * len(agent_data.response_ids)
# Tool 返回的 token: mask = 0
agent_data.response_mask += [0] * len(response_ids)

背后原因很简单,但几乎是所有 agentic RL 崩溃的第一现场:

  • tool 输出不应该参与 policy gradient(它不是 policy 的 action 分布)。
  • 一旦你把 tool token 当 action token 去算 ratio/KL,loss 在数学上就失真了,训练曲线会看起来“有梯度”,但其实在优化一个虚假的目标。

你可以把它当作一条底线:

任何进入 compute_log_prob / ratio / KL / entropy 的 token,都必须是 LLM 真正生成出来的 action token。


2. 状态机:把 tool / interaction / termination 写成可 debug 的结构

AgentLoop 的核心是一个显式状态机(5 状态),agent_loop_details.ipynb 给了非常标准的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
stateDiagram-v2
[*] --> PENDING: Start

state "PENDING (初始化)" as PENDING
state "GENERATING (模型生成)" as GENERATING
state "PROCESSING_TOOLS (执行工具)" as PROCESSING_TOOLS
state "INTERACTING (环境/用户交互)" as INTERACTING
state "TERMINATED (结束)" as TERMINATED

PENDING --> GENERATING : 准备好 Prompt

GENERATING --> PROCESSING_TOOLS : 模型决定调用工具
GENERATING --> INTERACTING : 无工具调用 & 配置了交互环境
GENERATING --> TERMINATED : 完成任务 / 达到最大轮数 / 达到长度限制

PROCESSING_TOOLS --> GENERATING : 工具执行完毕 (更新上下文)
PROCESSING_TOOLS --> TERMINATED : 上下文超长

INTERACTING --> GENERATING : 收到外部反馈 (更新上下文)
INTERACTING --> TERMINATED : 交互结束 (should_terminate=True)

TERMINATED --> [*]: End

把每个状态“只干一件事”,你会得到一个很强的工程性质:

  • tool 的并发、超时、重试都集中在 PROCESSING_TOOLS
  • user/env simulator 的终止规则都集中在 INTERACTING
  • token/mask 的维护集中在 GENERATING 与 tool/interact 的 append 动作。

agent_loop_details.ipynb 还给了每个状态的职责摘要(非常值得抄到你自己的代码注释里):

  • PENDING:初始化 prompt_idsapply_chat_template
  • GENERATING:调用 LLM 生成,解析 tool_calls
  • PROCESSING_TOOLS:并行执行工具,收集结果
  • INTERACTING:调用环境获取反馈(可能返回 should_terminate)

2.1 tool vs interaction:不要把两者混成“外部 API”

从 RL 视角看,两者都算“环境反馈”,但工程上差异很大:

  • tool:通常是“输入一段结构化调用,得到结构化输出”(可并发、可重试、可缓存)。
  • interaction:更像一个“有状态环境”(可能需要 instance_id、可能依赖历史、可能决定终止)。

如果你把 interaction 当 tool 来写,你会遇到:

  • 状态丢失(回合之间上下文不一致)
  • 终止条件分散在各处(debug 很痛)

所以单独一个 INTERACTING 状态是合理的设计,不是“架构洁癖”。


3. async rollout 的实现:三层并发,两个同步点

这一段是视频 14 的核心,也是 veRL agent loop “厉害但容易误解”的地方。

agent_loop_details.ipynb 给了一个从 Manager 到 Server 的分步伪代码,你可以把它当作真实实现的骨架:

  1. Manager 拆 batch → 分发到多个 Worker(Ray 并行)
  2. Worker 为每个 sample 启动一个 asyncio task(协程并行)
  3. 每个 task 运行状态机,在 GENERATING 里 await 推理,在 PROCESSING_TOOLS 里 await 工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Step 1: Manager 分发到 Workers
chunkes = prompts.chunk(len(self.agent_loop_workers))
outputs = ray.get([worker.generate_sequences.remote(chunk) for worker, chunk in zip(...)])

# Step 2: Worker 内部为每个 sample 创建 async task
tasks = []
for i in range(len(batch)):
tasks.append(asyncio.create_task(self._run_agent_loop(...)))
outputs = await asyncio.gather(*tasks) # 并发执行

# Step 3: 每个 task 运行状态机
state = AgentState.PENDING
while state != AgentState.TERMINATED:
...

# Step 4: 状态机调用 LLM Server(sticky)
output = await self.server_manager.generate(
request_id=agent_data.request_id,
prompt_ids=agent_data.prompt_ids,
sampling_params=sampling_params,
)

3.1 两个同步点在哪里(你调吞吐必须先知道这个)

  1. batch 级同步点ray.get([...]) 之后 concat。
    • 这会让“最慢 worker”决定这个 batch 的 wall time。
  2. worker 内同步点await asyncio.gather(*tasks) 之后 postprocess。
    • 这会让“最慢 trajectory”决定这个 worker 的完成时间。

你会发现:async 并不是消灭了同步,而是把同步点挪到了“更合理的位置”,让 GPU 可以在同步点之间持续工作。

3.2 为什么 async 能吃满 GPU:continuous batching + 见缝插针

agent_loop_details.ipynb 讲得很清楚:

  • 客户端:asyncio 让 CPU 端“非阻塞发射请求”
  • 服务端:vLLM/SGLang continuous batching 让 GPU 端“见缝插针执行请求”

如果你只做 async,不用 continuous batching,收益会明显缩水;
如果你只做 continuous batching,但 batch 同步推进多轮(sync),收益也会被 straggler 拖垮。

它们是组合拳。


4. Tool 调用的并发:asyncio.gather 很香,但要有节制

agent_loop_details.ipynb 提到“工具调用可以并行”,并给了一个骨架:

1
2
3
4
tasks = []
for tool_call in agent_data.tool_calls[:self.max_parallel_calls]:
tasks.append(self._call_tool(tool_call, agent_data.tools_kwargs))
responses = await asyncio.gather(*tasks)

这里我想补三点工程经验(视频里不一定强调,但你写系统时一定会遇到):

  1. 并发上限必须有max_parallel_calls 不是可选参数。没有上限,你会把外部 API 打爆,或者把本机 fd/线程池打爆。
  2. 超时与 fallback 必须统一:建议在 _call_tool 内统一做 timeout + exception mapping,把错误也变成 observation(mask=0)的一部分,让状态机继续推进或可控终止。
  3. 把 tool 的“结构化输出”与“写进 prompt 的字符串”分离:结构化输出用于 debug/metrics,字符串用于继续对话;不要把 JSON/debug 信息直接塞进 prompt(污染 token 分布)。

5. 计算细节 1:reward attach 不是小事,它决定了“你在优化什么”

很多同学会把 reward 当作一个标量塞进 batch,但在 token-level RL(LLM-RL)里,reward 最终会变成 token 级别的 advantage 信号。
你怎么 attach,会决定 credit assignment 的形态。

agent_loop_details.ipynb 给了一个很典型的做法(retool 相关):

  • reward 先由 reward_manager 计算出来
  • _postprocess 把 reward 写到最后一个有效 token 的位置

它甚至给了一个非常具象的赋值逻辑(概念上):

rm_scores[torch.arange(response_mask.size(0)), response_length] = scores

这类做法对应一种“稀疏回报”建模:

  • 只有最后一个位置 reward 非零,前面全是 0
  • 优势函数(GAE)会把这个信号向前传播,但传播形态依赖你的 value/critic 质量

5.1 tool reward shaping:鼓励工具调用到底在鼓励什么

agent_loop_details.ipynb 里还给了一个很“工程化”的 shaping:用 num_turns 给负分样本加一点补偿(鼓励尝试工具):

1
2
tool_call_reward = (num_turns - 2) / 2 * 0.1
result["score"] = min(-0.6, result["score"] + tool_call_reward)

我对这种 shaping 的态度是:可以用,但要把它当成“暂时的训练脚手架”,并且你必须监控两个副作用:

  1. agent 可能学会“无意义地拉长对话/多调工具”来骗分
  2. num_turns 与真实任务质量的相关性可能很弱(甚至负相关)

如果你的目标是 deep research agent,我更建议你把 shaping 做在:

  • tool 调用的有效性(是否提高证据覆盖、是否减少幻觉)
  • 任务闭环指标(例如检索到的引用是否能支撑最终结论)

而不是 “turn 数越多越好”。


6. 计算细节 2:为什么 rollout 阶段通常不算 logprob

AgentLoopOutput 里有 response_logprobs,但很多框架(含 verl)会选择:

  • rollout 阶段只保 token ids + mask
  • training 阶段再用 actor/ref worker 重算 logprob(并在 update 前缓存 old_logprob)

原因很现实:

  • rollout 阶段的瓶颈是 GPU 推理与 tool latency,把 logprob 计算塞进去会让路径更复杂、吞吐更差。
  • PPO 需要的 old_logprob 必须对应“update 前”的策略;最干净的做法就是在 fit 的开头统一重算并冻结。

这也是为什么在你看 ray_trainer.py 时,会看到类似流程(上一期已经提到过):

1
old_log_prob = self.actor_rollout_wg.compute_log_prob(batch)

当你把这个设计想清楚,你就会更敢于把 AgentLoop 写得“只关注轨迹”,把训练逻辑留给 Trainer。


7. Debug Checklist:一眼定位是状态机错、mask 错,还是 reward 口径错

我建议你最少打这些日志(没有这些,你就是在盲调):

  1. 每条 trajectory:request_id / num_turns / response_len / mask_ones_ratio
  2. 每轮 GENERATING:生成 token 数、解析到的 tool_calls 数、是否终止
  3. 每轮 PROCESSING_TOOLS:tool 名称、耗时、成功/失败、返回 token 数
  4. interaction:should_terminate、reward、反馈字符串长度
  5. reward attach:reward 写入的位置 index(最后 token 还是 turn token?)

如果你只能加一个指标:加 mask_ones_ratio
它能极快暴露“tool token 混入 action token”的灾难性 bug。


8. 小结:把 AgentLoop 写对,agentic RL 才有可能写对

  1. AgentLoop 的价值不在于“能跑 multi-turn”,而在于它输出的轨迹能被严谨地用于 token-level RL
  2. response_mask 是契约,不是优化项:你只要把 observation token 误当 action token,训练目标就会被污染。
  3. async rollout 是系统设计:三层并发(Ray + asyncio + continuous batching),两处同步点(ray.get + gather)。
  4. tool 与 interaction 建议分状态处理:并发/超时/重试/终止规则会更清晰。

下一篇如果你继续给链接,我会优先写“排障与性能调优专题”:包括 tool 长尾、ray.get straggler、sticky session 缓存命中、以及 reward 前置与 training-phase 口径一致性怎么做。