百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Springboot 整合 Websocket 轻松实现IM及时通讯

ztj100 2025-01-19 02:00 17 浏览 0 评论

一、方案实践

集成分为三步:添加依赖、增加配置类和消息核心类、前端集成。

1.1、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.1.13.RELEASE</version>
</dependency>

1.2、增加WebSocket配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * WebSocket配置
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

1.3、增加消息核心类WebSocketServer

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
    // 消息存储
    private static MessageStore messageStore;
    // 消息发送
    private static MessageSender messageSender;

    public static void setMessageStore(MessageStore messageStore) {
        WebSocketServer.messageStore = messageStore;
    }

    public static void setMessageSender(MessageSender messageSender) {
        WebSocketServer.messageSender = messageSender;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        messageStore.saveSession(session);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        messageStore.deleteSession(session);
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @ Param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) throws Exception {
        log.warn("=========== 收到来自窗口" + session.getId() + "的信息:" + message);
        handleTextMessage(session, new TextMessage(message));
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, @PathParam("userId") String userId, Throwable error) {
        log.error("=========== 发生错误");
        error.printStackTrace();
//        msgStore.deleteSession(session);
    }

    public void handleTextMessage(Session session, TextMessage message) throws Exception {
        log.warn("=========== Received message: {}", message.getPayload());
    }
}

1.4、前端页面加入socket

<!DOCTYPE html>
<html xmlns="http://www.w3.org/1999/html">
  <head>
    <title>WebSocket Example</title>
  </head>
  <body>
    登录用户ID:<input type="text" id="sendUserId" /></br>
    接受用户ID:<input type="text" id="receivedUserId" /></br>
    发送消息内容:<input type="text" id="messageInput" /></br>
    接受消息内容:<input type="text" id="messageReceive" /></br>
    <button onclick="sendMessage()">Send</button>

    <script>
      var socket = new WebSocket("ws://localhost:8080/websocket/aaa");
      var roomId = "123456";
      // 随机产出六位数字
      var sendUserId = Math.floor(Math.random() * 1000000);

      document.getElementById("sendUserId").value = sendUserId;
      var messageReceive = document.getElementById("messageReceive");


      socket.onopen = function (event) {
        console.log("WebSocket is open now.");
        let loginInfo = {
          msgType: 2, //登录消息
          sendUserId: sendUserId,
          bizType: 1, //业务类型
          bizOptModule: 1, //业务模块
          roomId: roomId,
          msgBody: {},
        };
        socket.send(JSON.stringify(loginInfo));
      };

      socket.onmessage = function (event) {
        var message = event.data;
        console.log("Received message: " + message);
        messageReceive.value = message;
      };

      socket.onclose = function (event) {
        console.log("WebSocket is closed now.");
      };

      function sendMessage() {
        var message = document.getElementById("messageInput").value;
        var receivedUserId = document.getElementById("receivedUserId").value;
        let operateInfo = {
          msgType: 100, //业务消息
          sendUserId: sendUserId,
          bizType: 1, //业务类型
          bizOptModule: 1, //业务模块
          roomId: roomId,
          receivedUserId: receivedUserId,
          msgBody: {
            operateType: 1, //操作类型:禁言
            operateContent: "1",
          },
        };
        socket.send(JSON.stringify(operateInfo));
      }

      setInterval(() => {
        socket.send("ping");
      }, 30000);
    </script>
  </body>
</html>


二、小型及时通讯包含的模块

以上只是集成了Websocket框架,实现了基本的全双工通信,服务器和客户端都可以同时发送和接收数据。要想实现一些小型完整的即时通讯,还需要具备以下几个核心模块。架构图如下:

2.1、架构图

2.2、消息对象模型

组织消息内容,比如消息类型、发送者用户ID、接受者用户ID、具体的消息体等。比如:

public class SocketMsg<T> {

    /**
     * 消息类型:1心跳  2登录 3业务操作
     */
    private Integer msgType;

    /**
     * 发送者用户ID
     */
    private String sendUserId;
    /**
     * 接受者用户ID
     */
    private String receivedUserId;

    /**
     * 业务类型
     */
    private Integer bizType;

    /**
     * 业务操作模块
     */
    private Integer bizOptModule;

    /**
     * 消息内容
     */
    private T msgBody;
}

2.3、消息存储模块

负责存储消息内容、用户ID和sessionID的关系,防止数据丢失或者服务器重启等。

2.4、消息发送模块

功能开发完毕,一般部署到分布式集群环境,所以通讯时session会分布在多台服务器。比如用户A的session在机器1,用户B的session在机器2,此时A发送给B,就无法单独通过机器1完成。

因为机器1拿不到机器2里的session,所以消息发不过去。此时只能借助别的中间件来实现,比如借助消息中间件kafka实现。

机器1将消息发送给kafka,然后机器1和机器2都监听kafka,然后查看用户对应的session是否在本机,如果在本机则发送出去。

2.5、消息推送模块

模块3提到的消息发送流程中,消息发送给 消息中间件,然后服务器消费到消费,再通过本机的session推送给客户端。

三、遇到的几个问题

3.1、连接自动断开

WebSocket连接之后,发现一个问题:每隔一段时间如果不传送数据的话,与前端的连接就会自动断开。可以采用心跳消息的方式来解决这个问题。比如客服端每隔30秒自动发送ping消息给服务端,服务端返回pong。

3.2、Session无法被序列化

分布式场景会存在这样的问题:当一次请求负载到第一台服务器时,session在第一台服务器线程上,第二次请求,负载到第二台服务器上,此时通过userId查找当前用户的session时,是查找不到的。

本来想着把session存入到redis中,就可以从redis获取用户的session,希望用这种方式来解决分布式场景下消息发送的问题。但是会出现如下错误:

The remote endpoint was in state [STREAM_WRITING] which is an invalid state for called method

翻看了session源码,发现session无法被序列化。所以这个方案只能放弃。解决方案请看下面的问题5或者上面的架构图

3.3、对象无法自动注入

使用了@ServerEndpoint注解的类中使用@Resource@Autowired注入对象都会失败,并且报空指针异常。

原因是WebSocket服务是线程安全的,那么当我们去发起一个ws连接时,就会创建一个端点对象。WebSocket服务是多对象的,不是单例的。而我们的SpringBean默认就是单例的,在非单例类中注入一个单例的Bean是冲突的。

或者说:

Spring管理采用单例模式(singleton),而 WebSocket 是多对象的,即每个客户端对应后台的一个 WebSocket 对象,也可以理解成 new 了一个 WebSocket,这样当然是不能获得自动注入的对象了,因为这两者刚好冲突。

@Autowired 注解注入对象操作是在启动时执行的,而不是在使用时,而 WebSocket 是只有连接使用时才实例化对象,且有多个连接就有多个对象。所以我们可以得出结论,这个 Service 根本就没有注入到 WebSocket 当中。

如何解决呢?

使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到WebSocketServer中。比如说这样:

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
  private static MessageStore messageStore;
  private static MessageSender messageSender;

  public static void setMessageStore(MessageStore messageStore) {
      WebSocketServer.messageStore = messageStore;
  }

  public static void setMessageSender(MessageSender messageSender) {
      WebSocketServer.messageSender = messageSender;
  }
}


@Slf4j
@Service
public class MessageStore {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @PostConstruct
    public void init() {
        WebSocketServer.setMessageStore(this);
    }
}

3.4、分布式场景消息如何发给客户端

问题2中提到了分布式场景下存在的session不在本机的问题,这种场景可以通过发送消息中间件的方式解决。具体这样解决:

每次连接时,都将userId和对应的session存入到本机,发送消息时,直接发送给MQ-Broker,然后每台应用负载都去消费这个消息,拿到消息之后,判断在本机能根据userId是否能找到session,找到session则推送到客户端。

3.5、部署时Nginx配置问题

代码开发完毕之后,本机跑通后,然后部署到服务器之后,还差很重要的一步:配置nginx代理。

3.5.1、给后端应用部署独立域名

要给后端应用部署独立域名,nginx代理直接转发到应用的独立域名,不要走微服务的gateway网关转发过去。

3.5.2、多层nginx转发问题

当只有一层nginx的时候,配置比较简单,如下:

location ~* ^/api/websocket/* {
      proxy_pass http://mangodwsstest.mangod.top;
      
      proxy_read_timeout 300s;
      proxy_send_timeout 300s;
      proxy_set_header Host mangodwsstest.mangod.top;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection "Upgrade";
      proxy_set_header X-Real-IP $remote_addr;
 }

但是,当有两层nginx转发的时候,问题就出现了。

在最外层的nginx需要使用如下配置,不能照抄后面一层的配置。proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_forproxy_set_header X-Forwarded-Proto $scheme这两个配置不能少,用来将协议和真实IP传递给后面一层的nginx。

location ~* ^/api/websocket/* {
      proxy_pass http://mangodwsstest.mangod.top;

      proxy_read_timeout 300s;
      proxy_send_timeout 300s;
      proxy_set_header  Host $http_host;
      proxy_set_header  X-Real-IP  $remote_addr;
      proxy_set_header  X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header  X-Forwarded-Proto $scheme;
      proxy_http_version 1.1;
      proxy_set_header Upgrade $http_upgrade;
      proxy_set_header Connection $connection_upgrade;
}

四、完整代码和示例

4.1、页面效果如下

开启两个web页面,用户1输入用户2的用户ID,输入发送消息内容,点击发送。在用户2的页面的接受消息内容可以看到发送的消息。

4.2、代码结构

4.3、代码地址

https://github.com/yclxiao/spring-websocket.git

五、总结

本文聊了Springboot如何集成Websocket、IM及时通讯需要哪些模块、开发和部署过程中遇到的问题、以及实现小型IM及时通讯的代码。希望对你有帮助!

相关推荐

如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL

阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...

Python数据分析:探索性分析

写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...

CSP-J/S冲奖第21天:插入排序

...

C++基础语法梳理:算法丨十大排序算法(二)

本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...

C 语言的标准库有哪些

C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...

[深度学习] ncnn安装和调用基础教程

1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...

用rust实现经典的冒泡排序和快速排序

1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...

ncnn+PPYOLOv2首次结合!全网最详细代码解读来了

编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...

C++特性使用建议

1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...

Qt4/5升级到Qt6吐血经验总结V202308

00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...

到底什么是C++11新特性,请看下文

C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...

掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!

C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...

经典算法——凸包算法

凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...

一起学习c++11——c++11中的新增的容器

c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...

C++ 编程中的一些最佳实践

1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...

取消回复欢迎 发表评论: