百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

你知道TCP粘包和拆包的原理以及它们的解决方案吗

ztj100 2024-12-14 16:12 49 浏览 0 评论

TCP粘包和拆包基本介绍

1、TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行丰包。这样做虽然提供了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通讯是无消息保护边界的。

2、由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题;

3、TCP粘包、拆包图解

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在一下四种情况

① 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

② 服务端一次接收到了两个数据包,D1和D2粘合在一起,称之为TCP粘包;

③ 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包剩余内容,这称之为TCP拆包;

④ 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包;

TCP粘包和拆包现象实例

1、在编写Netty程序时,如果没有做处理就会发送粘包和拆包的问题

代码演示

服务端代码

/**
 * 服务端
 */
public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerInitializer初始化类代码,简单地只设置了业务处理handler

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyServerHandler());
    }
}

服务端的业务处理MyServerHandler,这里定义了接收消息的次数,和打印出客户端发送的消息,观察服务端是分几次接收消息的,这个具有不确定性,每次的运行结果可能都不一样,然后服务端每次接收到消息也会回送一条消息给到客户端

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 服务端接收消息次数,每个客户端于服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 "+message);
        System.out.println("服务器接收到消息量="+(++this.count));

        //服务器回送数据给客户端,回送一个随机id
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()+"$", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

客户端业务处理类MyClientHandler,这里客户端演示一建立连接就给服务端发送10次hello server,也演示了客户端接收到服务端的消息是怎么接收的,是分几次接收的

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据hello,server 编号
        for(int i=0;i<10;i++){
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息="+message);
        System.out.println("客户端接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,显示看服务端的输出

第一次客户端连接服务端时,可以看到服务端只接收了一次,一次接收了客户端分十次发送的数据

服务器接收到数据 hello,server 0hello,server 1hello,server 2hello,server 3hello,server 4hello,server 5hello,server 6hello,server 7hello,server 8hello,server 9
服务器接收到消息量=1

第二次客户端连接服务端时,可以看到服务端分5次接收了客户端发送10次的消息,可以看到第一次接收到了客户端发送一次的数据,第二次接收了客户端累计发送的两次数据,第四,五次都是接收客户端累计发送三次的数据

服务器接收到数据 hello,server 0
服务器接收到消息量=1
服务器接收到数据 hello,server 1hello,server 2
服务器接收到消息量=2
服务器接收到数据 hello,server 3hello,server 4hello,server 5
服务器接收到消息量=3
服务器接收到数据 hello,server 6hello,server 7hello,server 8
服务器接收到消息量=4
服务器接收到数据 hello,server 9
服务器接收到消息量=5

再来看看客户端,因为我们演示了每一次客户端发送消息给服务端的时候,服务端都会回送一份随机数给客户端,每一次都是用$分开的,这里我们可以看到客户端分一次接收了服务端发送给它多次的数据

客户端接收到消息=e8a93f08-32e6-4af2-a344-56c00c409d8f$c8c363b5-d25e-4007-b7be-1d50d86acb7f$2572c3c9-76ed-491f-a3a5-acec6938ce7d$b4927691-0745-42cd-abbc-06a35cf5b2da$6fdf8259-3bd4-4d6a-ae33-ea8b72124df0$
客户端接收消息数量=1

TCP粘包和拆包解决方案

1、使用自定义协议+编解码器来解决

2、关键就是要解决服务端每次读取数据长度问题,这个问题解决了,就不会出现服务端多读或少读数据的问题,从而避免TCP粘包、拆包;

看一个具体的实例

1、要求客户端发送5个Message对象,客户端每次发送一个Message对象;

2、服务器端每次接收一个Message,分5次进行解码,每读取到一个Message,会回复一个Message对象给客户端。

代码演示

首先,我们先来定义自己的协议包类MessageProtocol,里面只定义了两个字段,一个是内容的长度,一个是消息的内容

/**
 * 自定义协议包
 */
@Data
public class MessageProtocol {
    /**
     * 长度,这个是关键
     */
    private int len;
    private byte[] content;
}

接下来我们定义服务端MyServer,服务端接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端初始化类的内容,设置了服务端的编解码器和它的业务逻辑处理器MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //自定义编码器
        pipeline.addLast(new MyMessageEncoder());
        //业务逻辑处理器
        pipeline.addLast(new MyServerHandler());
    }
}

编码器代码MyMessageEncoder ,这里在消息发送之前进行编码,将消息的长度和内容进行发送

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

解码器代码MyMessageDecoder,这里对消息进行解码,读取消息的长度和内容进行封装成消息协议包传给下一个handler

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码->MessageProtocol数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成MessageProtocol对象,放入out,传递下一个handler业务处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}

服务端的业务逻辑处理器代码,读取客户端发送的消息,并构建协议包回送给客户端

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 服务端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务器接收到信息如下: 长度="+len+";内容="+new String(content,Charset.forName("utf-8"))+";接收到消息包的数量="+(++this.count));
        //回复消息给客户端
        byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8");
        int length = responseContent.length;
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(responseContent);
        ctx.writeAndFlush(messageProtocol);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码,接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer,设置编解码器和客户端的业务逻辑处理器

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的编码器
        pipeline.addLast(new MyMessageEncoder());
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //客户端的业务逻辑处理器
        pipeline.addLast(new MyClientHandler());
    }
}

客户端的业务逻辑处理器MyClientHandler,和服务端建立连接时连续给服务端发送5条构建的协议包消息

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送5条数据"今天天气冷,吃火锅"
        for(int i=0;i<5;i++){
           String message="今天天气冷,吃火锅";
            byte[] content = message.getBytes(Charset.forName("utf-8"));
            int length = message.getBytes(Charset.forName("utf-8")).length;
            //创建协议包对象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息:长度="+len+",内容="+new String(content,Charset.forName("utf-8"))+",接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,先看服务端的输出

MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=1
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=2
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=3
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=4
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=5
MyMessageEncoder encode 方法被调用

可以看到一建立连接时,因为客户端连续给服务端发送5次协议包消息,服务端这边先进行消息的解码,读取完整的消息,然后再回送消息给客户端发送之前先进行了编码,可以看到,服务端按顺序接收到了5个完整的协议包的内容,并没有出现粘包和拆包的现象,再来看看客户端的输出

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=83d7f5b6-1c40-4437-b3fe-5940552c933d,接收消息数量=1
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=0a9fd95a-1ca5-4ae6-9596-8066fa21bd63,接收消息数量=2
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=146d26dc-0045-48ab-abe8-18f196151d76,接收消息数量=3
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=7995d62a-27ef-4860-bbf5-12334fa2fc23,接收消息数量=4
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=13275e5f-3483-4fbd-af91-30876a7cebea,接收消息数量=5

客户端一建立连接,就连续给服务端发送5条协议包消息,发送之前都会经过编码,然后接收服务端回送的消息,接收之前解码器先被调用,我们这里也可以看到客户端每次都时完整的接收到服务端发送的消息,服务端发送了5次,客户端就完整的接收了5次,并没有出现粘包和拆包的问题,至此我们演示粘包和拆包的方案就到这里结束

相关推荐

Java的SPI机制详解

作者:京东物流杨苇苇1.SPI简介SPI(ServiceProvicerInterface)是Java语言提供的一种接口发现机制,用来实现接口和接口实现的解耦。简单来说,就是系统只需要定义接口规...

90%的Java程序员都忽视的内部类使用不当导致内存泄露!

...

一文读懂 Spring Boot 启动原理,开发效率飙升!

在当今的Java开发领域,SpringBoot无疑是最热门的框架之一。它以其“约定大于配置”的理念,让开发者能够快速搭建和启动应用,极大地提高了开发效率。但是,你是否真正了解Spring...

ServiceLoader

ServiceLoader是Java提供的一种服务发现机制(ServiceProviderInterface,SPI)...

深入探索 Spring Boot3 中的自定义扩展操作

在当今互联网软件开发领域,SpringBoot无疑是最受欢迎的框架之一。随着其版本迭代至SpringBoot3,它为开发者们带来了更多强大的功能和特性,其中自定义扩展操作更是为我们在项目开发中...

Spring Boot启动过程全面解析:从入门到精通

一、SpringBoot概述SpringBoot是一个基于Spring框架的快速开发脚手架,它通过"约定优于配置"的原则简化了Spring应用的初始搭建和开发过程。...

Spring Boot 3.x 自定义 Starter 详解

今天星期六,继续卷springboot3.x。在SpringBoot3.x中,自定义Starter是封装和共享通用功能、实现“约定优于配置”理念的强大机制。通过创建自己的Starte...

Spring Boot 的 3 种动态 Bean 注入技巧

在SpringBoot开发中,动态注入Bean是一种强大的技术,它允许我们根据特定条件或运行时环境灵活地创建和管理Bean。相比于传统的静态Bean定义,动态注入提供了更高的灵活性和可...

大佬用4000字带你彻底理解SpringBoot的运行原理!

SpringBoot的运行原理从前面创建的SpringBoot应用示例中可以看到,启动一个SpringBoot工程都是从SpringApplication.run()方法开始的。这个方法具体完成...

Springboot是如何实现自动配置的

SpringBoot的自动配置功能极大地简化了基于Spring的应用程序的配置过程。它能够根据类路径中的依赖和配置文件中的属性,自动配置应用程序。下面是SpringBoot实现自动配置的...

Spring Boot3.x 应用的生命周期深度解析

SpringBoot应用的生命周期可以清晰地划分为三个主要阶段:启动阶段(Startup)...

Springboot 启动流程及各类事件生命周期那点事

前言本文通过Springboot启动方法分析SpringApplication逻辑。从静态run方法执行到各个阶段发布不同事件完成整个应用启动。...

Spring框架基础知识-常用的接口1

BeanDefinition基本概念BeanDefinition是Spring框架中描述bean配置信息的核心接口,它包含了创建bean实例所需的所有元数据。...

一家拥有 158 年历史的公司遭遇索赔,被迫关闭!

...

Java 技术岗面试全景备战!从基础到架构的系统性通关攻略分享

Java技术岗的面试往往是一项多维度的能力检验。本文将会从核心知识点、项目经验到面试策略,为你梳理一份系统性的备战攻略!...

取消回复欢迎 发表评论: