Appearance
流式读取数据(AI大模型对话)
用的技术是前端vue3和nodejs,浏览器环境axios不支持流式读取,所以需要用fetch
或者new EventSource
,实现效果和那些AI工具是基本一致的,也是一点点返回和打字机的效果一样。
提示
目前只是简单实现了效果而已,并不完美的,比如终止消息发送,文件上传和读取
一、new EventSource
前端代码:(这里主要展示重要代码,至于UI结构不展示了,点击提交按钮执行下面的代码,inputText.value
是输入框内容字段),用什么样的markdown插件自己定吧
点我查看代码(EventSource)
js
const encodedMessage = encodeURIComponent(inputText.value);
const eventSource = new EventSource(`http://localhost:3000/chat?message=${encodedMessage}`);
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') return eventSource.close();
const data = JSON.parse(event.data);
result.value += data.content;
cont.value = parseMarkdown(result.value)
};
const encodedMessage = encodeURIComponent(inputText.value);
const eventSource = new EventSource(`http://localhost:3000/chat?message=${encodedMessage}`);
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') return eventSource.close();
const data = JSON.parse(event.data);
result.value += data.content;
cont.value = parseMarkdown(result.value)
};
后端:只能用get请求(也可以自己剔除一些不需要的模块,比如body-parser
)
点我nodejs查看代码
js
require('dotenv').config();
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');
// 修改跨域设置
app.all("*", function(req,res,next){
//设置允许跨域的域名,*代表允许任意域名跨域
res.header("Access-Control-Allow-Origin","*");
//允许的header类型
res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
//跨域允许的请求方式
res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
// 添加 SSE 相关的响应头
res.header("Cache-Control", "no-cache");
res.header("Connection", "keep-alive");
if (req.method.toLowerCase() == 'options')
res.send(204).end(); //让options尝试请求快速结束
else
next();
})
app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下
const openai = new OpenAI({
baseURL: process.env. BASE_URL,
apiKey: process.env.MY_API_Key
});
app.get('/chat', async (req, res) => {
try {
// 设置 SSE 专用的 Content-Type
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 设置编码为 utf-8
res.setHeader('Content-Encoding', 'identity');
res.flushHeaders(); // 立即发送响应头
const { message } = req.query;
const stream = await openai.chat.completions.create({
messages: [
{ role: "system", content: "I am a AI" },
{ role: "user", content: message }
],
model: process.env. MODEL,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
// 发送数据到客户端
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
// 结束流
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
console.error('Error:', error);
res.status(500).send('Internal Server Error');
}
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
require('dotenv').config();
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');
// 修改跨域设置
app.all("*", function(req,res,next){
//设置允许跨域的域名,*代表允许任意域名跨域
res.header("Access-Control-Allow-Origin","*");
//允许的header类型
res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
//跨域允许的请求方式
res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
// 添加 SSE 相关的响应头
res.header("Cache-Control", "no-cache");
res.header("Connection", "keep-alive");
if (req.method.toLowerCase() == 'options')
res.send(204).end(); //让options尝试请求快速结束
else
next();
})
app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下
const openai = new OpenAI({
baseURL: process.env. BASE_URL,
apiKey: process.env.MY_API_Key
});
app.get('/chat', async (req, res) => {
try {
// 设置 SSE 专用的 Content-Type
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 设置编码为 utf-8
res.setHeader('Content-Encoding', 'identity');
res.flushHeaders(); // 立即发送响应头
const { message } = req.query;
const stream = await openai.chat.completions.create({
messages: [
{ role: "system", content: "I am a AI" },
{ role: "user", content: message }
],
model: process.env. MODEL,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
// 发送数据到客户端
res.write(`data: ${JSON.stringify({ content })}\n\n`);
}
}
// 结束流
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
console.error('Error:', error);
res.status(500).send('Internal Server Error');
}
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
二、fetch
前端代码:
js
const response = await fetch('http://localhost:3000/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message: inputText.value }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true }); // 解码并保留可能的不完整数据
const lines = buffer.split('\n\n'); // 按 SSE 事件分隔符分割
// 处理所有完整的事件,留下最后一个不完整的事件到缓冲区
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue; // 跳过非 data 行
const dataString = line.slice(6); // 去掉 'data: ' 前缀
if (dataString.trim() === '[DONE]') {
reader.cancel(); // 终止流
break;
}
try {
const data = JSON.parse(dataString);
if (data.content) {
result.value += data.content;
cont.value = parseMarkdown(result.value);
}
} catch (e) {
console.error('解析失败:', dataString, e);
}
}
}
const response = await fetch('http://localhost:3000/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message: inputText.value }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true }); // 解码并保留可能的不完整数据
const lines = buffer.split('\n\n'); // 按 SSE 事件分隔符分割
// 处理所有完整的事件,留下最后一个不完整的事件到缓冲区
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue; // 跳过非 data 行
const dataString = line.slice(6); // 去掉 'data: ' 前缀
if (dataString.trim() === '[DONE]') {
reader.cancel(); // 终止流
break;
}
try {
const data = JSON.parse(dataString);
if (data.content) {
result.value += data.content;
cont.value = parseMarkdown(result.value);
}
} catch (e) {
console.error('解析失败:', dataString, e);
}
}
}
后端:(对比第一种,主要改了get->post
和body->query
)
js
require('dotenv').config();
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');
// 修改跨域设置
app.all("*", function(req,res,next){
//设置允许跨域的域名,*代表允许任意域名跨域
res.header("Access-Control-Allow-Origin","*");
//允许的header类型
res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
//跨域允许的请求方式
res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
// 添加 SSE 相关的响应头
res.header("Cache-Control", "no-cache");
res.header("Connection", "keep-alive");
if (req.method.toLowerCase() == 'options')
res.send(204).end(); //让options尝试请求快速结束
else
next();
})
app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下
const openai = new OpenAI({
baseURL: process.env. BASE_URL,
apiKey: process.env.MY_API_Key
});
app.post('/chat', async (req, res) => {//post->get
try {
// 设置 SSE 专用的 Content-Type
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 设置编码为 utf-8
res.setHeader('Content-Encoding', 'identity');
res.flushHeaders(); // 立即发送响应头
const { message } = req.body;//主要是这里改了一下,body->query
const stream = await openai.chat.completions.create({
messages: [
{ role: "system", content: "I am a AI" },
{ role: "user", content: message }
],
model: process.env. MODEL,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
// 发送数据到客户端
res.write(`data: ${JSON.stringify({ content })}\n\n`);
// res.write(JSON.stringify({ content })+'\n');
}
}
// 结束流
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
console.error('Error:', error);
res.status(500).send('Internal Server Error');
}
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});
require('dotenv').config();
const express = require('express');
const OpenAI = require('openai');
const app = express();
const bodyParser = require('body-parser');
// 修改跨域设置
app.all("*", function(req,res,next){
//设置允许跨域的域名,*代表允许任意域名跨域
res.header("Access-Control-Allow-Origin","*");
//允许的header类型
res.header("Access-Control-Allow-Headers","Content-Type, Authorization");
//跨域允许的请求方式
res.header("Access-Control-Allow-Methods","DELETE,PUT,POST,GET,OPTIONS");
// 添加 SSE 相关的响应头
res.header("Cache-Control", "no-cache");
res.header("Connection", "keep-alive");
if (req.method.toLowerCase() == 'options')
res.send(204).end(); //让options尝试请求快速结束
else
next();
})
app.use(bodyParser.json());
app.use(express.static('public')); // 假设你的前端文件放在 `public` 目录下
const openai = new OpenAI({
baseURL: process.env. BASE_URL,
apiKey: process.env.MY_API_Key
});
app.post('/chat', async (req, res) => {//post->get
try {
// 设置 SSE 专用的 Content-Type
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// 设置编码为 utf-8
res.setHeader('Content-Encoding', 'identity');
res.flushHeaders(); // 立即发送响应头
const { message } = req.body;//主要是这里改了一下,body->query
const stream = await openai.chat.completions.create({
messages: [
{ role: "system", content: "I am a AI" },
{ role: "user", content: message }
],
model: process.env. MODEL,
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
// 发送数据到客户端
res.write(`data: ${JSON.stringify({ content })}\n\n`);
// res.write(JSON.stringify({ content })+'\n');
}
}
// 结束流
res.write('data: [DONE]\n\n');
res.end();
} catch (error) {
console.error('Error:', error);
res.status(500).send('Internal Server Error');
}
});
app.listen(3000, () => {
console.log('Server is running on port 3000');
});