4 changed files with 273 additions and 0 deletions
@ -0,0 +1,58 @@ |
|||
package com.xtong.zhbs.controller; |
|||
|
|||
|
|||
import com.xtong.zhbs.sse.SseEmitterServer; |
|||
import org.springframework.http.ResponseEntity; |
|||
import org.springframework.web.bind.annotation.*; |
|||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
|||
|
|||
|
|||
@CrossOrigin |
|||
@RestController |
|||
@RequestMapping("/sse") |
|||
public class TestController { |
|||
/** |
|||
* 用于创建连接 |
|||
*/ |
|||
@GetMapping("/connect/{userId}") |
|||
public SseEmitter connect(@PathVariable String userId) { |
|||
return SseEmitterServer.connect(userId); |
|||
} |
|||
|
|||
/** |
|||
* 推送给所有人 |
|||
* |
|||
* @param message |
|||
* @return |
|||
*/ |
|||
@GetMapping("/push/{message}") |
|||
public ResponseEntity<String> push(@PathVariable(name = "message") String message) { |
|||
SseEmitterServer.batchSendMessage(message); |
|||
return ResponseEntity.ok("WebSocket 推送消息给所有人"); |
|||
} |
|||
|
|||
/** |
|||
* 发送给单个人 |
|||
* |
|||
* @param message |
|||
* @param userid |
|||
* @return |
|||
*/ |
|||
@GetMapping("/push_one/{messsage}/{userid}") |
|||
public ResponseEntity<String> pushOne(@PathVariable(name = "message") String message, @PathVariable(name = "userid") String userid) { |
|||
SseEmitterServer.sendMessage(userid, message); |
|||
return ResponseEntity.ok("WebSocket 推送消息给" + userid); |
|||
} |
|||
|
|||
/** |
|||
* 关闭连接 |
|||
*/ |
|||
@GetMapping("/close/{userid}") |
|||
public ResponseEntity<String> close(@PathVariable("userid") String userid) { |
|||
SseEmitterServer.removeUser(userid); |
|||
return ResponseEntity.ok("连接关闭"); |
|||
} |
|||
|
|||
} |
|||
|
|||
|
|||
@ -0,0 +1,135 @@ |
|||
package com.xtong.zhbs.sse; |
|||
|
|||
|
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.http.MediaType; |
|||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* sse 服务器端推送消息 |
|||
*/ |
|||
public class SseEmitterServer { |
|||
|
|||
private static final Logger logger = LoggerFactory.getLogger(SseEmitterServer.class); |
|||
|
|||
/** |
|||
* 当前连接数 |
|||
*/ |
|||
private static AtomicInteger count = new AtomicInteger(0); |
|||
|
|||
/** |
|||
* 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面 |
|||
*/ |
|||
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); |
|||
|
|||
/** |
|||
* 创建用户连接并返回 SseEmitter |
|||
* |
|||
* @param userId 用户ID |
|||
* @return SseEmitter |
|||
*/ |
|||
public static SseEmitter connect(String userId) { |
|||
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
|
|||
SseEmitter sseEmitter = new SseEmitter(0L); |
|||
// 注册回调
|
|||
sseEmitter.onCompletion(completionCallBack(userId)); |
|||
sseEmitter.onError(errorCallBack(userId)); |
|||
sseEmitter.onTimeout(timeoutCallBack(userId)); |
|||
sseEmitterMap.put(userId, sseEmitter); |
|||
// 数量+1
|
|||
count.getAndIncrement(); |
|||
logger.info("创建新的sse连接,当前用户:{}", userId); |
|||
return sseEmitter; |
|||
} |
|||
|
|||
/** |
|||
* 给指定用户发送信息 |
|||
*/ |
|||
public static void sendMessage(String userId, String message) { |
|||
if (sseEmitterMap.containsKey(userId)) { |
|||
try { |
|||
// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
|
|||
sseEmitterMap.get(userId).send(message); |
|||
} catch (IOException e) { |
|||
logger.error("用户[{}]推送异常:{}", userId, e.getMessage()); |
|||
removeUser(userId); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 群发消息 |
|||
*/ |
|||
public static void batchSendMessage(String wsInfo, List<String> ids) { |
|||
ids.forEach(userId -> sendMessage(wsInfo, userId)); |
|||
} |
|||
|
|||
/** |
|||
* 群发所有人 |
|||
*/ |
|||
public static void batchSendMessage(String wsInfo) { |
|||
sseEmitterMap.forEach((k, v) -> { |
|||
try { |
|||
v.send(wsInfo, MediaType.APPLICATION_JSON); |
|||
} catch (IOException e) { |
|||
logger.error("用户[{}]推送异常:{}", k, e.getMessage()); |
|||
removeUser(k); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
/** |
|||
* 移除用户连接 |
|||
*/ |
|||
public static void removeUser(String userId) { |
|||
sseEmitterMap.remove(userId); |
|||
// 数量-1
|
|||
count.getAndDecrement(); |
|||
logger.info("移除用户:{}", userId); |
|||
} |
|||
|
|||
/** |
|||
* 获取当前连接信息 |
|||
*/ |
|||
public static List<String> getIds() { |
|||
return new ArrayList<>(sseEmitterMap.keySet()); |
|||
} |
|||
|
|||
/** |
|||
* 获取当前连接数量 |
|||
*/ |
|||
public static int getUserCount() { |
|||
return count.intValue(); |
|||
} |
|||
|
|||
private static Runnable completionCallBack(String userId) { |
|||
return () -> { |
|||
logger.info("结束连接:{}", userId); |
|||
removeUser(userId); |
|||
}; |
|||
} |
|||
|
|||
private static Runnable timeoutCallBack(String userId) { |
|||
return () -> { |
|||
logger.info("连接超时:{}", userId); |
|||
removeUser(userId); |
|||
}; |
|||
} |
|||
|
|||
private static Consumer<Throwable> errorCallBack(String userId) { |
|||
return throwable -> { |
|||
logger.info("连接异常:{}", userId); |
|||
removeUser(userId); |
|||
}; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,78 @@ |
|||
<!DOCTYPE html> |
|||
<html lang="en"> |
|||
|
|||
<head> |
|||
<meta charset="UTF-8"> |
|||
<title>SseEmitter</title> |
|||
</head> |
|||
|
|||
<body> |
|||
<button onclick="closeSse()">关闭连接</button> |
|||
<div id="message"></div> |
|||
</body> |
|||
<script> |
|||
let source = null; |
|||
|
|||
// 用时间戳模拟登录用户 |
|||
const userId = new Date().getTime(); |
|||
|
|||
if (window.EventSource) { |
|||
|
|||
// 建立连接 |
|||
source = new EventSource('http://localhost:8080/sse/connect/' + userId); |
|||
|
|||
/** |
|||
* 连接一旦建立,就会触发open事件 |
|||
* 另一种写法:source.onopen = function (event) {} |
|||
*/ |
|||
source.addEventListener('open', function(e) { |
|||
setMessageInnerHTML("建立连接。。。"); |
|||
}, false); |
|||
|
|||
/** |
|||
* 客户端收到服务器发来的数据 |
|||
* 另一种写法:source.onmessage = function (event) {} |
|||
*/ |
|||
source.addEventListener('message', function(e) { |
|||
setMessageInnerHTML(e.data); |
|||
}); |
|||
|
|||
|
|||
/** |
|||
* 如果发生通信错误(比如连接中断),就会触发error事件 |
|||
* 或者: |
|||
* 另一种写法:source.onerror = function (event) {} |
|||
*/ |
|||
source.addEventListener('error', function(e) { |
|||
if (e.readyState === EventSource.CLOSED) { |
|||
setMessageInnerHTML("连接关闭"); |
|||
} else { |
|||
console.log(e); |
|||
} |
|||
}, false); |
|||
|
|||
} else { |
|||
setMessageInnerHTML("你的浏览器不支持SSE"); |
|||
} |
|||
|
|||
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据 |
|||
window.onbeforeunload = function() { |
|||
closeSse(); |
|||
}; |
|||
|
|||
// 关闭Sse连接 |
|||
function closeSse() { |
|||
source.close(); |
|||
const httpRequest = new XMLHttpRequest(); |
|||
httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true); |
|||
httpRequest.send(); |
|||
console.log("close"); |
|||
} |
|||
|
|||
// 将消息显示在网页上 |
|||
function setMessageInnerHTML(innerHTML) { |
|||
document.getElementById('message').innerHTML += innerHTML + '<br/>'; |
|||
} |
|||
</script> |
|||
|
|||
</html> |
|||
Loading…
Reference in new issue