服务器推送消息给客户端之Server-Sent Events

背景:

公司内部需要开发一个供全公司使用的chatgpt页面,需要实现类似chatgpt页面的流式消息。

介绍:

Server-Sent Events(SSE)是一个可以让服务端主动推送消息给客户端,但是客户端无法发送消息给服务器的技术,并且可以不间断的持续发送数据流。类似视频一样,你点击一个视频,服务器并不会一次性把视频的全部数据传输给你,只会传给你对应时间的数据流。

使用:

SSE支持HTTP协议,目前测试的几个浏览器都支持(Bing,chrome, geogle),测试浏览器支持不支持也很简单,在浏览器中输入window.EventSource,查看输出就可以了。如下图这样的输出,就代表该浏览器支持SSE。

使用方法也很简单,一行代码即可建立连接

var source = EventSource("http://localhost:8080/stream/sse");   

API介绍:

open: 一旦和服务器建立连接,就会触发这个方法。

        source.onopen = function(event) {
           console.log('open')
      }

message:客户端接收到服务器推送的数据,会在这个方法下进行处理。

        source.onmessage = function(event) {
console.log(event.data)
}

error:如果连接中断,服务端返回数据失败,会自动重连。

        source.onerror = function(event) {
           console.log('error', event)
      }

close(): 关闭连接。

注意,close()是API,这里自定义了一个close事件,服务器在发送数据完成后,主动调用该事件即可关闭连接,不调用source.close()方法的话,客户端会一直重连

        source.addEventListener('close',function(event) {
           console.log('close')
        source.close();
      });

完整代码:

    var source = new EventSource("http://localhost:8080/stream/sse");   
   source.onopen = function(event) {
       console.log('open')
  }
   source.onmessage = function(event) {
console.log(event.data)
}
source.onerror = function(event) {
       console.log('error', event)
  }
   source.addEventListener('closeClient', function (event) {
      console.log('closeClient');
      source.close();
  }, false);

服务器端:

调用语法:

单个\n代表换行,\n\n代表单次发送结束。

data: value

示例: \n\n结尾   \n可以对长数据进行换行
1.response.getWriter().write("data: value\n\n")。
2.response.getWriter().write("data: value1\nvalue\n\n")。

event: event

示例:  这样服务器就可以调用客户端的close事件关闭连接 当然close事件需要自己定义
response.getWriter().write("event: close\n\n")。

retry: num

示例:   客户端连接不上服务器,多久执行一次重连 单位:ms
response.getWriter().write("retry: 10000\n\n")。 // 10S重连一次

id: value

示例:   消息的标识
response.getWriter().write("id: 唯一key\n\n")。

: value

示例: \n结尾  :前面没有东西,表示注释,可以用来保持连接不中断
response.getWriter().write(": heart-beat\n\n")。

原生方法,不推荐。里面有很多坑,一个不小心可能就会导致收不到数据,比如event后都需要跟data: \n\n才能生效, 因为event需要默认带上data,比如以下代码不生效:

    response.getWriter().write("event: closeClient\n data: \n\n")。 
   response.getWriter().write("data: \n\n event: closeClient\n ")。
//原生 不推荐
package com.example.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@Slf4j
@Controller
@RequestMapping(value = "/stream")
@ResponseBody
public class SseController {
   @GetMapping("/sse")
   public void sseTest(HttpServletRequest request, HttpServletResponse response) throws IOException, InterruptedException {
       response.setContentType("text/event-stream");
       response.setHeader("Access-Control-Allow-Origin", "*");
       response.setHeader("Cache-Control","no-cache");
       response.setCharacterEncoding("UTF-8");
       response.getWriter().write("retry: 10000\n");
       String str = "测试SSE";
       for (int i = 0;i< str.length();++i) {
           Thread.sleep(100);
           String t = "data: "+str.charAt(i) + "\n"+"data: "+str.charAt(i) + "\n\n";
           try {
               response.getWriter().write(t);
               response.getWriter().flush();
          } catch (IOException e) {
               throw new RuntimeException(e);
          }
      }
       response.getWriter().write("event: closeClient\n");
       response.getWriter().write("data: closeClient\n\n");
       response.getWriter().flush();
  }

}

SpringBoot的SseEmitter

在SpringBoot中有封装好的轮子供我们与SSE进行交互,我们只需要关注业务代码即可。

前端代码:

<!doctype html>
<html lang="en">
<head>
   <title>SseTest</title>
</head>
<body>
<div>SseTest</div>
<div id="panel"></div>
</body>
</html>
<script>
   var source = new EventSource('http://localhost:8080/stream/sse');
   source.onmessage = function (event) {
       text = document.getElementById('panel').innerText;
       text += event.data;
       document.getElementById('panel').innerText = text;
  };
   source.onopen = function (event) {
       text = document.getElementById('panel').innerText;
       text += 'open: ';
       console.log(event);
       document.getElementById('panel').innerText = text;
  };
   source.onerror = function(event) {
       console.log('error', event)
  }
   source.addEventListener('closeClient', function (event) {
       console.log('closeClient');
       source.close();
  }, false);
</script>

JAVA服务端代码

package com.example.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@Slf4j
@Controller
@RequestMapping(value = "/stream")
@ResponseBody
public class SseController {
   //定义一个公共对象
   private SseEmitter sseEmitter;

   @GetMapping(value = "/sse", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
   public SseEmitter sseTest(HttpServletRequest request, HttpServletResponse response) throws IOException {
       response.setContentType("text/event-stream");
       response.setHeader("Cache-Control","no-cache");
       response.setCharacterEncoding("UTF-8");
       //新建连接,设置超时时间
       sseEmitter = new SseEmitter(180000L);
       // 设置前端的重试时间为10s
       sseEmitter.send(SseEmitter.event().reconnectTime(10000).data("reconnectTime"));
       //设置超时的回调
       sseEmitter.onTimeout(() -> {
           System.out.println("timeout");
      });
       //设置断开连接的回调
       sseEmitter.onCompletion(() -> System.out.println("completion"));
       return sseEmitter;
  }

   //发送消息
   @GetMapping(value = "/sse/send")
   public void send(HttpServletRequest request, HttpServletResponse response,String message) throws IOException, InterruptedException {
       String str = message;
       for (int i = 0; i < str.length();++i) {
           Thread.sleep(100L);
           sseEmitter.send(str);
      }
  }

   //关闭连接
   @GetMapping(value = "/sse/close")
   public void close(HttpServletRequest request, HttpServletResponse response) throws IOException{
       //调用自定义的关闭事件
       sseEmitter.send(SseEmitter.event().name("closeClient").data("服务端主动关闭连接"));
  }
}

打开浏览器看一下效果

是不是有chatgpt那味儿了。

参考链接

Server-Sent Events 教程 – 阮一峰的网络日志 (ruanyifeng.com)

EventSource – Web API 接口参考 | MDN (mozilla.org)

【SpringBoot WEB 系列】SSE 服务器发送事件详解 – 一灰灰Blog – 博客园 (cnblogs.com)

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇