本文主要介绍如何用原生 Node.js http 模块和前端 fetch API 实现一个健壮的、类似 ChatGPT 效果的流式 AI 响应服务
以及本人在实际操作过程中遇到的一些问题和解决方法
# 第一步:搭建基础 - 一个简单的 HTTP 服务
首先,我们需要一个能响应客户端请求的服务器。我们使用 Node.js 内置的 http 模块,因为它无需任何外部依赖,能让我们更专注于底层的流式数据处理server.js - 版本 1
import http from 'http'; | |
const hostname = '127.0.0.1'; | |
const port = 5173; | |
const server = http.createServer(async (req, res) => { | |
// 设置响应头,告诉浏览器我们要发送的是 “服务器发送事件(SSE)” | |
res.writeHead(200, { | |
'Content-Type': 'text/event-stream', | |
'Cache-Control': 'no-cache', | |
'Connection': 'keep-alive', | |
}); | |
await new Promise((resolve, reject) => { | |
let id = 0; | |
const intervalId = setInterval(() => { | |
id++; | |
if (id > 4) { | |
clearInterval(intervalId); | |
resolve(); | |
} | |
// SSE 格式:以 "data:" 开头,以 "\n\n" 结尾 | |
console.log(`Sending message ${id}`); | |
res.write(`data: 这是消息 ${id}\n\n`); | |
}, 1000); | |
}); | |
// 结束响应 | |
res.end(); | |
// 当客户端关闭连接时,停止发送 | |
req.on('close', () => { | |
console.log('客户端已断开连接。'); | |
}); | |
}); | |
server.listen(port, hostname, () => { | |
console.log(`服务器正在运行于 http://${hostname}:${port}/`); | |
}); |
test.js - 版本 1
async function runTest() { | |
// 1 | |
const response = await fetch("http://127.0.0.1:5173"); | |
// const reader = response.body.getReader(); | |
// const decoder = new TextDecoder(); | |
// 等待消息发送完后,打印完整响应 | |
// 2 | |
console.log(await response.text()); | |
// 流式读取响应体 | |
// while (true) { | |
// const { value, done } = await reader.read(); | |
// if (done) break; | |
// const chunk = decoder.decode(value); | |
// console.log(chunk); | |
// } | |
} | |
runTest(); |
此时运行这两个文件,会发现,直到所有消息发送完毕,才会打印完整响应
但是在实际的 AI 对话中,我们希望能够边接收边处理响应,而不是等待所有消息发送完毕
这个时候我们把目光放在代码中的两个 await 语句
- 第一个
await fetch("http://127.0.0.1:5173")等待服务器响应 - 第二个
await response.text()等待所有消息发送完毕,返回完整响应
可以很清楚的看到,我们需要修改的是第二个 await response.text()
这个时候就要用流式读取响应体的方式,边接收边处理响应
这样,我们就以及建立了一个持久的流式连接
# 第二步:连接 AI 模型 - 获取真实的流式数据
接下来我们把静态消息转换成真实的 AI 响应。我们使用 openai 库来调用 AI 模型,并开启流式响应模式( stream: true )
但这里我们遇到了第一个问题
# 问题 1:原生 node.js 中, req.body 在哪里?
当客户端通过 POST 请求把用户的提问(Prompt)发送给服务器时,我们习惯从 req.body 中获取数据。但在原生 http 模块中, req 对象是一个可读流( ReadableStream ),而不是一个普通的对象。没有像 express 这样的框架自动解析 POST 请求体的功能。我们需要手动从流中读取数据
解决方法:监听 req 对象的 data 和 end 事件来异步拼接和解析请求体
下面就是 server.js 的进化版本,包含了请求体解析和调用 AI 的逻辑server.js - 版本 2
//... (OpenAI 客户端配置) ... | |
// 异步获取请求体的辅助函数 | |
function getRequestBody(req) { | |
return new Promise((resolve, reject) => { | |
let body = ''; | |
req.on('data', (chunk) => { | |
body += chunk.toString(); | |
}); | |
req.on('end', () => { | |
try { | |
// 当请求体为空时,返回一个空对象,避免 JSON.parse 报错 | |
resolve(JSON.parse(body || '{}')); | |
} catch (error) { | |
reject(new Error('无效的 JSON 请求体')); | |
} | |
}); | |
req.on('error', (err) => reject(err)); | |
}); | |
} | |
const server = http.createServer(async (req, res) => { | |
//... (设置 SSE 响应头) ... | |
try { | |
const body = await getRequestBody(req); // <-- 使用辅助函数 | |
const userPrompt = body.prompt; | |
if (!userPrompt) { | |
res.end('data: {"error": "Prompt 不能为空"}\n\n'); | |
return; | |
} | |
const stream = await openai.chat.completions.create({ | |
model: 'YOUR_AI_MODEL', // 替换成你的模型 | |
messages: [{ role: 'user', content: userPrompt }], | |
stream: true, | |
}); | |
// 迭代 AI 返回的流 | |
for await (const chunk of stream) { | |
const content = chunk.choices[0]?.delta?.content || ''; | |
if (content) { | |
// 将 AI 的内容块封装成 SSE 格式并发送 | |
res.write(`data: ${JSON.stringify({ content })}\n\n`); | |
} | |
} | |
} catch (error) { | |
//... (错误处理) ... | |
} finally { | |
res.end(); // 结束响应 | |
} | |
}); | |
//... (服务器监听) ... |
现在,服务器以及能够接收客户端的 prompt 了,并将数据流发送给客户端
# 第三步:客户端陷阱 -"看似正确的解析"
在客户端,自然的想到,既然服务器发送过来的消息都以 \n\n 结尾,那么我们就可以用 split('\n\n') 来解析消息test.js -“天真” 版本
//... (fetch 和 reader 初始化) ... | |
while (true) { | |
const { value, done } = await reader.read(); | |
if (done) break; | |
const chunk = decoder.decode(value); | |
const lines = chunk.split('\n\n'); // <-- 天真的分割 | |
for (const line of lines) { | |
if (line.startsWith('data: ')) { | |
const json = line.substring(6); | |
if (json) { | |
const data = JSON.parse(json); | |
console.log(data.content); | |
} | |
} | |
} | |
} |
一开始,这段代码几乎总是能正常工作的。这给我们一种 “没问题” 的错觉。然而,这后面隐藏了一个严重的问题
# 问题 2:为什么简单的分割方法是不可靠的?
核心原因:TCP 是一个面向字节流的协议,而不是面向消息的协议
这意味着,服务器调用一次 res.write() 发送的消息在网络传输过程中可能会被分割为多个 TCP 包。相应的,客户端调用一次 reader.read() 可能只收到这些包中的一个,也就是一条完整 SSE 消息的一部分
想象一下:
- 服务器发送:
data: {"content":"你好"}\n\n - 网络传输过程中,这条消息被拆分为两个 TCP 包:
- 包 1:
data: {"con - 包 2:
tent":"你好"}\n\n
- 包 1:
- 分割解析:
- 第一次
read收到包1,分割后得到data: {"con,解析会报错 - 第二次
read收到包2,分割后得到好"},解析会报错
- 第一次
结果就是数据丢失或解析错误。本地主要是因为延迟低、速度块,Node.js 的网络层优化(如 Nagle 算法)常常会把多次小的 write 合并成一个大的 TCP 包再发送,掩盖了这个问题
# 第四步:稳健之法 - 实现客户端缓冲区
为了解决消息碎片问题,我们必须在客户端实现一个缓冲区。它的工作原理就像一个蓄水池
- 蓄水:将每次
reader.read()到的字节数据暂存到缓冲区 - 处理:当缓冲区积累到足够多的数据(如
\n\n)时,进行解析和处理 - 放水:如果找到分隔符,就从缓冲区中提取出一条完整的信息进行处理
- 清理:将已经处理过的消息从缓冲区头部移除
- 循环:重复以上步骤
test.js- 稳健版本
async function getAIStream() { | |
//... (fetch 和 reader 初始化) ... | |
let buffer = ""; // <-- 关键的缓冲区 | |
while (true) { | |
const { value, done } = await reader.read(); | |
// 将新数据块解码并追加到缓冲区 | |
buffer += decoder.decode(value, { stream: true }); | |
if (done) { | |
processBuffer(); // 流结束时,最后处理一次缓冲区 | |
break; | |
} | |
processBuffer(); // 每次收到数据都尝试处理 | |
} | |
// 将消息处理逻辑提取成一个函数 | |
function processBuffer() { | |
let boundaryIndex; | |
// 循环处理缓冲区中所有完整的消息 | |
while ((boundaryIndex = buffer.indexOf('\n\n')) !== -1) { | |
// 提取一条完整的消息 | |
const message = buffer.substring(0, boundaryIndex); | |
// 从缓冲区移除已处理的消息 | |
buffer = buffer.substring(boundaryIndex + 2); | |
if (message.startsWith('data: ')) { | |
const json = message.substring(6); | |
if (json) { | |
try { | |
const data = JSON.parse(json); | |
console.log(data.content); | |
} catch (e) { | |
console.error("无法解析的 JSON:", json); | |
} | |
} | |
} | |
} | |
} | |
// ... | |
} |
# 第五步:验证
修改服务器,让它故意将一条消息拆开发送server.js - 破坏性测试
console.log("--- 开始破坏性测试 ---"); | |
console.log("发送不完整的消息的第一部分..."); | |
// 1. 发送一条 SSE 消息的前半部分,注意末尾没有 `\n\n` | |
res.write('data: {"content":"This is a very long message that will be broken'); | |
// 2. 等待 50 毫秒 | |
await new Promise((resolve) => { | |
setTimeout(() => { | |
console.log("50ms 后,发送消息的剩余部分..."); | |
res.write(' in the middle."}\n\n'); | |
res.end(); | |
console.log("--- 测试结束 ---"); | |
resolve(); | |
}, 50); | |
}); |
使用这个和天真的版本对比,我们可以发现,稳健版本能够正确处理消息碎片,而天真版本在这种情况下会丢失数据
# 总结
- 使用原生模块时,不能想当然的认为框架提供的便利功能时理所应当存在的
- 在处理任何基于流的协议时,客户端缓冲区时保证数据完整性的关键
- 流可以让我们以增量的方式处理数据,而不是一次性接收所有数据
# 源码
server.js
import http from 'http'; | |
import dotenv from 'dotenv'; | |
import OpenAI from 'openai'; | |
dotenv.config(); | |
// 配置 AI 客户端 | |
const openai = new OpenAI({ | |
// 替换 | |
apiKey: process.env.ARK_API_KEY, | |
baseURL: 'https://ark.cn-beijing.volces.com/api/v3', | |
}); | |
const hostname = '127.0.0.1'; | |
const port = 5173; | |
function getRequestBody(req) { | |
return new Promise((resolve, reject) => { | |
let body = ''; | |
req.on('data', (chunk) => { | |
body += chunk.toString(); // Buffer 转字符串 | |
}); | |
req.on('end', () => { | |
try { | |
resolve(JSON.parse(body || '{}')); | |
} catch (error) { | |
reject(new Error('Invalid JSON in request body')); | |
} | |
}); | |
req.on('error', (err) => { | |
reject(err); | |
}); | |
}); | |
} | |
const server = http.createServer(async (req, res) => { | |
try { | |
res.writeHead(200, { | |
'Content-Type': 'text/event-stream', // SSE 标准类型 | |
'Cache-Control': 'no-cache', | |
'Connection': 'keep-alive', | |
}); | |
const body = await getRequestBody(req); | |
const userPrompt = body.prompt; | |
console.log(`收到 Prompt: "${userPrompt}"`); | |
console.log("正在向 AI 请求流式响应..."); | |
const stream = await openai.chat.completions.create({ | |
model: 'ep-m-20251118124237-7zths', | |
messages: [{ role: 'user', content: userPrompt }], | |
stream: true, | |
}); | |
console.log("AI 流已开始,正在转发给客户端..."); | |
for await (const chunk of stream) { | |
const content = chunk.choices[0].delta?.content || ''; | |
if (content) { | |
// 将每个数据块按 SSE 格式写入响应流 | |
console.log(`发送内容块: "${content}"`); | |
res.write(`data: ${JSON.stringify({ content })}\n\n`); | |
} | |
} | |
console.log("AI 流结束。"); | |
//console.log ("--- 开始破坏性测试 ---"); | |
//console.log ("发送不完整的消息的第一部分..."); | |
//// 1. 发送一条 SSE 消息的前半部分,注意末尾没有 `\n\n` | |
// res.write('data: {"content":"This is a very long message that will be broken'); | |
//// 2. 等待 50 毫秒 | |
// await new Promise((resolve) => { | |
// setTimeout(() => { | |
// console.log ("50ms 后,发送消息的剩余部分..."); | |
// res.write(' in the middle."}\n\n'); | |
// // End the response *inside* the timeout callback | |
// res.end(); | |
// console.log ("--- 测试结束 ---"); | |
// resolve(); // Signal that the async operation is complete | |
// }, 50); | |
// }); | |
} catch (error) { | |
console.error("处理请求时出错:", error); | |
if (!res.headersSent) { | |
res.writeHead(500); | |
} | |
res.end('调用 AI 服务时发生错误。'); | |
} finally { | |
if (!res.writableEnded) { | |
res.end(); | |
} | |
} | |
}); | |
server.listen(port, hostname, () => { | |
console.log(`AI 服务器正在运行于 http://${hostname}:${port}/`); | |
console.log('按 Ctrl+C 停止服务器。'); | |
}) |
test.js
async function getAIStream() { | |
console.log("正在连接到 AI 服务器..."); | |
const response = await fetch("http://127.0.0.1:5173/chat", { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify({ prompt: "你好" }) | |
}); | |
const reader = response.body.getReader(); | |
const decoder = new TextDecoder(); | |
let fullText = ""; | |
let buffer = ""; // 缓冲区 | |
console.log("连接成功,开始接收数据流..."); | |
while (true) { | |
const {value, done} = await reader.read(); | |
if (done) { | |
processBuffer(); | |
if (buffer) { | |
console.log('处理流结束后缓冲区中的不完整文本:', buffer); | |
} | |
console.log('\n--- 数据流结束 ---'); | |
break; | |
} | |
buffer += decoder.decode(value, { stream: true }); | |
processBuffer(); | |
} | |
// 将消息处理逻辑提取到一个独立的函数中,以便复用 | |
function processBuffer() { | |
let boundaryIndex; | |
while ((boundaryIndex = buffer.indexOf('\n\n')) !== -1) { | |
const message = buffer.slice(0, boundaryIndex); | |
buffer = buffer.slice(boundaryIndex + 2); | |
if (message.startsWith('data: ')) { | |
const json = message.slice(6); | |
if (json) { | |
try { | |
const data = JSON.parse(json); | |
const content = data.content; | |
fullText += content; | |
console.log(`收到片段: "${content}"`); | |
} catch (e) { | |
console.error('无法解析的 JSON 数据:', json); | |
} | |
} | |
} | |
} | |
} | |
console.log("\n完整响应:", fullText); | |
} | |
getAIStream(); |
# 补充
我们手动的揭示了底层的细节,但在真正的项目中,我们应该站在巨人的肩膀上,使用社区沉淀的最佳实践和工具库来简化代码、提高健壮性
- 后端:使用
express.js简化服务器的开发 - 前端:使用
@microsoft/fetch-event-source替代我们自己手写的buffer处理逻辑- API 兼容:与
fetch类似 - 自动解析:内置了 SSE 解析逻辑,不再关系
buffer、等前缀处理 - 错误处理和重试:提供更完整的生命周期钩子
- 健壮性
- API 兼容:与