掘金 人工智能 17小时前
基于Fetch的post sse实现
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍如何使用Fetch API构建一个支持POST请求的SSE(Server-Sent Events)客户端。通过自定义的PostEventSource类,可以发送POST请求、解析SSE格式数据,并实现自动重连机制。文章提供了详细的代码实现、关键方法解析以及使用示例,帮助开发者在浏览器环境中高效地处理SSE数据流。

💡PostEventSource类是核心,它封装了SSE的连接和事件处理逻辑,允许开发者发送POST请求并处理SSE事件。

🔑connect()方法使用fetch API发送HTTP请求,处理来自服务器的SSE数据流。它负责读取数据、解码、缓冲,并触发相应的事件。

📌_processBuffer()方法解析接收到的数据流,将数据流分割成独立的SSE事件。它根据SSE规范解析event、data和id等字段。

🔄_handleReconnect()方法在连接错误时处理重连逻辑,确保客户端在网络问题时能够自动重新连接到SSE服务。

✅close()方法通过AbortController中断fetch请求,从而优雅地关闭SSE连接,释放资源。

由于浏览器原生的 EventSource 不支持 POST 请求,我们需要利用 fetch API 来手动实现一个支持 POST 请求的 SSE 客户端。

下面我将提供一个基础的实现,它包含了发送 POST 请求、解析 SSE 格式数据、以及简单的自动重连逻辑。


基于 Fetch API 的 POST SSE 实现

这个实现会模拟 EventSource 的一些行为,但允许你发送 POST 请求和请求体。

JavaScript

class PostEventSource {    constructor(url, options = {}) {        this.url = url;        this.options = {            method: 'POST', // 默认使用 POST            headers: {                'Content-Type': 'application/json',                'Accept': 'text/event-stream', // 告知服务器我们期望 SSE 格式                ...options.headers // 合并用户自定义头部            },            payload: options.payload || null, // POST 请求体            reconnectInterval: 3000, // 默认 3 秒后重连            maxRetries: 5, // 最大重试次数            ...options        };        this.eventListeners = {}; // 用于存储事件监听器        this.isConnected = false;        this.retryCount = 0;        this.controller = null; // AbortController 用于取消 fetch 请求        this.decoder = new TextDecoder('utf-8');        this.buffer = ''; // 用于处理分块接收的数据        this.connect(); // 实例化时立即尝试连接    }    // 触发事件的方法    _emit(eventName, data) {        if (this.eventListeners[eventName]) {            this.eventListeners[eventName].forEach(listener => {                listener(data);            });        }    }    // 连接到 SSE 服务    async connect() {        if (this.isConnected) {            console.warn('PostEventSource: Already connected.');            return;        }        this.isConnected = true;        this.controller = new AbortController();        const { signal } = this.controller;        try {            this._emit('open'); // 触发 open 事件            const fetchOptions = {                method: this.options.method,                headers: this.options.headers,                signal: signal,                // 只在 POST 或 PUT 等方法时包含 body                body: (this.options.method === 'POST' || this.options.method === 'PUT') ? JSON.stringify(this.options.payload) : undefined            };            const response = await fetch(this.url, fetchOptions);            if (!response.ok) {                throw new Error(`HTTP error! Status: ${response.status}`);            }            if (!response.body) {                throw new Error('Response body is null.');            }            this.retryCount = 0; // 连接成功,重置重试计数            const reader = response.body.getReader();            while (true) {                const { done, value } = await reader.read();                if (done) {                    console.log('PostEventSource: Stream finished.');                    break;                }                this.buffer += this.decoder.decode(value, { stream: true });                this._processBuffer(); // 处理接收到的数据            }            this._emit('close'); // 正常关闭        } catch (error) {            if (error.name === 'AbortError') {                console.log('PostEventSource: Connection aborted.');                this._emit('close'); // 主动关闭也触发 close            } else {                console.error('PostEventSource: Connection error:', error);                this.isConnected = false; // 标记为未连接                this._emit('error', error); // 触发 error 事件                this._handleReconnect(); // 尝试重连            }        } finally {            this.isConnected = false;        }    }    // 处理 SSE 消息缓冲    _processBuffer() {        const messages = this.buffer.split('\n\n'); // SSE 事件以两个换行符分隔        this.buffer = messages.pop(); // 最后一个可能不完整,放回 buffer        messages.forEach(msg => {            if (msg.trim() === '') return; // 跳过空消息            let event = 'message'; // 默认事件类型            let data = '';            let id = null;            msg.split('\n').forEach(line => {                if (line.startsWith('event:')) {                    event = line.substring(6).trim();                } else if (line.startsWith('data:')) {                    data += line.substring(5); // 多个 data 行会拼接                } else if (line.startsWith('id:')) {                    id = line.substring(3).trim();                } else if (line.startsWith('retry:')) {                    // 可以根据需要处理 retry 字段                    // this.options.reconnectInterval = parseInt(line.substring(6).trim(), 10);                }            });            const eventData = { data, event, id };            this._emit(event, eventData); // 触发特定事件(如 'message' 或自定义事件)        });    }    // 处理重连逻辑    _handleReconnect() {        if (this.retryCount < this.options.maxRetries) {            this.retryCount++;            console.log(`PostEventSource: Retrying connection in ${this.options.reconnectInterval / 1000} seconds... (Attempt ${this.retryCount}/${this.options.maxRetries})`);            setTimeout(() => this.connect(), this.options.reconnectInterval);        } else {            console.error('PostEventSource: Max retries reached. Connection failed permanently.');            this._emit('error', new Error('Max retries reached.')); // 最终错误        }    }    // 添加事件监听器    addEventListener(eventName, listener) {        if (!this.eventListeners[eventName]) {            this.eventListeners[eventName] = [];        }        this.eventListeners[eventName].push(listener);    }    // 移除事件监听器    removeEventListener(eventName, listener) {        if (this.eventListeners[eventName]) {            this.eventListeners[eventName] = this.eventListeners[eventName].filter(l => l !== listener);        }    }    // 关闭 SSE 连接    close() {        if (this.controller) {            this.controller.abort(); // 取消正在进行的 fetch 请求        }        this.isConnected = false;        this.retryCount = 0; // 重置重试计数        console.log('PostEventSource: Connection closed manually.');    }}// ------------------- 使用示例 -------------------// 假设你的服务器端 SSE 接口地址const SSE_POST_ENDPOINT = 'http://localhost:3000/stream-data';// 示例 POST 请求体const requestPayload = {    query: 'Give me a summary of current AI trends.',    userId: 'user123',    preferences: {        format: 'markdown',        length: 'short'    }};let myPostEventSource = null; // 声明一个变量来存储实例// 启动连接的函数function startSseConnection() {    // 如果已经有实例,先关闭它    if (myPostEventSource) {        myPostEventSource.close();    }    myPostEventSource = new PostEventSource(SSE_POST_ENDPOINT, {        payload: requestPayload,        headers: {            'Authorization': 'Bearer YOUR_AUTH_TOKEN_HERE', // 示例认证头            'X-Custom-Header': 'My-Value' // 其他自定义头部        },        reconnectInterval: 5000, // 5秒后重连        maxRetries: 3 // 最多重试 3 次    });    myPostEventSource.addEventListener('open', () => {        console.log('POST SSE Connection opened!');    });    myPostEventSource.addEventListener('message', (event) => {        console.log('Received message:', event.data);        // 在这里处理接收到的 SSE 数据,例如更新UI        document.getElementById('sse-output').innerText += event.data + '\n';    });    myPostEventSource.addEventListener('custom-event', (event) => {        // 如果服务器发送了 'event: custom-event'        console.log('Received custom event:', event.data);    });    myPostEventSource.addEventListener('error', (error) => {        console.error('POST SSE Connection error:', error);        // 处理错误,例如显示用户友好的消息        document.getElementById('sse-status').innerText = 'Connection Error!';    });    myPostEventSource.addEventListener('close', () => {        console.log('POST SSE Connection closed.');        document.getElementById('sse-status').innerText = 'Connection Closed';    });    document.getElementById('sse-status').innerText = 'Connecting...';}// 停止连接的函数function stopSseConnection() {    if (myPostEventSource) {        myPostEventSource.close();    }}// 假设你的HTML中有两个按钮和一个输出区域// <button onclick="startSseConnection()">Start SSE</button>// <button onclick="stopSseConnection()">Stop SSE</button>// <div id="sse-status"></div>// <pre id="sse-output"></pre>

代码解析和关键点:

    PostEventSource 类:

      这是一个自定义的 JavaScript 类,用于封装 SSE 的连接和事件处理逻辑。

      constructor(url, options)

        url: SSE 服务的 URL。options: 配置对象,最重要的就是 payload(POST 请求体)和 headersmethod: 强制设置为 POST(或其他你需要的 HTTP 方法)。headers: 包含 Content-Type: application/json 和 Accept: text/event-stream 是标准做法。reconnectInterval 和 maxRetries: 用于控制自动重连行为。controller: 使用 AbortController 来实现手动关闭连接(即取消 fetch 请求)。decoderTextDecoder 用于将二进制 Uint8Array 数据解码成字符串。buffer: 用于处理分块接收的数据流,确保 SSE 事件的完整性。

    connect() 方法:

      这是核心的连接逻辑。它使用 fetch(this.url, fetchOptions) 发送 HTTP 请求。fetchOptions.body: 根据 method 类型决定是否包含 JSON.stringify(this.options.payload)response.body.getReader()  获取响应体的一个 ReadableStreamDefaultReader,用于逐块读取数据。while (true) 循环和 reader.read()  这是处理流式响应的关键。它会持续读取数据块,直到 done为 true(表示流结束)。this.buffer += this.decoder.decode(value, { stream: true })  解码接收到的二进制数据,并将其添加到缓冲区。{ stream: true } 表示可能还有更多数据后续会来。this._processBuffer()  处理缓冲区中的数据,解析 SSE 事件。

    _processBuffer() 方法:

      负责将接收到的数据流解析成独立的 SSE 事件。split('\n\n')  SSE 规范规定,每个事件块由两个换行符 \n\n 分隔。messages.pop()  最后一个 messages 元素可能是不完整的事件,所以将其放回 buffer,等待更多数据。解析 event:data:id: 行:  根据 SSE 规范解析每一行,提取事件类型、数据和 ID。data: 行可以有多行,需要拼接。

    _handleReconnect() 方法:

      在 connect() 捕获到错误时调用。它会检查重试次数是否达到上限,如果没有,则在 reconnectInterval 后再次调用 connect() 尝试重连。

    addEventListener()removeEventListener()_emit()

      这些方法提供了类似 EventSource 的事件注册/触发机制,允许你监听 openmessageerrorclose 以及服务器发送的自定义事件

    close() 方法:

      this.controller.abort()  这是关键!它会中断正在进行的 fetch 请求,从而优雅地关闭 SSE 连接。

如何使用:

    将上述 PostEventSource 类复制到你的项目中。

    在需要启动 SSE 连接的地方,实例化 PostEventSource

    JavaScript

    const myPostEventSource = new PostEventSource(YOUR_SSE_URL, {    payload: { your: 'data', goes: 'here' }, // 你的 POST 请求体    headers: {        'Authorization': 'Bearer your_token',        'X-Another-Header': 'value'    }});

    添加事件监听器来处理接收到的事件:

    JavaScript

    myPostEventSource.addEventListener('message', (event) => {    console.log('SSE Data:', event.data);    // 更新 UI 或其他逻辑});myPostEventSource.addEventListener('error', (error) => {    console.error('SSE Error:', error);});myPostEventSource.addEventListener('open', () => {    console.log('SSE Connection opened!');});

    在需要关闭连接时调用 close()

    JavaScript

    myPostEventSource.close();

这个实现为你提供了一个健壮的起点,你可以根据自己的具体需求进行扩展,例如更复杂的重试策略、心跳检测等。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

SSE Fetch API POST请求 EventSource Web开发
相关文章