流式输出

我们用大模型的时候,用户侧看到的输出内容其实是一个词一个词蹦出来的,而不是等大模型全部生成完毕再一起返回的.这就是流式输出.

而如果我们回顾大模型推理,我们会发现大模型自回归生成 token 的方式正好与流式输出相契合.最简单地来说,我们只需要每一次把大模型自回归生成出来的那个 token 返回给前端进行渲染即可,这就是流式输出的雏形.

实现起来不算困难.假设 C/C++ 侧有函数 infer(...) 可以输出一个 token,那么在 Python 侧我们可以用迭代器的写法实现 stream() 进行逐 token 流式输出:调用 C/C++ 侧的 infer() 函数,获取 token 后将结果 yield 出去.这样,在主循环里,我们只需要用 for 循环处理 stream() 即可做到简单的流式输出.

输出到终端的话,可以用 flush;如果是做 API 的话,那就通过 POST 返回生成的 token.

1
2
3
4
5
6
7
8
9
10
11
""" in main loop """
for token in model.stream():
# decode with tokenizer ...

""" in model.py """
def stream(self):
# ...
token = # Call C++ model inference ...
# ...

yield token

从 Next Token Generation 到 Chat Server

核心数据结构(Pydantic 模型)

文件里定义了几个关键的请求/响应结构:

  • ChatMessage:聊天消息(角色:system/user/assistant + 内容)。
  • ChatCompletionRequest:客户端发送的请求.包含:
    • model(默认 qwen2)
    • messages(对话历史)
    • 生成参数:max_tokens/temperature/top_p/top_k
    • stream(是否使用 SSE 流式输出)
  • ChatCompletionResponse:OpenAI 兼容的返回结构,包括 choices,usage(token 统计)等。

运行时封装:ChatRuntime

1
2
3
4
5
@dataclass
class ChatRuntime:
tokenizer: AutoTokenizer
model: Qwen2
lock: threading.Lock = field(default_factory=threading.Lock)

tokenizer + model 封装起来。lock 保证同一时间只有一个请求访问模型(避免模型线程不安全或资源冲突)。

prompt 构造:_render_prompt

会把消息列表转换成模型能理解的文本形式:如果 tokenizer 有 apply_chat_template(部分模型 tokenizer 支持),会调用该模板。其中最后会自动加一个 <assistant>\n,让模型开始生成回复。

生成逻辑(stream / non-stream 两种模式)

非流式(一次生成完毕返回)

  • 先把 prompt 编码成 token:prompt_tokens = tokenizer.encode(prompt)
  • 调用:runtime.model.generate(...)
  • 将模型返回 token 通过 _decode_new_text(...) 解码为字符串并返回。
  • 返回结构符合 OpenAI Chat Completion 规范(包含 usagechoices 等)。

流式(SSE:Server-Sent Events)

req.stream=True 时,API 通过 StreamingResponse 实现逐步返回:

  • 先发送一个 chunk 指出 assistant 要开始输出。
  • 然后在循环里不断从 runtime.model.generate_stream(...) 获取新 token。
  • 每次生成新 token,解码新增的文本差异并通过 SSE 发送出来(data: {...}\n\n)。
  • 最后发送一个 stop chunk + data: [DONE] 结束流。

并发保护

无论是流式还是一次性生成,都有:

1
2
with runtime.lock:
...

确保同一时间只会有一个请求访问底层模型(通常是因为模型推理不是线程安全的)。

环境变量构建 runtime:build_runtime_from_env()

在启动时通常会用这个函数来构建 ChatRuntime

  • 读取环境变量:LLAISYS_MODEL_PATH(默认 data)、LLAISYS_DEVICE(默认 cpu
  • 根据 device 选择 DeviceType.CPU / DeviceType.NVIDIA
  • AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) 加载 tokenizer
  • Qwen2(model_path, device) 加载模型

提供的 HTTP 接口

  • GET /health:简单健康检查(返回 {"status": "ok"}
  • POST /v1/chat/completions:Chat Completion 接口(兼容 OpenAI 格式)

create_app()

  • 输入和验证
    • Endpoint: POST /v1/chat/completions,请求模型由 ChatCompletionRequest(pydantic)定义(model、messages、max_tokens、temperature、top_p、top_k、stream)。
    • 简单验证:若 messages 为空则返回 400(HTTPException)。
  • Prompt 构建与编码
    • _render_prompt(messages, tokenizer):先把消息映射为 role/content 列表;如果 tokenizer 有 apply_chat_template 方法,就调用它并让 tokenizer 负责插入生成提示(add_generation_prompt=True);否则回退为简单格式:
      "<role>\n<content>\n" 每条消息并在末尾加入 "<assistant>\n"
    • prompt_tokens = runtime.tokenizer.encode(prompt):把 prompt 编成 token id 列表(注意此处 prompt_tokens 是 listint,后文用其长度判断 prompt 长度)。
  • 非流式(一次性)生成路径
    • runtime.lock 内调用 runtime.model.generate(prompt_tokens, max_new_tokens=..., top_k=..., top_p=..., temperature=...)
    • 假设 generate 返回的是包含 prompt 的完整 token 列表(代码通过 len(generated_tokens) - len(prompt_tokens) 计算生成长度)。
    • _decode_new_text(tokenizer, all_tokens, prompt_len)(即 decode(all_tokensprompt_len:))得到新生成的文本 answer_text
    • 构建并返回 ChatCompletionResponse(含 id、created、model、choices、usage),FastAPI 会用 pydantic response_model 做序列化/校验。
  • 流式(SSE)生成路径
    • 若请求 req.streamTrue,则返回 StreamingResponse(event_stream(), media_type="text/event-stream")
    • event_stream() 是个 generator:整个生成过程包裹在 with runtime.lock:(即流期间也锁住模型)。
    • 初始先发送一条 chunk 表示 assistant 角色(delta 包含 "role": "assistant")。
    • 然后迭代 runtime.model.generate_stream(prompt_tokens, ...):每收到一个 token id,就把它 append 到 generated,用 runtime.tokenizer.decode(generated, skip_special_tokens=True) 得到当前全部已生成文本 current_text,通过字符串差分(current_text[len(last_text):])得到本次增量 delta_text,再以 SSE(data: JSON)形式 yield 增量 chunk(delta.content)。
    • 结束后发送一个 finish chunk(finish_reason: "stop")并发送 data: [DONE] 结尾(兼容常见 SSE 消费端习惯)。
  • 线程锁与并发考虑
    • ChatRuntime.lockthreading.Lock)在所有模型调用路径中被持有,保证模型/生成器调用的线程安全
    • 后果:持锁期间其它请求会被阻塞,尤其是流式生成会长时间占用锁。若需要并发吞吐,应考虑多实例模型池、外部队列或异步推理后端
  • SSE 事件格式与兼容性
    • 每个 SSE 事件数据是 JSON(通过 _sse(event) 生成,ensure_ascii=False 保持 Unicode),遵循类似 OpenAI chunk 格式:object: “chat.completion.chunk”,choices 列表包含 delta(可能是 role 或 content)和 finish_reason
    • 非流式返回使用 ChatCompletionResponse,字段 usage 使用 prompt/生成 token 数量统计(注意统计方法基于 token id 列表长度)。
      实现上的注意点与潜在问题
  • token 长度的计算依赖 tokenizer.encodegenerate/generate_stream 的返回格式一致(即 generate 返回包含 prompt 的 token 列表);否则计算 completion_tokens/切片会出错,需确认模型 API 语义。
  • 流式增量是用字符层面的差分(字符串切片)计算 delta,这在某些 tokenizer(或含特殊 tokens)下可能导致边界/重叠问题;更稳健的做法是基于 token 层面发送增量并由客户端 decode,或用 tokenizer 提供的逐 token decode API。
  • 错误处理较简单:如果模型在生成过程中抛异常,会变成 500;流式场景下需要确保生成异常时能优雅地在 SSE 中通知客户端(当前未做专门报错事件)。
  • 性能和资源:在高并发场景下持锁会成为瓶颈;同时大型模型需要考虑内存/显存管理(build_runtime_from_env() 里根据 env 加载模型与 tokenizer)。