流式传输

最近在折腾 LLM(大语言模型)的本地部署,发现很多推理框架都支持”流式输出”——也就是模型一边生成文字,前端一边展示,而不是等全部生成完再一次性吐出来。这种体验对用户来说更自然、响应更快,背后依赖的就是”流式传输”技术。于是顺手整理一下相关知识,顺便记录下自己踩过的坑。

什么是流式传输?

流式传输(Streaming)是指数据在生成的同时就被发送和处理,而不是等到全部生成完毕才一次性传输。这种方式特别适合以下场景:

  • 大文件上传/下载(如视频、日志)
  • 实时通信(如聊天、直播、IoT 数据)
  • AI 模型推理输出(如 LLM 逐字生成回答)
  • 日志监控或事件推送

与传统的”请求-响应”模式(Request-Response)不同,流式传输通常采用长连接分块传输(Chunked Transfer),让客户端能”边收边用”。

常见的流式协议/技术

技术 特点 适用场景
HTTP Chunked Transfer HTTP/1.1 原生支持,服务端分块返回数据 简单文本流、API 流式响应
Server-Sent Events (SSE) 基于 HTTP 的单向流(服务器 → 客户端),自动重连 实时通知、日志推送
WebSocket 双向全双工通信 聊天、游戏、协作编辑
gRPC Stream 基于 HTTP/2 的高性能流 微服务间通信、AI 推理管道

对于大多数 Web 项目,SSE 是最简单且兼容性最好的选择(尤其配合 JavaScript 的 EventSource)。

快速上手:用 Python + FastAPI 实现 SSE 流

假设你想做一个”逐字输出”的 AI 回答接口,可以这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def fake_llm_stream():
response = "你好!这是一个流式传输的示例。"
for char in response:
yield char
await asyncio.sleep(0.1) # 模拟生成延迟

@app.get("/stream")
async def stream_response():
return StreamingResponse(fake_llm_stream(), media_type="text/plain")

前端用 fetchEventSource 即可逐字接收:

1
2
3
4
5
6
7
8
9
10
// 使用 fetch + ReadableStream(推荐)
const response = await fetch('/stream');
const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(decoder.decode(value)); // 逐字打印
}

注意:若用 EventSource,需按 SSE 格式返回(如 data: ...\n\n),否则会解析失败。

为什么流式体验更好?

  • 降低感知延迟:用户不用干等,看到第一个字就知道”有反应了”
  • 节省内存:服务端无需缓存完整结果,尤其适合长文本生成
  • 容错性更强:即使中途断开,已接收的部分依然可用

流式传输的更多应用场景

除了 Web API 和 AI 推理,流式传输在多个领域都有广泛应用:

1. 数据库查询流

大型查询结果可被流式读取,避免一次性加载进内存。例如:

  • PostgreSQL 的 COPY ... TO STDOUT 支持流式导出
  • MongoDB 的 cursor.forEach() 可逐条处理文档
  • Node.js 中可通过 stream 模块消费数据库游标

2. 文件处理与转换

处理超大文件(如日志、CSV、视频)时,可边读边处理:

  • 使用 fs.createReadStream() + Transform 流实现内存友好的 ETL
  • 视频转码工具(如 FFmpeg)常以流方式输入/输出,减少中间存储

3. 消息队列与事件总线

Kafka、RabbitMQ、Redis Streams 等系统本质上都是流式架构:

  • 消费者订阅 topic,持续接收新消息
  • 支持背压(backpressure)控制,防止下游过载
  • 适用于微服务解耦、审计日志、实时分析等

4. CLI 工具与管道(Unix Philosophy)

Unix/Linux 的”管道”(|)就是经典的流式思想:

1
cat huge.log | grep "ERROR" | awk '{print $3}' | sort | uniq -c

每一步只处理当前行,不等待整个文件,高效且组合性强。

5. 边缘计算与 IoT

设备产生的传感器数据通常是连续不断的流:

  • 边缘节点对数据流做初步过滤/聚合
  • 仅将关键事件上传云端,节省带宽
  • 配合 MQTT、CoAP 等轻量协议实现低功耗流传输

小贴士 & 坑点提醒

  1. Nginx / Cloudflare 默认缓冲
    有些反向代理会缓存整个响应才转发,导致流”卡住”。解决方法:

    • Nginx 加 proxy_buffering off;
    • Cloudflare 开启”开发模式”或使用 Cache-Control: no-cache
  2. 前端别用 response.text()
    这会等全部数据收完才解析。务必用 ReadableStream 手动读取。

  3. 超时设置
    长连接容易被网关/浏览器中断,建议:

    • 服务端设置合理的 keep-alive
    • 客户端实现重连逻辑(SSE 自带)
  4. 背压处理(Backpressure)
    当消费者处理速度慢于生产者时,需有机制暂停或丢弃数据,避免 OOM。Node.js 的 stream、Rust 的 tokio::sync::mpsc、Go 的 channel 都内置了背压支持。

结语

流式传输不是什么新技术,但在 AI 时代重新变得重要——毕竟没人想等 30 秒才看到第一个字。
更重要的是,它体现了一种增量处理、资源友好、响应及时的工程哲学,无论是在后端服务、数据管道,还是终端工具链中,都值得我们善加利用。

如果你也在做 Web 应用、AI 接口或需要处理大量数据的项目,不妨试试流式方案。它不一定能提升总耗时,但一定能提升用户体验。下一篇可能会写怎么用 FastAPI + SSE 实现一个简单的流式聊天接口,敬请期待(如果我不懒的话)。

(本文未涉及音视频流如 HLS、DASH、RTMP,那属于媒体传输的专门领域,如有兴趣可另开专题。)