Browse Source

尝试集成 sse(服务端单向消息推送)

master
453530270@qq.com 2 years ago
parent
commit
9a58401dcb
  1. 12
      README.md
  2. 66
      src/main/java/com/xtong/zhbs/controller/PassFlowController.java
  3. 110
      src/main/java/com/xtong/zhbs/service/SeverSentEventService.java
  4. 8
      src/main/java/com/xtong/zhbs/utils/AjaxResult.java
  5. 2
      src/main/resources/static/index.html

12
README.md

@ -5,4 +5,14 @@
2、dm 数据库在执行插入过程后,不能返回相应的主键id
## 参考网址
1、 (https://blog.csdn.net/u014595589/article/details/90760618)[Spring Boot使用EventSource和观察者模式实现服务端推送]
1、 [Spring Boot使用EventSource和观察者模式实现服务端推送](https://blog.csdn.net/u014595589/article/details/90760618)
2、[了解JS三种实时通信方式——Eventsource、websocket与socket.io之间的差异和优缺点](https://blog.csdn.net/weixin_42508580/article/details/130931268)
3、 [documentationPluginsBootstrapper 错误](https://www.jianshu.com/p/1ea987c75073)
4、[实时通信的服务器推送机制 EventSource(SSE) 简介,附 go 实现示例](https://blog.csdn.net/DisMisPres/article/details/130539861)
5 [SpringBoot使用WebSocket](https://blog.csdn.net/weixin_44185837/article/details/124942482)
[【ServerSentEvents】服务端单向消息推送](http://www.manongjc.com/detail/40-envenmxrmjsschv.html)

66
src/main/java/com/xtong/zhbs/controller/PassFlowController.java

@ -0,0 +1,66 @@
package com.xtong.zhbs.controller;
import com.alibaba.fastjson.JSONObject;
import com.xtong.zhbs.bean.PassengerFlow;
import com.xtong.zhbs.service.PassengerFlowService;
import com.xtong.zhbs.utils.AjaxResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
/**
* 实时客流
*/
@Api(tags = "客流趋势")
@Controller
@RequestMapping(value = "/real/")
public class PassFlowController {
@Autowired
private PassengerFlowService passengerFlowService;
@ApiImplicitParam(name = "page",value = "姓名",required = true)
@ApiOperation(value = "客流列表")
@GetMapping(value = "/psflist")
@ResponseBody
public JSONObject psflist(@RequestParam String page, @RequestParam("limit") String limit){
int ipage = 1;
int ilimit=10;
List<PassengerFlow> passengerFlowList = passengerFlowService.getPassengerFlowList(ipage,ilimit);
return AjaxResult.renderData(passengerFlowList);
}
/**
* 数据流方式输出内容
* @param response
* @return
* @throws IOException
*/
@ResponseBody
@RequestMapping(value = "/getMsg", produces="text/event-stream;charset=UTF-8")
DeferredResult<String> getMsg(HttpServletResponse response) throws IOException {
response.setContentType("text/event-stream");
response.setCharacterEncoding("UTF-8");
response.setStatus(200);
List<PassengerFlow> passengerFlowList = passengerFlowService.getPassengerFlowList(1,10);
// msgService.removeErrorResponse();
// msgService.getListRes().add(response);
if(!response.getWriter().checkError()){
response.getWriter().write("data:hello\n\n");
response.getWriter().flush();
}
DeferredResult<String> df = new DeferredResult<String>(60000l);
return df;
}
}

110
src/main/java/com/xtong/zhbs/service/SeverSentEventService.java

@ -0,0 +1,110 @@
package com.xtong.zhbs.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* 服务单向消息传递SeverSentEvents web端自动重连
*/
@Service
public class SeverSentEventService {
private static Log logger = LogFactory.getLog(SeverSentEventService.class);
/**
* SSE客户端管理容器
*/
private static final Map<String, SseEmitter> SESSION_MAP = new ConcurrentHashMap<>();
/**
* 连接超时时限1小时 客户端不活跃的时长上限
*/
private static final Long TIME_OUT = 1000L * 60L * 60L;
/**
* 根据客户端标识创建SSE连接
*
* @param clientId
* @return SseEmitter
* @author Cloud9
* @date 2022/11/10 10:17
*/
public SseEmitter createConnection(String clientId) {
SseEmitter emitter = new SseEmitter(TIME_OUT);
emitter.onTimeout(() -> {
logger.info(" - - - - SSE连接超时 " + clientId + " - - - - ");
SESSION_MAP.remove(clientId);
});
emitter.onCompletion(() -> {
logger.info(" - - - - - SSE会话结束 " + clientId + " - - - - ");
SESSION_MAP.remove(clientId);
});
emitter.onError(exception -> {
logger.error("- - - - - SSE连接异常 " + clientId + " - - - - -", exception);
SESSION_MAP.remove(clientId);
});
SESSION_MAP.put(clientId, emitter);
return emitter;
}
/**
* 根据客户端令牌标识向目标客户端发送消息
*
* @param clientId
* @param message
* @author Cloud9
* @date 2022/11/10 10:17
*/
public void sendMessageToClient(String clientId, String message) {
SseEmitter sseEmitter = SESSION_MAP.get(clientId);
/* 如果客户端不存在,不执行发送 */
if (Objects.isNull(sseEmitter)) return;
try {
SseEmitter.SseEventBuilder builder = SseEmitter
.event()
/* 不要添加以下SSE的数据头,导致JS不会触发钩子方法onmessage */
/* https://oomake.com/question/9520150 */
// .comment("注释内容....")
// .name("名字....")
// .id(UUID.randomUUID().toString())
.data(message);
sseEmitter.send(builder);
} catch (Exception e) {
logger.error("- - - - Web连接会话中断, ClientId:" + clientId + "已经退出 - - - - ", e);
/* 结束服务端这里的会话, 释放会话资源 */
sseEmitter.complete();
SESSION_MAP.remove(clientId);
}
}
/**
* 给所有客户端发送消息
*
* @param message
* @author Cloud9
* @date 2022/11/10 10:18
*/
public void sendMessageToClient(String message) {
SseEmitter.SseEventBuilder builder = SseEmitter
.event()
.data(message);
for (String clientId : SESSION_MAP.keySet()) {
SseEmitter emitter = SESSION_MAP.get(clientId);
if (Objects.isNull(emitter)) continue;
try {
emitter.send(builder);
} catch (Exception e) {
logger.error("- - - - Web连接会话中断, ClientId:" + emitter + "已经退出 - - - - ", e);
emitter.complete();
SESSION_MAP.remove(clientId);
}
}
}
}

8
src/main/java/com/xtong/zhbs/utils/AjaxResult.java

@ -32,6 +32,14 @@ public class AjaxResult {
return result;
}
public static JSONObject renderData(List datalist){
JSONObject result = new JSONObject();
result.put("code",200);
result.put("msg","success");
result.put("data",datalist);
return result;
}
/**
* 异常信息
* @param msg

2
src/main/resources/static/index.html

@ -15,7 +15,7 @@
var evtSource = new EventSource('/real/psflist?page=1&limit=10');
evtSource.addEventListener('message',function(evt){
console.log(evt)
document.querySelector("ul").innerHTML+=("<li>"+JSON.parse(evt.data).message+"</li>")
document.querySelector("ul").innerHTML+=("<li>"+JSON.parse(evt.data).groupName+"</li>")
})
// evtSource.onmessage=(evt)=>{
// console.log(evt.data)

Loading…
Cancel
Save