目录
现如今互联网架构越来越追求系统的稳定、高效、高性能、低延迟等特性。于是我们将业务系统拆分,形成微服务,领域模型架构。提供服务多节点部署,提高系统可扩展性。
需求
平台用户增大,需求增大,访问量变大,数据越来越复杂,必不可免大批量的密集IO处理。而系统可提供的请求量有限,如果以传统的同步请求调用,每一次请求对应一条线程处理,每条线程大量时间被IO操作而占用,这样必然会限制系统瓶颈。此时除了水平扩展系统节点之外,我们还能够使用异步和事件模式来提升系统吞吐量。
dapeng-soa 异步模型
采用Java Future模式、Filter-Chain模式、netty异步框架、nio底层selector模型等实现dapeng服务的异步调用
dapeng-soa 底层网络通讯基于 Netty 实现 使用者 启动客户端与服务端,同时注册用户请求处理器即可完成远程调用。
本文将采用框架支持的 scala
语言开发一个简单的异步服务,来说明 dapeng
的异步特性。
异步服务端接口
package com.github.dapeng.hello.scala.service
import com.github.dapeng.core.{Processor, Service}
import com.github.dapeng.core.SoaGlobalTransactional
import scala.concurrent.Future
/**
* Autogenerated by Dapeng-Code-Generator (2.0.5)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
**/
@Service(name = "com.github.dapeng.hello.service.HelloService", version = "1.0.0")
@Processor(className = "com.github.dapeng.hello.scala.HelloServiceAsyncCodec$Processor")
trait HelloServiceAsync extends com.github.dapeng.core.definition.AsyncService {
@throws[com.github.dapeng.core.SoaException]
def sayHello(hello: com.github.dapeng.hello.scala.domain.Hello): Future[String]
}
异步服务接口和同步普通接口唯一不同的是返回值不同, 异步接口返回是一个 Future
包装的结果, 我们知道 Future 是 Java 异步模型中的一个接口,代表未来的结果。
异步服务端实现
/**
* @author maple 2018.09.13 上午10:02
*/
class HelloServiceImpl extends HelloServiceAsync {
private val logger = LoggerFactory.getLogger(getClass)
/**
* 服务实现方法
**/
override def sayHello(hello: Hello): Future[HelloResponse] = {
//耗时很久的方法 20s
val result: CompletableFuture[Response] = postPayThirdRequest(hello)
toScala(result)(resp => HelloResponse(s"$hello", resp.getResponseBody))
}
/**
* 调用第三方接口
*/
def postPayThirdRequest(hello: Hello): CompletableFuture[Response] = wrapLog("请求第三方接口") {
// requestForGet
val asyncHttpFuture: CompletableFuture[Response] = HttpAsyncClient.requestForGet("http://127.0.0.1:8080/test", "")
asyncHttpFuture
}
/**
* 记录日志
*/
def wrapLog[T](label: String)(f: => T): T = {
val startTime = System.currentTimeMillis
val result = f
val endTime = System.currentTimeMillis
logger.info(s"$label => 程序运行时间:${(endTime - startTime) / 1000}")
result
}
/**
* 将 java CompletableFuture 转换为 scala Future
*/
def toScala[T, R](response: CompletableFuture[T])(extractor: T => R): Future[R] = {
val promise = Promise[R]()
response.whenComplete((res: T, ex) => {
if (ex != null) promise.failure(ex)
else promise.success(extractor(res))
})
promise.future
}
}
由于 sayHello
方法 主要的耗时操作为调用第三方接口,本地线程阻塞等待比较消耗
sayHello方法会马上返回一个Future对象,然后当前IO线程会继续处理下一个来自客户端的请求。具体方法的处理会在业务线程池中进行执行。这样服务端就达到了异步处理的效果。
同步 http
请求第三方接口
上面 HttpAsyncClient.requestForGet
方法是调用远程第三方的某个接口,可能会比较耗时,如果我们按照上面的步骤实现了异步服务并返回 Future
, 但是这里这个方法仍使用普通的同步http请求,如下面代码,使用 httpClient
请求
public static String doPost(String url, JSONObject json) {
HttpClient client = CustomHttpClient.getHttpClient();
HttpPost post = new HttpPost(url);
String response = null;
try {
StringEntity s = new StringEntity(json.toString());
s.setContentEncoding("UTF-8");
s.setContentType("application/json");//发送json数据需要设置contentType
post.setEntity(s);
HttpResponse res = client.execute(post);
if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
HttpEntity entity = res.getEntity();
String result = EntityUtils.toString(res.getEntity());// 返回json格式:
response = result;
}
client.getConnectionManager().shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
return response;
}
上述请求是同步的,其实这里我们只是将dapeng-container处理线程该处理的任务扔到了另外一个线程中进行执行,该线程仍然会阻塞等待第三方接口返回。实际上,这是一个假异步模式,这种实现模式甚至没有直接在dapeng-container线程中执行效果好,因为这里会有线程上下文切换的耗时操作。
使用 asynchttpclient
类库 进行异步 http
请求
上面 HttpAsyncClient.requestForGet
方法是调用远程第三方的某个接口,可能会比较耗时,如果我们按照上面的步骤实现了异步服务并返回 Future
, 但是这里这个方法仍使用普通的同步http请求,如下面代码,使用 httpClient
请求
那么我们如何实现真正意义上的异步呢?也就是所有线程都不会被阻塞,这里就要引入 Reator 模型的概念了,类似 java nio 一样 ,IO线程可以多路服用,做到同步非阻塞模式,一条线程可以处理很多请求。
我们引入的 asynchttpclient
就是基于 netty 实现的异步http请求模式,下面看代码
public static CompletableFuture<Response> requestForGet(String url, String json) {
//创建Async http 请求客户端
AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();
//通过传入的 url 链式调用 远程接口,并立即返回一个 Future
CompletableFuture<Response> whenResponse = asyncHttpClient
.prepareGet(url)
.execute()
.toCompletableFuture();
LOGGER.info("异步调用 requestForGet 完毕, 返回 CompletableFuture ");
return whenResponse;
}
asynchttpclient
类库底层使用 netty
实现, 实现了真正意义上的异步,即线程不会阻塞等待。
这样服务端处理流程简化如下:
服务端异步执行流程图
服务端采用 netty
作为网络通讯框架, netty io
线程 轮询 获取远程 客户端的连接请求,并执行解码操作,然后解码后将请求调度给业务线程进行处理,业务线程需要调用第三方http接口,这里采用了异步 http
模型,请求第三方接口后,马上返回一个 future
对象,业务线程结束。 async-http-io 线程 会非阻塞方式轮询返回结果,当结果返回时,理解complete future ,之前在 future上设置的回调方法就会触发,代码如下:
// resp 即第三方 http 接口返回的结果
toScala(result)(resp => HelloResponse(s"$hello", resp.getResponseBody))
def toScala[T, R](response: CompletableFuture[T])(extractor: T => R): Future[R] = {
val promise = Promise[R]()
response.whenComplete((res: T, ex) => {
if (ex != null) promise.failure(ex)
else promise.success(extractor(res))
})
promise.future
}
future
是完成状态后, dapeng框架采用 filter-chain
模式实现事件通知,具体如下:
容器使用 CompletableFuture
(Java 1.7 新特性) 监听 future
完成 ,如果完成后,会调用 netty
的 ChannalHandlerContext.writeAndFlush
将消息响应回客户端。
public void onEntry(FilterContext filterContext, FilterChain next) {
// 省略部分代码...
//判断是不是异步服务
if (serviceDef.isAsync) {
SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
// 获取服务实现类返回的 CompletableFuture
CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
//当 future 是完成状态时会触发下面的实现
future.whenComplete((realResult, ex) -> {
try {
TransactionContext.Factory.currentInstance(transactionContext);
// 返回结果有异常
if (ex != null) {
SoaException soaException = ExceptionUtil.convertToSoaException(ex);
attachErrorInfo(transactionContext, soaException);
} else {
// 处理返回结果
processResult(soaFunction, transactionContext, realResult, filterContext);
}
//在 future完成时才会触发filter-chain 的 onExit方法
onExit(filterContext, getPrevChain(filterContext));
} finally {
TransactionContext.Factory.removeCurrentInstance();
}
});
} else {
//省略同步服务处理代码
}
在 whenComplete
中 调用 onExit
使 过滤链条能够往回执行,最终到容器 HeadFilter 的 onExit 方法
public void onExit(FilterContext filterContext, FilterChain prev) {
// ... 部分代码省略
SoaResponseWrapper responseWrapper = new SoaResponseWrapper(transactionContext,
Optional.ofNullable(filterContext.getAttribute("result")),
Optional.ofNullable((BeanSerializer) filterContext.getAttribute("respSerializer")));
// 这里调用了 netty 的 writeAndFlush 响应客户端
channelHandlerContext.writeAndFlush(responseWrapper).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
通过上面的分析,我们已经了解 dapeng
服务端异步的使用方式及执行流程。
异步客户端
下面是一个比较简单的异步客户端的例子,我们只需要new 需要调用的服务的异步客户端,然后再使用时,直接调用需要使用的方法。
public static void main(String[] args) throws SoaException {
com.github.dapeng.hello.service.HelloServiceAsync helloService = new HelloServiceAsyncClient();
Hello hello = new Hello();
hello.setName("maple");
CompletableFuture<HelloResponse> responseFuture = (CompletableFuture<HelloResponse>) helloService.sayHello(hello);
responseFuture.whenComplete((result, ex) -> {
if (ex != null) {
logger.error(ex.getMessage(), ex);
} else {
logger.info("返回结果: {}", result);
}
});
logger.info("主线程执行完成!");
}
当我们运行main程序后,客户端会去寻址并请求服务端,然后立即返回一个future,我们仍然监听这个future的完成情况,当完成后,打印返回结果。
17:37:17.070 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=com.github.dapeng.registry.zookeeper.ClientZk$$Lambda$4/1434041222@7a36aefa
17:37:17.173 [main] INFO c.g.d.registry.zookeeper.ClientZk - ClientZk::syncZkRuntimeInfo[com.github.dapeng.hello.service.HelloService], 获取/soa/runtime/services/com.github.dapeng.hello.service.HelloService的子节点成功
17:37:17.176 [main] INFO c.g.d.registry.zookeeper.ClientZk - <-> syncZkRuntimeInfo 触发服务实例同步,目前服务实例列表:com.github.dapeng.hello.service.HelloService -> IP:【192.168.1.103:9095】
17:37:17.667 [main] INFO c.g.dapeng.client.filter.LogFilter - LogFilter::onEntry,request[seqId:0, server:192.168.1.103:9095]:service[com.github.dapeng.hello.service.HelloService]:version[1.0.0]:method[sayHello]
17:37:17.716 [main] INFO ClientService - 主线程执行完成!
17:37:38.046 [nioEventLoopGroup-2-1] INFO ClientService - 返回结果: {"content":"Hello(maple,None)","thirdMsg":"SUCCESS"}
观察日志,我们看到主线程早早结束,然后异步等待服务端返回结果,最终结果是在 nioEventLoopGroup
这个 netty
的 io 线程中返回的。
异步网关 Dapeng-Mesh 与异步客户端的结合
我们将采用基于 netty 的 http 网关 dapengMesh 和 异步客户端请求来讲解dapeng高性能的异步客户端模式
异步网关 dapeng-mesh 采用 netty 实现, netty接收到前端 http的请求后,解析该请求,然后通过dapeng提供的 json 和 元数据 直接将 请求字符串 序列化为 bytebuf 请求对象,通过注册中心寻址提供服务的ip 端口,然后构造 netty client 客户端发送请求,立即返回 future. 当前线程可以继续执行下一个请求, 实现异步非阻塞。
当服务端有请求返回时,该 future会进行回调触发。
核心处理逻辑代码: netty 接收 http 请求的 handler
@ChannelHandler.Sharable
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);
private final HttpPostProcessor postHandler = new HttpPostProcessor();
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
try {
doService(httpRequest, ctx);
} catch (Exception e) {
logger.error("网关处理请求失败: " + e.getMessage(), e);
HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.ProcessReqFailed), null, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
private void doService(FullHttpRequest request, ChannelHandlerContext ctx) throws Exception {
dispatchRequest(request, ctx);
}
private void dispatchRequest(FullHttpRequest request, ChannelHandlerContext ctx) {
HttpMethod method = request.method();
boolean isGet = HttpMethod.GET.equals(method);
if (isGet || HttpMethod.HEAD.equals(method)) {
handlerGetAndHead(request, ctx);
return;
}
boolean isPost = HttpMethod.POST.equals(method);
if (isPost) {
postHandler.handlerPostRequest(request, ctx);
return;
}
HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.RequestTypeNotSupport), request, HttpResponseStatus.OK);
}
handlerPostRequest 专用于处理 post请求
public void handlerPostRequest(FullHttpRequest request, ChannelHandlerContext ctx) {
String uri = request.uri();
PostRequestInfo info = UrlMappingResolver.handlerMappingUrl(uri, request);;
if (info != null) {
String parameter = RequestParser.fastParseParam(request, "parameter");
CompletableFuture<String> jsonResponse = (CompletableFuture<String>) PostUtil.postAsync(info.getService(), info.getVersion(), info.getMethod(), parameter, request, InvokeUtil.getCookies(info));
long beginTime = System.currentTimeMillis();
jsonResponse.whenComplete((result, ex) -> {
try {
if (ex != null) {
String resp = //省略//;
HttpProcessorUtils.sendHttpResponse(ctx, resp, request, HttpResponseStatus.OK);
} else {
String response = "{}".equals(result) ? "{\"status\":1}" : result.substring(0, result.lastIndexOf('}')) + ",\"status\":1}";
HttpProcessorUtils.sendHttpResponse(ctx, response, request, HttpResponseStatus.OK);
}
}
});
} else {
HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.IllegalRequest), request, HttpResponseStatus.OK);
}
}
由于篇幅原因,没有系统介绍 dapeng-mesh的代码,有兴趣的朋友可以直接在github上查看完整实现代码(https://github.com/dapeng-soa/dapeng-mesh)