百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

java-socket长连接demo体验(java socket长连接)

ztj100 2025-07-08 00:26 2 浏览 0 评论

作者:DavidDing
来源:https://zhuanlan.zhihu.com/p/56135195


一、前言

最近公司在预研设备app端与服务端的交互方案,主要方案有:

  • 服务端和app端通过阿里iot套件实现消息的收发;
  • 服务端通过极光推送主动给app端推消息,app通过rest接口与服务端进行交互;
  • 服务端与app通过mqtt消息队列来实现彼此的消息交互;
  • 服务端与app通过原生socket长连接交互。

虽然上面的一些成熟方案肯定更利于上生产环境,但它们通讯基础也都是socket长连接,所以本人主要是预研了一下socket长连接的交互,写了个简单demo,采用了BIO的多线程方案,集成了springboot,实现了自定义简单协议,心跳机制,socket客户端身份强制验证,socket客户端断线获知等功能,并暴露了一些接口,可通过接口简单实现客户端与服务端的socket交互。

Github源码:

https://github.com/DavidDingXu/springboot-socket-demo

二、IO通讯模型

1. IO通讯模型简介

IO通讯模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路复用IO以及异步IO。

该部分内容总结自专栏文章:

https://blog.csdn.net/yinwenjie/column/info/sys-communication/3

1.1 阻塞式同步IO

BIO就是:blocking IO。最容易理解、最容易实现的IO工作方式,应用程序向操作系统请求网络IO操作,这时应用程序会一直等待;另一方面,操作系统收到请求后,也会等待,直到网络上有数据传到监听端口;操作系统在收集数据后,会把数据发送给应用程序;最后应用程序受到数据,并解除等待状态。



BIO通讯示意图

1.2 非阻塞式同步IO

这种模式下,应用程序的线程不再一直等待操作系统的IO状态,而是在等待一段时间后,就解除阻塞。如果没有得到想要的结果,则再次进行相同的操作。这样的工作方式,暴增了应用程序的线程可以不会一直阻塞,而是可以进行一些其他工作。



非阻塞式IO示意图

1.3 多路复用IO(阻塞+非阻塞)



多路复用IO示意图

目前流程的多路复用IO实现主要包括四种:select、poll、epoll、kqueue。下表是他们的一些重要特性的比较:



1.4 异步IO

异步IO则是采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。



异步IO示意图

和同步IO一样,异步IO也是由操作系统进行支持的。微软的windows系统提供了一种异步IO技术:IOCP(I/O Completion Port,I/O完成端口);

Linux下由于没有这种异步IO技术,所以使用的是epoll(上文介绍过的一种多路复用IO技术的实现)对异步IO进行模拟。

2. Java对IO模型的支持

  • Java对阻塞式同步IO的支持主要是java.net包中的Socket套接字实现;
  • Java中非阻塞同步IO模式通过设置serverSocket.setSoTimeout(100);即可实现;
  • Java 1.4中引入了NIO框架(java.nio包)可以构建多路复用、同步非阻塞IO程序;
  • Java 7中对NIO进行了进一步改进,即NIO2,引入了异步非阻塞IO方式。

由于是要实现socket长连接的demo,主要关注其一些实现注意点及方案,所以本demo采用了BIO的多线程方案,该方案代码比较简单、直观,引入了多线程技术后,IO的处理吞吐量也大大提高了。下面是BIO多线程方案server端的简单实现:

public static void main(String[] args) throws Exception{
 ServerSocket serverSocket = new ServerSocket(83);
 try {
 while(true) {
 Socket socket = null;
 socket = serverSocket.accept();
 //这边获得socket连接后开启一个线程监听处理数据
 SocketServerThread socketServerThread = new SocketServerThread(socket);
 new Thread(socketServerThread).start();
 }
 } catch(Exception e) {
 log.error("Socket accept failed. Exception:{}", e.getMessage());
 } finally {
 if(serverSocket != null) {
 serverSocket.close();
 }
 }
 }
}
@slf4j
class SocketServerThread implements Runnable {
 private Socket socket;
 public SocketServerThread (Socket socket) {
 this.socket = socket;
 }
 @Override
 public void run() {
 InputStream in = null;
 OutputStream out = null;
 try {
 in = socket.getInputStream();
 out = socket.getOutputStream();
 Integer sourcePort = socket.getPort();
 int maxLen = 2048;
 byte[] contextBytes = new byte[maxLen];
 int realLen;
 StringBuffer message = new StringBuffer();
 BIORead:while(true) {
 try {
 while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
 message.append(new String(contextBytes , 0 , realLen));
 /*
 * 我们假设读取到“over”关键字,
 * 表示客户端的所有信息在经过若干次传送后,完成
 * */
 if(message.indexOf("over") != -1) {
 break BIORead;
 }
 }
 }
 //下面打印信息
 log.info("服务器(收到来自于端口:" + sourcePort + "的信息:" + message);
 //下面开始发送信息
 out.write("回发响应信息!".getBytes());
 //关闭
 out.close();
 in.close();
 this.socket.close();
 } catch(Exception e) {
 log.error("Socket read failed. Exception:{}", e.getMessage());
 }
 }
}

三、注意点及实现方案

1. TCP粘包/拆包

1.1 问题说明

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。 1. 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包; 2. 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包; 3. 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包; 4. 服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

1.2 解决思路

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下: 1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格; 2. 在包尾增加回车换行符进行分割,例如FTP协议; 3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度; 4. 更复杂的应用层协议。

1.3 demo方案

作为socket长连接的demo,使用了上述的解决思路2,即在包尾增加回车换行符进行数据的分割,同时整体数据使用约定的Json体进行作为消息的传输格式。

使用换行符进行数据分割,可如下进行数据的单行读取:

BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message;
while ((message = reader.readLine()) != null) {
//....
}

可如下进行数据的单行写入:

PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
writer.println(message);

Json消息格式如下:

(1) 服务端接收消息实体类

@Data
public class ServerReceiveDto implements Serializable {
 private static final long serialVersionUID = 6600253865619639317L;
 /**
 * 功能码 0 心跳 1 登陆 2 登出 3 发送消息
 */
 private Integer functionCode;
 /**
 * 用户id
 */
 private String userId;
 /**
 * 这边假设是string的消息体
 */
 private String message;
}

(2) 服务端发送消息实体类

@Data
public class ServerSendDto implements Serializable {
 private static final long serialVersionUID = -7453297551797390215L;
 /**
 * 状态码 20000 成功,否则有errorMessage
 */
 private Integer statusCode;
 private String message;
 /**
 * 功能码
 */
 private Integer functionCode;
 /**
 * 错误消息
 */
 private String errorMessage;
}

(3) 客户端发送消息实体类

@Data
public class ClientSendDto implements Serializable {
 private static final long serialVersionUID = 97085384412852967L;
 /**
 * 功能码 0 心跳 1 登陆 2 登出 3 发送消息
 */
 private Integer functionCode;
 /**
 * 用户id
 */
 private String userId;
 /**
 * 这边假设是string的消息体
 */
 private String message;
}

2. 客户端或服务端掉线检测功能

2.1 实现思路

通过自定义心跳包来实现掉线检测功能,具体思路如下:

客户端连接上服务端后,在服务端会维护一个在线客户端列表。客户端每隔一段时间,向服务端发送一个心跳包,服务端受收到包以后,会更新客户端最近一次在线时间。一旦服务端超过规定时间没有接收到客户端发来的包,则视为掉线。

2.2 代码实现

维护一个客户端map,其中key代表用户的唯一id(用户唯一id的身份验证下面会说明),value代表用户对应的一个实体

/**
 * 存储当前由用户信息活跃的的socket线程
 */
private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();

其中Connection对象包含的信息如下:

@Slf4j
@Data
public class Connection {
 /**
 * 当前的socket连接实例
 */
 private Socket socket;
 /**
 * 当前连接线程
 */
 private ConnectionThread connectionThread;
 /**
 * 当前连接是否登陆
 */
 private boolean isLogin;
 /**
 * 存储当前的user信息
 */
 private String userId;
 /**
 * 创建时间
 */
 private Date createTime;
 /**
 * 最后一次更新时间,用于判断心跳
 */
 private Date lastOnTime;
}

主要关注其中的lastOnTime字段,每次服务端接收到标识是心跳数据,会更新当前的lastOnTime字段,代码如下:

if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) {
 //心跳类型
 connection.setLastOnTime(new Date());
 //发送同样的心跳数据给客户端
 ServerSendDto dto = new ServerSendDto();
 dto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
 connection.println(JSONObject.toJSONString(dto));
}

额外会有一个监测进程,以一定频率来监测上述维护的map中的每一个Connection对象,如果当前时间与lastOnTime的时间间隔超过自定义的长度,则自动将其对应的socket连接关闭,代码如下:

Date now = new Date();
Date lastOnTime = connectionThread.getConnection().getLastOnTime();
long heartDuration = now.getTime() - lastOnTime.getTime();
if (heartDuration > SocketConstant.HEART_RATE) {
 //心跳超时,关闭当前线程
 log.error("心跳超时");
 connectionThread.stopRunning();
}

在上面代码中,服务端收到标识是心跳数据的时候,除了更新该socket对应的lastOnTime,还会同样同样心跳类型的数据给客户端,客户端收到标识是心跳数据的时候也会更新自己的lastOnTime字段,同时也有一个心跳监测线程在监测当前的socket连接心跳是否超时

3. 客户端身份获知、强制身份验证

3.1 实现思路

通过代码socket = serverSocket.accept()获得的一个socket连接我们仅仅只能知道其客户端的ip以及端口号,并不能获知这个socket连接对应的到底是哪一个客户端,因此必须得先获得客户端的身份并且验证通过其身份才能让其正常连接。

具体的实现思路是:

自定义一个登陆处理接口,当server端受到标识是用户登陆的时候(此时会携带用户信息或者token,此处简化为用户id),调用用户的登陆验证,验证通过的话则将该socket连接与用户信息绑定,设置其为已登录,并且封装对应的对象放入前面提的客户端map中,由此可获得具体用户对应的哪一个socket连接。

为了实现socket连接的强制验证,在监测线程中,也会判断当前用户多长时间内没有实现登录态,若超时则认为该socket连接为非法连接,主动关闭该socket连接。

3.2 代码实现

自定义登陆处理接口,这边简单以userId来判断是否允许登陆:

public interface LoginHandler {
 /**
 * client登陆的处理函数
 *
 * @param userId 用户id
 *
 * @return 是否验证通过
 */
 boolean canLogin(String userId);
}

收到客户端发来的数据时候的处理:

if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) {
 //登陆,身份验证
 String userId = receiveDto.getUserId();
 if (socketServer.getLoginHandler().canLogin(userId)) {
 //设置用户对象已登录状态
 connection.setLogin(true);
 connection.setUserId(userId);
 if (socketServer.getExistSocketMap().containsKey(userId)) {
 //存在已登录的用户,发送登出指令并主动关闭该socket
 Connection existConnection = socketServer.getExistSocketMap().get(userId);
 ServerSendDto dto = new ServerSendDto();
 dto.setStatusCode(999);
 dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue());
 dto.setErrorMessage("force logout");
 existConnection.println(JSONObject.toJSONString(dto));
 existConnection.getConnectionThread().stopRunning();
 log.error("用户被客户端重入踢出,userId:{}", userId);
 }
 //添加到已登录map中
 socketServer.getExistSocketMap().put(userId, connection);
}

监测线程判断用户是否完成身份验证:

if (!connectionThread.getConnection().isLogin()) {
 //还没有用户登陆成功
 Date createTime = connectionThread.getConnection().getCreateTime();
 long loginDuration = now.getTime() - createTime.getTime();
 if (loginDuration > SocketConstant.LOGIN_DELAY) {
 //身份验证超时
 log.error("身份验证超时");
 connectionThread.stopRunning();
 }
}

4. socket异常处理与垃圾线程回收

4.1 实现思路

socket在读取数据或者发送数据的时候会出现各种异常,比如客户端的socket已断开连接(正常断开或物理连接断开等),但是服务端还在发送数据或者还在接受数据的过程中,此时socket会抛出相关异常,对于该异常的处理需要将自身的socket连接关闭,避免资源的浪费,同时由于是多线程方案,还需将该socket对应的线程正常清理。

4.2 代码实现

下面以server端发送数据为例,该代码中加入了重试机制:

public void println(String message) {
 int count = 0;
 PrintWriter writer;
 do {
 try {
 writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
 writer.println(message);
 break;
 } catch (IOException e) {
 count++;
 if (count >= RETRY_COUNT) {
 //重试多次失败,说明client端socket异常
 this.connectionThread.stopRunning();
 }
 }
 try {
 Thread.sleep(2 * 1000);
 } catch (InterruptedException e1) {
 log.error("Connection.println.IOException interrupt,userId:{}", userId);
 }
 } while (count < 3);
}

上述调用的
this.connectionThread.stopRunning()代码如下:

public void stopRunning() {
 //设置线程对象状态,便于线程清理
 isRunning = false;
 try {
 //异常情况需要将该socket资源释放
 socket.close();
 } catch (IOException e) {
 log.error("ConnectionThread.stopRunning failed.exception:{}", e);
 }
}

上述代码中设置了线程对象的状态,下述代码在监测线程中执行,将没有运行的线程给清理掉

/**
 * 存储只要有socket处理的线程
 */
private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>());
/**
 * 中间list,用于遍历的时候删除
 */
private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>());
//...
//删除list中没有用的thread引用
existConnectionThreadList.forEach(connectionThread -> {
 if (!connectionThread.isRunning()) {
 noConnectionThreadList.add(connectionThread);
 }
});
noConnectionThreadList.forEach(connectionThread -> {
 existConnectionThreadList.remove(connectionThread);
 if (connectionThread.getConnection().isLogin()) {
 //说明用户已经身份验证成功了,需要删除map
 this.existSocketMap.remove(connectionThread.getConnection().getUserId());
 }
});
noConnectionThreadList.clear();

四、项目结构

由于使用了springboot框架来实现该demo,所以项目结构如下:

整体项目结构图

socket工具包目录如下:

socket工具包目录

pom文件主要添加了springboot的相关依赖,以及json工具和lombok工具等,依赖如下:

<parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>2.0.3.RELEASE</version>
 <relativePath/>
</parent>
<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.36</version>
 </dependency>
</dependencies>

自己写的socket工具包的使用方式如下:

@Configuration
@Slf4j
public class SocketServerConfig {
@Bean
public SocketServer socketServer() {
 SocketServer socketServer = new SocketServer(60000);
 socketServer.setLoginHandler(userId -> {
 log.info("处理socket用户身份验证,userId:{}", userId);
 //用户名中包含了dingxu则允许登陆
 return userId.contains("dingxu");
 });
 socketServer.setMessageHandler((connection, receiveDto) -> log
 .info("处理socket消息,userId:{},receiveDto:{}", connection.getUserId(),
 JSONObject.toJSONString(receiveDto)));
 socketServer.start();
 return socketServer;
}
}

该demo中主要提供了以下几个接口进行测试:

  • 服务端:获得当前用户列表,发送一个消息客户端:开始一个socket客户端,发送一个消息,关闭一个socket客户端,查看已开启的客户端

具体的postman文件也放已在项目中,具体可点此链接获得

demo中还提供了一个简单压测函数,如下:

@Slf4j
public class SocketClientTest {
 public static void main(String[] args) {
 ExecutorService clientService = Executors.newCachedThreadPool();
 String userId = "dingxu";
 for (int i = 0; i < 1000; i++) {
 int index = i;
 clientService.execute(() -> {
 try {
 SocketClient client;
 client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000);
 //登陆
 ClientSendDto dto = new ClientSendDto();
 dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue());
 dto.setUserId(userId + index);
 client.println(JSONObject.toJSONString(dto));
 ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor(
 r -> new Thread(r, "socket_client+heart_" + r.hashCode()));
 clientHeartExecutor.scheduleWithFixedDelay(() -> {
 try {
 ClientSendDto heartDto = new ClientSendDto();
 heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
 client.println(JSONObject.toJSONString(heartDto));
 } catch (Exception e) {
 log.error("客户端异常,userId:{},exception:{}", userId, e.getMessage());
 client.close();
 }
 }, 0, 5, TimeUnit.SECONDS);
 while (true){
 }
 } catch (Exception e) {
 log.error(e.getMessage());
 }
 });
 }
 }
}

源码地址如下,仅供学习参考

github.com/DavidDingXu/springboot-socket-demo

五、参考

https://blog.csdn.net/yinwenjie/column/info/sys-communication/3
https://blog.csdn.net/m0_37739193/article/details/78738253
https://www.cnblogs.com/snaildev/p/7724867.html

相关推荐

Java网络编程(JAVA网络编程技术)

网络编程三要素1.IP地址:表示设备在网络中的地址,是网络中设备的唯一标识2.端口号:应用程序在设备中唯一的标识3.协议:连接和数据在网络中传输的规则。InetAddress类Java中也有一个...

字节Java全能手册火了!多线程/网络/性能调优/框架啥都有

前言在这个技术不断更新的年代,跟不上时代变化的速度就会被刷掉,特别是咱们程序员这一群体,技术不断更新的同时也要同时进步,不然长江后浪推前浪,前浪......一个程序员从一个什么都不懂的小白在学到有一定...

一分钟了解java网络编程(java基础网络编程)

一、OSI七层网络模型应用层:Http协议、电子邮件传输、文件服务器等;表示层:数据转换,解决不同系统的兼容问题(跨语言);会话层:建立与应用程序的会话连接;传输层:提供了端口号和传输协议(TPC/U...

Java编程-高并发情况下接口性能优化实践-提升吞吐量TPS

记得前段时间工作中接到一个任务是优化一个下单接口的性能提高接口的吞吐量TPS,前期通过arthas工具跟踪接口的具体方法调用链路及耗时,发现了影响此接口的性能瓶颈主要是加锁的方式,后来变更了锁的方式...

socket 断线重连和心跳机制如何实现?

一、socket概念1.套接字(socket)是网络通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议,...

迅速了解-Java网络编程(java基础网络编程)

Java网络编程在JavaSE阶段,我们学习了I/O流,既然I/O流如此强大,那么能否跨越不同的主机进行I/O操作呢?这就要提到Java的网络编程了。...

Java网络编程详解(java 网络编程)

网络编程基础知识最!最!最!重要网络编程基础概念网络编程不等于网站编程,网络编程即使用套接字(socket)来达到各进程间的通信,现在一般称为TCP/IP编程;网络编程分为服务端和客户端。服务端就相当...

「开源推荐」高性能网络通信框架 HP-Socket v5.7.2

简介HP-Socket是一套通用的高性能TCP/UDP/HTTP通信框架,包含服务端组件、客户端组件和Agent组件,广泛适用于各种不同应用场景的TCP/UDP/HTTP通信系统,提供C/...

Java网络编程从入门到精通:打造属于你的网络世界

Java网络编程从入门到精通:打造属于你的网络世界在当今这个信息爆炸的时代,网络编程已经成为程序员必不可少的一项技能。而Java作为一种功能强大且广泛使用的编程语言,在网络编程领域也有着举足轻重的地位...

5分钟读懂C#中TcpClient、TcpListener和Socket三个类的角色

一、核心功能与定位1.Socket类:底层通信的基石-位于System.Net.Sockets命名空间,提供对网络协议栈的直接操作,支持TCP、UDP等多种协议。-手动管理连接细节:需...

(三)谈谈 IO 模型(Socket 编程篇)

快过年啦,估计很多朋友已在摸鱼的路上。而我为了兄弟们年后的追逐,却在苦苦寻觅、规划,导致文章更新晚了些,各位猿粉谅解。上期分享,我们结合新春送祝福的场景,通过一坨坨的代码让BIO、NIO编程过程呈...

大数据编程入门:Java网络编程(大数据 编程)

如果想要编写出一个可以运行在多个设备上的程序,应该怎么做呢?答案是网络编程,今天小编将为大家带来大数据编程入门:Java网络编程。一、网络编程概念网络编程是指编写在通过网络连接的多个设备(计算机)上运...

基于JAVA的社交聊天室(java聊天设计与实现)

基于Java的社交聊天室一、前言随着互联网技术的迅速发展,实时通信和在线社交已成为人们日常生活的重要组成部分。基于Java的社交聊天室系统,凭借其跨平台、高性能和安全性等特点,为用户提供了一个集中、开...

java-socket长连接demo体验(java socket长连接)

作者:DavidDing来源:https://zhuanlan.zhihu.com/p/56135195一、前言最近公司在预研设备app端与服务端的交互方案,主要方案有:服务端和app端通过阿里i...

JAVA数据库编程(java数据库编程指南)

预计更新###第一节:什么是JAVA-JAVA的背景和历史-JAVA的特点和应用领域-如何安装和配置JAVA开发环境###第二节:JAVA基础语法-JAVA的基本数据类型和变量-运算符和...

取消回复欢迎 发表评论: