一 Spring Boot实现实现STOMP协议下的WebSocket
1.1 WebSocket简介
WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道,两者之间直接可以数据互相传送。
实现WebSocket的三种方式:
- 使用Tomcat的WebSocket实现
- 使用Spring的WebSocket实现,需要Spring 4.x及以上版本,并且使用了socketjs,对不支持WebSocket的浏览器可以模拟WebSocket的使用
- 使用Netty的WebSocket实现,高性能、高可靠性。
1.2 STOMP简介
直接使用WebSocket就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议,因此就需要我们定义应用之间所发送消息的语义,还需要确保链接的两端都能遵循这些语义。
就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。与HTTP请求和相应类似,STOMP帧由命令、一个或多个头信息及负载所组成。例如,如下就是发送数据的一个STOMP帧:
>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20
{"message":"Marco!"}
- SEND:STOMP命令,表明发送一些内容
- destination:头信息,用来表示消息发送到哪里
- content-length:头信息,用来表示负载内容的大小
- 空行
- 帧内容(负载内容)
1.3 STOMP协议分析
常用的STOMP的命令有:
- CONNECT:初始化一个数据流或TCP连接发送CONNECT帧到服务端。
- CONNECTED:如果服务器接收了连接意图,它会返回一个CONNECTED帧
- SEND:客户端主动发送消息到服务器。
- SUBSCRIBE:客户端注册给定的目的地,被订阅的目的地收到的任何消息将通过MESSAGE Frame发送给Client。ACK控制着确认模式。
- UNSUBSCRIBE:用来移除一个已经存在的订阅,一旦一个订阅被从连接中取消,那么客户端就再也不会收到来自这个订阅的消息
- BEGIN:用于开启一个事务。这种情况下的事务适用于发送消息和确认已经收到的消息。在一个事务期间,任何发送和确认的动作都会被当做事务的一个原子操作
- COMMIT:用来提交事务
- ABORT:终止正在执行的事务
- ACK:用来在client和client-individual模式下确认已经收到一个订阅消息的操作
- NACK:它告诉服务端客户端没有处理该消息
- DISCONNECT:客户端可以通过DISCONNECT帧表示正常断开链接
二 Spring Boot实现
使用Spring Boot + WebSocket + SocketJS + STOMP实现实时通讯功能
2.1 环境
- IDEA
- JDK 8
- Spring Boot
- Vue
- SocketJS
- STOMP
整个项目代码如下:
springbootwebsocket.zip
2.2 引入依赖
在项目中引入如下依赖:
<!-- WebSocket 依赖 Start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- WebSocket 依赖 End -->
2.3 配置
在Spring中启用STOMP通讯不用我们自己实现原生态的帧,Spring的消息功能是基于代理模式构建。如果需要开启STOMP,只需要在WebSocket配置类上使用**@EnableWebSocketMessageBroker**,并实现WebSocketMessageBrokerConfigurer接口。
在Spring中,AbstractWebSocketMessageBrokerConfigurer类,该类已经被标记为废弃。而AbstractWebSocketMessageBrokerConfigurer类的全部功能可以拆分为WebSocketMessageBrokerConfigurer接口和AbstractMessageBrokerConfiguration抽象类。
其中WebSocketMessageBrokerConfigurer接口主要方法及描述如下:
/**
* 添加这个Endpoint, 这样在客户端通过WebSocket连接上服务, 即配置的WebSocket的地址, 并且可以指定是否使用socketjs
*/
default void registerStompEndpoints(StompEndpointRegistry registry) {
}
/**
* 配置发送和接收的消息参数, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureWebSocketTransport(WebSocketTransportRegistration registry) {
}
/**
* 设置输入消息通道的线程数和拦截器, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureClientInboundChannel(ChannelRegistration registration) {
}
/**
* 设置输出通道的线程数和拦截器, 默认线程为1, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureClientOutboundChannel(ChannelRegistration registration) {
}
/**
* 自定义控制器方法的参数类型
*/
default void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
}
/**
* 自定义控制器方法的返回值类型
*/
default void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
}
/**
* 添加自定义消息转换器。Spring提供多种默认的消息转换器.
* @return 返回false不会添加转换器;返回true会添加默认的消息转换器
*/
default boolean configureMessageConverters(List<MessageConverter> messageConverters) {
return true;
}
/**
* 配置消息代理, 哪种类型的消息会进行代理处理
*/
default void configureMessageBroker(MessageBrokerRegistry registry) {
}
在AbstractMessageBrokerConfiguration的子类中有WebSocketMessageBrokerConfigurationSupport类,该类中有:
public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
@Bean
public WebSocketHandler subProtocolWebSocketHandler() {
return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
}
}
这个Bean也是WebSocketHandler的一种实现,专门处理子协议(内容交换协议)的WebSocketHandler实现。通过这个类,Spring将STOMP over WebSocket完美的封装起来。
完整的配置类WebSocketConfig如下:
/**
* 配置基于STOMP的WebSocket
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer {
/**
* 添加这个Endpoint, 这样就可以在网页中通过WebSocket连接上服务
* 也就是配置的WebSocket服务地址, 并且可以指定是否使用SocketJS
* @param registry 注册
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
/*
* 1、将/{serviceName}/websocket路径注册为STOMP的端点
* 用户连接这个端点后就可以进行WebSocket通信, 支持socketjs
* 2、setAllowedOrigins("*")表示可以从任何地方来访问(跨域)
* 3、withSockJS()表示支持SocketJS访问
* 4、addInterceptors() 添加自定义握手拦截器
* 5、setHandshakeHandler() 添加用户认证拦截器, 封装了用户的信息
*/
registry.addEndpoint("/websocket")
.setAllowedOrigins("*")
.addInterceptors(new WebSocketHandshakeInterceptor())
.setHandshakeHandler(new CustomPrincipalHandshakeHandler())
.withSockJS();
}
/**
* 配置发送与接收的消息参数, 可以指定消息字节大小, 缓存大小, 发送超时时间
* @param registry 注册器
*/
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
/*
* 1、setMessageSizeLimit() 设置消息缓存的字节大小, 单位为字节
* 2、setSendBufferSizeLimit() 设置WebSocket 会话时, 缓存的字节大小, 单位为字节
* 3、setSendTimeLimit() 设置消息发送会话超时时间, 单位为毫秒
*/
registry.setMessageSizeLimit(102400)
.setSendBufferSizeLimit(10240)
.setSendTimeLimit(10000);
}
/**
* 设置输入通道的线程数, 默认线程数为1, 可以自定义线程数, 最大线程数, 线程存活时间等
* @param registration 注册器
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
/*
* 配置消息线程池
* 1、corePoolSize() 配置核心线程池, 当线程数小于此配置时, 不管线程中有无空闲的线程, 都会产生新线程处理任务
* 2、maxPoolSize() 配置线程池最大数, 当线程池等于此配置时, 不会产生新线程
* 3、keepAliveSeconds() 线程池维护线程所允许的空闲时间, 单位为秒
*/
registration.taskExecutor()
.corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
/*
* 添加STOMP自定义拦截器
* 消息拦截器, 实现ChannelInterceptor接口
*/
registration.interceptors(new WebSocketChannelInterceptor());
}
/**
* 设置输出通道的线程数, 默认线程数为1, 可以自定义线程数, 最大线程数, 线程存活时间等
* @param registration 注册器
*/
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(10)
.maxPoolSize(20)
.keepAliveSeconds(60);
}
/**
* 自定义控制器的参数类型
* @param argumentResolvers 参数列表
*/
@Override
public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
super.addArgumentResolvers(argumentResolvers);
}
/**
* 自定义控制器返回类型
* @param returnValueHandlers 返回类型
*/
@Override
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
super.addReturnValueHandlers(returnValueHandlers);
}
/**
* 添加自定义的消息转换器, Spring 提供了多种默认的消息转换器
* @param messageConverters 消息转换列表
* @return true: 会添加默认的消息转换器, 也可以把自己的转换器添加到转换链中; false: 不会添加消息转换器
*/
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
return super.configureMessageConverters(messageConverters);
}
/**
* 配置消息代理
* @param registry 注册器
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
/*
* MessageBrokerRegistry配置外部的STOMP服务, 需要安装额外的支持, 比如RabbitMQ或ActiveMQ
* 1、配置代理域, 可以配置多个。此段代码配置目的地的前缀为"topicTest", 就可以在配置的域上向客户端发送消息
* 2、可以通过 setRelayHost 配置代理监听的host, 默认为localhost
* 3、可以通过 setRelayPort 配置代理监听的端口, 默认为61613
* 4、可以通过 setClientLogin 和 setClientPassword 配置账号和密码
* 5、setXXX 这种设置的方法是可选的, 根据业务需要自行配置, 也可以使用默认配置
*/
/*registry.enableStompBrokerRelay("topicTest")
.setRelayHost("192.168.10.3")
.setRelayPort(62623)
.setClientLogin("userName")
.setClientPasscode("password");*/
//自定义调度器, 用于控制心跳线程 Start
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);//线程池线程数
taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");//线程名前缀
taskScheduler.initialize();//初始化
//自定义调度器, 用于控制心跳线程 End
/*
* Spring 内置Broker对象
* 1、配置代理域, 可以配置多个, 此段代码配置目的地前缀是"topic"和"/user", 可以在配置的域上向客户端发送消息(即客户端订阅的前缀)
* 2、进行心跳设置, 第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值表示server希望client发的心跳间隔毫秒数
* 3、可以配置心跳线程调度器, setHeartbeatValue 不能单独设置, 不然不起作用, 要配合setTaskSchedule才可以生效
*/
registry.enableSimpleBroker("/topic", "/user")
.setHeartbeatValue(new long[]{10000, 10000})
.setTaskScheduler(taskScheduler);
/*
* "/app" 为配置应用服务器的地址前缀, 表示所有以 "/app" 开头的客户端消息或请求
* 都会路由到带有@MessageMapping 注解的方法中
*/
registry.setApplicationDestinationPrefixes("/app");
/*
* 1、配置一对一消息前缀, 客户端接收一对一消息需要配置的前缀, 如"/user/" + userId + "/message"
* 是客户端订阅一对一消息的地址, stompClient.subscribe JS方法调用的地址
* 2、使用@SendToUser 发送私信的规则不是这个参数设定, 在框架内部使用UserDestinationMessageHandler处理,
* 而不是AnnotationMethodMessageHandler 或 SimpleBrokerMessageHandler
*/
registry.setUserDestinationPrefix("/user");
/*
* 自定义路径分隔符, 这段代码的分隔符为 "."
* 分隔是类级别的@MessageMapping 和方法级别的 @MessageMapping
* 例如, 类路径路径为"topic", 方法注解路径为"hello", 那么客户端JS stompClient.send 的方法调用的路径为"/app/topic.hello"
* 注释掉这段代码, 类路径路径为"topic", 方法注解路径为"hello", JS调用路径为"/app/topic/hello"
*/
// registry.setPathMatcher(new AntPathMatcher("."));
}
/**
* WebSocket子协议处理器(STOMP)
* @return WebSocket处理器
*/
@Bean
@Override
public WebSocketHandler subProtocolWebSocketHandler() {
return new CustomSubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
}
}
2.3.1 用户认证信息的类
在STOMP端点上注册了一个保存用户信息的实体类CustomPrincipalHandshakeHandler,代码如下:
/**
* 配置用户认证信息的握手拦截器
*/
@Slf4j
public class CustomPrincipalHandshakeHandler extends DefaultHandshakeHandler {
/**
* 生成自己的用户信息
* @param request request
* @param wsHandler WebSocket处理器
* @param attributes 其他属性值
* @return 用户信息
*/
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
Map<String, Object> attributes) {
HttpServletRequest servletRequest = this.getRequest(request);
log.info("determineUser: getRequest result --> {}, attributes --> {}", servletRequest, attributes);
Preconditions.checkArgument(Objects.nonNull(servletRequest), "请求连接失败");
log.info("determineUser: user info -- userId --> {}, userName --> {}, clientId --> {}, companyId --> {}",
servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_ID),
servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_NAME),
servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.CLIENT_ID),
servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID));
CustomPrincipal customPrincipal = CustomPrincipal.builder().companyId(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID)).userId(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_ID)).userName(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_NAME)).build();
attributes.put(CommonConstrants.WebSocketRequestKey.COMPANY_ID, servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID));//将机构ID放置到全局的属性中
attributes.put(CommonConstrants.WebSocketRequestKey.USER_ID, customPrincipal);
return customPrincipal;
}
/**
* 获取HttpServletRequest
* @param request ServerHttpRequest
* @return HttpServletRequest
*/
private HttpServletRequest getRequest(ServerHttpRequest request) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest)request;
return servletServerHttpRequest.getServletRequest();
}
return null;
}
}
2.3.2 握手拦截器
握手拦截器为WebSocketHandshakeInterceptor,需要继承HttpSessionHandshakeInterceptor,重写beforeHandshake等方法,实现自己的握手拦截逻辑,代码如下:
/**
* WebSocket的握手拦截器
*/
@Slf4j
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
/**
* 建立连接前置拦截
* @param request ServerHttpRequest
* @param response ServerHttpResponse
* @param wsHandler WebSocketHandler
* @param attributes 其他属性
* @return true/false: 是否能继续握手
* @throws Exception 异常
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map<String, Object> attributes) throws Exception {
log.info("beforeHandshake: before handshake");
HttpServletRequest servletRequest = this.getRequest(request);
log.info("beforeHandshake: getRequest result --> {}", servletRequest);
//不知道从哪来的请求, 不让通信
return !Objects.isNull(servletRequest);
}
/**
* 在握手之后执行该方法, 无论是否握手成功都指明了相应状态码和响应头
* @param request ServerHttpRequest
* @param response ServerHttpResponse
* @param wsHandler WebSocketHandler
* @param exception exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
log.info("afterHandshake: after handshake -- exception --> ", exception);
}
/**
* 获取HttpServletRequest
* @param request ServerHttpRequest
* @return HttpServletRequest
*/
private HttpServletRequest getRequest(ServerHttpRequest request) {
if (request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest)request;
return servletServerHttpRequest.getServletRequest();
}
return null;
}
}
2.3.3 消息通道拦截器
如果需要添加监听,我们监听类需要实现ChannelInterceptor接口,在ChannelInterceptor接口中的preSend能在消息发送前做一些处理。代码如下:
/**
* WebSocket 消息监听, 用于监听WebSocket用户连接情况
*/
@Slf4j
public class WebSocketChannelInterceptor implements ChannelInterceptor {
/**
* 在消息发送之前执行, 如果此方法返回值为空, 则不会发生实际的消息发送
* @param message 消息
* @param channel 通道
* @return 消息
*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor stompHeader = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
/*
* 判断是否首次连接请求, 如果已经连接, 返回message
*/
if (StompCommand.CONNECT.equals(stompHeader.getCommand())) {//首次连接
List<String> tokens = stompHeader.getNativeHeader("Authorization");//获取TOKEN
log.info("preSend: stompHeader.getNativeHeader tokens --> {}", tokens);
String token = Optional.ofNullable(tokens).map(tokenList -> tokenList.get(0)).orElse(null);
log.info("preSend: token --> {}", token);
if (StringUtils.isBlank(token)) {//首次连接没有Token
log.warn("preSend: first connect token is null fail");
return null;
}
} else if (StompCommand.DISCONNECT.equals(stompHeader.getCommand())) {//断开连接
log.info("preSend: {} close connect", stompHeader.getUser());
}
return message;
}
/**
* 在消息被实际检索之前调用, 在WebSocket场景中应用不到
* @param channel 通道
* @return false: 不会检索任何消息
*/
@Override
public boolean preReceive(MessageChannel channel) {
return true;
}
/**
* 在检索到消息之后, 返回调用方法之前调用, 可以进行消息修改, 如果返回null, 就不会执行下一步操作
* @param message 消息
* @param channel 通道
* @return 消息
*/
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return message;
}
/**
* 在消息发送后立刻调用
* @param message 消息
* @param channel 通道
* @param sent 表示调用的返回值
*/
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
/*StompHeaderAccessor stompHeader = StompHeaderAccessor.wrap(message);
if (Objects.isNull(stompHeader.getCommand())) {//心跳消息
return;
}
log.info("postSend -- command --> {}", stompHeader.getCommand());*/
}
}
2.3.4 WebSocket子协议处理器
可以对WebSocket子协议进行处理,如建立连接成功之后执行的内容,连接断开之后执行的内容等。示例代码如下:
@Slf4j
public class CustomSubProtocolWebSocketHandler extends SubProtocolWebSocketHandler {
/**
* Create a new {@code SubProtocolWebSocketHandler} for the given inbound and outbound channels.
* @param clientInboundChannel the inbound {@code MessageChannel}
* @param clientOutboundChannel the outbound {@code MessageChannel}
*/
public CustomSubProtocolWebSocketHandler(MessageChannel clientInboundChannel, SubscribableChannel clientOutboundChannel) {
super(clientInboundChannel, clientOutboundChannel);
}
/**
* 建立连接成功之后执行的方法
* @param session 处理器
* @throws Exception 异常
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("afterConnectionEstablished: connection established -- session --> {}", session);
if (Objects.isNull(session) || Objects.isNull(session.getPrincipal())) {
log.warn("afterConnectionEstablished: session is null or user is null -- session --> {}", session);
return;
}
Map<String, Object> attributes = session.getAttributes();
String sessionId = session.getId();
log.info("afterConnectionEstablished: attributes -->{}, sessionId --> {}", JSONObject.toJSONString(attributes), session);
attributes.put(CommonConstrants.WebSocketRequestKey.CLIENT_ID, sessionId);
super.afterConnectionEstablished(session);
}
/**
* 在关闭连接之后调用
* @param session WebSocket处理器
* @param closeStatus 状态
* @throws Exception 异常
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.info("afterConnectionClosed: close -- closeStatus --> {}", closeStatus);
super.afterConnectionClosed(session, closeStatus);
}
}
2.4 服务端消息处理器
服务端消息处理,主要是映射和发送,发送分为广播式通知和点对点发送。
- MessageMapping:Spring提供了一个@MessageMapping注解,功能类似@RequestMapping,定义一个消息的请求。
- SendTo:定义消息的目的地。如接受“/app/notice”的请求,会发往“/topic/notice”客户端
- SendToUser:将消息推送给指定的用户。
- SimpMessagingTemplate:是Spring-WebSocket内置的一个消息发送工具,可以将消息发送大搜指定的客户端
我这里的示例代码如下:
@Slf4j
@RestController
public class WebSocketController {
private final SimpMessageSendingOperations simpMessageSendingOperations;
public WebSocketController(SimpMessageSendingOperations simpMessageSendingOperations) {
this.simpMessageSendingOperations = simpMessageSendingOperations;
}
/**
* 广播消息
* @param message 消息体
* @return 消息
*/
@MessageMapping(value = "/all/notice")
@SendTo(value = "/topic/notice")
public StatusDTO broadCast(String message) {
log.info("broadCast: message --> {}", message);
return StatusDTO.buildDataSuccess(message);
}
/**
* 获取用户的信息
* @param stompHeader 头
* @return 用户信息
*/
@MessageMapping(value = "/get/user/info")
@SendToUser(value = "/account/info", broadcast = false)
public StatusDTO<CustomPrincipal> getUserInfo(StompHeaderAccessor stompHeader) {
Principal user = stompHeader.getUser();
if (Objects.isNull(user)) {
return StatusDTO.buildFailure("请重新连接");
}
CustomPrincipal customUser = (CustomPrincipal) user;
log.info("getUserInfo: user --> {}", customUser);
Map<String, Object> sessionAttributes = stompHeader.getSessionAttributes();
log.info("getUserInfo -- clientId --> {}, companyId --> {}", sessionAttributes.get(CommonConstrants.WebSocketRequestKey.CLIENT_ID),
sessionAttributes.get(CommonConstrants.WebSocketRequestKey.COMPANY_ID));
return StatusDTO.buildDataSuccess(customUser);
}
/**
* 改变用户名称
* 通过客户端用户名的变更
* @param userId 用户ID
* @param userName 用户名
* @return 成功/失败
*/
@GetMapping(value = "/change/user/name/{userId}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public StatusDTO changeName(@PathVariable(value = "userId") String userId,
@RequestParam(value = "user_name", required = false) String userName) {
log.info("changeName -- userId --> {}, userName --> {}", userId, userName);
Preconditions.checkArgument(StringUtils.isNotBlank(userName), "用户名不能为空");
CustomPrincipal customPrincipal = CustomPrincipal.builder().userId(userId).userName(userName).build();
/*
* 通过客户端用户名的变更, 第一个参数是接收人, 第二个参数是地址, 第三个参数是消息体
*/
this.simpMessageSendingOperations.convertAndSendToUser(userId, "/account/info", StatusDTO.buildDataSuccess(customPrincipal));
return StatusDTO.buildSuccess();
}
}
2.5 VUE前端实现
vue项目需要先引入sockjs-client和stompjs两个库:
创建好项目,切换到项目的根路径,安装两个类库:
npm install --save sockjs-client
npm install --save stompjs
然后添加代码如下:
<template>
<div id="bianjf-app">
<span>消息</span>
<button id="message" @click="sendMsg">发送消息</button>
<br/>
用户信息
<button id="userInfo" @click="getUserInfo">用户信息</button>
</div>
</template>
<script>
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
export default {
data () {
return {
stompClient: null,
userName: 'bianjf'
}
},
mounted() {
this.initWebSocket();
},
methods: {
sendMsg() {
var message = "HelloWorld"
this.stompClient.send("/app/all/notice", {}, JSON.stringify(message));
console.log("发送的消息: HelloWord");
},
getUserInfo() {
this.stompClient.send("/app/get/user/info", {}, {});
console.log("获取用户信息");
},
initWebSocket() {
this.connect();//建立连接
},
connect() {//连接方法
let userId = 1;
//建立连接对象, 连接服务端提供的通信接口, 建立连接后才可以进行消息通信。这里使用的是http而不是原生的WebSocket
const socket = new SockJS("http://192.168.10.79:1994/websocket?userId=" + userId + "&userName=" + this.userName + "&companyId=1");
this.stompClient = Stomp.over(socket);//获取STOMP子协议的客户端对象
const token = "bnianjf-token";
let headers = {
'Authorization': token,
};
this.stompClient.connect(headers, (frame) => {
console.log("Connected: " + frame);
this.stompClient.subscribe("/topic/notice", (result) => {
console.log("订阅广播消息收到的结果: " + result);
let body = result.body
console.log("接收到公告信息: " + body);
});
this.stompClient.subscribe("/user/" + userId + "/account/info", (result) => {
console.log("订阅获取用户信息收到的结果: " + result);
let body = result.body;
var bodyJSON = JSON.parse(body);
console.log("用户信息: " + body);
console.log("状态码: " + bodyJSON.code);
if (bodyJSON.code == 200) {
console.log("用户名: " + bodyJSON.data.userName);
if (bodyJSON.data.userName != this.userName) {
this.disconnect();
this.userName = bodyJSON.data.userName;
this.connect();
}
}
});
})
},
disconnect() {
if (this.stompClient != null) {
this.stompClient.disconnect();
console.log("Disconnected");
}
}
}
}
</script>
整个项目代码如下:
http://gitlab.shangsw.com/bianjf-projects/websocket-stomp/websocket-vue.git
可留言获取代码
评论区