[Feature]Add output fallback support for OpenAI serving#7942
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
本 PR 为 OpenAI 兼容服务新增 output fallback 兜底处理框架,在 streaming / non-streaming 路径上对模型输出做后处理(修复 Markdown 加粗冒号、Markdown 表格、检测重复输出截断),并通过策略注册 + 插件机制支持自定义扩展。
Changes:
- 新增
fastdeploy/output/fallback/子包:定义OutputFallbackStrategy基类、OutputFallbackContext、StreamFallbackDecision、OutputFallbackManager,并内置markdown-bold-colon/markdown-table/repeat-truncate三个策略。 - 在
EngineArgs/ api_server 接入--output-fallback、--output-fallback-plugin、--output-fallback-config三个启动参数,并将 manager 注入到 v0 / v1 chat 和 completion 的 serving 类。 - 在 streaming / non-streaming 处理流程中调用 manager 的
apply/on_delta/on_finish/cleanup;命中 repeat-truncate 时将finish_reason设为repeat_truncate并 abort 对应 choice。
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/output/fallback/init.py | 暴露公共类并导入三个内置策略以触发注册 |
| fastdeploy/output/fallback/base.py | 定义 fallback context / decision / 抽象基类 |
| fastdeploy/output/fallback/manager.py | 注册表 / 插件加载 / apply / on_delta / on_finish / cleanup |
| fastdeploy/output/fallback/markdown_bold_colon.py | 修正 **xxx:** 冒号位置,支持跨 delta 缓存 |
| fastdeploy/output/fallback/markdown_table.py | 修复 Markdown 表格分隔行 / 列数不一致 |
| fastdeploy/output/fallback/repeat_truncate.py | 基于 token window 检测重复输出并触发 truncate |
| fastdeploy/engine/args_utils.py | 增加 3 个新 CLI 参数 |
| fastdeploy/entrypoints/openai/api_server.py | 解析参数构建 manager 并注入各 handler,/config-info 暴露相应字段 |
| fastdeploy/entrypoints/openai/serving_chat.py | v0 chat 流/非流路径接入 fallback,含 repeat_truncate finish_reason |
| fastdeploy/entrypoints/openai/serving_completion.py | v0 completion 流/非流路径接入 fallback |
| fastdeploy/entrypoints/openai/v1/serving_base.py | 基类构造接收 manager 并在 finally 清理状态 |
| fastdeploy/entrypoints/openai/v1/serving_chat.py | v1 chat 接入 fallback(非多模态路径) |
| fastdeploy/entrypoints/openai/v1/serving_completion.py | v1 completion 接入 fallback |
| tests/output/test_fallback.py | 覆盖 manager、内置策略、流式 hold/flush/truncate、cleanup、插件导入 |
| choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index] | ||
| choice.finish_reason = self._calc_finish_reason(request_output, max_tokens, choice_completion_tokens) | ||
| if fallback_truncated: | ||
| choice.finish_reason = "repeat_truncate" |
| if res.get("error_msg") is not None and "Aborted" in res["error_msg"]: | ||
| choices[-1].finish_reason = "abort" | ||
| if fallback_truncated: | ||
| choices[-1].finish_reason = "repeat_truncate" |
| choice.finish_reason = "abort" | ||
|
|
||
| if fallback_truncated: | ||
| choice.finish_reason = "repeat_truncate" |
| if fallback_truncated: | ||
| choice.finish_reason = "repeat_truncate" |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #7942 +/- ##
==========================================
Coverage ? 68.09%
==========================================
Files ? 472
Lines ? 66200
Branches ? 10217
==========================================
Hits ? 45081
Misses ? 18262
Partials ? 2857
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
CI报告基于以下代码生成(30分钟更新一次): 1 Required任务 : 7/10 通过
2 失败详情无
|
| self.data_processor.process_response_dict( | ||
| response_dict=request_output, | ||
| stream=stream, | ||
| include_stop_str_in_output=include_stop_str_in_output, | ||
| request=request, |
| - ``hold`` : buffer this delta inside the strategy's ``state``; downstream | ||
| strategies still run, but the manager emits nothing this round. |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-06-04 17:49:35
📋 Review 摘要
PR 概述:新增 output fallback 框架,为 OpenAI serving 层提供统一的模型输出兜底处理机制,支持自定义策略注册、流式/非流式场景处理及插件加载。
变更范围:fastdeploy/output/fallback/(新模块)、fastdeploy/input/base_processor.py、fastdeploy/entrypoints/openai/(serving_chat/completion/v1/*)、fastdeploy/engine/args_utils.py、fastdeploy/plugins/output_fallback/
影响面 Tag:[Feature] [APIServer] [DataProcessor] [Engine]
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | fastdeploy/output/fallback/manager.py:124 |
hold 时提前 return,当前 round 所有 trial_state 变更未写回 self.states,有状态策略状态机无法正确推进 |
| 🟡 建议 | fastdeploy/output/fallback/manager.py:151 |
on_finish 中 buffer 非空时错误调用了 strategy.on_delta 而非 strategy.on_finish,导致自定义策略的 flush 钩子在流结束时被完全跳过 |
历史 Findings 修复情况
| Finding | 问题 | 状态 |
|---|---|---|
| F1 | output_fallback 类型注解缺少 Optional |
✅ 已修复 |
| F2 | v1 streaming 路径缺少 fallback_truncated_choices 保护集 |
✅ 已修复(truncated_choices 已加入 ServeContext) |
| F3 | v1 completion streaming 路径同样缺少保护集 | ✅ 已修复 |
| F4 | repeat_truncate 不是 OpenAI 标准 finish_reason |
✅ 已修复(改为 "length") |
| F5 | truncate 与 hold 同时触发时截断文本被静默丢弃 |
🔄 部分修复(if not truncated 判断已加,但 hold 时 state 不回写问题仍存在,见 N1) |
| F6 | _calc_finish_reason 返回类型注解包含 "repeat_truncate" |
|
| F7 | asdict(output) 热路径深拷贝性能问题 |
|
| F8 | on_finish 返回 action="truncate" 时调用方只检查 .text 不检查 .action |
base_processor.py 中 finish_decision.action 仍未判断) |
| F9 | PR 描述声明支持 drop 但代码无此 action |
✅ 已修复(描述已同步为 send/hold/flush/truncate) |
| F10 | request.n > 1 时 fallback_decode_status key 冲突 |
✅ 已修复(使用完整 req_id(含 ::n::idx 后缀)作为 key,各 choice 相互隔离) |
| F11 | 多策略 on_finish 对 flush 文本做覆盖而非链式传递 |
✅ 已修复(改为 pending += 拼接) |
| F12 | serving 层 cleanup 使用 base request_id 但 manager key 为 choice_id |
✅ 已修复(cleanup 移至 base_processor.py,serving 层不再直接持有 manager) |
| F13 | trial_state = dict(original_state) 浅拷贝导致可变对象泄漏 |
✅ 已修复(改为 copy.deepcopy) |
| F14 | context.delta_text 在 on_finish 调用时保存的是原始 delta,而非经 on_delta 处理后的结果 |
|
| F15 | on_delta 中策略实际接收的是累积缓冲内容,而非参数名 delta_text 暗示的当前增量 |
current_text = buffer 仍传给策略) |
| F16 | self.output_fallback_manager 在 serving 层存储但从未消费,疑为死代码 |
✅ 已修复(manager 现在注入 engine_client.data_processor,serving 层不再持有) |
📝 PR 规范检查
PR 标题 [Feature]Add output fallback support for OpenAI serving 标签后缺少空格,标题格式不符合规范。
标题建议(可直接复制):
[Feature] Add output fallback support for OpenAI serving
PR 描述结构完整,包含 Motivation、Modifications、Usage or Command、Accuracy Tests 和 Checklist 全部必填章节,内容充实,checklist 勾选状态符合实际变更。无需修改描述。
总体评价
本轮迭代解决了 F1–F4、F9–F13、F16 共 10 个历史问题,整体质量明显提升。当前仍需关注两个新发现的 manager.py 逻辑问题(N1/N2):hold 时 trial_state 未写回导致有状态策略状态机失效,以及 on_finish 中 buffer 非空分支未调用 strategy.on_finish 导致自定义 flush 逻辑无法执行。F6/F7/F8/F14/F15 为遗留问题,建议后续迭代修复。
| "Failed to apply streaming output fallback strategy '%s'.", strategy.name | ||
| ) | ||
| trial_states[strategy.name] = trial_state | ||
| continue |
There was a problem hiding this comment.
🟡 建议 hold 时提前 return,当前 round 所有 trial_state 变更未写回 self.states
当某策略返回 action="hold" 时,代码直接 return,跳过了末尾的状态写回循环。结果:
- 发出 hold 的策略在
trial_state里记录的跨轮次状态(如已累积字节数)全部丢失; - 链中排在该策略之前、本轮已运行过的其他策略,其 trial_state 变更同样丢失。
对于需要通过 state dict 追踪何时可放行的有状态策略,这会导致状态机永远无法正确推进。
建议修复:在 return 前先执行已积累的 trial_states 写回:
if decision.action == "hold":
# 先写回已执行策略的状态,再决定是否 return
for name, ts in trial_states.items():
s = self._get_state(request_id, name)
s.clear()
s.update(ts)
if not truncated:
return StreamFallbackDecision(action="hold", text="")
continue| return StreamFallbackDecision(action="send", text=current_text) | ||
|
|
||
| def on_finish(self, request_id: str, context: OutputFallbackContext) -> StreamFallbackDecision: | ||
| buffer = self._buffers.pop(request_id, "") |
There was a problem hiding this comment.
🟡 建议 on_finish 中 buffer 非空时错误调用了 strategy.on_delta 而非 strategy.on_finish,导致自定义策略的 flush 钩子在流结束时被完全跳过。
当最后一个 delta 处于 hold 状态、buffer 非空时,此分支对每个策略调用 on_delta,策略在 on_finish 里实现的 flush 逻辑(如将缓存内容在流结束时统一输出)永远不会被执行。
另:此处 state = self._get_state(...) 直接修改持久 state,与 on_delta 正常路径(先 deepcopy 再写回)不一致,遇到异常会导致 state 半更新。
建议修复:在处理完 buffer 的 on_delta 之后,额外调用每个策略的 on_finish:
if buffer:
# ... 现有 on_delta 处理逻辑(建议同样改用 trial_state + 写回)...
# 再触发每个策略的 on_finish
for strategy in self.instances:
state = self._get_state(request_id, strategy.name)
try:
fdec = strategy.on_finish(context, state)
except Exception:
continue
if fdec.text:
current_text += fdec.text
return StreamFallbackDecision(action="flush", text=current_text)
Motivation
当前推理链路缺少统一的 output fallback 扩展机制,业务侧如果希望对模型输出进行兜底处理,只能在各个下游环节分别适配,难以统一管理。
本 PR 引入 output fallback framework,并将 output fallback 的实际处理前移到 data processor 中,在 reasoning/tool parsing 之前对原始 decoded stream 做统一处理。这样可以保证内容文本、reasoning 内容以及 tool call 相关文本都能共享同一套 fallback 逻辑,同时也为后续扩展自定义 fallback strategy 提供统一入口。
Modifications
本 PR 主要包含以下改动:
新增 output fallback framework
fastdeploy/output/fallback/模块OutputFallbackStrategy抽象基类OutputFallbackContextStreamFallbackDecisionOutputFallbackManager新增 output fallback 插件加载机制
fastdeploy.plugins.output_fallbackfastdeploy.output_fallback_plugins自动加载插件--output-fallback-plugin指定外部插件路径动态导入新增 output fallback 相关启动参数
--output-fallback--output-fallback-plugin--output-fallback-config将 output fallback 的应用前移到 data processor
fastdeploy/input/base_processor.py中新增output_fallback_managerprocess_response_dict_normal()中,对完整输出文本应用 fallbackprocess_response_dict_streaming()中,对 streaming 增量文本应用 fallback支持 streaming 场景下的 fallback 控制语义
send:发送当前 deltahold:暂存当前 delta,本轮不输出flush:在流结束时输出缓存内容truncate:发送当前文本并提前终止后续生成新增 processor 侧 fallback 状态管理
fallback_decode_status调整 OpenAI serving 层职责
on_delta()/on_finish()/apply()fallback_truncated和skipped字段感知 fallback 结果finish_reason = "length"并主动 abort 对应 choice扩展 request / output 数据结构
CompletionOutput中新增:fallback_truncatedskipped补充测试
tests/output/test_fallback.pyUsage or Command
启用指定 fallback strategy:
加载自定义 fallback 插件:
为策略传入配置:
--output-fallback your-strategy-name \ --output-fallback-plugin /path/to/custom_fallback.py \ --output-fallback-config '{"your-strategy-name": {"key": "value"}}'如何增加自定义兜底协议
可以通过继承 OutputFallbackStrategy 并使用 OutputFallbackManager.register(...) 注册自定义策略。
示例:
自定义策略说明:
加载方式有两种:
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.