流式输出
我们用大模型的时候,用户侧看到的输出内容其实是一个词一个词蹦出来的,而不是等大模型全部生成完毕再一起返回的.这就是流式输出.
而如果我们回顾大模型推理,我们会发现大模型自回归生成 token 的方式正好与流式输出相契合.最简单地来说,我们只需要每一次把大模型自回归生成出来的那个 token 返回给前端进行渲染即可,这就是流式输出的雏形.
实现起来不算困难.假设 C/C++ 侧有函数 infer(...) 可以输出一个 token,那么在 Python 侧我们可以用迭代器的写法实现 stream() 进行逐 token 流式输出:调用 C/C++ 侧的 infer() 函数,获取 token 后将结果 yield 出去.这样,在主循环里,我们只需要用 for 循环处理 stream() 即可做到简单的流式输出.
输出到终端的话,可以用 flush;如果是做 API 的话,那就通过 POST 返回生成的 token.
1 | """ in main loop """ |
从 Next Token Generation 到 Chat Server
核心数据结构(Pydantic 模型)
文件里定义了几个关键的请求/响应结构:
ChatMessage:聊天消息(角色:system/user/assistant + 内容)。ChatCompletionRequest:客户端发送的请求.包含:model(默认 qwen2)messages(对话历史)- 生成参数:
max_tokens/temperature/top_p/top_k stream(是否使用 SSE 流式输出)
ChatCompletionResponse:OpenAI 兼容的返回结构,包括 choices,usage(token 统计)等。
运行时封装:ChatRuntime
1 |
|
把 tokenizer + model 封装起来。lock 保证同一时间只有一个请求访问模型(避免模型线程不安全或资源冲突)。
prompt 构造:_render_prompt
会把消息列表转换成模型能理解的文本形式:如果 tokenizer 有 apply_chat_template(部分模型 tokenizer 支持),会调用该模板。其中最后会自动加一个 <assistant>\n,让模型开始生成回复。
生成逻辑(stream / non-stream 两种模式)
非流式(一次生成完毕返回)
- 先把 prompt 编码成 token:
prompt_tokens = tokenizer.encode(prompt) - 调用:
runtime.model.generate(...) - 将模型返回 token 通过
_decode_new_text(...)解码为字符串并返回。 - 返回结构符合 OpenAI Chat Completion 规范(包含
usage、choices等)。
流式(SSE:Server-Sent Events)
当 req.stream=True 时,API 通过 StreamingResponse 实现逐步返回:
- 先发送一个 chunk 指出 assistant 要开始输出。
- 然后在循环里不断从
runtime.model.generate_stream(...)获取新 token。 - 每次生成新 token,解码新增的文本差异并通过 SSE 发送出来(
data: {...}\n\n)。 - 最后发送一个
stopchunk +data: [DONE]结束流。
并发保护
无论是流式还是一次性生成,都有:
1 | with runtime.lock: |
确保同一时间只会有一个请求访问底层模型(通常是因为模型推理不是线程安全的)。
环境变量构建 runtime:build_runtime_from_env()
在启动时通常会用这个函数来构建 ChatRuntime:
- 读取环境变量:
LLAISYS_MODEL_PATH(默认 data)、LLAISYS_DEVICE(默认cpu) - 根据
device选择DeviceType.CPU/DeviceType.NVIDIA - 用
AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)加载 tokenizer - 用
Qwen2(model_path, device)加载模型
提供的 HTTP 接口
GET /health:简单健康检查(返回{"status": "ok"})POST /v1/chat/completions:Chat Completion 接口(兼容 OpenAI 格式)
create_app()
- 输入和验证
- Endpoint:
POST /v1/chat/completions,请求模型由ChatCompletionRequest(pydantic)定义(model、messages、max_tokens、temperature、top_p、top_k、stream)。 - 简单验证:若
messages为空则返回 400(HTTPException)。
- Endpoint:
- Prompt 构建与编码
_render_prompt(messages, tokenizer):先把消息映射为 role/content 列表;如果 tokenizer 有apply_chat_template方法,就调用它并让 tokenizer 负责插入生成提示(add_generation_prompt=True);否则回退为简单格式:
"<role>\n<content>\n"每条消息并在末尾加入"<assistant>\n"。prompt_tokens = runtime.tokenizer.encode(prompt):把 prompt 编成 token id 列表(注意此处prompt_tokens是 listint,后文用其长度判断 prompt 长度)。
- 非流式(一次性)生成路径
- 在
runtime.lock内调用runtime.model.generate(prompt_tokens, max_new_tokens=..., top_k=..., top_p=..., temperature=...)。 - 假设
generate返回的是包含 prompt 的完整 token 列表(代码通过len(generated_tokens) - len(prompt_tokens)计算生成长度)。 - 用
_decode_new_text(tokenizer, all_tokens, prompt_len)(即decode(all_tokensprompt_len:))得到新生成的文本answer_text。 - 构建并返回
ChatCompletionResponse(含id、created、model、choices、usage),FastAPI 会用 pydantic response_model 做序列化/校验。
- 在
- 流式(SSE)生成路径
- 若请求
req.stream为True,则返回StreamingResponse(event_stream(), media_type="text/event-stream")。 event_stream()是个 generator:整个生成过程包裹在with runtime.lock:(即流期间也锁住模型)。- 初始先发送一条 chunk 表示 assistant 角色(
delta包含"role": "assistant")。 - 然后迭代
runtime.model.generate_stream(prompt_tokens, ...):每收到一个 token id,就把它append到 generated,用runtime.tokenizer.decode(generated, skip_special_tokens=True)得到当前全部已生成文本current_text,通过字符串差分(current_text[len(last_text):])得到本次增量delta_text,再以 SSE(data: JSON)形式yield增量 chunk(delta.content)。 - 结束后发送一个 finish chunk(
finish_reason: "stop")并发送data: [DONE]结尾(兼容常见 SSE 消费端习惯)。
- 若请求
- 线程锁与并发考虑
ChatRuntime.lock(threading.Lock)在所有模型调用路径中被持有,保证模型/生成器调用的线程安全- 后果:持锁期间其它请求会被阻塞,尤其是流式生成会长时间占用锁。若需要并发吞吐,应考虑多实例模型池、外部队列或异步推理后端。
- SSE 事件格式与兼容性
- 每个 SSE 事件数据是 JSON(通过
_sse(event)生成,ensure_ascii=False保持 Unicode),遵循类似 OpenAI chunk 格式:object: “chat.completion.chunk”,choices 列表包含delta(可能是 role 或 content)和finish_reason。 - 非流式返回使用
ChatCompletionResponse,字段usage使用 prompt/生成 token 数量统计(注意统计方法基于 token id 列表长度)。
实现上的注意点与潜在问题
- 每个 SSE 事件数据是 JSON(通过
- token 长度的计算依赖
tokenizer.encode与generate/generate_stream的返回格式一致(即 generate 返回包含 prompt 的 token 列表);否则计算completion_tokens/切片会出错,需确认模型 API 语义。 - 流式增量是用字符层面的差分(字符串切片)计算 delta,这在某些 tokenizer(或含特殊 tokens)下可能导致边界/重叠问题;更稳健的做法是基于 token 层面发送增量并由客户端 decode,或用 tokenizer 提供的逐 token decode API。
- 错误处理较简单:如果模型在生成过程中抛异常,会变成 500;流式场景下需要确保生成异常时能优雅地在 SSE 中通知客户端(当前未做专门报错事件)。
- 性能和资源:在高并发场景下持锁会成为瓶颈;同时大型模型需要考虑内存/显存管理(
build_runtime_from_env()里根据env加载模型与 tokenizer)。