百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

你知道TCP粘包和拆包的原理以及它们的解决方案吗

ztj100 2024-12-14 16:12 69 浏览 0 评论

TCP粘包和拆包基本介绍

1、TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行丰包。这样做虽然提供了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通讯是无消息保护边界的。

2、由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题;

3、TCP粘包、拆包图解

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在一下四种情况

① 服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

② 服务端一次接收到了两个数据包,D1和D2粘合在一起,称之为TCP粘包;

③ 服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包剩余内容,这称之为TCP拆包;

④ 服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包;

TCP粘包和拆包现象实例

1、在编写Netty程序时,如果没有做处理就会发送粘包和拆包的问题

代码演示

服务端代码

/**
 * 服务端
 */
public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyServerInitializer初始化类代码,简单地只设置了业务处理handler

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyServerHandler());
    }
}

服务端的业务处理MyServerHandler,这里定义了接收消息的次数,和打印出客户端发送的消息,观察服务端是分几次接收消息的,这个具有不确定性,每次的运行结果可能都不一样,然后服务端每次接收到消息也会回送一条消息给到客户端

public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 服务端接收消息次数,每个客户端于服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("服务器接收到数据 "+message);
        System.out.println("服务器接收到消息量="+(++this.count));

        //服务器回送数据给客户端,回送一个随机id
        ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString()+"$", Charset.forName("utf-8"));
        ctx.writeAndFlush(responseByteBuf);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new MyClientHandler());
    }
}

客户端业务处理类MyClientHandler,这里客户端演示一建立连接就给服务端发送10次hello server,也演示了客户端接收到服务端的消息是怎么接收的,是分几次接收的

public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送10条数据hello,server 编号
        for(int i=0;i<10;i++){
            ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buffer);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
        String message = new String(buffer, Charset.forName("utf-8"));
        System.out.println("客户端接收到消息="+message);
        System.out.println("客户端接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,显示看服务端的输出

第一次客户端连接服务端时,可以看到服务端只接收了一次,一次接收了客户端分十次发送的数据

服务器接收到数据 hello,server 0hello,server 1hello,server 2hello,server 3hello,server 4hello,server 5hello,server 6hello,server 7hello,server 8hello,server 9
服务器接收到消息量=1

第二次客户端连接服务端时,可以看到服务端分5次接收了客户端发送10次的消息,可以看到第一次接收到了客户端发送一次的数据,第二次接收了客户端累计发送的两次数据,第四,五次都是接收客户端累计发送三次的数据

服务器接收到数据 hello,server 0
服务器接收到消息量=1
服务器接收到数据 hello,server 1hello,server 2
服务器接收到消息量=2
服务器接收到数据 hello,server 3hello,server 4hello,server 5
服务器接收到消息量=3
服务器接收到数据 hello,server 6hello,server 7hello,server 8
服务器接收到消息量=4
服务器接收到数据 hello,server 9
服务器接收到消息量=5

再来看看客户端,因为我们演示了每一次客户端发送消息给服务端的时候,服务端都会回送一份随机数给客户端,每一次都是用$分开的,这里我们可以看到客户端分一次接收了服务端发送给它多次的数据

客户端接收到消息=e8a93f08-32e6-4af2-a344-56c00c409d8f$c8c363b5-d25e-4007-b7be-1d50d86acb7f$2572c3c9-76ed-491f-a3a5-acec6938ce7d$b4927691-0745-42cd-abbc-06a35cf5b2da$6fdf8259-3bd4-4d6a-ae33-ea8b72124df0$
客户端接收消息数量=1

TCP粘包和拆包解决方案

1、使用自定义协议+编解码器来解决

2、关键就是要解决服务端每次读取数据长度问题,这个问题解决了,就不会出现服务端多读或少读数据的问题,从而避免TCP粘包、拆包;

看一个具体的实例

1、要求客户端发送5个Message对象,客户端每次发送一个Message对象;

2、服务器端每次接收一个Message,分5次进行解码,每读取到一个Message,会回复一个Message对象给客户端。

代码演示

首先,我们先来定义自己的协议包类MessageProtocol,里面只定义了两个字段,一个是内容的长度,一个是消息的内容

/**
 * 自定义协议包
 */
@Data
public class MessageProtocol {
    /**
     * 长度,这个是关键
     */
    private int len;
    private byte[] content;
}

接下来我们定义服务端MyServer,服务端接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyServer {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    //自定义一个初始化类
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端初始化类的内容,设置了服务端的编解码器和它的业务逻辑处理器MyServerInitializer

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //自定义编码器
        pipeline.addLast(new MyMessageEncoder());
        //业务逻辑处理器
        pipeline.addLast(new MyServerHandler());
    }
}

编码器代码MyMessageEncoder ,这里在消息发送之前进行编码,将消息的长度和内容进行发送

public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder encode 方法被调用");
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}

解码器代码MyMessageDecoder,这里对消息进行解码,读取消息的长度和内容进行封装成消息协议包传给下一个handler

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder decode 被调用");
        //需要将得到二进制字节码->MessageProtocol数据包(对象)
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        //封装成MessageProtocol对象,放入out,传递下一个handler业务处理
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(content);
        out.add(messageProtocol);
    }
}

服务端的业务逻辑处理器代码,读取客户端发送的消息,并构建协议包回送给客户端

public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 服务端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        //接收到数据,并处理
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("服务器接收到信息如下: 长度="+len+";内容="+new String(content,Charset.forName("utf-8"))+";接收到消息包的数量="+(++this.count));
        //回复消息给客户端
        byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8");
        int length = responseContent.length;
        //构建一个协议包
        MessageProtocol messageProtocol = new MessageProtocol();
        messageProtocol.setLen(length);
        messageProtocol.setContent(responseContent);
        ctx.writeAndFlush(messageProtocol);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码,接收消息的时候要进行解码,发送消息之前要进行编码,消息的载体是我们上面定义的协议包类

public class MyClient {
    @SuppressWarnings("all")
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    //自定义一个初始化类
                    .handler(new MyClientInitializer());
            ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客户端初始化类MyClientInitializer,设置编解码器和客户端的业务逻辑处理器

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        //加入自定义的编码器
        pipeline.addLast(new MyMessageEncoder());
        //加入自定义的解码器
        pipeline.addLast(new MyMessageDecoder());
        //客户端的业务逻辑处理器
        pipeline.addLast(new MyClientHandler());
    }
}

客户端的业务逻辑处理器MyClientHandler,和服务端建立连接时连续给服务端发送5条构建的协议包消息

public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
    /**
     * 客户端接收消息次数,每个客户端与服务端的连接都时独立的通道,所以不同的客户端是不会有影响的
     */
    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //使用客户端发送5条数据"今天天气冷,吃火锅"
        for(int i=0;i<5;i++){
           String message="今天天气冷,吃火锅";
            byte[] content = message.getBytes(Charset.forName("utf-8"));
            int length = message.getBytes(Charset.forName("utf-8")).length;
            //创建协议包对象
            MessageProtocol messageProtocol = new MessageProtocol();
            messageProtocol.setLen(length);
            messageProtocol.setContent(content);
            ctx.writeAndFlush(messageProtocol);

        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
        int len = msg.getLen();
        byte[] content = msg.getContent();
        System.out.println("客户端接收到消息:长度="+len+",内容="+new String(content,Charset.forName("utf-8"))+",接收消息数量="+(++this.count));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

演示代码,先启动服务端,然后再多次启动客户端观察,先看服务端的输出

MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=1
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=2
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=3
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=4
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
服务器接收到信息如下: 长度=27;内容=今天天气冷,吃火锅;接收到消息包的数量=5
MyMessageEncoder encode 方法被调用

可以看到一建立连接时,因为客户端连续给服务端发送5次协议包消息,服务端这边先进行消息的解码,读取完整的消息,然后再回送消息给客户端发送之前先进行了编码,可以看到,服务端按顺序接收到了5个完整的协议包的内容,并没有出现粘包和拆包的现象,再来看看客户端的输出

MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageEncoder encode 方法被调用
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=83d7f5b6-1c40-4437-b3fe-5940552c933d,接收消息数量=1
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=0a9fd95a-1ca5-4ae6-9596-8066fa21bd63,接收消息数量=2
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=146d26dc-0045-48ab-abe8-18f196151d76,接收消息数量=3
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=7995d62a-27ef-4860-bbf5-12334fa2fc23,接收消息数量=4
MyMessageDecoder decode 被调用
客户端接收到消息:长度=36,内容=13275e5f-3483-4fbd-af91-30876a7cebea,接收消息数量=5

客户端一建立连接,就连续给服务端发送5条协议包消息,发送之前都会经过编码,然后接收服务端回送的消息,接收之前解码器先被调用,我们这里也可以看到客户端每次都时完整的接收到服务端发送的消息,服务端发送了5次,客户端就完整的接收了5次,并没有出现粘包和拆包的问题,至此我们演示粘包和拆包的方案就到这里结束

相关推荐

sharding-jdbc实现`分库分表`与`读写分离`

一、前言本文将基于以下环境整合...

三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么

在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...

MySQL8行级锁_mysql如何加行级锁

MySQL8行级锁版本:8.0.34基本概念...

mysql使用小技巧_mysql使用入门

1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...

MySQL/MariaDB中如何支持全部的Unicode?

永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...

聊聊 MySQL Server 可执行注释,你懂了吗?

前言MySQLServer当前支持如下3种注释风格:...

MySQL系列-源码编译安装(v5.7.34)

一、系统环境要求...

MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了

对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...

MySQL字符问题_mysql中字符串的位置

中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...

深圳尚学堂:mysql基本sql语句大全(三)

数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...

MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?

大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...

一文讲清怎么利用Python Django实现Excel数据表的导入导出功能

摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...

用DataX实现两个MySQL实例间的数据同步

DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...

MySQL数据库知识_mysql数据库基础知识

MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...

如何为MySQL中的JSON字段设置索引

背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...

取消回复欢迎 发表评论: