From 9a58401dcb8afc5303926e5dd630aa03f1de44ba Mon Sep 17 00:00:00 2001 From: "453530270@qq.com" Date: Fri, 25 Aug 2023 17:00:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E9=9B=86=E6=88=90=20sse?= =?UTF-8?q?=EF=BC=88=E6=9C=8D=E5=8A=A1=E7=AB=AF=E5=8D=95=E5=90=91=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8E=A8=E9=80=81=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 +- .../zhbs/controller/PassFlowController.java | 66 +++++++++++ .../zhbs/service/SeverSentEventService.java | 110 ++++++++++++++++++ .../java/com/xtong/zhbs/utils/AjaxResult.java | 8 ++ src/main/resources/static/index.html | 2 +- 5 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/xtong/zhbs/controller/PassFlowController.java create mode 100644 src/main/java/com/xtong/zhbs/service/SeverSentEventService.java diff --git a/README.md b/README.md index 5dd6b9f..5b0b400 100644 --- a/README.md +++ b/README.md @@ -5,4 +5,14 @@ 2、dm 数据库在执行插入过程后,不能返回相应的主键id ## 参考网址 -1、 (https://blog.csdn.net/u014595589/article/details/90760618)[Spring Boot使用EventSource和观察者模式实现服务端推送] +1、 [Spring Boot使用EventSource和观察者模式实现服务端推送](https://blog.csdn.net/u014595589/article/details/90760618) + +2、[了解JS三种实时通信方式——Eventsource、websocket与socket.io之间的差异和优缺点](https://blog.csdn.net/weixin_42508580/article/details/130931268) + +3、 [documentationPluginsBootstrapper 错误](https://www.jianshu.com/p/1ea987c75073) + +4、[实时通信的服务器推送机制 EventSource(SSE) 简介,附 go 实现示例](https://blog.csdn.net/DisMisPres/article/details/130539861) + +5 [SpringBoot使用WebSocket](https://blog.csdn.net/weixin_44185837/article/details/124942482) + +[【ServerSentEvents】服务端单向消息推送](http://www.manongjc.com/detail/40-envenmxrmjsschv.html) \ No newline at end of file diff --git a/src/main/java/com/xtong/zhbs/controller/PassFlowController.java b/src/main/java/com/xtong/zhbs/controller/PassFlowController.java new file mode 100644 index 0000000..ccc3bb2 --- /dev/null +++ b/src/main/java/com/xtong/zhbs/controller/PassFlowController.java @@ -0,0 +1,66 @@ +package com.xtong.zhbs.controller; + +import com.alibaba.fastjson.JSONObject; +import com.xtong.zhbs.bean.PassengerFlow; +import com.xtong.zhbs.service.PassengerFlowService; +import com.xtong.zhbs.utils.AjaxResult; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.context.request.async.DeferredResult; + + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.List; + +/** + * 实时客流 + */ +@Api(tags = "客流趋势") +@Controller +@RequestMapping(value = "/real/") +public class PassFlowController { + @Autowired + private PassengerFlowService passengerFlowService; + + @ApiImplicitParam(name = "page",value = "姓名",required = true) + @ApiOperation(value = "客流列表") + @GetMapping(value = "/psflist") + @ResponseBody + public JSONObject psflist(@RequestParam String page, @RequestParam("limit") String limit){ + int ipage = 1; + int ilimit=10; + List passengerFlowList = passengerFlowService.getPassengerFlowList(ipage,ilimit); + return AjaxResult.renderData(passengerFlowList); + } + + /** + * 数据流方式输出内容 + * @param response + * @return + * @throws IOException + */ + @ResponseBody + @RequestMapping(value = "/getMsg", produces="text/event-stream;charset=UTF-8") + DeferredResult getMsg(HttpServletResponse response) throws IOException { + response.setContentType("text/event-stream"); + response.setCharacterEncoding("UTF-8"); + response.setStatus(200); + List passengerFlowList = passengerFlowService.getPassengerFlowList(1,10); +// msgService.removeErrorResponse(); +// msgService.getListRes().add(response); + if(!response.getWriter().checkError()){ + response.getWriter().write("data:hello\n\n"); + response.getWriter().flush(); + } + DeferredResult df = new DeferredResult(60000l); + return df; + } +} diff --git a/src/main/java/com/xtong/zhbs/service/SeverSentEventService.java b/src/main/java/com/xtong/zhbs/service/SeverSentEventService.java new file mode 100644 index 0000000..abeb92b --- /dev/null +++ b/src/main/java/com/xtong/zhbs/service/SeverSentEventService.java @@ -0,0 +1,110 @@ +package com.xtong.zhbs.service; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 服务单向消息传递,SeverSentEvents web端自动重连 + */ +@Service +public class SeverSentEventService { + private static Log logger = LogFactory.getLog(SeverSentEventService.class); + /** + * SSE客户端管理容器 + */ + private static final Map SESSION_MAP = new ConcurrentHashMap<>(); + + /** + * 连接超时时限1小时 (客户端不活跃的时长上限?) + */ + private static final Long TIME_OUT = 1000L * 60L * 60L; + + /** + * 根据客户端标识创建SSE连接 + * + * @param clientId + * @return SseEmitter + * @author Cloud9 + * @date 2022/11/10 10:17 + */ + public SseEmitter createConnection(String clientId) { + SseEmitter emitter = new SseEmitter(TIME_OUT); + emitter.onTimeout(() -> { + logger.info(" - - - - SSE连接超时 " + clientId + " - - - - "); + SESSION_MAP.remove(clientId); + }); + emitter.onCompletion(() -> { + logger.info(" - - - - - SSE会话结束 " + clientId + " - - - - "); + SESSION_MAP.remove(clientId); + }); + emitter.onError(exception -> { + logger.error("- - - - - SSE连接异常 " + clientId + " - - - - -", exception); + SESSION_MAP.remove(clientId); + }); + + SESSION_MAP.put(clientId, emitter); + return emitter; + } + + /** + * 根据客户端令牌标识,向目标客户端发送消息 + * + * @param clientId + * @param message + * @author Cloud9 + * @date 2022/11/10 10:17 + */ + public void sendMessageToClient(String clientId, String message) { + SseEmitter sseEmitter = SESSION_MAP.get(clientId); + + /* 如果客户端不存在,不执行发送 */ + if (Objects.isNull(sseEmitter)) return; + try { + SseEmitter.SseEventBuilder builder = SseEmitter + .event() + /* 不要添加以下SSE的数据头,导致JS不会触发钩子方法onmessage */ + /* https://oomake.com/question/9520150 */ + // .comment("注释内容....") + // .name("名字....") + // .id(UUID.randomUUID().toString()) + .data(message); + sseEmitter.send(builder); + } catch (Exception e) { + logger.error("- - - - Web连接会话中断, ClientId:" + clientId + "已经退出 - - - - ", e); + /* 结束服务端这里的会话, 释放会话资源 */ + sseEmitter.complete(); + SESSION_MAP.remove(clientId); + } + } + + /** + * 给所有客户端发送消息 + * + * @param message + * @author Cloud9 + * @date 2022/11/10 10:18 + */ + public void sendMessageToClient(String message) { + SseEmitter.SseEventBuilder builder = SseEmitter + .event() + .data(message); + + for (String clientId : SESSION_MAP.keySet()) { + SseEmitter emitter = SESSION_MAP.get(clientId); + if (Objects.isNull(emitter)) continue; + try { + emitter.send(builder); + } catch (Exception e) { + logger.error("- - - - Web连接会话中断, ClientId:" + emitter + "已经退出 - - - - ", e); + emitter.complete(); + SESSION_MAP.remove(clientId); + } + } + } +} diff --git a/src/main/java/com/xtong/zhbs/utils/AjaxResult.java b/src/main/java/com/xtong/zhbs/utils/AjaxResult.java index 20240c7..b36c991 100644 --- a/src/main/java/com/xtong/zhbs/utils/AjaxResult.java +++ b/src/main/java/com/xtong/zhbs/utils/AjaxResult.java @@ -32,6 +32,14 @@ public class AjaxResult { return result; } + public static JSONObject renderData(List datalist){ + JSONObject result = new JSONObject(); + result.put("code",200); + result.put("msg","success"); + result.put("data",datalist); + return result; + } + /** * 异常信息 * @param msg diff --git a/src/main/resources/static/index.html b/src/main/resources/static/index.html index 2e66d51..f1d8cb8 100644 --- a/src/main/resources/static/index.html +++ b/src/main/resources/static/index.html @@ -15,7 +15,7 @@ var evtSource = new EventSource('/real/psflist?page=1&limit=10'); evtSource.addEventListener('message',function(evt){ console.log(evt) - document.querySelector("ul").innerHTML+=("
  • "+JSON.parse(evt.data).message+"
  • ") + document.querySelector("ul").innerHTML+=("
  • "+JSON.parse(evt.data).groupName+"
  • ") }) // evtSource.onmessage=(evt)=>{ // console.log(evt.data)