Skip to content

流式读取数据(AI大模型对话)

用的技术是前端vue3和nodejs,浏览器环境axios不支持流式读取,所以需要用fetch或者new EventSource,实现效果和那些AI工具是基本一致的,也是一点点返回和打字机的效果一样。

提示

目前只是简单实现了效果而已,并不完美的,比如终止消息发送,文件上传和读取

一、new EventSource

前端代码:(这里主要展示重要代码,至于UI结构不展示了,点击提交按钮执行下面的代码,inputText.value是输入框内容字段),用什么样的markdown插件自己定吧

点我查看代码(EventSource)
js
const encodedMessage = encodeURIComponent(inputText.value);
const eventSource = new EventSource(`http://localhost:3000/chat?message=${encodedMessage}`);
eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') return eventSource.close();
  const data = JSON.parse(event.data);
  	result.value += data.content;
    cont.value = parseMarkdown(result.value)
};
const encodedMessage = encodeURIComponent(inputText.value);
const eventSource = new EventSource(`http://localhost:3000/chat?message=${encodedMessage}`);
eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') return eventSource.close();
  const data = JSON.parse(event.data);
  	result.value += data.content;
    cont.value = parseMarkdown(result.value)
};

后端:只能用get请求(也可以自己剔除一些不需要的模块,比如body-parser)

点我nodejs查看代码
js
require('dotenv').config(); 
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');

// 修改跨域设置
app.all("*", function(req,res,next){
    //设置允许跨域的域名,*代表允许任意域名跨域
    res.header("Access-Control-Allow-Origin","*");
    //允许的header类型
    res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
    //跨域允许的请求方式 
    res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
    // 添加 SSE 相关的响应头
    res.header("Cache-Control", "no-cache");
    res.header("Connection", "keep-alive");
    if (req.method.toLowerCase() == 'options')
        res.send(204).end();  //让options尝试请求快速结束
    else
        next();
})

app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下

const openai = new OpenAI({
    baseURL: process.env.BASE_URL,
    apiKey: process.env.MY_API_Key
});

app.get('/chat', async (req, res) => {
    try {
        // 设置 SSE 专用的 Content-Type
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        
        // 设置编码为 utf-8
        res.setHeader('Content-Encoding', 'identity');
        res.flushHeaders(); // 立即发送响应头

        const { message } = req.query;
        const stream = await openai.chat.completions.create({
            messages: [
                { role: "system", content: "I am a AI" },
                { role: "user", content: message }
            ],
            model: process.env.MODEL,
            stream: true,
        });

        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                // 发送数据到客户端
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
            }
        }

        // 结束流
        res.write('data: [DONE]\n\n');
        res.end();
    } catch (error) {
        console.error('Error:', error);
        res.status(500).send('Internal Server Error');
    }
});

app.listen(3000, () => {
    console.log('Server is running on port 3000');
});
require('dotenv').config(); 
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');

// 修改跨域设置
app.all("*", function(req,res,next){
    //设置允许跨域的域名,*代表允许任意域名跨域
    res.header("Access-Control-Allow-Origin","*");
    //允许的header类型
    res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
    //跨域允许的请求方式 
    res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
    // 添加 SSE 相关的响应头
    res.header("Cache-Control", "no-cache");
    res.header("Connection", "keep-alive");
    if (req.method.toLowerCase() == 'options')
        res.send(204).end();  //让options尝试请求快速结束
    else
        next();
})

app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下

const openai = new OpenAI({
    baseURL: process.env.BASE_URL,
    apiKey: process.env.MY_API_Key
});

app.get('/chat', async (req, res) => {
    try {
        // 设置 SSE 专用的 Content-Type
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        
        // 设置编码为 utf-8
        res.setHeader('Content-Encoding', 'identity');
        res.flushHeaders(); // 立即发送响应头

        const { message } = req.query;
        const stream = await openai.chat.completions.create({
            messages: [
                { role: "system", content: "I am a AI" },
                { role: "user", content: message }
            ],
            model: process.env.MODEL,
            stream: true,
        });

        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                // 发送数据到客户端
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
            }
        }

        // 结束流
        res.write('data: [DONE]\n\n');
        res.end();
    } catch (error) {
        console.error('Error:', error);
        res.status(500).send('Internal Server Error');
    }
});

app.listen(3000, () => {
    console.log('Server is running on port 3000');
});

二、fetch

前端代码:

js
const response = await fetch('http://localhost:3000/chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ message: inputText.value }),
    });
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';

   while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true }); // 解码并保留可能的不完整数据
    const lines = buffer.split('\n\n'); // 按 SSE 事件分隔符分割

    // 处理所有完整的事件,留下最后一个不完整的事件到缓冲区
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue; // 跳过非 data 行

      const dataString = line.slice(6); // 去掉 'data: ' 前缀
      if (dataString.trim() === '[DONE]') {
        reader.cancel(); // 终止流
        break;
      }

      try {
        const data = JSON.parse(dataString);
        if (data.content) {
          result.value += data.content;
          cont.value = parseMarkdown(result.value);
        }
      } catch (e) {
        console.error('解析失败:', dataString, e);
      }
    }
  }
const response = await fetch('http://localhost:3000/chat', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({ message: inputText.value }),
    });
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';

   while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true }); // 解码并保留可能的不完整数据
    const lines = buffer.split('\n\n'); // 按 SSE 事件分隔符分割

    // 处理所有完整的事件,留下最后一个不完整的事件到缓冲区
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue; // 跳过非 data 行

      const dataString = line.slice(6); // 去掉 'data: ' 前缀
      if (dataString.trim() === '[DONE]') {
        reader.cancel(); // 终止流
        break;
      }

      try {
        const data = JSON.parse(dataString);
        if (data.content) {
          result.value += data.content;
          cont.value = parseMarkdown(result.value);
        }
      } catch (e) {
        console.error('解析失败:', dataString, e);
      }
    }
  }

后端:(对比第一种,主要改了get->postbody->query)

js
require('dotenv').config(); 
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');

// 修改跨域设置
app.all("*", function(req,res,next){
    //设置允许跨域的域名,*代表允许任意域名跨域
    res.header("Access-Control-Allow-Origin","*");
    //允许的header类型
    res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
    //跨域允许的请求方式 
    res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
    // 添加 SSE 相关的响应头
    res.header("Cache-Control", "no-cache");
    res.header("Connection", "keep-alive");
    if (req.method.toLowerCase() == 'options')
        res.send(204).end();  //让options尝试请求快速结束
    else
        next();
})

app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下

const openai = new OpenAI({
    baseURL: process.env.BASE_URL,
    apiKey: process.env.MY_API_Key
});

app.post('/chat', async (req, res) => {//post->get 
    try {
        // 设置 SSE 专用的 Content-Type
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        
        // 设置编码为 utf-8
        res.setHeader('Content-Encoding', 'identity');
        res.flushHeaders(); // 立即发送响应头

        const { message } = req.body;//主要是这里改了一下,body->query
        const stream = await openai.chat.completions.create({
            messages: [
                { role: "system", content: "I am a AI" },
                { role: "user", content: message }
            ],
            model: process.env.MODEL,
            stream: true,
        });

        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                // 发送数据到客户端
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
                // res.write(JSON.stringify({ content })+'\n');
            }
        }

        // 结束流
        res.write('data: [DONE]\n\n');
        res.end();
    } catch (error) {
        console.error('Error:', error);
        res.status(500).send('Internal Server Error');
    }
});

app.listen(3000, () => {
    console.log('Server is running on port 3000');
});
require('dotenv').config(); 
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');

// 修改跨域设置
app.all("*", function(req,res,next){
    //设置允许跨域的域名,*代表允许任意域名跨域
    res.header("Access-Control-Allow-Origin","*");
    //允许的header类型
    res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
    //跨域允许的请求方式 
    res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
    // 添加 SSE 相关的响应头
    res.header("Cache-Control", "no-cache");
    res.header("Connection", "keep-alive");
    if (req.method.toLowerCase() == 'options')
        res.send(204).end();  //让options尝试请求快速结束
    else
        next();
})

app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下

const openai = new OpenAI({
    baseURL: process.env.BASE_URL,
    apiKey: process.env.MY_API_Key
});

app.post('/chat', async (req, res) => {//post->get 
    try {
        // 设置 SSE 专用的 Content-Type
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Connection', 'keep-alive');
        
        // 设置编码为 utf-8
        res.setHeader('Content-Encoding', 'identity');
        res.flushHeaders(); // 立即发送响应头

        const { message } = req.body;//主要是这里改了一下,body->query
        const stream = await openai.chat.completions.create({
            messages: [
                { role: "system", content: "I am a AI" },
                { role: "user", content: message }
            ],
            model: process.env.MODEL,
            stream: true,
        });

        for await (const chunk of stream) {
            const content = chunk.choices[0]?.delta?.content || '';
            if (content) {
                // 发送数据到客户端
                res.write(`data: ${JSON.stringify({ content })}\n\n`);
                // res.write(JSON.stringify({ content })+'\n');
            }
        }

        // 结束流
        res.write('data: [DONE]\n\n');
        res.end();
    } catch (error) {
        console.error('Error:', error);
        res.status(500).send('Internal Server Error');
    }
});

app.listen(3000, () => {
    console.log('Server is running on port 3000');
});

程序员小洛文档