集成Netty

Netty是Vert.x的依赖之一。事实上,Netty给予了Vert.x网络服务的能力,你可以使用Vert.x Core编写以下类型的基本网络服务:

  • TCP
  • HTTP
  • UDP
  • DNS

它们是基于 Netty 的各种组件构建的。 Netty 社区已经实现了广泛的组件,本章解释了如何在 Vert.x 中集成这些组件。

在本章中,我们将构建一个 TIME 协议客户端和服务器。 Netty 文档提供了这个简单协议的客户端/服务器实现,我们将重点关注这些组件的集成。

Netty集成点

本章的主要目的是解释 Vert.x 的一些内部接口,此类接口是暴露与 Netty 交互的底层方法的扩展,这些方法对于直接重用 Netty 的组件非常有用。

大多数用户不需要处理这些扩展,因此这些方法被隔离在扩展接口中

启动客户端

ContextInternal 继承了 io.vertx.core.Context 并暴露了一些 Netty 集成点,比如 VertxInternal

通常情况下,上下文是从 Vertx#getOrCreateContext() 方法获取的,该方法返回当前执行上下文,或者在必要时创建一个新的上下文:在 Verticle 中调用时,getOrCreateContext() 返回此 Verticle 的上下文,当在主线程或单元测试等非 Vert.x 线程中使用时,它会创建一个新上下文并返回它。

1
2
3
4
Context context = vertx.getOrCreateContext();

// 强转借此访问额外的方法
Internals contextInternal = (Internals) context;

上下文始终与 Netty 事件循环(event loop)相关联,因此使用此上下文可确保我们的组件重复使用相同的事件循环(如果之前存在)或使用新的事件循环。

ContextInternal#nettyEventLoop() 方法返回这个特定的事件循环,我们可以在 Bootstrap(对于客户端)或 ServerBoostrap(对于服务器)上使用它:

1
2
3
4
5
6
ContextInternal contextInt = (ContextInternal) context; // 1
EventLoop eventLoop = contextInt.nettyEventLoop();

Bootstrap bootstrap = new Bootstrap(); // 2
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(eventLoop);
  • 获取这个上下文关联的event loop
  • 创建客户端的Bootstrap

启动服务端

VertxInternal 扩展了 io.vertx.core.Vertx,其中 VertxInternal#getAcceptorEventLoopGroup() 返回一个 EventLoopGroup用于接受服务器上的连接,它的常见用法是在 ServerBootstrap 上:

1
2
3
4
5
6
7
8
9
ContextInternal contextInt = (ContextInternal) context; // 1
EventLoop eventLoop = contextInt.nettyEventLoop();

VertxInternal vertxInt = contextInt.owner(); // 2
EventLoopGroup acceptorGroup = vertxInt.getAcceptorEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap(); // 3
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
  1. 获取这个上下文关联的event loop
  2. 获取这个Vertx的Acceptor event loop组
  3. 创建服务端的Bootstrap

处理事件

现在我们对ContextInternal更熟悉了,让我们看看如何使用它来处理Netty事件,如网络事件、channel生命周期等…

ContextInternal#emit 方法用于向应用程序发出事件,它确保了:

  1. 上下文并发性:重复利用当前的event-loop线程或者在worker线程执行
  2. 当前上下文与调度线程的thread local关联
  3. 任何抛出的未捕获异常都会在上下文中报告,这样的异常要么被记录,要么被传递给context#exceptionHandler

如下展示了一个简短的服务启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Handler<Channel> bindHandler = ch -> {
};

bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
context.emit(ch, bindHandler);
}
});

Promise<Void> bindPromise = context.promise();

bootstrap.bind(socketAddress).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// 通知应用绑定成功
bindPromise.complete();
} else {
// 通知应用绑定失败
bindPromise.fail(future.cause());
}
}
});

return bindPromise.future();

emit的典型用法是将一个或多个事件下发到同一个处理程序,就像事件处理程序一样。
当涉及到future时,ContextInternal#promise方法会创建一个promise,这个promise会像emit一样对监听器执行操作。

服务端

你可以在这里找到原始的服务器代码实例。

本文示例的Vert.x TIME服务代码暴露了一个简单的接口:

  • 一个创建TimeServer的静态方法
  • 两个方法:通过listen 绑定服务,通过 close 解绑服务
  • requestHandler用于设置处理请求的处理器handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public interface TimeServer {

/**
* @return 返回一个时间服务
*/
static TimeServer create(Vertx vertx) {
return new TimeServerImpl(vertx);
}

/**
* 设置发生时间请求时要调用的handler. 这个handler应该完成包含时间的future
*
* @param handler the handler to be called
* @return this object
*/
TimeServer requestHandler(Handler<Promise<Long>> handler);

/**
* 启动并绑定时间server.
*
* @param port the server port
* @param host the server host
* @return the future completed when the socket is bound
*/
Future<Void> listen(int port, String host);

/**
* 关闭时间server.
*/
void close();

}

接下来实现一个返回当前JVM时间的TIME服务:

1
2
3
4
5
6
7
8
9
10
11
12
Vertx vertx = Vertx.vertx();

// 创建时间server
TimeServer server = TimeServer.create(vertx);
server.requestHandler(time -> {
time.complete(System.currentTimeMillis());
});

// 启动
server.listen(8037, "0.0.0.0")
.onSuccess(v -> System.out.println("Server started"))
.onFailure(err -> err.printStackTrace());

现在让我们研究一下服务器的实现。

服务端的bootstrap

首先让我们看一下ServerBootstrap的创建和配置

1
2
3
4
5
6
7
8
9
10
11
12
13
EventLoopGroup acceptorGroup = vertx.getAcceptorEventLoopGroup(); // 1
EventLoop eventLoop = context.nettyEventLoop(); // 2
bootstrap = new ServerBootstrap(); // 3
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.group(acceptorGroup, eventLoop);
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); // 4
TimeServerHandler handler = new TimeServerHandler(context, requestHandler);
pipeline.addLast(handler);
}
});
  1. VertxInternal返回event loop组当作Acceptor组(Boss接受请求)
  2. ContextInternal 返回 event loop组用child组(worker处理请求)
  3. 创建并配置Netty的ServerBootstrap
  4. 使用requestHandler初始化TimeServerHandler并通过TimeServerHandler配置channel

ServerBootstrap的创建非常简单,与直接使用Netty的版本非常相似,主要的区别在于,我们复用了Verticle和Vert.x提供的事件循环event loop,这确保了我们的服务器共享应用程序的资源(这里指event loop)。

这里要注意,TimeServerHandler是用服务器的requestHandler初始化的,这个handler将在提供TIME请求时使用。

服务绑定

现在让我们来看一下绑定操作,它与直接使用Netty的原始版本示例有很多不同但区别也不是特别大:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Promise<Void> promise = context.promise(); // 1

ChannelFuture bindFuture = bootstrap.bind(host, port);
bindFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
// 2
if (future.isSuccess()) {
channel = future.channel();
promise.complete();
} else {
promise.fail(future.cause());
}
}
});

return promise.future(); // 3
  1. 创建一个绑定这个Server上下文的promise
  2. 将结果promise设置为完成或者成功(代码中应为完成或者失败)
  3. 返回future结果

此处最重要的部分是创建上下文promise,用于让应用程序知道绑定结果。

服务handler

现在,让我们用TimeServerHandler来完成我们的服务器,它改写自Netty原始版本TimeServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Promise<Long> result = Promise.promise(); // 1

context.emit(result, requestHandler); // 2

result.future().onComplete(ar -> { //3
if (ar.succeeded()) { // 4
ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (ar.result() / 1000L + 2208988800L));
ChannelFuture f = ctx.writeAndFlush(time);
f.addListener((ChannelFutureListener) channelFuture -> ctx.close());
} else { // 5
ctx.close();
}
});
  1. 创建一个将由requestHandler解析的空promise
  2. 让上下文使用emit将事件发送给requestHandler
  3. requestHandler的实现完成相关的promise时,调用future的处理程序
  4. 将当前时间写入channel,然后关闭
  5. 如果应用程序失败,只需关闭socket套接字

emit 是 TIME请求事件发生时使用的, 将需要完成(complete)的promise传递给 requestHandler. 当promise完成了(completed), handler将时间结果写入到通道或将其关闭。

客户端

你可以在这里找到原始的客户端代码实例。

本文示例的Vert.x TIME客户端暴露了一个简单的接口:

  • 一个创建TimeClient的静态方法
  • 客户端从服务端获取时间的getTime方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface TimeClient {

/**
* @return 返回一个time客户端
*/
static TimeClient create(Vertx vertx) {
return new TimeClientImpl(vertx);
}

/**
* 从服务器获取当前时间
*
* @param port the server port
* @param host the server host name
* @return the result future
*/
Future<Long> getTime(int port, String host);

}

TIME客户端使用起来很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
Vertx vertx = Vertx.vertx();

// 创建time client
TimeClient server = TimeClient.create(vertx);

// 获取时间
server.getTime(8037, "localhost").onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Time is " + new Date(ar.result()));
} else {
ar.cause().printStackTrace();
}
});

现在让我们研究下客户端的实现。

客户端的bootstrap

首先让我们看下客户端Bootstrap的创建和配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
EventLoop eventLoop = context.nettyEventLoop();  // 1

// 创建并配置 Netty bootstrap
Bootstrap bootstrap = new Bootstrap(); // 2
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline(); // 3
pipeline.addLast(new TimeClientHandler(result));
}
});

return bootstrap;
  1. VertxInternal返回event loop用作child组(用于接受请求)
  2. 创建并配置 Netty bootstrap
  3. 通过返回结果处理器resultHandler(这里是个promise)初始化TimeClientHandler,然后配置channel

Bootstrap的创建非常简单,与Netty原始版本非常相似,主要区别在于我们使用了Verticle提供的event loop事件循环,这确保了我们的客户复用与Verticel相同的event loop。

就像在服务器部分的示例中一样,我们使用ContextInternal来获取要在Bootstrap上设置的Netty的EventLoop

需要注意的是,TimeClientHandler是用客户端resultHandler初始化的,这个处理程序将用于TIME请求结果调用。

客户端连接

BootStrap程序的设置与原始的示例非常相似,在失败的情况下,应用程序将使用一个包含整体结果的promise作为回调。

1
2
3
4
5
6
7
8
9
ChannelFuture connectFuture = bootstrap.connect(host, port); // 1
connectFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
result.fail(future.cause()); // 2
}
}
});
  1. 连接到服务器
  2. 一旦连接失败,将promise置为失败

客户端handler

现在,让我们用TimeClientHandler来完成我们的客户端,它是对Netty原始版本TimeClientHandler的改写:

1
2
3
4
5
6
7
8
9
10
ByteBuf m = (ByteBuf) msg;
long currentTimeMillis;
try {
currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; // 1
resultPromise.complete(currentTimeMillis); // 2
resultPromise = null; // 3
ctx.close(); // 4
} finally {
m.release();
}
  1. 解码从服务器返回的时间
  2. 使用response将resultPromise置为完成
  3. resultPromise设置为null
  4. 关闭channel

这里重复说明下,当TIME响应事件发生时,我们将resultPromise设置为完成。