聊聊分布式下的WebSocket解决方案
ztj100 2025-01-19 01:59 45 浏览 0 评论
前言
最近自己搭建了个项目,项目本身很简单,但是里面有使用WebSocket进行消息提醒的功能,大体情况是这样的。
发布消息者在系统中发送消息,实时的把消息推送给对应的一个部门下的所有人。
这里面如果是单机应用的情况时,我们可以通过部门的id和用户的id组成一个唯一的key,与应用服务器建立WebSocket长连接,然后就可以接收到发布消息者发送的消息了。
但是真正把项目应用于生产环境中时,我们是不可能就部署一个单机应用的,而是要部署一个集群。
所以我通过Nginx+两台Tomcat搭建了一个简单的负载均衡集群,作为测试使用
但是问题出现了,我们的客户端浏览器只会与一台服务器建立WebSocket长连接,所以发布消息者在发送消息时,就没法保证所有目标部门的人都能接收到消息(因为这些人连接的可能不是一个服务器)。
本篇文章就是针对于这么一个问题展开讨论,提出一种解决方案,当然解决方案不止一种,那我们开始吧。
WebSocket单体应用介绍
在介绍分布式集群之前,我们先来看一下王子的WebSocket代码实现,先来看java后端代码如下:
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{key}")
public class WebSocket {
private static int onlineCount = 0;
/**
* 存储连接的客户端
*/
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
private Session session;
/**
* 发送的目标科室code
*/
private String key;
@OnOpen
public void onOpen(@PathParam("key") String key, Session session) throws IOException {
this.key = key;
this.session = session;
if (!clients.containsKey(key)) {
addOnlineCount();
}
clients.put(key, this);
Log.info(key+"已连接消息服务!");
}
@OnClose
public void onClose() throws IOException {
clients.remove(key);
subOnlineCount();
}
@OnMessage
public void onMessage(String message) throws IOException {
if(message.equals("ping")){
return ;
}
JSONObject jsonTo = JSON.parseObject(message);
String mes = (String) jsonTo.get("message");
if (!jsonTo.get("to").equals("All")){
sendMessageTo(mes, jsonTo.get("to").toString());
}else{
sendMessageAll(mes);
}
}
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
private void sendMessageTo(String message, String To) throws IOException {
for (WebSocket item : clients.values()) {
if (item.key.contains(To) )
item.session.getAsyncRemote().sendText(message);
}
}
private void sendMessageAll(String message) throws IOException {
for (WebSocket item : clients.values()) {
item.session.getAsyncRemote().sendText(message);
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocket.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocket.onlineCount--;
}
public static synchronized Map<String, WebSocket> getClients() {
return clients;
}
}
示例代码中并没有使用Spring,用的是原生的java web编写的,简单和大家介绍一下里面的方法。
onOpen:在客户端与WebSocket服务连接时触发方法执行
onClose:在客户端与WebSocket连接断开的时候触发执行
onMessage:在接收到客户端发送的消息时触发执行
onError:在发生错误时触发执行
可以看到,在onMessage方法中,我们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。
再来看一下js代码
var host = document.location.host;
// 获得当前登录科室
var deptCodes='${sessionScope.$UserContext.departmentID}';
deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, "");
var key = '${sessionScope.$UserContext.userID}'+deptCodes;
var lockReconnect = false; //避免ws重复连接
var ws = null; // 判断当前浏览器是否支持WebSocket
var wsUrl = 'ws://' + host + '/webSocket/'+ key;
createWebSocket(wsUrl); //连接ws
function createWebSocket(url) {
try{
if('WebSocket' in window){
ws = new WebSocket(url);
}else if('MozWebSocket' in window){
ws = new MozWebSocket(url);
}else{
layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!");
}
initEventHandle();
}catch(e){
reconnect(url);
console.log(e);
}
}
function initEventHandle() {
ws.onclose = function () {
reconnect(wsUrl);
console.log("llws连接关闭!"+new Date().toUTCString());
};
ws.onerror = function () {
reconnect(wsUrl);
console.log("llws连接错误!");
};
ws.onopen = function () {
heartCheck.reset().start(); //心跳检测重置
console.log("llws连接成功!"+new Date().toUTCString());
};
ws.onmessage = function (event) { //如果获取到消息,心跳检测重置
heartCheck.reset().start(); //拿到任何消息都说明当前连接是正常的//接收到消息实际业务处理
...
};
}
// 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
ws.close();
}
function reconnect(url) {
if(lockReconnect) return;
lockReconnect = true;
setTimeout(function () { //没连接上会一直重连,设置延迟避免请求过多
createWebSocket(url);
lockReconnect = false;
}, 2000);
}
//心跳检测
var heartCheck = {
timeout: 300000, //5分钟发一次心跳
timeoutObj: null,
serverTimeoutObj: null,
reset: function(){
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function(){
var self = this;
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
ws.send("ping");
console.log("ping!")
self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了
ws.close(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
}
js部分使用的是原生H5编写的,如果为了更好的兼容浏览器,也可以使用SockJS,有兴趣小伙伴们可以自行百度。
接下来我们就手动的优化代码,实现WebSocket对分布式架构的支持。
解决方案的思考
现在我们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何能够解决这个问题了。
我们先来看一看发生这个问题的根本原因是什么。
简单思考一下就能明白,单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。
换成分布式系统后,假如我们有两台消息服务器,那么客户端通过Nginx负载均衡后,就会有一部分连接到其中一台服务器,另一部分连接到另一台服务器,所以发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就可以执行群发操作,但问题是,另一台服务器并不知道这件事,也就无法发送消息了。
现在我们知道了根本原因是生产消息时,只有一台消息服务器能够感知到,所以我们只要让另一台消息服务器也能感知到就可以了,这样感知到之后,它就可以群发消息给连接到它上边的客户端了。
那么什么方法可以实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知所有消息服务器就可以了。
引入RabbitMQ解决分布式下的WebSocket问题
在消息中间件的选择上,王子选择了RabbitMQ,原因是它的搭建比较简单,功能也很强大,而且我们只是用到它群发消息的功能。
RabbitMQ有一个广播模式(fanout),我们使用的就是这种模式。
首先我们写一个RabbitMQ的连接类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtil {
private static Connection connection;
/**
* 与rabbitmq建立连接
* @return
*/
public static Connection getConnection() {
if (connection != null&&connection.isOpen()) {
return connection;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("/");
factory.setHost("192.168.220.110"); // 用的是虚拟IP地址
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
try {
connection = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
这个类没什么说的,就是获取MQ连接的一个工厂类。
然后按照我们的思路,就是每次服务器启动的时候,都会创建一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,如下:
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
public class InitListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
WebSocket.init();
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
}
}
记得要在Web.xml中配置监听器信息
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
version="4.0">
<listener>
<listener-class>InitListener</listener-class>
</listener>
</web-app>
WebSocket中增加init方法,作为MQ消费者部分
public static void init() {
try {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//交换机声明(参数为:交换机名称;交换机类型)
channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
//获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
//队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)
channel.queueBind(queueName,"fanoutLogs","");
//这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body,"UTF-8");
System.out.println(message);
//这里可以使用WebSocket通过消息内容发送消息给对应的客户端
}
};
//声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)
channel.basicConsume(queueName,true,consumer);
//这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费
} catch (IOException e) {
e.printStackTrace();
}
}
同时在接收到消息时,不是直接通过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样如果消息服务器有多个,就都会从MQ中获得消息,之后通过获取的消息内容再使用WebSocket推送给对应的客户端就可以了。
WebSocket的onMessage方法增加内容如下:
try {
//尝试获取一个连接
Connection connection = RabbitMQUtil.getConnection();
//尝试创建一个channel
Channel channel = connection.createChannel();
//声明交换机(参数为:交换机名称; 交换机类型,广播模式)
channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
//消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)
channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
System.out.println("发布消息");
channel.close();
} catch (IOException |TimeoutException e) {
e.printStackTrace();
}
增加后删除掉原来的Websocket推送部分代码。
这样一整套的解决方案就完成了。
总结
到这里,我们就解决了分布式下WebSocket的推送消息问题。
我们主要是引入了RabbitMQ,通过RabbitMQ的发布订阅模式,让每个消息服务器启动的时候都去订阅消息,而无论哪台消息服务器在发送消息的时候都会发送给MQ,这样每台消息服务器就都会感知到发送消息的时间,从而再通过Websocket发送给客户端。
大体流程就是这样,那么小伙伴们有没有想过,如果RabbitMQ挂掉了几分钟,之后重启了,消费者是否可以重新连接到RabbitMQ?是否还能正常接收消息呢?
生产环境下,这个问题是必须考虑的。
这里已经测试过,消费者是支持自动重连的,所以我们可以放心的使用这套架构来解决此问题。
本文到这里就结束了,欢迎各位小伙伴留言讨论,一起学习,一起进步。
相关推荐
- SpringBoot整合SpringSecurity+JWT
-
作者|Sans_https://juejin.im/post/5da82f066fb9a04e2a73daec一.说明SpringSecurity是一个用于Java企业级应用程序的安全框架,主要包含...
- 「计算机毕设」一个精美的JAVA博客系统源码分享
-
前言大家好,我是程序员it分享师,今天给大家带来一个精美的博客系统源码!可以自己买一个便宜的云服务器,当自己的博客网站,记录一下自己学习的心得。开发技术博客系统源码基于SpringBoot,shiro...
- springboot教务管理系统+微信小程序云开发附带源码
-
今天给大家分享的程序是基于springboot的管理,前端是小程序,系统非常的nice,不管是学习还是毕设都非常的靠谱。本系统主要分为pc端后台管理和微信小程序端,pc端有三个角色:管理员、学生、教师...
- SpringBoot+LayUI后台管理系统开发脚手架
-
源码获取方式:关注,转发之后私信回复【源码】即可免费获取到!项目简介本项目本着避免重复造轮子的原则,建立一套快速开发JavaWEB项目(springboot-mini),能满足大部分后台管理系统基础开...
- Spring Boot的Security安全控制——认识SpringSecurity!
-
SpringBoot的Security安全控制在Web项目开发中,安全控制是非常重要的,不同的人配置不同的权限,这样的系统才安全。最常见的权限框架有Shiro和SpringSecurity。Shi...
- 前同事2024年接私活已入百万,都是用这几个开源的SpringBoot项目
-
前言不得不佩服SpringBoot的生态如此强大,今天给大家推荐几款优秀的后台管理系统,小伙伴们再也不用从头到尾撸一个项目了。SmartAdmin...
- 值得学习的15 个优秀开源的 Spring Boot 学习项目
-
SpringBoot算是目前Java领域最火的技术栈了,除了书呢?当然就是开源项目了,今天整理15个开源领域非常不错的SpringBoot项目供大家学习,参考。高富帅的路上只能帮你到这里了,...
- 开发企业官网就用这个基于SpringBoot的CMS系统,真香
-
前言推荐这个项目是因为使用手册部署手册非常...
- 2021年超详细的java学习路线总结—纯干货分享
-
本文整理了java开发的学习路线和相关的学习资源,非常适合零基础入门java的同学,希望大家在学习的时候,能够节省时间。纯干货,良心推荐!第一阶段:Java基础...
- jeecg-boot学习总结及使用心得(jeecgboot简单吗)
-
jeecg-boot学习总结及使用心得1.jeecg-boot是一个真正前后端分离的模版项目,便于二次开发,使用的都是较流行的新技术,后端技术主要有spring-boot2.x、shiro、Myb...
- 后勤集团原料管理系统springboot+Layui+MybatisPlus+Shiro源代码
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述后勤集团原料管理系统spr...
- 白卷开源SpringBoot+Vue的前后端分离入门项目
-
简介白卷是一个简单的前后端分离项目,主要采用Vue.js+SpringBoot技术栈开发。除了用作入门练习,作者还希望该项目可以作为一些常见Web项目的脚手架,帮助大家简化搭建网站的流程。...
- Spring Security 自动踢掉前一个登录用户,一个配置搞定
-
登录成功后,自动踢掉前一个登录用户,松哥第一次见到这个功能,就是在扣扣里边见到的,当时觉得挺好玩的。自己做开发后,也遇到过一模一样的需求,正好最近的SpringSecurity系列正在连载,就结...
- 收藏起来!这款开源在线考试系统,我爱了
-
大家好,我是为广大程序员兄弟操碎了心的小编,每天推荐一个小工具/源码,装满你的收藏夹,每天分享一个小技巧,让你轻松节省开发效率,实现不加班不熬夜不掉头发,是我的目标!今天小编推荐一款基于Spr...
- Shiro框架:认证和授权原理(shiro权限认证流程)
-
优质文章,及时送达前言Shiro作为解决权限问题的常用框架,常用于解决认证、授权、加密、会话管理等场景。本文将对Shiro的认证和授权原理进行介绍:Shiro可以做什么?、Shiro是由什么组成的?举...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- SpringBoot整合SpringSecurity+JWT
- 「计算机毕设」一个精美的JAVA博客系统源码分享
- springboot教务管理系统+微信小程序云开发附带源码
- SpringBoot+LayUI后台管理系统开发脚手架
- Spring Boot的Security安全控制——认识SpringSecurity!
- 前同事2024年接私活已入百万,都是用这几个开源的SpringBoot项目
- 值得学习的15 个优秀开源的 Spring Boot 学习项目
- 开发企业官网就用这个基于SpringBoot的CMS系统,真香
- 2021年超详细的java学习路线总结—纯干货分享
- jeecg-boot学习总结及使用心得(jeecgboot简单吗)
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)