本文主要介绍如何用原生 Node.js http 模块和前端 fetch API 实现一个健壮的、类似 ChatGPT 效果的流式 AI 响应服务
以及本人在实际操作过程中遇到的一些问题和解决方法


# 第一步:搭建基础 - 一个简单的 HTTP 服务

首先,我们需要一个能响应客户端请求的服务器。我们使用 Node.js 内置的 http 模块,因为它无需任何外部依赖,能让我们更专注于底层的流式数据处理
server.js - 版本 1

import http from 'http';
const hostname = '127.0.0.1';
const port = 5173;
const server = http.createServer(async (req, res) => {
  // 设置响应头,告诉浏览器我们要发送的是 “服务器发送事件(SSE)”
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  });
  await new Promise((resolve, reject) => {
    let id = 0;
    const intervalId = setInterval(() => {
      id++;
      if (id > 4) {
        clearInterval(intervalId);
        resolve();
      }
      // SSE 格式:以 "data:" 开头,以 "\n\n" 结尾
      console.log(`Sending message ${id}`);
      res.write(`data: 这是消息 ${id}\n\n`);
    }, 1000);
  });
  // 结束响应
  res.end();
  // 当客户端关闭连接时,停止发送
  req.on('close', () => {
    console.log('客户端已断开连接。');
  });
});
server.listen(port, hostname, () => {
  console.log(`服务器正在运行于 http://${hostname}:${port}/`);
});

test.js - 版本 1

async function runTest() {
  // 1
  const response = await fetch("http://127.0.0.1:5173");
  // const reader = response.body.getReader();
  // const decoder = new TextDecoder();
  // 等待消息发送完后,打印完整响应
  // 2
  console.log(await response.text());
  // 流式读取响应体
  // while (true) {
  //   const { value, done } = await reader.read();
  //   if (done) break;
  //   const chunk = decoder.decode(value);
  //   console.log(chunk);
  // }
}
runTest();

此时运行这两个文件,会发现,直到所有消息发送完毕,才会打印完整响应
但是在实际的 AI 对话中,我们希望能够边接收边处理响应,而不是等待所有消息发送完毕
这个时候我们把目光放在代码中的两个 await 语句

  • 第一个 await fetch("http://127.0.0.1:5173") 等待服务器响应
  • 第二个 await response.text() 等待所有消息发送完毕,返回完整响应

可以很清楚的看到,我们需要修改的是第二个 await response.text()
这个时候就要用流式读取响应体的方式,边接收边处理响应

这样,我们就以及建立了一个持久的流式连接


# 第二步:连接 AI 模型 - 获取真实的流式数据

接下来我们把静态消息转换成真实的 AI 响应。我们使用 openai 库来调用 AI 模型,并开启流式响应模式( stream: true

但这里我们遇到了第一个问题

# 问题 1:原生 node.js 中, req.body 在哪里?

当客户端通过 POST 请求把用户的提问(Prompt)发送给服务器时,我们习惯从 req.body 中获取数据。但在原生 http 模块中, req 对象是一个可读流( ReadableStream ),而不是一个普通的对象。没有像 express 这样的框架自动解析 POST 请求体的功能。我们需要手动从流中读取数据

解决方法:监听 req 对象的 dataend 事件来异步拼接和解析请求体

下面就是 server.js 的进化版本,包含了请求体解析和调用 AI 的逻辑
server.js - 版本 2

//... (OpenAI 客户端配置) ...
// 异步获取请求体的辅助函数
function getRequestBody(req) {
  return new Promise((resolve, reject) => {
    let body = '';
    req.on('data', (chunk) => {
      body += chunk.toString();
    });
    req.on('end', () => {
      try {
        // 当请求体为空时,返回一个空对象,避免 JSON.parse 报错
        resolve(JSON.parse(body || '{}'));
      } catch (error) {
        reject(new Error('无效的 JSON 请求体'));
      }
    });
    req.on('error', (err) => reject(err));
  });
}
const server = http.createServer(async (req, res) => {
  //... (设置 SSE 响应头) ...
  try {
    const body = await getRequestBody(req); // <-- 使用辅助函数
    const userPrompt = body.prompt;
    if (!userPrompt) {
      res.end('data: {"error": "Prompt 不能为空"}\n\n');
      return;
    }
    const stream = await openai.chat.completions.create({
      model: 'YOUR_AI_MODEL', // 替换成你的模型
      messages: [{ role: 'user', content: userPrompt }],
      stream: true,
    });
    // 迭代 AI 返回的流
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content || '';
      if (content) {
        // 将 AI 的内容块封装成 SSE 格式并发送
        res.write(`data: ${JSON.stringify({ content })}\n\n`);
      }
    }
  } catch (error) {
    //... (错误处理) ...
  } finally {
    res.end(); // 结束响应
  }
});
//... (服务器监听) ...

现在,服务器以及能够接收客户端的 prompt 了,并将数据流发送给客户端


# 第三步:客户端陷阱 -"看似正确的解析"

在客户端,自然的想到,既然服务器发送过来的消息都以 \n\n 结尾,那么我们就可以用 split('\n\n') 来解析消息
test.js -“天真” 版本

//... (fetch 和 reader 初始化) ...
while (true) {
  const { value, done } = await reader.read();
  if (done) break;
  
  const chunk = decoder.decode(value);
  const lines = chunk.split('\n\n'); // <-- 天真的分割
  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const json = line.substring(6);
      if (json) {
        const data = JSON.parse(json);
        console.log(data.content);
      }
    }
  }
}

一开始,这段代码几乎总是能正常工作的。这给我们一种 “没问题” 的错觉。然而,这后面隐藏了一个严重的问题

# 问题 2:为什么简单的分割方法是不可靠的?

核心原因:TCP 是一个面向字节流的协议,而不是面向消息的协议
这意味着,服务器调用一次 res.write() 发送的消息在网络传输过程中可能会被分割为多个 TCP 包。相应的,客户端调用一次 reader.read() 可能只收到这些包中的一个,也就是一条完整 SSE 消息的一部分

想象一下:

  1. 服务器发送: data: {"content":"你好"}\n\n
  2. 网络传输过程中,这条消息被拆分为两个 TCP 包:
    • 包 1: data: {"con
    • 包 2: tent":"你好"}\n\n
  3. 分割解析:
    • 第一次 read 收到 包1 ,分割后得到 data: {"con ,解析会报错
    • 第二次 read 收到 包2 ,分割后得到 好"} ,解析会报错

结果就是数据丢失或解析错误。本地主要是因为延迟低、速度块,Node.js 的网络层优化(如 Nagle 算法)常常会把多次小的 write 合并成一个大的 TCP 包再发送,掩盖了这个问题


# 第四步:稳健之法 - 实现客户端缓冲区

为了解决消息碎片问题,我们必须在客户端实现一个缓冲区。它的工作原理就像一个蓄水池

  1. 蓄水:将每次 reader.read() 到的字节数据暂存到缓冲区
  2. 处理:当缓冲区积累到足够多的数据(如 \n\n )时,进行解析和处理
  3. 放水:如果找到分隔符,就从缓冲区中提取出一条完整的信息进行处理
  4. 清理:将已经处理过的消息从缓冲区头部移除
  5. 循环:重复以上步骤
    test.js - 稳健版本
async function getAIStream() {
  //... (fetch 和 reader 初始化) ...
  let buffer = ""; // <-- 关键的缓冲区
  while (true) {
    const { value, done } = await reader.read();
    
    // 将新数据块解码并追加到缓冲区
    buffer += decoder.decode(value, { stream: true });
    if (done) {
      processBuffer(); // 流结束时,最后处理一次缓冲区
      break;
    }
    processBuffer(); // 每次收到数据都尝试处理
  }
  // 将消息处理逻辑提取成一个函数
  function processBuffer() {
    let boundaryIndex;
    // 循环处理缓冲区中所有完整的消息
    while ((boundaryIndex = buffer.indexOf('\n\n')) !== -1) {
      // 提取一条完整的消息
      const message = buffer.substring(0, boundaryIndex);
      // 从缓冲区移除已处理的消息
      buffer = buffer.substring(boundaryIndex + 2);
      if (message.startsWith('data: ')) {
        const json = message.substring(6);
        if (json) {
          try {
            const data = JSON.parse(json);
            console.log(data.content);
          } catch (e) {
            console.error("无法解析的 JSON:", json);
          }
        }
      }
    }
  }
  // ...
}

# 第五步:验证

修改服务器,让它故意将一条消息拆开发送
server.js - 破坏性测试

console.log("--- 开始破坏性测试 ---");
console.log("发送不完整的消息的第一部分...");
// 1. 发送一条 SSE 消息的前半部分,注意末尾没有 `\n\n`
res.write('data: {"content":"This is a very long message that will be broken');
// 2. 等待 50 毫秒
await new Promise((resolve) => {
  setTimeout(() => {
    console.log("50ms 后,发送消息的剩余部分...");
    res.write(' in the middle."}\n\n');
    
    res.end(); 
    console.log("--- 测试结束 ---");
    resolve();
  }, 50);
});

使用这个和天真的版本对比,我们可以发现,稳健版本能够正确处理消息碎片,而天真版本在这种情况下会丢失数据

# 总结

  1. 使用原生模块时,不能想当然的认为框架提供的便利功能时理所应当存在的
  2. 在处理任何基于流的协议时,客户端缓冲区时保证数据完整性的关键
  3. 流可以让我们以增量的方式处理数据,而不是一次性接收所有数据

# 源码

server.js

import http from 'http';
import dotenv from 'dotenv';
import OpenAI from 'openai';
dotenv.config();
// 配置 AI 客户端
const openai = new OpenAI({
  // 替换
  apiKey: process.env.ARK_API_KEY,
  baseURL: 'https://ark.cn-beijing.volces.com/api/v3',
});
const hostname = '127.0.0.1';
const port = 5173;
function getRequestBody(req) {
  return new Promise((resolve, reject) => {
    let body = '';
    req.on('data', (chunk) => {
      body += chunk.toString(); // Buffer 转字符串
    });
    req.on('end', () => {
      try {
        resolve(JSON.parse(body || '{}'));
      } catch (error) {
        reject(new Error('Invalid JSON in request body'));
      }
    });
    req.on('error', (err) => {
      reject(err);
    });
  });
}
const server = http.createServer(async (req, res) => {
  try {
    res.writeHead(200, {
      'Content-Type': 'text/event-stream', // SSE 标准类型
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    });
    const body = await getRequestBody(req);
    const userPrompt = body.prompt;
    console.log(`收到 Prompt: "${userPrompt}"`);
    console.log("正在向 AI 请求流式响应...");
    const stream = await openai.chat.completions.create({
      model: 'ep-m-20251118124237-7zths',
      messages: [{ role: 'user', content: userPrompt }],
      stream: true,
    });
    console.log("AI 流已开始,正在转发给客户端...");
    for await (const chunk of stream) {
      const content = chunk.choices[0].delta?.content || '';
      if (content) {
        // 将每个数据块按 SSE 格式写入响应流
        console.log(`发送内容块: "${content}"`);
        res.write(`data: ${JSON.stringify({ content })}\n\n`);
      }
    }
    console.log("AI 流结束。");
    //console.log ("--- 开始破坏性测试 ---");
    //console.log ("发送不完整的消息的第一部分...");
    //// 1. 发送一条 SSE 消息的前半部分,注意末尾没有 `\n\n`
    // res.write('data: {"content":"This is a very long message that will be broken');
    //// 2. 等待 50 毫秒
    // await new Promise((resolve) => {
    //   setTimeout(() => {
    //     console.log ("50ms 后,发送消息的剩余部分...");
    //     res.write(' in the middle."}\n\n');
        
    //     // End the response *inside* the timeout callback
    //     res.end(); 
    //     console.log ("--- 测试结束 ---");
    //     resolve(); // Signal that the async operation is complete
    //   }, 50);
    // });
  } catch (error) {
    console.error("处理请求时出错:", error);
    if (!res.headersSent) {
      res.writeHead(500);
    }
    res.end('调用 AI 服务时发生错误。');
  } finally {
    if (!res.writableEnded) {
      res.end();
    }
  }
});
server.listen(port, hostname, () => {
  console.log(`AI 服务器正在运行于 http://${hostname}:${port}/`);
  console.log('按 Ctrl+C 停止服务器。');
})

test.js

async function getAIStream() {
  console.log("正在连接到 AI 服务器...");
  const response = await fetch("http://127.0.0.1:5173/chat", {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ prompt: "你好" })
  });
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let fullText = "";
  let buffer = ""; // 缓冲区
  console.log("连接成功,开始接收数据流...");
  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      processBuffer();
      if (buffer) {
        console.log('处理流结束后缓冲区中的不完整文本:', buffer);
      }
      console.log('\n--- 数据流结束 ---');
      break;
    }
    buffer += decoder.decode(value, { stream: true });
    processBuffer();
  }
  // 将消息处理逻辑提取到一个独立的函数中,以便复用
  function processBuffer() {
    let boundaryIndex;
    while ((boundaryIndex = buffer.indexOf('\n\n')) !== -1) {
      const message = buffer.slice(0, boundaryIndex);
      buffer = buffer.slice(boundaryIndex + 2);
      if (message.startsWith('data: ')) {
        const json = message.slice(6);
        if (json) {
          try {
            const data = JSON.parse(json);
            const content = data.content;
            fullText += content;
            console.log(`收到片段: "${content}"`);
          } catch (e) {
            console.error('无法解析的 JSON 数据:', json);
          }
        }
      }
    }
  }
  console.log("\n完整响应:", fullText);
}
getAIStream();

# 补充

我们手动的揭示了底层的细节,但在真正的项目中,我们应该站在巨人的肩膀上,使用社区沉淀的最佳实践和工具库来简化代码、提高健壮性

  1. 后端:使用 express.js 简化服务器的开发
  2. 前端:使用 @microsoft/fetch-event-source 替代我们自己手写的 buffer 处理逻辑
    • API 兼容:与 fetch 类似
    • 自动解析:内置了 SSE 解析逻辑,不再关系 buffer 、等前缀处理
    • 错误处理和重试:提供更完整的生命周期钩子
    • 健壮性