精 灵 王


  • 首页

  • 文章归档

  • 所有分类

  • 关于我

  • 搜索
设计模式之美 分布式 Redis 并发编程 个人成长 周志明的软件架构课 架构 单元测试 LeetCode 工具 位运算 读书笔记 操作系统 MySQL 异步编程 技术方案设计 集合 设计模式 三亚 游玩 转载 Linux 观察者模式 事件 Spring SpringCloud 实战 实战,SpringCloud 源码分析 线程池 同步 锁 线程 线程模型 动态代理 字节码 类加载 垃圾收集器 垃圾回收算法 对象创建 虚拟机内存 内存结构 Java

Java异步非阻塞编程的几种方式

发表于 2021-02-24 | 分类于 Java | 0 | 阅读次数 244

本文系转载至公众号:Java异步非阻塞编程的几种方式
转载系列的文章都是本人觉得写得非常好的,值得认真品读学习的文章。

一  从一个同步的Http调用说起

一个很简单的业务逻辑,其他后端服务提供了一个接口,我们需要通过接口调用,获取到响应的数据。

逆地理接口:通过经纬度获取这个经纬度所在的省市区县以及响应的code:

curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
{"adcode":"510722"}

服务端执行,最简单的同步调用方式:

1

服务端响应之前,IO会阻塞在:java.net.SocketInputStream#socketRead0 的native方法上:

2

通过jstack日志,可以发现,此时这个Thread会一直在runable的状态:

"main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE        at java.net.SocketInputStream.socketRead0(Native Method)        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)        at java.net.SocketInputStream.read(SocketInputStream.java:171)        at java.net.SocketInputStream.read(SocketInputStream.java:141)        at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)        at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)        at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)        at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)        at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)        at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)        at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)        at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)        at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)        at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)        at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)        at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)        at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)        at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)        at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)        at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)                .......

线程模型示例:

3

同步最大的问题是在IO等待的过程中,线程资源没有得到充分的利用,对于大量IO场景的业务吞吐量会有一定限制。

二  JDK NIO & Future

在JDK 1.5 中,JUC提供了Future抽象:

4

5

当然并不是所有的Future都是这样实现的,如 io.netty.util.concurrent.AbstractFuture 就是通过线程轮询去。

这样做的好处是,主线程可以不用等待IO响应,可以去做点其他的,比如说再发送一个IO请求,可以等到一起返回:

"main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)        at sun.misc.Unsafe.park(Native Method)- parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)        at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)        at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)        ....."AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000]   java.lang.Thread.State: RUNNABLE        at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)        at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)        at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)- locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)- locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)- locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)        at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)        at java.lang.Thread.run(Thread.java:748)

6

主线程在等待结果返回过程中依然需要等待,没有根本解决此问题。

三  使用Callback回调方式

第二节中,依然需要主线程等待,获取结果,那么可不可以在主线程完成发送请求后,再也不用关心这个逻辑,去执行其他的逻辑?那就可以使用Callback机制。

7

如此一来,主线程再也不需要关心发起IO后的业务逻辑,发送完请求后,就可以彻底去干其他事情,或者回到线程池中再供调度。如果是HttpServer,那么需要结合Servlet 3.1的异步Servlet。

8

9

异步Servelt参考资料

https://www.cnblogs.com/davenkin/p/async-servlet.html

使用Callback方式,从线程模型中看,发现线程资源已经得到了比较充分的利用,整个过程中已经没有线程阻塞。

四  Callback hell

回调地狱,当Callback的线程还需要执行下一个IO调用的时候,这个时候进入回调地狱模式。

典型的应用场景如,通过经纬度获取行政区域adcode(逆地理接口),然后再根据获得的adcode,获取当地的天气信息(天气接口)。

在同步的编程模型中,几乎不会涉及到此类问题。

10

Callback方式的核心缺陷

五  JDK 1.8 CompletableFuture

那么有没有办法解决Callback Hell的问题?当然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解决这个问题的。

将逆地理的Callback逻辑,封装成一个独立的CompletableFuture,当异步线程回调时,调用 future.complete(T) ,将结果封装。

11

将天气执行的Call逻辑,也封装成为一个独立的CompletableFuture ,完成之后,逻辑同上。

12

compose衔接,whenComplete输出:

13

每一个IO操作,均可以封装为独立的CompletableFuture,从而避免回调地狱。

CompletableFuture,只有两个属性:

  • result:Future的执行结果 (Either the result or boxed AltResult)。

  • stack:操作栈,用于定义这个Future接下来操作的行为 (Top of Treiber stack of dependent actions)。

weatherFuture这个方法是如何被调用的呢?

通过堆栈可以发现,是在 reverseCodeFuture.complete(result) 的时候,并且也将获得的adcode作为参数执行接下来的逻辑。

14

这样一来,就完美解决回调地狱问题,在主的逻辑中,看起来像是在同步的进行编码。

六  Vert.x Future

Info-Service中,大量使用的 Vert.x Future 也是类似的解决的方案,不过设计上使用Handler的概念。

15

核心执行的逻辑差不多:

16

这当然不是Vertx的全部,当然这是题外话了。

七  Reactive Streams

异步编程对吞吐量以及资源有好处,但是有没有统一的抽象去解决此类问题内,答案是 Reactive Streams。

核心抽象:Publisher Subscriber Processor Subscription ,整个包里面,只有这四个接口,没有实现类。

17

在JDK 9里面,已经被作为一种规范封装到 java.util.concurrent.Flow :

18

19

参考资料

https://www.baeldung.com/java-9-reactive-streams

http://ypk1226.com/2019/07/01/reactive/reactive-streams/

https://www.reactivemanifesto.org/

https://projectreactor.io/learn

一个简单的例子:

20

八  Reactor & Spring 5 & Spring WebFlux

Flux & Mono

21

22

参考资料

https://projectreactor.io/docs/core/3.1.0.M3/reference/index.html

https://speakerdeck.com/simonbasle/projectreactor-dot-io-reactor3-intro

精 灵 王 wechat
👆🏼欢迎扫码关注微信公众号👆🏼
  • 本文作者: 精 灵 王
  • 本文链接: https://jinglingwang.cn/archives/java-async-code
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# 设计模式之美 # 分布式 # Redis # 并发编程 # 个人成长 # 周志明的软件架构课 # 架构 # 单元测试 # LeetCode # 工具 # 位运算 # 读书笔记 # 操作系统 # MySQL # 异步编程 # 技术方案设计 # 集合 # 设计模式 # 三亚 # 游玩 # 转载 # Linux # 观察者模式 # 事件 # Spring # SpringCloud # 实战 # 实战,SpringCloud # 源码分析 # 线程池 # 同步 # 锁 # 线程 # 线程模型 # 动态代理 # 字节码 # 类加载 # 垃圾收集器 # 垃圾回收算法 # 对象创建 # 虚拟机内存 # 内存结构 # Java
Java中9种常见的CMS GC问题分析与解决
浅谈分库分表那些事儿
  • 文章目录
  • 站点概览
精 灵 王

精 灵 王

青春岁月,以此为伴

85 日志
14 分类
43 标签
RSS
E-mail
Creative Commons
Links
  • 添加友链说明
© 2022 精 灵 王
渝ICP备2020013371号
0%