一篇文章彻底搞懂Netty编解码器和Handler的调用机制
ztj100 2024-12-14 16:12 30 浏览 0 评论
基本说明
1、netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等;
2、ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如:实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站的数据。
3、ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务器端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。
编码解码器
1、当Netty发送或者接收一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另外一种格式(比如Java对象);如果是出站消息,它会被编码成字节。
2、Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHandler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了,以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler.
解码器-ByteToMessageDecoder
1、由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理,它的类继承关系如下:
2、一个关于ByteToMessageDecoder实例分析
public class ToIntegerDecoder extends ByteToMessageDecoder{
@Override
protected void decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out) throws Exception{
if(in.readableBytes()>=4){
out.add(in.readInt());
}
}
}
说明:
① 这个例子,每次入站从ByteBuf中读取4个字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据;
Netty的handler链的调用机制
实例要求
1、使用自定义的编码器和解码器来说明Netty的handler调用机制;
2、客户端发送long->服务端,服务端发送long->客户端;
代码演示
服务端代码,绑定7000端口,并定义一个初始化类MyServerInitializer来设置服务端的解码器、编码器、和它的业务逻辑处理器
/**
* 服务端
*/
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
//自定义一个初始化类
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerInitializer自定义类的代码
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入站的handler进行解码MybateToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//这是出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
自定义的解码器MyByteToLongDecoder,对于服务端来说,当消息入站时,会调用该解码器对消息进行解码,前提条件是入站的消息类型和解码器的处理消息类型一致解码器才会被调用,注意我们这里演示的是long类型的消息,它的字节长度为8个字节,所以在解码的时候,我们要进行判断一下,有8个字节才进去读取
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*decode 会根据接收的数据,被调用多次,直到确定没有新的元素被添加到list,或者
* 是ByteBuf没有更多的可读字节为止,如果list out 不为空,就会将list的内容传递给下一个
* channelinboundHandler处理器,该处理器的方法也会被调用多次
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out List集合,将解码后的数据传给下一个handler
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder 被调用");
//因为Long 为8个字节,这里需要判断有8个字节,才能读取一个long
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
自定义编码器MyLongToByteEncoder,对于服务端来说,当数据出站时,它需要将数据进行编码再发送,它的原理其实和解码器是一样的道理,只有发送的消息类型和编码器处理的数据类型一致,该编码器才会在消息发出之前被调用,不然,它就不会经过该编码器,消息就会直接发送出去
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
/**
* 编码方法,这个编码方法会根据接收到的数据类型,如果是Long类型就被被调用,
* 如果不是Long类型就不会调用该编码器,直接就会将数据发送出去不会经过该编码器,判断的逻辑在父类MessageToByteEncoder的write方法里面的this.acceptOutboundMessage(msg)
* 因此我们在编写Encoder要注意传入的数据类型和处理的数据类型要一致
* @param ctx 上下文
* @param msg 发送的消息
* @param out
*/
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用,msg="+msg);
//因为msg已经是long类型了,所以这里不需要进行编码,这里只是讲解一下编码的逻辑
out.writeLong(msg);
}
}
服务端的业务逻辑处理器MyServerHandler,这里我们将客户端发送给服务端的消息直接打印出来,并且给服务端发送一个long类型的消息
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端"+ctx.channel().remoteAddress()+" 读取到long "+msg);
//给客户端发送一个long
ctx.writeAndFlush(98979L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端代码,定义一个初始化类MyClientInitializer来设置它的解码器、编码器、和它的业务逻辑处理器
public class MyClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//自定义一个初始化类
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
客户端的初始化类MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行编码
pipeline.addLast(new MyLongToByteEncoder());
//这是一个入站的解码器(入站handler)
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}
然后它也是需要有编码器、解码器,和服务端的原理是一样的,它的业务处理器用来处理业务,也就是它可以只关注业务MyClientHandler,这里客户端的业务逻辑只打印出服务端发送的消息,及一连接上服务端就给服务端发送一条消息
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip="+ctx.channel().remoteAddress()+",收到消息="+msg);
}
/**
* 一连上服务端就给服务端发送消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//发送一个Long类型的数据
ctx.writeAndFlush(1234567L);
}
}
演示代码,先启动服务端,然后再启动客户端,先观察服务端的输出,根据输出可以看到当数据入站时,解码器先被调用,然后再调用业务逻辑处理器,业务逻辑处理器发送消息给客户端的时候,会调用编码器
MyByteToLongDecoder 被调用
从客户端/127.0.0.1:53398 读取到long 1234567
MyLongToByteEncoder encode 被调用,msg=98979
观察客户端的输出,可以看到发送消息给服务端时,编码器被调用,当服务端回送消息回来的时候,解码器被调用然后再到业务逻辑处理器
MyClientHandler 发送数据
MyLongToByteEncoder encode 被调用,msg=1234567
MyByteToLongDecoder 被调用
服务器的ip=localhost/127.0.0.1:7000,收到消息=98979
3、结论
① 不论解码器handler还是编码器handler即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行;
② 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会和期望的可能不一致;
解码器-ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
1、ReplayingDecoder 扩展了ByteToMessageDecoder类,使用这个类我们不必调用readableBytes()方法,参数S指定了用户状态管理的类型,其中void代表不需要状态管理;
2、应用实例:使用ReplayingDecoder编写解码器,对前面的案例进行简化,它可以替代上面的解码器,这样子在服务端读取消息的时候,就不用判断接收到的消息的字节数是否够8个字节,因为它底层已经做了处理,它直接读取消息便可
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext cx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
3、ReplayingDecoder使用方便,但它也有一些局限性
① 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException;
② ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢;
其他编解码器
1、LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
2、DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
3、HttpObjectDecoder:一个HTTP数据的解码器。
4、LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理粘包和半包消息。
......
相关推荐
- 如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL
-
阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...
- Python数据分析:探索性分析
-
写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...
- C++基础语法梳理:算法丨十大排序算法(二)
-
本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...
- C 语言的标准库有哪些
-
C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...
- [深度学习] ncnn安装和调用基础教程
-
1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...
- 用rust实现经典的冒泡排序和快速排序
-
1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...
- ncnn+PPYOLOv2首次结合!全网最详细代码解读来了
-
编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...
- C++特性使用建议
-
1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...
- Qt4/5升级到Qt6吐血经验总结V202308
-
00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...
- 到底什么是C++11新特性,请看下文
-
C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...
- 掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!
-
C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...
- 经典算法——凸包算法
-
凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...
- 一起学习c++11——c++11中的新增的容器
-
c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...
- C++ 编程中的一些最佳实践
-
1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)