一 Netty异步和事件驱动
1.1 Netty简介
在开发中,总会有更高的吞吐量和可扩展性的要求,在更低的成本的基础上进行交付。
在网络编程领域,Netty是Java的框架,它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。如下,是Netty的特性的一个总结:
- 设计:统一的API,支持多种传输类型,阻塞和非阻塞的线程模型。
- 易于使用:详细的Javadoc和大量的示例集
- 性能:拥有比Java的核心API更高的吞吐量以及更低的延迟
- 健壮性:不会因为慢速、快速或超载的连接而导致OutOfMemoryError
- 安全性:完整的SSL/TLS以及StartTLS支持
- 社区驱动:发布快速而且频繁
1.1.1 异步和事件驱动
异步事件,简单来说,你在问了一个问题后无需等待答案的出现,在等待的这段时间,可以做点别的事情。
这种能力对于实现最高级别的可伸缩性至关重要:
- 非阻塞网络调用使得程序可以不必等待一个操作的完成。完全异步的I/O正是基于这个特性构建的,并且更进异步:异步方法会立即返回,并且在它完成时,会直接或在稍后的某个时间点通知用户
- 选择器使得程序能够通过较少的线程便可监视许多连接上的事件
1.1.2 Netty的核心组件
Netty的核心组件包括:channel、回调、Future、事件和ChannelHandler。这些模块代表了不同类型的构造:资源、逻辑以及通知。
1.1.2.1 Channel
Channel是Java NIO的一个基本构造:它代表一个实体(如一个硬件设备、一个文件、一个网络套接字或一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。
目前,可以把Channel看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或关闭,连接或者断开连接。
1.1.2.2 回调
一个回调其实就是一个方法,一个指向已经被提供给另一个方法的方法的引用。这使得后者可以在适当的时候调用前者。
Netty在内部使用了回调来处理事件,当一个回调被触发时,相关的事件可以被一个interface-ChannelHandler的实现处理。
如下,是一个代码示例,当一个新的连接已经被建立时,ChannelHandler的channelActive()回调方法将会被调用:
public class ConnectHandler extendes ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client" + ctx.channel().remoteAddress() + " connected");
}
}
1.1.2.3 Future
Future提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对齐结果的访问。
JDK的Future只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。而Netty提供了自己的实现:ChannelFuture,用于在执行异步操作的时候使用。
ChannelFuture提供了几种额外的方法,这些方法使得我们能够注册一个或多个ChannelFutureListener实例。监听器的回调方法operationComplete(),将会在对应的操作完成时被调用。然后监听器判断操作是完成了还是出错了,如果出错了则抛出异常Throwable。
每个Netty的出站I/O都将返回一个ChannelFuture,也就是说,它们都不会阻塞。这就是Netty是完全异步和事件驱动。
1.1.2.4 事件和ChannelHandler
Netty使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的操作,这些动作可能是:
- 记录日志
- 数据转换
- 流控制
- 应用程序逻辑
Netty是一个网络编程框架,所以事件是按照入站与出站数据流的相关性进行分类的,可能由入站数据或者相关的状态更改而触发的事件包括:
- 连接已经被激活或者连接失活
- 数据读取
- 用户事件
- 错误事件
出站事件是未来将会触发的某个动作的操作结果,这些动作包括:
- 打开或者关闭到远程节点的连接
- 将数据写到或者冲刷到套接字
每个事件都可以被分发给ChannelHandler类中的某个用户实现的方法。如下图所示,展示了ChannelHandler链处理:
Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议(如Http和SSL/TLS)的ChannelHandler。在内部,ChannelHanlder自己也使用了事件和Future,使得它们也成为应用程序使用的相同抽象的消费者。
1.2 把它们放在一起
1.2.1 Future、回调和ChannelHandler
Netty的异步编程模型是建立在Future和回调的概念上的,而将事件派发到ChannelHandler的方法则发生在更深的层次上。结合在一起,这些元素就提供了一个处理环境,使你的应用程序逻辑可以独立于任何网络操作相关的顾虑而独立演变。
拦截操作以及高速地转换入站数据和出站数据,都只需要你提供回调或者利用操作返回的Future。这使得链接操作变得既简单又高效,并促进了可重用的通用代码的编写。
1.2.2 选择器、事件和EventLoop
Netty通过触发事件将Selector从应用程序中抽象出来,消除了本来将需要手动编写的派发代码。在内部,将会为每个Channel分配一个EventLoop,用以处理所有事件,包括:
- 注册感兴趣的事件
- 将事件派发给ChannelHandler
- 安排进一步的动作
EventLoop本身只由一个线程驱动,其处理了一个Channel的所有I/O事件,并且在该EventLoop的整个生命周期内都不会改变。这个设计消除了你可能有的在ChannelHandler实现中需要进行同步的任何顾虑。
二 Netty快速入门
在接下来的操作中,将展示如何构建一个基于Netty的客户端和服务器。程序很简单:客户端将消息发送给服务器,而服务器再将消息会送给客户端。
2.1 设置开发环境
我这里的开发环境是:
- JDK8
- Maven
- IDEA
我这里使用的版本是:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
2.2 Netty客户端/服务器概览
如下图所示,编写Echo客户端和服务器应用程序:
上图展示的是多个客户端同时连接到一台服务器,Echo客户端和服务器之间的交互式非常简单的。在客户端建立一个连接之后,它会向服务器发送一个或多个消息,反过来,服务器又会将每个消息会送给客户端。
2.3 编写Echo服务器
所有的Netty服务器都需要以下两部分:
- 至少一个ChannelHandler:实现了服务器对客户端接收的数据的处理,即它的业务逻辑
- 引导:这是配置服务器的启动代码。至少,它会将服务器绑定到它监听连接请求的端口上。
2.3.1 ChannelHandler和业务逻辑
因为你的Echo服务器会响应传入的消息,所以它需要实现ChannelInboundHandler接口,用来定义响应入站事件方法。本程序只用到了少量的方法,所以继承ChannelInboundHandlerAdapter类就足够了,主要实现的方法有:
- channelRead():对于每个传入的消息都要调用
- channelReadComplete():通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的最后一条消息
- exceptionCaught():在读取操作期间,有异常抛出时会调用
如下,是EchoServerHandler的代码:
/**
* Echo服务端处理器
*/
@ChannelHandler.Sharable//是否可以在多个Channel中共享使用
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取消息, 进行业务逻辑编写
* @param ctx 通道上下文
* @param msg 消息数据
* @throws Exception 异常
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("Server receive: " + in.toString(CharsetUtil.UTF_8));//这里只做简单的打印工作
ctx.write(in);//将消息写出去, 而不冲刷出站消息
}
/**
* 通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息
* @param ctx 通道处理器上下文
* @throws Exception 异常
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)//将未读消息冲刷到远程节点
.addListener(ChannelFutureListener.CLOSE);//关闭该Channel
}
/**
* 在读取操作期间, 有异常时抛出
* @param ctx 通道处理器上下文
* @param cause 业务异常
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//异常
ctx.close();//关闭该Channel
}
}
在这个应用程序中,重写了channelRead()方法,逻辑是简单地将数据回送给远程节点。
重写了exceptionCaught()方法允许你对Throwable的任何子类型做出反应,这里我们记录了异常并关闭了连接。
除了ChannelInboundHandlerAdapter之外,还有很多需要了解的ChannelHandler的子类型和实现。目前,记住如下关键点:
- 针对不同类型的事件来调用ChannelHandler
- 应用程序通过实现或者扩展ChannelHandler来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑
- 在架构上,ChannelHandler有助于保持业务逻辑与网络代理代码的分离。这简化了开发过程,因为代码必须不断演化以响应不断变化的需求
2.3.2 引导服务器
由EchoServerHandler实现了核心业务逻辑后,就需要进行引导服务器本身的过程,大致过程如下:
- 绑定到服务器将在其上监听并接收传入连接请求的端口
- 配置Channel,以将有关的入站消息通知给EchoServerHandler示例
如下,是引导类EchoServer:
public class EchoServer {
/** 端口号 */
private static final int port = 8888;
public static void main(String[] args) throws Exception {
new EchoServer().start();;
}
private void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();//创建Handler
EventLoopGroup group = new NioEventLoopGroup();//创建EventLoopGroup
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();//创建ServerBootstrap
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)//指定所使用的NIO传输Channel
.localAddress("192.168.10.79", port)//指定主机地址和端口号
.childHandler(new ChannelInitializer<SocketChannel>() {//添加一个EchoServerHandler到子Channel的ChannelPipeline中
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);//EchoServerHandler被标准为@Shareable,所以我们总是可以使用同样的示例
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();//异步绑定服务器。调用sync()方法阻塞等待直到绑定完成
channelFuture.channel().closeFuture().sync();//获取Channel的CloseFuture, 并且阻塞当前线程直到它完成
} finally {
group.shutdownGracefully().sync();//关闭EventLoopGroup, 释放所有资源
}
}
}
创建了一个ServerBootstrap实例。因为使用的是NIO传输,所以指定了NioEventLoopGroup来接收和处理新的连接,并且将Channel的类型指定为NioServerSocketChannel。在此之后,程序将本地地址(我这里本机地址是192.168.10.79)设置为一个具体选定的端口,服务器绑定到这个地址以监听新的连接请求。
并且,这里使用了一个特殊的类:ChannelInitializer。当一个新的连接被接收时,一个新的子CHannel将会被创建,而ChannelInitializer将会把EchoServerHandler的实例添加到该Channel的ChannelPipeline中。
接下来你绑定了服务器,并等待绑定完成(对sync()方法的调用将导致当前Thread阻塞,一直到绑定操作完成为止)。并且,该应用程序会阻塞等待直到服务器的Channel关闭(在Channel的CloseFuture上调用了sync()方法),然后关闭EventLoopGroup,释放所有的资源,包括所有被创建的线程。
所以,服务器的主要代码组件是:
-
EchoServerHandler实现了业务逻辑
-
main()方法引导了服务器,引导过程如下:
- 创建一个ServerBootstrap的实例以引导和绑定服务器
- 创建并分配一个NioEventLoopGroup实例以进行事件的处理,如接受新连接以及读/写数据
- 指定服务器绑定到本地的InetSocketAddress
- 使用一个EchoServerHandler的实例初始化每一个新的Channel
- 调用ServerBootstrap.bind()方法绑定服务器
这个时候,服务器已经初始化完成。
2.4 编写Echo客户端
Echo客户端将会:
- 连接到服务器
- 发送一个或多个消息
- 对于每个消息,等待并接受从服务器发回的相同的消息
- 关闭连接
2.4.1 通过ChannelHandler实现客户端逻辑
如同服务端,客户端将拥有一个用来处理数据的ChannelInboundHandler。我们扩展了SimpleChannelInboundHandler类处理任务,重写的方法如下:
- channelActive():在到服务器的连接已经建立之后被调用
- channelRead0():当从服务器接收到一个消息时被调用
- exceptionCaught():在处理过程中发生异常时被调用
如下代码,是EchoClientHandler的代码:
/**
* 客户端处理器
*/
@ChannelHandler.Sharable//标记该类的实例可以被多个Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 当被通知Channel是活跃的时候(即与服务器的连接建立之后), 发送一条消息
* @param ctx 通道处理器上下文
* @throws Exception 异常
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty!", CharsetUtil.UTF_8));//发送一条消息
}
/**
* 获取消息
* @param ctx 通道处理器上下文
* @param msg 消息数据
* @throws Exception 异常
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client receive: " + msg.toString(CharsetUtil.UTF_8));//直接打印消息
}
/**
* 发生异常后处理
* @param ctx 消息处理器上下文
* @param cause 异常信息
* @throws Exception 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印异常
ctx.close();//关闭通道
}
}
首先,重写了channelActive()方法,其将在一个连接建立时被调用。这确保了数据将会被尽可能写入服务器。
接下来,重写了channelRead0()方法,每当接收数据时,都会调用这个方法。注意,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5个字节,那么不能保证这5个字节会被一次性接收。即使对于少量的 数据,channelRead()方法也可能会被调用两次,第一次使用一个持有3字节的ButeBuf(Netty的字节容器),第二次使用一个持有2字节的ByteBuf。作为一个面向流的协议,TCP保证了字节数组将会按照服务器发送它们的顺序接收。
exceptionCaught(),如同服务端的处理逻辑,记录Throwable,关闭Channel。终止到服务器的连接。
2.4.2 引导客户端
引导客户端类似于引导服务器,客户端使用主机和端口参数来连接远程地址,这里是Echo服务器的地址和端口。代码如下:
public class EchoHandler {
/** 主机地址 */
private static final String HOST = "192.168.10.79";
/** 端口号 */
private static final int PORT = 8888;
public static void main(String[] args) throws Exception {
new EchoHandler().start();
}
private void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();//创建EventLoopGroup
try {
Bootstrap bootstrap = new Bootstrap();//创建Bootstrap
bootstrap.group(group)//指定EventLoopGroup处理客户端事件
.channel(NioSocketChannel.class)//适用于NIO传输的Channel类型
.remoteAddress(HOST, PORT)//设置服务器的地址和端口
.handler(new ChannelInitializer<SocketChannel>() {//向ChannelPileline中添加一个EchoClientHandler实例
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();//连接到远程节点, 阻塞直到连接完成
channelFuture.channel().closeFuture().sync();//阻塞, 直到Channel关闭
} finally {
group.shutdownGracefully().sync();//关闭线程池, 并且释放所有资源
}
}
}
和之前一样,使用NIO传输(服务器和客户端可以分别使用不同的传输)。
客户端创建过程如下:
- 为初始化客户端,创建一个Bootstrap实例
- 为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据
- 当连接被建立时,一个EchoClientHandler实例会被安装到ChannelPipeline中
- 在所有都设置完成后,调用Bootstrap.connect()方法连接到远程节点
这个时候就可以进行测试了(省略测试过程)。
服务端代码如下:
echoserver.zip
客户端代码如下:
echoclient.zip
评论区