由于浏览器原生的 EventSource
不支持 POST 请求,我们需要利用 fetch
API 来手动实现一个支持 POST 请求的 SSE 客户端。
下面我将提供一个基础的实现,它包含了发送 POST 请求、解析 SSE 格式数据、以及简单的自动重连逻辑。
基于 Fetch API 的 POST SSE 实现
这个实现会模拟 EventSource
的一些行为,但允许你发送 POST 请求和请求体。
JavaScript
class PostEventSource { constructor(url, options = {}) { this.url = url; this.options = { method: 'POST', // 默认使用 POST headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream', // 告知服务器我们期望 SSE 格式 ...options.headers // 合并用户自定义头部 }, payload: options.payload || null, // POST 请求体 reconnectInterval: 3000, // 默认 3 秒后重连 maxRetries: 5, // 最大重试次数 ...options }; this.eventListeners = {}; // 用于存储事件监听器 this.isConnected = false; this.retryCount = 0; this.controller = null; // AbortController 用于取消 fetch 请求 this.decoder = new TextDecoder('utf-8'); this.buffer = ''; // 用于处理分块接收的数据 this.connect(); // 实例化时立即尝试连接 } // 触发事件的方法 _emit(eventName, data) { if (this.eventListeners[eventName]) { this.eventListeners[eventName].forEach(listener => { listener(data); }); } } // 连接到 SSE 服务 async connect() { if (this.isConnected) { console.warn('PostEventSource: Already connected.'); return; } this.isConnected = true; this.controller = new AbortController(); const { signal } = this.controller; try { this._emit('open'); // 触发 open 事件 const fetchOptions = { method: this.options.method, headers: this.options.headers, signal: signal, // 只在 POST 或 PUT 等方法时包含 body body: (this.options.method === 'POST' || this.options.method === 'PUT') ? JSON.stringify(this.options.payload) : undefined }; const response = await fetch(this.url, fetchOptions); if (!response.ok) { throw new Error(`HTTP error! Status: ${response.status}`); } if (!response.body) { throw new Error('Response body is null.'); } this.retryCount = 0; // 连接成功,重置重试计数 const reader = response.body.getReader(); while (true) { const { done, value } = await reader.read(); if (done) { console.log('PostEventSource: Stream finished.'); break; } this.buffer += this.decoder.decode(value, { stream: true }); this._processBuffer(); // 处理接收到的数据 } this._emit('close'); // 正常关闭 } catch (error) { if (error.name === 'AbortError') { console.log('PostEventSource: Connection aborted.'); this._emit('close'); // 主动关闭也触发 close } else { console.error('PostEventSource: Connection error:', error); this.isConnected = false; // 标记为未连接 this._emit('error', error); // 触发 error 事件 this._handleReconnect(); // 尝试重连 } } finally { this.isConnected = false; } } // 处理 SSE 消息缓冲 _processBuffer() { const messages = this.buffer.split('\n\n'); // SSE 事件以两个换行符分隔 this.buffer = messages.pop(); // 最后一个可能不完整,放回 buffer messages.forEach(msg => { if (msg.trim() === '') return; // 跳过空消息 let event = 'message'; // 默认事件类型 let data = ''; let id = null; msg.split('\n').forEach(line => { if (line.startsWith('event:')) { event = line.substring(6).trim(); } else if (line.startsWith('data:')) { data += line.substring(5); // 多个 data 行会拼接 } else if (line.startsWith('id:')) { id = line.substring(3).trim(); } else if (line.startsWith('retry:')) { // 可以根据需要处理 retry 字段 // this.options.reconnectInterval = parseInt(line.substring(6).trim(), 10); } }); const eventData = { data, event, id }; this._emit(event, eventData); // 触发特定事件(如 'message' 或自定义事件) }); } // 处理重连逻辑 _handleReconnect() { if (this.retryCount < this.options.maxRetries) { this.retryCount++; console.log(`PostEventSource: Retrying connection in ${this.options.reconnectInterval / 1000} seconds... (Attempt ${this.retryCount}/${this.options.maxRetries})`); setTimeout(() => this.connect(), this.options.reconnectInterval); } else { console.error('PostEventSource: Max retries reached. Connection failed permanently.'); this._emit('error', new Error('Max retries reached.')); // 最终错误 } } // 添加事件监听器 addEventListener(eventName, listener) { if (!this.eventListeners[eventName]) { this.eventListeners[eventName] = []; } this.eventListeners[eventName].push(listener); } // 移除事件监听器 removeEventListener(eventName, listener) { if (this.eventListeners[eventName]) { this.eventListeners[eventName] = this.eventListeners[eventName].filter(l => l !== listener); } } // 关闭 SSE 连接 close() { if (this.controller) { this.controller.abort(); // 取消正在进行的 fetch 请求 } this.isConnected = false; this.retryCount = 0; // 重置重试计数 console.log('PostEventSource: Connection closed manually.'); }}// ------------------- 使用示例 -------------------// 假设你的服务器端 SSE 接口地址const SSE_POST_ENDPOINT = 'http://localhost:3000/stream-data';// 示例 POST 请求体const requestPayload = { query: 'Give me a summary of current AI trends.', userId: 'user123', preferences: { format: 'markdown', length: 'short' }};let myPostEventSource = null; // 声明一个变量来存储实例// 启动连接的函数function startSseConnection() { // 如果已经有实例,先关闭它 if (myPostEventSource) { myPostEventSource.close(); } myPostEventSource = new PostEventSource(SSE_POST_ENDPOINT, { payload: requestPayload, headers: { 'Authorization': 'Bearer YOUR_AUTH_TOKEN_HERE', // 示例认证头 'X-Custom-Header': 'My-Value' // 其他自定义头部 }, reconnectInterval: 5000, // 5秒后重连 maxRetries: 3 // 最多重试 3 次 }); myPostEventSource.addEventListener('open', () => { console.log('POST SSE Connection opened!'); }); myPostEventSource.addEventListener('message', (event) => { console.log('Received message:', event.data); // 在这里处理接收到的 SSE 数据,例如更新UI document.getElementById('sse-output').innerText += event.data + '\n'; }); myPostEventSource.addEventListener('custom-event', (event) => { // 如果服务器发送了 'event: custom-event' console.log('Received custom event:', event.data); }); myPostEventSource.addEventListener('error', (error) => { console.error('POST SSE Connection error:', error); // 处理错误,例如显示用户友好的消息 document.getElementById('sse-status').innerText = 'Connection Error!'; }); myPostEventSource.addEventListener('close', () => { console.log('POST SSE Connection closed.'); document.getElementById('sse-status').innerText = 'Connection Closed'; }); document.getElementById('sse-status').innerText = 'Connecting...';}// 停止连接的函数function stopSseConnection() { if (myPostEventSource) { myPostEventSource.close(); }}// 假设你的HTML中有两个按钮和一个输出区域// <button onclick="startSseConnection()">Start SSE</button>// <button onclick="stopSseConnection()">Stop SSE</button>// <div id="sse-status"></div>// <pre id="sse-output"></pre>
代码解析和关键点:
PostEventSource
类:
这是一个自定义的 JavaScript 类,用于封装 SSE 的连接和事件处理逻辑。
constructor(url, options)
:
url
: SSE 服务的 URL。options
: 配置对象,最重要的就是 payload
(POST 请求体)和 headers
。method
: 强制设置为 POST
(或其他你需要的 HTTP 方法)。headers
: 包含 Content-Type: application/json
和 Accept: text/event-stream
是标准做法。reconnectInterval
和 maxRetries
: 用于控制自动重连行为。controller
: 使用 AbortController
来实现手动关闭连接(即取消 fetch
请求)。decoder
: TextDecoder
用于将二进制 Uint8Array
数据解码成字符串。buffer
: 用于处理分块接收的数据流,确保 SSE 事件的完整性。connect()
方法:
- 这是核心的连接逻辑。它使用
fetch(this.url, fetchOptions)
发送 HTTP 请求。fetchOptions.body
: 根据 method
类型决定是否包含 JSON.stringify(this.options.payload)
。response.body.getReader()
: 获取响应体的一个 ReadableStreamDefaultReader
,用于逐块读取数据。while (true)
循环和 reader.read()
: 这是处理流式响应的关键。它会持续读取数据块,直到 done
为 true
(表示流结束)。this.buffer += this.decoder.decode(value, { stream: true })
: 解码接收到的二进制数据,并将其添加到缓冲区。{ stream: true }
表示可能还有更多数据后续会来。this._processBuffer()
: 处理缓冲区中的数据,解析 SSE 事件。_processBuffer()
方法:
- 负责将接收到的数据流解析成独立的 SSE 事件。
split('\n\n')
: SSE 规范规定,每个事件块由两个换行符 \n\n
分隔。messages.pop()
: 最后一个 messages
元素可能是不完整的事件,所以将其放回 buffer
,等待更多数据。解析 event:
, data:
, id:
行: 根据 SSE 规范解析每一行,提取事件类型、数据和 ID。data:
行可以有多行,需要拼接。_handleReconnect()
方法:
- 在
connect()
捕获到错误时调用。它会检查重试次数是否达到上限,如果没有,则在 reconnectInterval
后再次调用 connect()
尝试重连。addEventListener()
, removeEventListener()
, _emit()
:
- 这些方法提供了类似
EventSource
的事件注册/触发机制,允许你监听 open
, message
, error
, close
以及服务器发送的自定义事件。close()
方法:
this.controller.abort()
: 这是关键!它会中断正在进行的 fetch
请求,从而优雅地关闭 SSE 连接。如何使用:
将上述 PostEventSource
类复制到你的项目中。
在需要启动 SSE 连接的地方,实例化 PostEventSource
:
JavaScript
const myPostEventSource = new PostEventSource(YOUR_SSE_URL, { payload: { your: 'data', goes: 'here' }, // 你的 POST 请求体 headers: { 'Authorization': 'Bearer your_token', 'X-Another-Header': 'value' }});
添加事件监听器来处理接收到的事件:
JavaScript
myPostEventSource.addEventListener('message', (event) => { console.log('SSE Data:', event.data); // 更新 UI 或其他逻辑});myPostEventSource.addEventListener('error', (error) => { console.error('SSE Error:', error);});myPostEventSource.addEventListener('open', () => { console.log('SSE Connection opened!');});
在需要关闭连接时调用 close()
:
JavaScript
myPostEventSource.close();
这个实现为你提供了一个健壮的起点,你可以根据自己的具体需求进行扩展,例如更复杂的重试策略、心跳检测等。