异步服务

目录

现如今互联网架构越来越追求系统的稳定、高效、高性能、低延迟等特性。于是我们将业务系统拆分,形成微服务,领域模型架构。提供服务多节点部署,提高系统可扩展性。

需求

平台用户增大,需求增大,访问量变大,数据越来越复杂,必不可免大批量的密集IO处理。而系统可提供的请求量有限,如果以传统的同步请求调用,每一次请求对应一条线程处理,每条线程大量时间被IO操作而占用,这样必然会限制系统瓶颈。此时除了水平扩展系统节点之外,我们还能够使用异步和事件模式来提升系统吞吐量。

dapeng-soa 异步模型

采用Java Future模式、Filter-Chain模式、netty异步框架、nio底层selector模型等实现dapeng服务的异步调用

dapeng-soa 底层网络通讯基于 Netty 实现 使用者 启动客户端与服务端,同时注册用户请求处理器即可完成远程调用。

本文将采用框架支持的 scala 语言开发一个简单的异步服务,来说明 dapeng 的异步特性。

异步服务端接口

package com.github.dapeng.hello.scala.service

import com.github.dapeng.core.{Processor, Service}
import com.github.dapeng.core.SoaGlobalTransactional
import scala.concurrent.Future

 /**
  * Autogenerated by Dapeng-Code-Generator (2.0.5)
  *
  * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
  **/
@Service(name = "com.github.dapeng.hello.service.HelloService", version = "1.0.0")
@Processor(className = "com.github.dapeng.hello.scala.HelloServiceAsyncCodec$Processor")
trait HelloServiceAsync extends com.github.dapeng.core.definition.AsyncService {

  
  @throws[com.github.dapeng.core.SoaException]
  def sayHello(hello: com.github.dapeng.hello.scala.domain.Hello): Future[String]

}
    

异步服务接口和同步普通接口唯一不同的是返回值不同, 异步接口返回是一个 Future 包装的结果, 我们知道 Future 是 Java 异步模型中的一个接口,代表未来的结果。

异步服务端实现

/**
  * @author maple 2018.09.13 上午10:02
  */
class HelloServiceImpl extends HelloServiceAsync {
 
  private val logger = LoggerFactory.getLogger(getClass)
  
  /**
    * 服务实现方法
    **/
  override def sayHello(hello: Hello): Future[HelloResponse] = {
    
    //耗时很久的方法 20s
    val result: CompletableFuture[Response] = postPayThirdRequest(hello)

    toScala(result)(resp => HelloResponse(s"$hello", resp.getResponseBody))
  }

  /**
    * 调用第三方接口
    */
  def postPayThirdRequest(hello: Hello): CompletableFuture[Response] = wrapLog("请求第三方接口") {
    //      requestForGet
    val asyncHttpFuture: CompletableFuture[Response] = HttpAsyncClient.requestForGet("http://127.0.0.1:8080/test", "")
    asyncHttpFuture
  }

  /**
    * 记录日志
    */
  def wrapLog[T](label: String)(f: => T): T = {
    val startTime = System.currentTimeMillis
    val result = f
    val endTime = System.currentTimeMillis
    logger.info(s"$label => 程序运行时间:${(endTime - startTime) / 1000}")
    result
  }
  
  /**
    * 将 java CompletableFuture 转换为 scala Future
    */
  def toScala[T, R](response: CompletableFuture[T])(extractor: T => R): Future[R] = {

    val promise = Promise[R]()
    response.whenComplete((res: T, ex) => {
      if (ex != null) promise.failure(ex)
      else promise.success(extractor(res))
    })
    promise.future
  }
}

由于 sayHello 方法 主要的耗时操作为调用第三方接口,本地线程阻塞等待比较消耗 sayHello方法会马上返回一个Future对象,然后当前IO线程会继续处理下一个来自客户端的请求。具体方法的处理会在业务线程池中进行执行。这样服务端就达到了异步处理的效果。

同步 http 请求第三方接口

上面 HttpAsyncClient.requestForGet 方法是调用远程第三方的某个接口,可能会比较耗时,如果我们按照上面的步骤实现了异步服务并返回 Future, 但是这里这个方法仍使用普通的同步http请求,如下面代码,使用 httpClient 请求

public static String doPost(String url, JSONObject json) {
        HttpClient client = CustomHttpClient.getHttpClient();
        HttpPost post = new HttpPost(url);
        String response = null;
        try {
            StringEntity s = new StringEntity(json.toString());
            s.setContentEncoding("UTF-8");
            s.setContentType("application/json");//发送json数据需要设置contentType
            post.setEntity(s);
            HttpResponse res = client.execute(post);
            if (res.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
                HttpEntity entity = res.getEntity();
                String result = EntityUtils.toString(res.getEntity());// 返回json格式:
                response = result;
            }
            client.getConnectionManager().shutdown();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return response;
    }

上述请求是同步的,其实这里我们只是将dapeng-container处理线程该处理的任务扔到了另外一个线程中进行执行,该线程仍然会阻塞等待第三方接口返回。实际上,这是一个假异步模式,这种实现模式甚至没有直接在dapeng-container线程中执行效果好,因为这里会有线程上下文切换的耗时操作。

使用 asynchttpclient 类库 进行异步 http 请求

上面 HttpAsyncClient.requestForGet 方法是调用远程第三方的某个接口,可能会比较耗时,如果我们按照上面的步骤实现了异步服务并返回 Future, 但是这里这个方法仍使用普通的同步http请求,如下面代码,使用 httpClient 请求 那么我们如何实现真正意义上的异步呢?也就是所有线程都不会被阻塞,这里就要引入 Reator 模型的概念了,类似 java nio 一样 ,IO线程可以多路服用,做到同步非阻塞模式,一条线程可以处理很多请求。

我们引入的 asynchttpclient 就是基于 netty 实现的异步http请求模式,下面看代码

public static CompletableFuture<Response> requestForGet(String url, String json) {
    //创建Async http 请求客户端
    AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient();

    //通过传入的 url 链式调用 远程接口,并立即返回一个 Future
     CompletableFuture<Response> whenResponse = asyncHttpClient
                .prepareGet(url)
                .execute()
                .toCompletableFuture();

    LOGGER.info("异步调用 requestForGet 完毕, 返回 CompletableFuture ");
    return whenResponse;
}

asynchttpclient 类库底层使用 netty 实现, 实现了真正意义上的异步,即线程不会阻塞等待。

这样服务端处理流程简化如下:

服务端异步执行流程图

服务端异步执行流程.png

服务端采用 netty 作为网络通讯框架, netty io 线程 轮询 获取远程 客户端的连接请求,并执行解码操作,然后解码后将请求调度给业务线程进行处理,业务线程需要调用第三方http接口,这里采用了异步 http 模型,请求第三方接口后,马上返回一个 future 对象,业务线程结束。 async-http-io 线程 会非阻塞方式轮询返回结果,当结果返回时,理解complete future ,之前在 future上设置的回调方法就会触发,代码如下:

// resp 即第三方 http 接口返回的结果
toScala(result)(resp => HelloResponse(s"$hello", resp.getResponseBody))

def toScala[T, R](response: CompletableFuture[T])(extractor: T => R): Future[R] = {

    val promise = Promise[R]()
    response.whenComplete((res: T, ex) => {
      if (ex != null) promise.failure(ex)
      else promise.success(extractor(res))
    })
    promise.future
  }

future 是完成状态后, dapeng框架采用 filter-chain 模式实现事件通知,具体如下: 容器使用 CompletableFuture(Java 1.7 新特性) 监听 future 完成 ,如果完成后,会调用 nettyChannalHandlerContext.writeAndFlush 将消息响应回客户端。

public void onEntry(FilterContext filterContext, FilterChain next) {
    // 省略部分代码... 
   
    //判断是不是异步服务
    if (serviceDef.isAsync) {
        SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
        // 获取服务实现类返回的 CompletableFuture 
        CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);
        //当 future 是完成状态时会触发下面的实现
        future.whenComplete((realResult, ex) -> {
            try {
                TransactionContext.Factory.currentInstance(transactionContext);
                // 返回结果有异常
                if (ex != null) {
                    SoaException soaException = ExceptionUtil.convertToSoaException(ex);
                    attachErrorInfo(transactionContext, soaException);
                } else {
                    // 处理返回结果
                    processResult(soaFunction, transactionContext, realResult, filterContext);
                }
                //在 future完成时才会触发filter-chain 的 onExit方法
                onExit(filterContext, getPrevChain(filterContext));
            } finally {
                TransactionContext.Factory.removeCurrentInstance();
            }
       });
    } else {
    //省略同步服务处理代码
}

whenComplete 中 调用 onExit 使 过滤链条能够往回执行,最终到容器 HeadFilter 的 onExit 方法

public void onExit(FilterContext filterContext, FilterChain prev) {
    // ... 部分代码省略
    SoaResponseWrapper responseWrapper = new SoaResponseWrapper(transactionContext,
        Optional.ofNullable(filterContext.getAttribute("result")),
        Optional.ofNullable((BeanSerializer) filterContext.getAttribute("respSerializer")));
    // 这里调用了 netty 的 writeAndFlush 响应客户端
    channelHandlerContext.writeAndFlush(responseWrapper).addListener(FIRE_EXCEPTION_ON_FAILURE);
}

通过上面的分析,我们已经了解 dapeng 服务端异步的使用方式及执行流程。

异步客户端

下面是一个比较简单的异步客户端的例子,我们只需要new 需要调用的服务的异步客户端,然后再使用时,直接调用需要使用的方法。

 public static void main(String[] args) throws SoaException {
    com.github.dapeng.hello.service.HelloServiceAsync helloService = new HelloServiceAsyncClient();
    Hello hello = new Hello();
    hello.setName("maple");
    CompletableFuture<HelloResponse> responseFuture = (CompletableFuture<HelloResponse>) helloService.sayHello(hello);

    responseFuture.whenComplete((result, ex) -> {

        if (ex != null) {
            logger.error(ex.getMessage(), ex);
        } else {

            logger.info("返回结果: {}", result);
        }

    });

    logger.info("主线程执行完成!");
    }

当我们运行main程序后,客户端会去寻址并请求服务端,然后立即返回一个future,我们仍然监听这个future的完成情况,当完成后,打印返回结果。

17:37:17.070 [main] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=com.github.dapeng.registry.zookeeper.ClientZk$$Lambda$4/1434041222@7a36aefa

17:37:17.173 [main] INFO  c.g.d.registry.zookeeper.ClientZk - ClientZk::syncZkRuntimeInfo[com.github.dapeng.hello.service.HelloService], 获取/soa/runtime/services/com.github.dapeng.hello.service.HelloService的子节点成功
17:37:17.176 [main] INFO  c.g.d.registry.zookeeper.ClientZk - <-> syncZkRuntimeInfo 触发服务实例同步,目前服务实例列表:com.github.dapeng.hello.service.HelloService -> IP:【192.168.1.103:9095】

17:37:17.667 [main] INFO  c.g.dapeng.client.filter.LogFilter - LogFilter::onEntry,request[seqId:0, server:192.168.1.103:9095]:service[com.github.dapeng.hello.service.HelloService]:version[1.0.0]:method[sayHello]

17:37:17.716 [main] INFO  ClientService - 主线程执行完成!

17:37:38.046 [nioEventLoopGroup-2-1] INFO  ClientService - 返回结果: {"content":"Hello(maple,None)","thirdMsg":"SUCCESS"}

观察日志,我们看到主线程早早结束,然后异步等待服务端返回结果,最终结果是在 nioEventLoopGroup 这个 netty 的 io 线程中返回的。

异步网关 Dapeng-Mesh 与异步客户端的结合

我们将采用基于 netty 的 http 网关 dapengMesh 和 异步客户端请求来讲解dapeng高性能的异步客户端模式

异步网关 dapeng-mesh 采用 netty 实现, netty接收到前端 http的请求后,解析该请求,然后通过dapeng提供的 json 和 元数据 直接将 请求字符串 序列化为 bytebuf 请求对象,通过注册中心寻址提供服务的ip 端口,然后构造 netty client 客户端发送请求,立即返回 future. 当前线程可以继续执行下一个请求, 实现异步非阻塞。

当服务端有请求返回时,该 future会进行回调触发。

核心处理逻辑代码: netty 接收 http 请求的 handler

@ChannelHandler.Sharable
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);

    private final HttpPostProcessor postHandler = new HttpPostProcessor();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
        try {
            doService(httpRequest, ctx);
        } catch (Exception e) {
            logger.error("网关处理请求失败: " + e.getMessage(), e);
            HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.ProcessReqFailed), null, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private void doService(FullHttpRequest request, ChannelHandlerContext ctx) throws Exception {
        dispatchRequest(request, ctx);
    }

    private void dispatchRequest(FullHttpRequest request, ChannelHandlerContext ctx) {
        HttpMethod method = request.method();
        boolean isGet = HttpMethod.GET.equals(method);
        if (isGet || HttpMethod.HEAD.equals(method)) {
            handlerGetAndHead(request, ctx);
            return;
        }
        boolean isPost = HttpMethod.POST.equals(method);
        if (isPost) {
            postHandler.handlerPostRequest(request, ctx);
            return;
        }
        HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.RequestTypeNotSupport), request, HttpResponseStatus.OK);
    }

handlerPostRequest 专用于处理 post请求

public void handlerPostRequest(FullHttpRequest request, ChannelHandlerContext ctx) {
        String uri = request.uri();
        PostRequestInfo info =  UrlMappingResolver.handlerMappingUrl(uri, request);;
 
        if (info != null) {
            String parameter = RequestParser.fastParseParam(request, "parameter");

            CompletableFuture<String> jsonResponse = (CompletableFuture<String>) PostUtil.postAsync(info.getService(), info.getVersion(), info.getMethod(), parameter, request, InvokeUtil.getCookies(info));
            long beginTime = System.currentTimeMillis();
            jsonResponse.whenComplete((result, ex) -> {
                try {
                    if (ex != null) {
                        String resp = //省略//;               
                        HttpProcessorUtils.sendHttpResponse(ctx, resp, request, HttpResponseStatus.OK);
                    } else {
                        String response = "{}".equals(result) ? "{\"status\":1}" : result.substring(0, result.lastIndexOf('}')) + ",\"status\":1}";
                        HttpProcessorUtils.sendHttpResponse(ctx, response, request, HttpResponseStatus.OK);
                    }
                } 
            });
        } else {
            HttpProcessorUtils.sendHttpResponse(ctx, HttpProcessorUtils.wrapErrorResponse(DapengMeshCode.IllegalRequest), request, HttpResponseStatus.OK);
        }
    }

由于篇幅原因,没有系统介绍 dapeng-mesh的代码,有兴趣的朋友可以直接在github上查看完整实现代码(https://github.com/dapeng-soa/dapeng-mesh)