百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

你知道TCP粘包和拆包的原理以及它们的解决方案吗

ztj100 2024-12-14 16:12 24 浏览 0 评论

TCP粘包和拆包基本介绍

1、TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行丰包。这样做虽然提供了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通讯是无消息保护边界的。

2、由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题;

3、TCP粘包、拆包图解

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在一下四种情况

① 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

② 服务端一次接收到了两个数据包,D1和D2粘合在一起,称之为TCP粘包;

③ 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包剩余内容,这称之为TCP拆包;

④ 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包;

TCP粘包和拆包现象实例

1、在编写Netty程序时,如果没有做处理就会发送粘包和拆包的问题

代码演示

服务端代码

/**
 * 服务端
 */
public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerInitializer初始化类代码,简单地只设置了业务处理handler

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyServerHandler());
    }
}

服务端的业务处理MyServerHandler,这里定义了接收消息的次数,和打印出客户端发送的消息,观察服务端是分几次接收消息的,这个具有不确定性,每次的运行结果可能都不一样,然后服务端每次接收到消息也会回送一条消息给到客户端

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 服务端接收消息次数,每个客户端于服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 "+message);
        System.out.println("服务器接收到消息量="+(++this.count));

        //服务器回送数据给客户端,回送一个随机id
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()+"$", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

客户端业务处理类MyClientHandler,这里客户端演示一建立连接就给服务端发送10次hello server,也演示了客户端接收到服务端的消息是怎么接收的,是分几次接收的

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据hello,server 编号
        for(int i=0;i<10;i++){
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息="+message);
        System.out.println("客户端接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,显示看服务端的输出

第一次客户端连接服务端时,可以看到服务端只接收了一次,一次接收了客户端分十次发送的数据

服务器接收到数据 hello,server 0hello,server 1hello,server 2hello,server 3hello,server 4hello,server 5hello,server 6hello,server 7hello,server 8hello,server 9
服务器接收到消息量=1

第二次客户端连接服务端时,可以看到服务端分5次接收了客户端发送10次的消息,可以看到第一次接收到了客户端发送一次的数据,第二次接收了客户端累计发送的两次数据,第四,五次都是接收客户端累计发送三次的数据

服务器接收到数据 hello,server 0
服务器接收到消息量=1
服务器接收到数据 hello,server 1hello,server 2
服务器接收到消息量=2
服务器接收到数据 hello,server 3hello,server 4hello,server 5
服务器接收到消息量=3
服务器接收到数据 hello,server 6hello,server 7hello,server 8
服务器接收到消息量=4
服务器接收到数据 hello,server 9
服务器接收到消息量=5

再来看看客户端,因为我们演示了每一次客户端发送消息给服务端的时候,服务端都会回送一份随机数给客户端,每一次都是用$分开的,这里我们可以看到客户端分一次接收了服务端发送给它多次的数据

客户端接收到消息=e8a93f08-32e6-4af2-a344-56c00c409d8f$c8c363b5-d25e-4007-b7be-1d50d86acb7f$2572c3c9-76ed-491f-a3a5-acec6938ce7d$b4927691-0745-42cd-abbc-06a35cf5b2da$6fdf8259-3bd4-4d6a-ae33-ea8b72124df0$
客户端接收消息数量=1

TCP粘包和拆包解决方案

1、使用自定义协议+编解码器来解决

2、关键就是要解决服务端每次读取数据长度问题,这个问题解决了,就不会出现服务端多读或少读数据的问题,从而避免TCP粘包、拆包;

看一个具体的实例

1、要求客户端发送5个Message对象,客户端每次发送一个Message对象;

2、服务器端每次接收一个Message,分5次进行解码,每读取到一个Message,会回复一个Message对象给客户端。

代码演示

首先,我们先来定义自己的协议包类MessageProtocol,里面只定义了两个字段,一个是内容的长度,一个是消息的内容

/**
 * 自定义协议包
 */
@Data
public class MessageProtocol {
    /**
     * 长度,这个是关键
     */
    private int len;
    private byte[] content;
}

接下来我们定义服务端MyServer,服务端接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端初始化类的内容,设置了服务端的编解码器和它的业务逻辑处理器MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //自定义编码器
        pipeline.addLast(new MyMessageEncoder());
        //业务逻辑处理器
        pipeline.addLast(new MyServerHandler());
    }
}

编码器代码MyMessageEncoder ,这里在消息发送之前进行编码,将消息的长度和内容进行发送

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

解码器代码MyMessageDecoder,这里对消息进行解码,读取消息的长度和内容进行封装成消息协议包传给下一个handler

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码->MessageProtocol数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成MessageProtocol对象,放入out,传递下一个handler业务处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}

服务端的业务逻辑处理器代码,读取客户端发送的消息,并构建协议包回送给客户端

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 服务端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务器接收到信息如下: 长度="+len+";内容="+new String(content,Charset.forName("utf-8"))+";接收到消息包的数量="+(++this.count));
        //回复消息给客户端
        byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8");
        int length = responseContent.length;
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(responseContent);
        ctx.writeAndFlush(messageProtocol);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码,接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer,设置编解码器和客户端的业务逻辑处理器

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的编码器
        pipeline.addLast(new MyMessageEncoder());
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //客户端的业务逻辑处理器
        pipeline.addLast(new MyClientHandler());
    }
}

客户端的业务逻辑处理器MyClientHandler,和服务端建立连接时连续给服务端发送5条构建的协议包消息

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送5条数据"今天天气冷,吃火锅"
        for(int i=0;i<5;i++){
           String message="今天天气冷,吃火锅";
            byte[] content = message.getBytes(Charset.forName("utf-8"));
            int length = message.getBytes(Charset.forName("utf-8")).length;
            //创建协议包对象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息:长度="+len+",内容="+new String(content,Charset.forName("utf-8"))+",接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,先看服务端的输出

MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=1
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=2
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=3
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=4
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=5
MyMessageEncoder encode 方法被调用

可以看到一建立连接时,因为客户端连续给服务端发送5次协议包消息,服务端这边先进行消息的解码,读取完整的消息,然后再回送消息给客户端发送之前先进行了编码,可以看到,服务端按顺序接收到了5个完整的协议包的内容,并没有出现粘包和拆包的现象,再来看看客户端的输出

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=83d7f5b6-1c40-4437-b3fe-5940552c933d,接收消息数量=1
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=0a9fd95a-1ca5-4ae6-9596-8066fa21bd63,接收消息数量=2
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=146d26dc-0045-48ab-abe8-18f196151d76,接收消息数量=3
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=7995d62a-27ef-4860-bbf5-12334fa2fc23,接收消息数量=4
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=13275e5f-3483-4fbd-af91-30876a7cebea,接收消息数量=5

客户端一建立连接,就连续给服务端发送5条协议包消息,发送之前都会经过编码,然后接收服务端回送的消息,接收之前解码器先被调用,我们这里也可以看到客户端每次都时完整的接收到服务端发送的消息,服务端发送了5次,客户端就完整的接收了5次,并没有出现粘包和拆包的问题,至此我们演示粘包和拆包的方案就到这里结束

相关推荐

如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL

阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...

Python数据分析:探索性分析

写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...

CSP-J/S冲奖第21天:插入排序

...

C++基础语法梳理:算法丨十大排序算法(二)

本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...

C 语言的标准库有哪些

C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...

[深度学习] ncnn安装和调用基础教程

1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...

用rust实现经典的冒泡排序和快速排序

1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...

ncnn+PPYOLOv2首次结合!全网最详细代码解读来了

编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...

C++特性使用建议

1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...

Qt4/5升级到Qt6吐血经验总结V202308

00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...

到底什么是C++11新特性,请看下文

C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...

掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!

C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...

经典算法——凸包算法

凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...

一起学习c++11——c++11中的新增的容器

c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...

C++ 编程中的一些最佳实践

1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...

取消回复欢迎 发表评论: