侧边栏壁纸
博主头像
林雷博主等级

斜月沉沉藏海雾,碣石潇湘无限路

  • 累计撰写 132 篇文章
  • 累计创建 47 个标签
  • 累计收到 3 条评论

目 录CONTENT

文章目录

1、SpringBoot+WebSocket+STOMP+VUE实现双通道通信

林雷
2020-06-26 / 1 评论 / 0 点赞 / 930 阅读 / 20,032 字

一 Spring Boot实现实现STOMP协议下的WebSocket

1.1 WebSocket简介

WebSocket是HTML5开始提供的一种在单个TCP连接上进行全双工通讯的协议。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道,两者之间直接可以数据互相传送。

实现WebSocket的三种方式:

  • 使用Tomcat的WebSocket实现
  • 使用Spring的WebSocket实现,需要Spring 4.x及以上版本,并且使用了socketjs,对不支持WebSocket的浏览器可以模拟WebSocket的使用
  • 使用Netty的WebSocket实现,高性能、高可靠性。

1.2 STOMP简介

直接使用WebSocket就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议,因此就需要我们定义应用之间所发送消息的语义,还需要确保链接的两端都能遵循这些语义。

就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。与HTTP请求和相应类似,STOMP帧由命令、一个或多个头信息及负载所组成。例如,如下就是发送数据的一个STOMP帧:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}
  • SEND:STOMP命令,表明发送一些内容
  • destination:头信息,用来表示消息发送到哪里
  • content-length:头信息,用来表示负载内容的大小
  • 空行
  • 帧内容(负载内容)

1.3 STOMP协议分析

常用的STOMP的命令有:

  • CONNECT:初始化一个数据流或TCP连接发送CONNECT帧到服务端。
  • CONNECTED:如果服务器接收了连接意图,它会返回一个CONNECTED帧
  • SEND:客户端主动发送消息到服务器。
  • SUBSCRIBE:客户端注册给定的目的地,被订阅的目的地收到的任何消息将通过MESSAGE Frame发送给Client。ACK控制着确认模式。
  • UNSUBSCRIBE:用来移除一个已经存在的订阅,一旦一个订阅被从连接中取消,那么客户端就再也不会收到来自这个订阅的消息
  • BEGIN:用于开启一个事务。这种情况下的事务适用于发送消息和确认已经收到的消息。在一个事务期间,任何发送和确认的动作都会被当做事务的一个原子操作
  • COMMIT:用来提交事务
  • ABORT:终止正在执行的事务
  • ACK:用来在client和client-individual模式下确认已经收到一个订阅消息的操作
  • NACK:它告诉服务端客户端没有处理该消息
  • DISCONNECT:客户端可以通过DISCONNECT帧表示正常断开链接

二 Spring Boot实现

使用Spring Boot + WebSocket + SocketJS + STOMP实现实时通讯功能

2.1 环境

  • IDEA
  • JDK 8
  • Spring Boot
  • Vue
  • SocketJS
  • STOMP

整个项目代码如下:
springbootwebsocket.zip

2.2 引入依赖

在项目中引入如下依赖:

<!-- WebSocket 依赖 Start -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- WebSocket 依赖 End -->

2.3 配置

在Spring中启用STOMP通讯不用我们自己实现原生态的帧,Spring的消息功能是基于代理模式构建。如果需要开启STOMP,只需要在WebSocket配置类上使用**@EnableWebSocketMessageBroker**,并实现WebSocketMessageBrokerConfigurer接口。

在Spring中,AbstractWebSocketMessageBrokerConfigurer类,该类已经被标记为废弃。而AbstractWebSocketMessageBrokerConfigurer类的全部功能可以拆分为WebSocketMessageBrokerConfigurer接口和AbstractMessageBrokerConfiguration抽象类。

其中WebSocketMessageBrokerConfigurer接口主要方法及描述如下:

/**
* 添加这个Endpoint, 这样在客户端通过WebSocket连接上服务, 即配置的WebSocket的地址, 并且可以指定是否使用socketjs
*/
default void registerStompEndpoints(StompEndpointRegistry registry) {
}

/**
* 配置发送和接收的消息参数, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureWebSocketTransport(WebSocketTransportRegistration registry) {
}

/**
* 设置输入消息通道的线程数和拦截器, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureClientInboundChannel(ChannelRegistration registration) {
}

/**
* 设置输出通道的线程数和拦截器, 默认线程为1, 默认线程为1, 可以自定义线程数, 最大线程数, 线程存活时间
*/
default void configureClientOutboundChannel(ChannelRegistration registration) {
}

/**
* 自定义控制器方法的参数类型
*/
default void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
}

/**
* 自定义控制器方法的返回值类型
*/
default void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
}

/**
* 添加自定义消息转换器。Spring提供多种默认的消息转换器.
* @return 返回false不会添加转换器;返回true会添加默认的消息转换器
*/
default boolean configureMessageConverters(List<MessageConverter> messageConverters) {
    return true;
}

/**
* 配置消息代理, 哪种类型的消息会进行代理处理
*/
default void configureMessageBroker(MessageBrokerRegistry registry) {

}

在AbstractMessageBrokerConfiguration的子类中有WebSocketMessageBrokerConfigurationSupport类,该类中有:

public abstract class WebSocketMessageBrokerConfigurationSupport extends AbstractMessageBrokerConfiguration {
   @Bean
   public WebSocketHandler subProtocolWebSocketHandler() {
      return new SubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
   }
}

这个Bean也是WebSocketHandler的一种实现,专门处理子协议(内容交换协议)的WebSocketHandler实现。通过这个类,Spring将STOMP over WebSocket完美的封装起来。

完整的配置类WebSocketConfig如下:

/**
* 配置基于STOMP的WebSocket
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport implements WebSocketMessageBrokerConfigurer {
    /**
     * 添加这个Endpoint, 这样就可以在网页中通过WebSocket连接上服务
     * 也就是配置的WebSocket服务地址, 并且可以指定是否使用SocketJS
     * @param registry 注册
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        /*
        * 1、将/{serviceName}/websocket路径注册为STOMP的端点
        *    用户连接这个端点后就可以进行WebSocket通信, 支持socketjs
        * 2、setAllowedOrigins("*")表示可以从任何地方来访问(跨域)
        * 3、withSockJS()表示支持SocketJS访问
        * 4、addInterceptors() 添加自定义握手拦截器
        * 5、setHandshakeHandler() 添加用户认证拦截器, 封装了用户的信息
        */
        registry.addEndpoint("/websocket")
                .setAllowedOrigins("*")
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .setHandshakeHandler(new CustomPrincipalHandshakeHandler())
                .withSockJS();
    }

    /**
     * 配置发送与接收的消息参数, 可以指定消息字节大小, 缓存大小, 发送超时时间
     * @param registry 注册器
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        /*
         * 1、setMessageSizeLimit() 设置消息缓存的字节大小, 单位为字节
         * 2、setSendBufferSizeLimit() 设置WebSocket 会话时, 缓存的字节大小, 单位为字节
         * 3、setSendTimeLimit() 设置消息发送会话超时时间, 单位为毫秒
         */
        registry.setMessageSizeLimit(102400)
                .setSendBufferSizeLimit(10240)
                .setSendTimeLimit(10000);
    }

    /**
     * 设置输入通道的线程数, 默认线程数为1, 可以自定义线程数, 最大线程数, 线程存活时间等
     * @param registration 注册器
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        /*
         * 配置消息线程池
         * 1、corePoolSize() 配置核心线程池, 当线程数小于此配置时, 不管线程中有无空闲的线程, 都会产生新线程处理任务
         * 2、maxPoolSize() 配置线程池最大数, 当线程池等于此配置时, 不会产生新线程
         * 3、keepAliveSeconds() 线程池维护线程所允许的空闲时间, 单位为秒
         */
        registration.taskExecutor()
                .corePoolSize(10)
                .maxPoolSize(20)
                .keepAliveSeconds(60);
        /*
         * 添加STOMP自定义拦截器
         * 消息拦截器, 实现ChannelInterceptor接口
         */
        registration.interceptors(new WebSocketChannelInterceptor());
    }

    /**
     * 设置输出通道的线程数, 默认线程数为1, 可以自定义线程数, 最大线程数, 线程存活时间等
     * @param registration 注册器
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor()
                .corePoolSize(10)
                .maxPoolSize(20)
                .keepAliveSeconds(60);
    }

    /**
     * 自定义控制器的参数类型
     * @param argumentResolvers 参数列表
     */
    @Override
    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
        super.addArgumentResolvers(argumentResolvers);
    }

    /**
     * 自定义控制器返回类型
     * @param returnValueHandlers 返回类型
     */
    @Override
    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
        super.addReturnValueHandlers(returnValueHandlers);
    }

    /**
     * 添加自定义的消息转换器, Spring 提供了多种默认的消息转换器
     * @param messageConverters 消息转换列表
     * @return true: 会添加默认的消息转换器, 也可以把自己的转换器添加到转换链中; false: 不会添加消息转换器
     */
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        return super.configureMessageConverters(messageConverters);
    }

    /**
     * 配置消息代理
     * @param registry 注册器
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /*
         * MessageBrokerRegistry配置外部的STOMP服务, 需要安装额外的支持, 比如RabbitMQ或ActiveMQ
         * 1、配置代理域, 可以配置多个。此段代码配置目的地的前缀为"topicTest", 就可以在配置的域上向客户端发送消息
         * 2、可以通过 setRelayHost 配置代理监听的host, 默认为localhost
         * 3、可以通过 setRelayPort 配置代理监听的端口, 默认为61613
         * 4、可以通过 setClientLogin 和 setClientPassword 配置账号和密码
         * 5、setXXX 这种设置的方法是可选的, 根据业务需要自行配置, 也可以使用默认配置
         */
        /*registry.enableStompBrokerRelay("topicTest")
                .setRelayHost("192.168.10.3")
                .setRelayPort(62623)
                .setClientLogin("userName")
                .setClientPasscode("password");*/

        //自定义调度器, 用于控制心跳线程 Start
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);//线程池线程数
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");//线程名前缀
        taskScheduler.initialize();//初始化
        //自定义调度器, 用于控制心跳线程 End

        /*
         * Spring 内置Broker对象
         * 1、配置代理域, 可以配置多个, 此段代码配置目的地前缀是"topic"和"/user", 可以在配置的域上向客户端发送消息(即客户端订阅的前缀)
         * 2、进行心跳设置, 第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值表示server希望client发的心跳间隔毫秒数
         * 3、可以配置心跳线程调度器, setHeartbeatValue 不能单独设置, 不然不起作用, 要配合setTaskSchedule才可以生效
         */
        registry.enableSimpleBroker("/topic", "/user")
                .setHeartbeatValue(new long[]{10000, 10000})
                .setTaskScheduler(taskScheduler);
        /*
         * "/app" 为配置应用服务器的地址前缀, 表示所有以 "/app" 开头的客户端消息或请求
         * 都会路由到带有@MessageMapping 注解的方法中
         */
        registry.setApplicationDestinationPrefixes("/app");
        /*
         * 1、配置一对一消息前缀, 客户端接收一对一消息需要配置的前缀, 如"/user/" + userId + "/message"
         *    是客户端订阅一对一消息的地址, stompClient.subscribe JS方法调用的地址
         * 2、使用@SendToUser 发送私信的规则不是这个参数设定, 在框架内部使用UserDestinationMessageHandler处理,
         *    而不是AnnotationMethodMessageHandler 或 SimpleBrokerMessageHandler
         */
        registry.setUserDestinationPrefix("/user");

        /*
         * 自定义路径分隔符, 这段代码的分隔符为 "."
         * 分隔是类级别的@MessageMapping 和方法级别的 @MessageMapping
         * 例如, 类路径路径为"topic", 方法注解路径为"hello", 那么客户端JS stompClient.send 的方法调用的路径为"/app/topic.hello"
         * 注释掉这段代码, 类路径路径为"topic", 方法注解路径为"hello", JS调用路径为"/app/topic/hello"
         */
//        registry.setPathMatcher(new AntPathMatcher("."));
    }

    /**
     * WebSocket子协议处理器(STOMP)
     * @return WebSocket处理器
     */
    @Bean
    @Override
    public WebSocketHandler subProtocolWebSocketHandler() {
        return new CustomSubProtocolWebSocketHandler(clientInboundChannel(), clientOutboundChannel());
    }
}

2.3.1 用户认证信息的类

在STOMP端点上注册了一个保存用户信息的实体类CustomPrincipalHandshakeHandler,代码如下:

/**
* 配置用户认证信息的握手拦截器
*/
@Slf4j
public class CustomPrincipalHandshakeHandler extends DefaultHandshakeHandler {
    /**
     * 生成自己的用户信息
     * @param request request
     * @param wsHandler WebSocket处理器
     * @param attributes 其他属性值
     * @return 用户信息
     */
    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler,
                                      Map<String, Object> attributes) {
        HttpServletRequest servletRequest = this.getRequest(request);
        log.info("determineUser: getRequest result --> {}, attributes --> {}", servletRequest, attributes);
        Preconditions.checkArgument(Objects.nonNull(servletRequest), "请求连接失败");
        log.info("determineUser: user info -- userId --> {}, userName --> {}, clientId --> {}, companyId --> {}",
                servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_ID),
                servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_NAME),
                servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.CLIENT_ID),
                servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID));
        CustomPrincipal customPrincipal = CustomPrincipal.builder().companyId(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID)).userId(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_ID)).userName(servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.USER_NAME)).build();
        attributes.put(CommonConstrants.WebSocketRequestKey.COMPANY_ID, servletRequest.getParameter(CommonConstrants.WebSocketRequestKey.COMPANY_ID));//将机构ID放置到全局的属性中
        attributes.put(CommonConstrants.WebSocketRequestKey.USER_ID, customPrincipal);
        return customPrincipal;
    }

    /**
     * 获取HttpServletRequest
     * @param request ServerHttpRequest
     * @return HttpServletRequest
     */
    private HttpServletRequest getRequest(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest)request;
            return servletServerHttpRequest.getServletRequest();
        }
        return null;
    }
}

2.3.2 握手拦截器

握手拦截器为WebSocketHandshakeInterceptor,需要继承HttpSessionHandshakeInterceptor,重写beforeHandshake等方法,实现自己的握手拦截逻辑,代码如下:

/**
* WebSocket的握手拦截器
*/
@Slf4j
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    /**
     * 建立连接前置拦截
     * @param request ServerHttpRequest
     * @param response ServerHttpResponse
     * @param wsHandler WebSocketHandler
     * @param attributes 其他属性
     * @return true/false: 是否能继续握手
     * @throws Exception 异常
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map<String, Object> attributes) throws Exception {
        log.info("beforeHandshake: before handshake");
        HttpServletRequest servletRequest = this.getRequest(request);
        log.info("beforeHandshake: getRequest result --> {}", servletRequest);
        //不知道从哪来的请求, 不让通信
        return !Objects.isNull(servletRequest);
    }

    /**
     * 在握手之后执行该方法, 无论是否握手成功都指明了相应状态码和响应头
     * @param request ServerHttpRequest
     * @param response ServerHttpResponse
     * @param wsHandler WebSocketHandler
     * @param exception exception
     */
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        log.info("afterHandshake: after handshake -- exception --> ", exception);
    }

    /**
     * 获取HttpServletRequest
     * @param request ServerHttpRequest
     * @return HttpServletRequest
     */
    private HttpServletRequest getRequest(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest servletServerHttpRequest = (ServletServerHttpRequest)request;
            return servletServerHttpRequest.getServletRequest();
        }
        return null;
    }
}

2.3.3 消息通道拦截器

如果需要添加监听,我们监听类需要实现ChannelInterceptor接口,在ChannelInterceptor接口中的preSend能在消息发送前做一些处理。代码如下:

/**
* WebSocket 消息监听, 用于监听WebSocket用户连接情况
*/
@Slf4j
public class WebSocketChannelInterceptor implements ChannelInterceptor {
    /**
     * 在消息发送之前执行, 如果此方法返回值为空, 则不会发生实际的消息发送
     * @param message 消息
     * @param channel 通道
     * @return 消息
     */
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor stompHeader = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        /*
         * 判断是否首次连接请求, 如果已经连接, 返回message
         */
        if (StompCommand.CONNECT.equals(stompHeader.getCommand())) {//首次连接
            List<String> tokens = stompHeader.getNativeHeader("Authorization");//获取TOKEN
            log.info("preSend: stompHeader.getNativeHeader tokens --> {}", tokens);
            String token = Optional.ofNullable(tokens).map(tokenList -> tokenList.get(0)).orElse(null);
            log.info("preSend: token --> {}", token);
            if (StringUtils.isBlank(token)) {//首次连接没有Token
                log.warn("preSend: first connect token is null fail");
                return null;
            }
        } else if (StompCommand.DISCONNECT.equals(stompHeader.getCommand())) {//断开连接
            log.info("preSend: {} close connect", stompHeader.getUser());
        }
        return message;
    }

    /**
     * 在消息被实际检索之前调用, 在WebSocket场景中应用不到
     * @param channel 通道
     * @return false: 不会检索任何消息
     */
    @Override
    public boolean preReceive(MessageChannel channel) {
        return true;
    }

    /**
     * 在检索到消息之后, 返回调用方法之前调用, 可以进行消息修改, 如果返回null, 就不会执行下一步操作
     * @param message 消息
     * @param channel 通道
     * @return 消息
     */
    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        return message;
    }

    /**
     * 在消息发送后立刻调用
     * @param message 消息
     * @param channel 通道
     * @param sent 表示调用的返回值
     */
    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
        /*StompHeaderAccessor stompHeader = StompHeaderAccessor.wrap(message);
        if (Objects.isNull(stompHeader.getCommand())) {//心跳消息
            return;
        }
        log.info("postSend -- command --> {}", stompHeader.getCommand());*/
    }
}

2.3.4 WebSocket子协议处理器

可以对WebSocket子协议进行处理,如建立连接成功之后执行的内容,连接断开之后执行的内容等。示例代码如下:

@Slf4j
public class CustomSubProtocolWebSocketHandler extends SubProtocolWebSocketHandler {
    /**
     * Create a new {@code SubProtocolWebSocketHandler} for the given inbound and outbound channels.
     * @param clientInboundChannel the inbound {@code MessageChannel}
     * @param clientOutboundChannel the outbound {@code MessageChannel}
     */
    public CustomSubProtocolWebSocketHandler(MessageChannel clientInboundChannel, SubscribableChannel clientOutboundChannel) {
        super(clientInboundChannel, clientOutboundChannel);
    }

    /**
     * 建立连接成功之后执行的方法
     * @param session 处理器
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        log.info("afterConnectionEstablished: connection established -- session --> {}", session);
        if (Objects.isNull(session) || Objects.isNull(session.getPrincipal())) {
            log.warn("afterConnectionEstablished: session is null or user is null -- session --> {}", session);
            return;
        }
        Map<String, Object> attributes = session.getAttributes();
        String sessionId = session.getId();
        log.info("afterConnectionEstablished: attributes -->{}, sessionId --> {}", JSONObject.toJSONString(attributes), session);
        attributes.put(CommonConstrants.WebSocketRequestKey.CLIENT_ID, sessionId);
        super.afterConnectionEstablished(session);
    }

    /**
     * 在关闭连接之后调用
     * @param session WebSocket处理器
     * @param closeStatus 状态
     * @throws Exception 异常
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        log.info("afterConnectionClosed: close -- closeStatus --> {}", closeStatus);
        super.afterConnectionClosed(session, closeStatus);
    }
}

2.4 服务端消息处理器

服务端消息处理,主要是映射和发送,发送分为广播式通知和点对点发送。

  • MessageMapping:Spring提供了一个@MessageMapping注解,功能类似@RequestMapping,定义一个消息的请求。
  • SendTo:定义消息的目的地。如接受“/app/notice”的请求,会发往“/topic/notice”客户端
  • SendToUser:将消息推送给指定的用户。
  • SimpMessagingTemplate:是Spring-WebSocket内置的一个消息发送工具,可以将消息发送大搜指定的客户端

我这里的示例代码如下:

@Slf4j
@RestController
public class WebSocketController {
    private final SimpMessageSendingOperations simpMessageSendingOperations;

    public WebSocketController(SimpMessageSendingOperations simpMessageSendingOperations) {
        this.simpMessageSendingOperations = simpMessageSendingOperations;
    }

    /**
     * 广播消息
     * @param message 消息体
     * @return 消息
     */
    @MessageMapping(value = "/all/notice")
    @SendTo(value = "/topic/notice")
    public StatusDTO broadCast(String message) {
        log.info("broadCast: message --> {}", message);
        return StatusDTO.buildDataSuccess(message);
    }

    /**
     * 获取用户的信息
     * @param stompHeader 头
     * @return 用户信息
     */
    @MessageMapping(value = "/get/user/info")
    @SendToUser(value = "/account/info", broadcast = false)
    public StatusDTO<CustomPrincipal> getUserInfo(StompHeaderAccessor stompHeader) {
        Principal user = stompHeader.getUser();
        if (Objects.isNull(user)) {
            return StatusDTO.buildFailure("请重新连接");
        }
        CustomPrincipal customUser = (CustomPrincipal) user;
        log.info("getUserInfo: user --> {}", customUser);
        Map<String, Object> sessionAttributes = stompHeader.getSessionAttributes();
        log.info("getUserInfo -- clientId --> {}, companyId --> {}", sessionAttributes.get(CommonConstrants.WebSocketRequestKey.CLIENT_ID),
        sessionAttributes.get(CommonConstrants.WebSocketRequestKey.COMPANY_ID));
        return StatusDTO.buildDataSuccess(customUser);
    }

    /**
     * 改变用户名称
     * 通过客户端用户名的变更
     * @param userId 用户ID
     * @param userName 用户名
     * @return 成功/失败
     */
    @GetMapping(value = "/change/user/name/{userId}", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public StatusDTO changeName(@PathVariable(value = "userId") String userId,
                                @RequestParam(value = "user_name", required = false) String userName) {
        log.info("changeName -- userId --> {}, userName --> {}", userId, userName);
        Preconditions.checkArgument(StringUtils.isNotBlank(userName), "用户名不能为空");
        CustomPrincipal customPrincipal = CustomPrincipal.builder().userId(userId).userName(userName).build();
        /*
         * 通过客户端用户名的变更, 第一个参数是接收人, 第二个参数是地址, 第三个参数是消息体
         */
        this.simpMessageSendingOperations.convertAndSendToUser(userId, "/account/info", StatusDTO.buildDataSuccess(customPrincipal));
        return StatusDTO.buildSuccess();
    }
}

2.5 VUE前端实现

vue项目需要先引入sockjs-client和stompjs两个库:
创建好项目,切换到项目的根路径,安装两个类库:

npm install --save sockjs-client
npm install --save stompjs

然后添加代码如下:

<template>
    <div id="bianjf-app">
        <span>消息</span>
        <button id="message" @click="sendMsg">发送消息</button>
        <br/>
        用户信息
        <button id="userInfo" @click="getUserInfo">用户信息</button>
    </div>
</template>

<script>
    import SockJS from 'sockjs-client';
    import Stomp from 'stompjs';
    export default {
      data () {
        return {
          stompClient: null,
          userName: 'bianjf'
        }
      },

      mounted() {
        this.initWebSocket();
      },

      methods: {
        sendMsg() {
          var message = "HelloWorld"
          this.stompClient.send("/app/all/notice", {}, JSON.stringify(message));
          console.log("发送的消息: HelloWord");
        },
        getUserInfo() {
          this.stompClient.send("/app/get/user/info", {}, {});
          console.log("获取用户信息");
        },
        initWebSocket() {
          this.connect();//建立连接
        },
        connect() {//连接方法
          let userId = 1;
          //建立连接对象, 连接服务端提供的通信接口, 建立连接后才可以进行消息通信。这里使用的是http而不是原生的WebSocket
          const socket = new SockJS("http://192.168.10.79:1994/websocket?userId=" + userId + "&userName=" + this.userName + "&companyId=1");
          this.stompClient = Stomp.over(socket);//获取STOMP子协议的客户端对象
          const token = "bnianjf-token";
          let headers = {
            'Authorization': token,
          };
          this.stompClient.connect(headers, (frame) => {
            console.log("Connected: " + frame);
            this.stompClient.subscribe("/topic/notice", (result) => {
              console.log("订阅广播消息收到的结果: " + result);
              let body = result.body
              console.log("接收到公告信息: " + body);
            });
            this.stompClient.subscribe("/user/" + userId + "/account/info", (result) => {
              console.log("订阅获取用户信息收到的结果: " + result);
              let body = result.body;
              var bodyJSON = JSON.parse(body);
              console.log("用户信息: " + body);
              console.log("状态码: " + bodyJSON.code);
              if (bodyJSON.code == 200) {
                console.log("用户名: " + bodyJSON.data.userName);
                if (bodyJSON.data.userName != this.userName) {
                  this.disconnect();
                  this.userName = bodyJSON.data.userName;
                  this.connect();
                }
              }
            });
          })
        },
        disconnect() {
          if (this.stompClient != null) {
            this.stompClient.disconnect();
            console.log("Disconnected");
          }
        }
      }
    }
</script>

整个项目代码如下:
http://gitlab.shangsw.com/bianjf-projects/websocket-stomp/websocket-vue.git

可留言获取代码

0

评论区