百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

你知道TCP粘包和拆包的原理以及它们的解决方案吗

ztj100 2024-12-14 16:12 43 浏览 0 评论

TCP粘包和拆包基本介绍

1、TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行丰包。这样做虽然提供了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通讯是无消息保护边界的。

2、由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题;

3、TCP粘包、拆包图解

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在一下四种情况

① 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

② 服务端一次接收到了两个数据包,D1和D2粘合在一起,称之为TCP粘包;

③ 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包剩余内容,这称之为TCP拆包;

④ 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包;

TCP粘包和拆包现象实例

1、在编写Netty程序时,如果没有做处理就会发送粘包和拆包的问题

代码演示

服务端代码

/**
 * 服务端
 */
public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerInitializer初始化类代码,简单地只设置了业务处理handler

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyServerHandler());
    }
}

服务端的业务处理MyServerHandler,这里定义了接收消息的次数,和打印出客户端发送的消息,观察服务端是分几次接收消息的,这个具有不确定性,每次的运行结果可能都不一样,然后服务端每次接收到消息也会回送一条消息给到客户端

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 服务端接收消息次数,每个客户端于服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 "+message);
        System.out.println("服务器接收到消息量="+(++this.count));

        //服务器回送数据给客户端,回送一个随机id
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()+"$", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

客户端业务处理类MyClientHandler,这里客户端演示一建立连接就给服务端发送10次hello server,也演示了客户端接收到服务端的消息是怎么接收的,是分几次接收的

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据hello,server 编号
        for(int i=0;i<10;i++){
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息="+message);
        System.out.println("客户端接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,显示看服务端的输出

第一次客户端连接服务端时,可以看到服务端只接收了一次,一次接收了客户端分十次发送的数据

服务器接收到数据 hello,server 0hello,server 1hello,server 2hello,server 3hello,server 4hello,server 5hello,server 6hello,server 7hello,server 8hello,server 9
服务器接收到消息量=1

第二次客户端连接服务端时,可以看到服务端分5次接收了客户端发送10次的消息,可以看到第一次接收到了客户端发送一次的数据,第二次接收了客户端累计发送的两次数据,第四,五次都是接收客户端累计发送三次的数据

服务器接收到数据 hello,server 0
服务器接收到消息量=1
服务器接收到数据 hello,server 1hello,server 2
服务器接收到消息量=2
服务器接收到数据 hello,server 3hello,server 4hello,server 5
服务器接收到消息量=3
服务器接收到数据 hello,server 6hello,server 7hello,server 8
服务器接收到消息量=4
服务器接收到数据 hello,server 9
服务器接收到消息量=5

再来看看客户端,因为我们演示了每一次客户端发送消息给服务端的时候,服务端都会回送一份随机数给客户端,每一次都是用$分开的,这里我们可以看到客户端分一次接收了服务端发送给它多次的数据

客户端接收到消息=e8a93f08-32e6-4af2-a344-56c00c409d8f$c8c363b5-d25e-4007-b7be-1d50d86acb7f$2572c3c9-76ed-491f-a3a5-acec6938ce7d$b4927691-0745-42cd-abbc-06a35cf5b2da$6fdf8259-3bd4-4d6a-ae33-ea8b72124df0$
客户端接收消息数量=1

TCP粘包和拆包解决方案

1、使用自定义协议+编解码器来解决

2、关键就是要解决服务端每次读取数据长度问题,这个问题解决了,就不会出现服务端多读或少读数据的问题,从而避免TCP粘包、拆包;

看一个具体的实例

1、要求客户端发送5个Message对象,客户端每次发送一个Message对象;

2、服务器端每次接收一个Message,分5次进行解码,每读取到一个Message,会回复一个Message对象给客户端。

代码演示

首先,我们先来定义自己的协议包类MessageProtocol,里面只定义了两个字段,一个是内容的长度,一个是消息的内容

/**
 * 自定义协议包
 */
@Data
public class MessageProtocol {
    /**
     * 长度,这个是关键
     */
    private int len;
    private byte[] content;
}

接下来我们定义服务端MyServer,服务端接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端初始化类的内容,设置了服务端的编解码器和它的业务逻辑处理器MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //自定义编码器
        pipeline.addLast(new MyMessageEncoder());
        //业务逻辑处理器
        pipeline.addLast(new MyServerHandler());
    }
}

编码器代码MyMessageEncoder ,这里在消息发送之前进行编码,将消息的长度和内容进行发送

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

解码器代码MyMessageDecoder,这里对消息进行解码,读取消息的长度和内容进行封装成消息协议包传给下一个handler

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码->MessageProtocol数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成MessageProtocol对象,放入out,传递下一个handler业务处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}

服务端的业务逻辑处理器代码,读取客户端发送的消息,并构建协议包回送给客户端

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 服务端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务器接收到信息如下: 长度="+len+";内容="+new String(content,Charset.forName("utf-8"))+";接收到消息包的数量="+(++this.count));
        //回复消息给客户端
        byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8");
        int length = responseContent.length;
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(responseContent);
        ctx.writeAndFlush(messageProtocol);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码,接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer,设置编解码器和客户端的业务逻辑处理器

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的编码器
        pipeline.addLast(new MyMessageEncoder());
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //客户端的业务逻辑处理器
        pipeline.addLast(new MyClientHandler());
    }
}

客户端的业务逻辑处理器MyClientHandler,和服务端建立连接时连续给服务端发送5条构建的协议包消息

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送5条数据"今天天气冷,吃火锅"
        for(int i=0;i<5;i++){
           String message="今天天气冷,吃火锅";
            byte[] content = message.getBytes(Charset.forName("utf-8"));
            int length = message.getBytes(Charset.forName("utf-8")).length;
            //创建协议包对象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息:长度="+len+",内容="+new String(content,Charset.forName("utf-8"))+",接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,先看服务端的输出

MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=1
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=2
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=3
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=4
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=5
MyMessageEncoder encode 方法被调用

可以看到一建立连接时,因为客户端连续给服务端发送5次协议包消息,服务端这边先进行消息的解码,读取完整的消息,然后再回送消息给客户端发送之前先进行了编码,可以看到,服务端按顺序接收到了5个完整的协议包的内容,并没有出现粘包和拆包的现象,再来看看客户端的输出

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=83d7f5b6-1c40-4437-b3fe-5940552c933d,接收消息数量=1
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=0a9fd95a-1ca5-4ae6-9596-8066fa21bd63,接收消息数量=2
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=146d26dc-0045-48ab-abe8-18f196151d76,接收消息数量=3
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=7995d62a-27ef-4860-bbf5-12334fa2fc23,接收消息数量=4
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=13275e5f-3483-4fbd-af91-30876a7cebea,接收消息数量=5

客户端一建立连接,就连续给服务端发送5条协议包消息,发送之前都会经过编码,然后接收服务端回送的消息,接收之前解码器先被调用,我们这里也可以看到客户端每次都时完整的接收到服务端发送的消息,服务端发送了5次,客户端就完整的接收了5次,并没有出现粘包和拆包的问题,至此我们演示粘包和拆包的方案就到这里结束

相关推荐

再说圆的面积-蒙特卡洛(蒙特卡洛方法求圆周率的matlab程序)

在微积分-圆的面积和周长(1)介绍微积分方法求解圆的面积,本文使用蒙特卡洛方法求解圆面积。...

python编程:如何使用python代码绘制出哪些常见的机器学习图像?

专栏推荐...

python创建分类器小结(pytorch分类数据集创建)

简介:分类是指利用数据的特性将其分成若干类型的过程。监督学习分类器就是用带标记的训练数据建立一个模型,然后对未知数据进行分类。...

matplotlib——绘制散点图(matplotlib散点图颜色和图例)

绘制散点图不同条件(维度)之间的内在关联关系观察数据的离散聚合程度...

python实现实时绘制数据(python如何绘制)

方法一importmatplotlib.pyplotaspltimportnumpyasnpimporttimefrommathimport*plt.ion()#...

简单学Python——matplotlib库3——绘制散点图

前面我们学习了用matplotlib绘制折线图,今天我们学习绘制散点图。其实简单的散点图与折线图的语法基本相同,只是作图函数由plot()变成了scatter()。下面就绘制一个散点图:import...

数据分析-相关性分析可视化(相关性分析数据处理)

前面介绍了相关性分析的原理、流程和常用的皮尔逊相关系数和斯皮尔曼相关系数,具体可以参考...

免费Python机器学习课程一:线性回归算法

学习线性回归的概念并从头开始在python中开发完整的线性回归算法最基本的机器学习算法必须是具有单个变量的线性回归算法。如今,可用的高级机器学习算法,库和技术如此之多,以至于线性回归似乎并不重要。但是...

用Python进行机器学习(2)之逻辑回归

前面介绍了线性回归,本次介绍的是逻辑回归。逻辑回归虽然名字里面带有“回归”两个字,但是它是一种分类算法,通常用于解决二分类问题,比如某个邮件是否是广告邮件,比如某个评价是否为正向的评价。逻辑回归也可以...

【Python机器学习系列】拟合和回归傻傻分不清?一文带你彻底搞懂

一、拟合和回归的区别拟合...

推荐2个十分好用的pandas数据探索分析神器

作者:俊欣来源:关于数据分析与可视化...

向量数据库:解锁大模型记忆的关键!选型指南+实战案例全解析

本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在...

用Python进行机器学习(11)-主成分分析PCA

我们在机器学习中有时候需要处理很多个参数,但是这些参数有时候彼此之间是有着各种关系的,这个时候我们就会想:是否可以找到一种方式来降低参数的个数呢?这就是今天我们要介绍的主成分分析,英文是Princip...

神经网络基础深度解析:从感知机到反向传播

本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在...

Python实现基于机器学习的RFM模型

CDA数据分析师出品作者:CDALevelⅠ持证人岗位:数据分析师行业:大数据...

取消回复欢迎 发表评论: