一篇文章彻底搞懂Netty编解码器和Handler的调用机制
ztj100 2024-12-14 16:12 41 浏览 0 评论
基本说明
1、netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等;
2、ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如:实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站的数据。
3、ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务器端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。
编码解码器
1、当Netty发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另外一种格式(比如Java对象);如果是出站消息,它会被编码成字节。
2、Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了,以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler.
解码器-ByteToMessageDecoder
1、由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理,它的类继承关系如下:
2、一个关于ByteToMessageDecoder实例分析
public class ToIntegerDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out) throws Exception{
if(in.readableBytes()>=4){
out.add(in.readInt());
}
}
}
说明:
① 这个例子,每次入站从ByteBuf中读取4个字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据;
Netty的handler链的调用机制
实例要求
1、使用自定义的编码器和解码器来说明Netty的handler调用机制;
2、客户端发送long->服务端,服务端发送long->客户端;
代码演示
服务端代码,绑定7000端口,并定义一个初始化类MyServerInitializer来设置服务端的解码器、编码器、和它的业务逻辑处理器
/**
* 服务端
*/
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
//自定义一个初始化类
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer自定义类的代码
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入站的handler进行解码MybateToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//这是出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
自定义的解码器MyByteToLongDecoder,对于服务端来说,当消息入站时,会调用该解码器对消息进行解码,前提条件是入站的消息类型和解码器的处理消息类型一致解码器才会被调用,注意我们这里演示的是long类型的消息,它的字节长度为8个字节,所以在解码的时候,我们要进行判断一下,有8个字节才进去读取
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*decode 会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者
* 是ByteBuf没有更多的可读字节为止,如果list out 不为空,就会将list的内容传递给下一个
* channelinboundHandler处理器,该处理器的方法也会被调用多次
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out List集合,将解码后的数据传给下一个handler
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder 被调用");
//因为Long 为8个字节,这里需要判断有8个字节,才能读取一个long
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
自定义编码器MyLongToByteEncoder,对于服务端来说,当数据出站时,它需要将数据进行编码再发送,它的原理其实和解码器是一样的道理,只有发送的消息类型和编码器处理的数据类型一致,该编码器才会在消息发出之前被调用,不然,它就不会经过该编码器,消息就会直接发送出去
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
/**
* 编码方法,这个编码方法会根据接收到的数据类型,如果是Long类型就被被调用,
* 如果不是Long类型就不会调用该编码器,直接就会将数据发送出去不会经过该编码器,判断的逻辑在父类MessageToByteEncoder的write方法里面的this.acceptOutboundMessage(msg)
* 因此我们在编写Encoder要注意传入的数据类型和处理的数据类型要一致
* @param ctx 上下文
* @param msg 发送的消息
* @param out
*/
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用,msg="+msg);
//因为msg已经是long类型了,所以这里不需要进行编码,这里只是讲解一下编码的逻辑
out.writeLong(msg);
}
}
服务端的业务逻辑处理器MyServerHandler,这里我们将客户端发送给服务端的消息直接打印出来,并且给服务端发送一个long类型的消息
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端"+ctx.channel().remoteAddress()+" 读取到long "+msg);
//给客户端发送一个long
ctx.writeAndFlush(98979L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端代码,定义一个初始化类MyClientInitializer来设置它的解码器、编码器、和它的业务逻辑处理器
public class MyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//自定义一个初始化类
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端的初始化类MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行编码
pipeline.addLast(new MyLongToByteEncoder());
//这是一个入站的解码器(入站handler)
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}
然后它也是需要有编码器、解码器,和服务端的原理是一样的,它的业务处理器用来处理业务,也就是它可以只关注业务MyClientHandler,这里客户端的业务逻辑只打印出服务端发送的消息,及一连接上服务端就给服务端发送一条消息
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip="+ctx.channel().remoteAddress()+",收到消息="+msg);
}
/**
* 一连上服务端就给服务端发送消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//发送一个Long类型的数据
ctx.writeAndFlush(1234567L);
}
}
演示代码,先启动服务端,然后再启动客户端,先观察服务端的输出,根据输出可以看到当数据入站时,解码器先被调用,然后再调用业务逻辑处理器,业务逻辑处理器发送消息给客户端的时候,会调用编码器
MyByteToLongDecoder 被调用
从客户端/127.0.0.1:53398 读取到long 1234567
MyLongToByteEncoder encode 被调用,msg=98979
观察客户端的输出,可以看到发送消息给服务端时,编码器被调用,当服务端回送消息回来的时候,解码器被调用然后再到业务逻辑处理器
MyClientHandler 发送数据
MyLongToByteEncoder encode 被调用,msg=1234567
MyByteToLongDecoder 被调用
服务器的ip=localhost/127.0.0.1:7000,收到消息=98979
3、结论
① 不论解码器handler还是编码器handler即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行;
② 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致;
解码器-ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
1、ReplayingDecoder 扩展了ByteToMessageDecoder类,使用这个类我们不必调用readableBytes()方法,参数S指定了用户状态管理的类型,其中void代表不需要状态管理;
2、应用实例:使用ReplayingDecoder编写解码器,对前面的案例进行简化,它可以替代上面的解码器,这样子在服务端读取消息的时候,就不用判断接收到的消息的字节数是否够8个字节,因为它底层已经做了处理,它直接读取消息便可
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext cx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
3、ReplayingDecoder使用方便,但它也有一些局限性
① 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException;
② ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢;
其他编解码器
1、LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
2、DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
3、HttpObjectDecoder:一个HTTP数据的解码器。
4、LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理粘包和半包消息。
......
相关推荐
- sharding-jdbc实现`分库分表`与`读写分离`
-
一、前言本文将基于以下环境整合...
- 三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么
-
在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...
- MySQL8行级锁_mysql如何加行级锁
-
MySQL8行级锁版本:8.0.34基本概念...
- mysql使用小技巧_mysql使用入门
-
1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...
- MySQL/MariaDB中如何支持全部的Unicode?
-
永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...
- 聊聊 MySQL Server 可执行注释,你懂了吗?
-
前言MySQLServer当前支持如下3种注释风格:...
- MySQL系列-源码编译安装(v5.7.34)
-
一、系统环境要求...
- MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了
-
对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...
- MySQL字符问题_mysql中字符串的位置
-
中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...
- 深圳尚学堂:mysql基本sql语句大全(三)
-
数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...
- MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?
-
大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...
- 一文讲清怎么利用Python Django实现Excel数据表的导入导出功能
-
摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...
- 用DataX实现两个MySQL实例间的数据同步
-
DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...
- MySQL数据库知识_mysql数据库基础知识
-
MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...
- 如何为MySQL中的JSON字段设置索引
-
背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...
你 发表评论:
欢迎- 一周热门
-
-
MySQL中这14个小玩意,让人眼前一亮!
-
旗舰机新标杆 OPPO Find X2系列正式发布 售价5499元起
-
【VueTorrent】一款吊炸天的qBittorrent主题,人人都可用
-
面试官:使用int类型做加减操作,是线程安全吗
-
C++编程知识:ToString()字符串转换你用正确了吗?
-
【Spring Boot】WebSocket 的 6 种集成方式
-
PyTorch 深度学习实战(26):多目标强化学习Multi-Objective RL
-
pytorch中的 scatter_()函数使用和详解
-
与 Java 17 相比,Java 21 究竟有多快?
-
基于TensorRT_LLM的大模型推理加速与OpenAI兼容服务优化
-
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)