SpringBoot使用Websocket总结
1.添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
2.写处理器
写一个类实现org.springframework.web.socket.WebSocketHandler,
更推荐继承org.springframework.web.socket.handler.AbstractWebSocketHandler类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class TestHandler extends AbstractWebSocketHandler {
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { super.handleTextMessage(session, message); }
@Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); }
@Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { super.handlePongMessage(session, message); }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { super.afterConnectionClosed(session, status); }
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); }
@Override public boolean supportsPartialMessages() { return super.supportsPartialMessages(); } }
|
3.配置路径和拦截器
在一个Configuration类上,添加EnableWebSocket注解启动Websocket。
注入WebSocketConfigurer的Bean即可启动websocket服务端。
addInterceptors方法是添加拦截器,这个拦截器拦截在http协议转向websocket的那个请求上,如果底层使用的是servlet可以强转成ServletServerHttpRequest,在这里能拿到websocket连接时携带的param和head。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @EnableWebSocket @Configuration public class WebsocketConfig {
private static Logger log = LoggerFactory.getLogger(WebsocketConfig.class);
@Bean public WebSocketConfigurer wsc() { return registry -> registry.addHandler(new TestHandler(), "/websocket/test") .addInterceptors(new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { ServletServerHttpRequest req = (ServletServerHttpRequest) request; return true; }
@Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
} }).setAllowedOrigins("*"); }
}
|
4.技巧
拦截器返回false,则不会进行websocket协议的转换,如必须要登录才能连接的情况,可以在拦截器里拿到请求session_id来判断,也可以获取参数判断。
拦截器里的第三个参数,是一个map,放入此map的值可以从WebSocketHandler里面的WebSocketSession身上拿出来。
supportsPartialMessages回调方法是决定是否接受半包,因为websocket协议比较底层,好像Tcp协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false,这样底层会等待完整报文到达聚合后才回调。
WebSocketSession身上能设置最大报文大小,如果报文过大则会报错的,可以将限制调大。可以在afterConnectionEstablished方法里对WebSocketSession进行设置。tomcat的session默认大小就是 8*1024。
1 2 3 4 5
| @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { session.setBinaryMessageSizeLimit(8 * 1024); session.setTextMessageSizeLimit(8 * 1024); }
|
WebSocketSession是不支持并发发送消息的,要么自己做同步,要么包装WebSocketSession为ConcurrentWebSocketSessionDecorator,关于ConcurrentWebSocketSessionDecorator的使用可以参考Spring并发发送Websocket消息。
setAllowedOrigins("*")为控制跨域的,测试阶段可以设成*,不然不允许跨域链接Websocket。
百度搜在线websocket,有在线连接Websocket的网站,这样后端自己做测试比较方便。
5.原理
EnableWebSocket引入了DelegatingWebSocketConfiguration。
1 2 3 4 5 6
| @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(DelegatingWebSocketConfiguration.class) public @interface EnableWebSocket { }
|
先看DelegatingWebSocketConfiguration这个类。首先类被构造后,会自动注入WebSocketConfigurer,这个就是我们配置的Bean。
1 2 3 4 5 6
| @Autowired(required = false) public void setConfigurers(List<WebSocketConfigurer> configurers) { if (!CollectionUtils.isEmpty(configurers)) { this.configurers.addAll(configurers); } }
|
然后父类会注入一个HandlerMapping,会创建一个ServletWebSocketHandlerRegistry,调用所有的
configurer.registerWebSocketHandlers(registry);方法进行配置。
最后从registry.getHandlerMapping()获取HandlerMapping,这个HandlerMapping最终会注册到Springmvc里,接受到请求时会使用该HandlerMapping来处理请求。
1 2 3 4 5 6 7 8 9 10 11
| @Bean public HandlerMapping webSocketHandlerMapping() { ServletWebSocketHandlerRegistry registry = initHandlerRegistry(); if (registry.requiresTaskScheduler()) { TaskScheduler scheduler = defaultSockJsTaskScheduler(); Assert.notNull(scheduler, "Expected default TaskScheduler bean"); registry.setTaskScheduler(scheduler); } return registry.getHandlerMapping(); }
|
1 2 3 4 5 6 7
| private ServletWebSocketHandlerRegistry initHandlerRegistry() { if (this.handlerRegistry == null) { this.handlerRegistry = new ServletWebSocketHandlerRegistry(); registerWebSocketHandlers(this.handlerRegistry); } return this.handlerRegistry; }
|
1 2 3 4 5 6
| @Override protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { for (WebSocketConfigurer configurer : this.configurers) { configurer.registerWebSocketHandlers(registry); } }
|
下面是获取HandlerMapping的过程
这里会创建一个WebSocketHandlerMapping,并设置urlMap,urlMap里面存放的是url 和对应的HttpRequestHandle。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public AbstractHandlerMapping getHandlerMapping() { Map<String, Object> urlMap = new LinkedHashMap<>(); for (ServletWebSocketHandlerRegistration registration : this.registrations) { MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings(); mappings.forEach((httpHandler, patterns) -> { for (String pattern : patterns) { urlMap.put(pattern, httpHandler); } }); } WebSocketHandlerMapping hm = new WebSocketHandlerMapping(); hm.setUrlMap(urlMap); hm.setOrder(this.order); if (this.urlPathHelper != null) { hm.setUrlPathHelper(this.urlPathHelper); } return hm; }
|
这里是将我们的配置构建成一个WebSocketHttpRequestHandler来处理请求,其中包含了路径,拦截器,WebSocketHandler,HandshakeInterceptor,HandshakeHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override protected void addWebSocketHandlerMapping(MultiValueMap<HttpRequestHandler, String> mappings, WebSocketHandler webSocketHandler, HandshakeHandler handshakeHandler, HandshakeInterceptor[] interceptors, String path) {
WebSocketHttpRequestHandler httpHandler = new WebSocketHttpRequestHandler(webSocketHandler, handshakeHandler);
if (!ObjectUtils.isEmpty(interceptors)) { httpHandler.setHandshakeInterceptors(Arrays.asList(interceptors)); } mappings.add(httpHandler, path); }
|
HandshakeHandler这个类也是可以自定义的,他与协议转换有关,spring会自动根据底层实现来决定这个是什么,一般不要自己写。
HttpRequestHandle处理过程如下,先调用拦截器的before,再掉用HandshakeHandler做协议转换,再调用拦截器的after。这里调用拦截器前创建的Map attributes,最终会设置到WebsocketSession上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Override public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws ServletException, IOException {
ServerHttpRequest request = new ServletServerHttpRequest(servletRequest); ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
HandshakeInterceptorChain chain = new HandshakeInterceptorChain(this.interceptors, this.wsHandler); HandshakeFailureException failure = null;
try { if (logger.isDebugEnabled()) { logger.debug(servletRequest.getMethod() + " " + servletRequest.getRequestURI()); } Map<String, Object> attributes = new HashMap<>(); if (!chain.applyBeforeHandshake(request, response, attributes)) { return; } this.handshakeHandler.doHandshake(request, response, this.wsHandler, attributes); chain.applyAfterHandshake(request, response, null); } catch (HandshakeFailureException ex) { failure = ex; } catch (Throwable ex) { failure = new HandshakeFailureException("Uncaught failure for request " + request.getURI(), ex); } finally { if (failure != null) { chain.applyAfterHandshake(request, response, failure); response.close(); throw failure; } response.close(); } }
|
下面是协议升级前做的操作,创建StandardWebSocketSession时将attr也就是那个拦截器里的map设置进去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Override public void upgrade(ServerHttpRequest request, ServerHttpResponse response, @Nullable String selectedProtocol, List<WebSocketExtension> selectedExtensions, @Nullable Principal user, WebSocketHandler wsHandler, Map<String, Object> attrs) throws HandshakeFailureException {
HttpHeaders headers = request.getHeaders(); InetSocketAddress localAddr = null; try { localAddr = request.getLocalAddress(); } catch (Exception ex) { } InetSocketAddress remoteAddr = null; try { remoteAddr = request.getRemoteAddress(); } catch (Exception ex) { } StandardWebSocketSession session = new StandardWebSocketSession(headers, attrs, localAddr, remoteAddr, user); StandardWebSocketHandlerAdapter endpoint = new StandardWebSocketHandlerAdapter(wsHandler, session);
List<Extension> extensions = new ArrayList<>(); for (WebSocketExtension extension : selectedExtensions) { extensions.add(new WebSocketToStandardExtensionAdapter(extension)); }
upgradeInternal(request, response, selectedProtocol, extensions, endpoint); }
|
Springmvc处理过程
在DispatcherServlet里面,websocket升级的请求是由HttpRequestHandlerAdapter处理的,这个类是在
org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport#httpRequestHandlerAdapter方法里注册进来的。
处理过程也很简单,直接强转调用。
1 2 3 4 5 6 7 8
| @Override @Nullable public ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
((HttpRequestHandler) handler).handleRequest(request, response); return null; }
|
完