|
异步执行对于开发者来说并不陌生,在实际的开发过程中,很多场景多会使用到异步,相比同步执行,异步可以大大缩短请求链路耗时时间,比如:发送短信、邮件、异步更新等,这些都是典型的可以通过异步实现的场景。
什么是异步?
首先我们先看一个常见的用户下单的场景:

在同步操作中,我们执行到 发送短信 的时候,我们必须等待这个方法彻底执行完才能执行 赠送积分 这个操作,如果 赠送积分 这个动作执行时间较长,发送短信需要等待,这就是典型的同步场景。
实际上,发送短信和赠送积分没有任何的依赖关系,通过异步,我们可以实现赠送积分和发送短信这两个操作能够同时进行,比如:

这就是所谓的异步,是不是非常简单,下面就说说异步的几种实现方式吧。
异步的八种实现方式
- 线程Thread
- Future
- 异步框架CompletableFuture
- Spring注解@Async
- Spring ApplicationEvent事件
- 消息队列
- 第三方异步框架,比如Hutool的ThreadUtil
- Guava异步
1. 线程异步
publicclassAsyncThreadextendsThread{@Overridepublicvoidrun(){System.out.println("Currentthreadname:"+Thread.currentThread().getName()+"Sendemailsuccess!");}publicstaticvoidmain(String[]args){AsyncThreadasyncThread=newAsyncThread();asyncThread.run();}}当然如果每次都创建一个Thread线程,频繁的创建、销毁,浪费系统资源,我们可以采用线程池:
privateExecutorServiceexecutorService=Executors.newCachedThreadPool();publicvoidfun(){executorService.submit(newRunnable(){@Overridepublicvoidrun(){log.info("执行业务逻辑...");}});}可以将业务逻辑封装到Runnable或Callable中,交由线程池来执行。
2. Future异步
@Slf4jpublicclassFutureManager{publicStringexecute()throwsException{ExecutorServiceexecutor=Executors.newFixedThreadPool(1);Future<String>future=executor.submit(newCallable<String>(){@OverridepublicStringcall()throwsException{System.out.println(&#34;---taskstart---&#34;);Thread.sleep(3000);System.out.println(&#34;---taskfinish---&#34;);return&#34;thisisfutureexecutefinalresult!!!&#34;;}});//这里需要返回值时会阻塞主线程Stringresult=future.get();log.info(&#34;Futuregetresult:{}&#34;,result);returnresult;}@SneakyThrowspublicstaticvoidmain(String[]args){FutureManagermanager=newFutureManager();manager.execute();}}输出结果:
---taskstart------taskfinish---Futuregetresult:thisisfutureexecutefinalresult!!!Future的不足之处
Future的不足之处的包括以下几点:
- 无法被动接收异步任务的计算结果:虽然我们可以主动将异步任务提交给线程池中的线程来执行,但是待异步任务执行结束之后,主线程无法得到任务完成与否的通知,它需要通过get方法主动获取任务执行的结果。
- Future件彼此孤立:有时某一个耗时很长的异步任务执行结束之后,你想利用它返回的结果再做进一步的运算,该运算也会是一个异步任务,两者之间的关系需要程序开发人员手动进行绑定赋予,Future并不能将其形成一个任务流(pipeline),每一个Future都是彼此之间都是孤立的,所以才有了后面的CompletableFuture,CompletableFuture就可以将多个Future串联起来形成任务流。
- Futrue没有很好的错误处理机制:截止目前,如果某个异步任务在执行发的过程中发生了异常,调用者无法被动感知,必须通过捕获get方法的异常才知晓异步任务执行是否出现了错误,从而在做进一步的判断处理。
3. CompletableFuture实现异步
publicclassCompletableFutureCompose{/***thenAccept子任务和父任务公用同一个线程*/@SneakyThrowspublicstaticvoidthenRunAsync(){CompletableFuture<Integer>cf1=CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread()+&#34;cf1dosomething....&#34;);return1;});CompletableFuture<Void>cf2=cf1.thenRunAsync(()->{System.out.println(Thread.currentThread()+&#34;cf2dosomething...&#34;);});//等待任务1执行完成System.out.println(&#34;cf1结果->&#34;+cf1.get());//等待任务2执行完成System.out.println(&#34;cf2结果->&#34;+cf2.get());}publicstaticvoidmain(String[]args){thenRunAsync();}}我们不需要显式使用ExecutorService,CompletableFuture 内部使用了ForkJoinPool来处理异步任务,如果在某些业务场景我们想自定义自己的异步线程池也是可以的。
4. Spring的@Async异步
自定义异步线程池:
/***线程池参数配置,多个线程池实现线程池隔离,@Async注解,默认使用系统自定义线程池,可在项目中设置多个线程池,在异步调用的时候,指明需要调用的线程池名称,比如:@Async(&#34;taskName&#34;)**/@EnableAsync@ConfigurationpublicclassTaskPoolConfig{/***自定义线程池**@author:jacklin*@since:2021/11/1617:41**/@Bean(&#34;taskExecutor&#34;)publicExecutortaskExecutor(){//返回可用处理器的Java虚拟机的数量12inti=Runtime.getRuntime().availableProcessors();System.out.println(&#34;系统最大线程数:&#34;+i);ThreadPoolTaskExecutorexecutor=newThreadPoolTaskExecutor();//核心线程池大小executor.setCorePoolSize(16);//最大线程数executor.setMaxPoolSize(20);//配置队列容量,默认值为Integer.MAX_VALUEexecutor.setQueueCapacity(99999);//活跃时间executor.setKeepAliveSeconds(60);//线程名字前缀executor.setThreadNamePrefix(&#34;asyncServiceExecutor-&#34;);//设置此执行程序应该在关闭时阻止的最大秒数,以便在容器的其余部分继续关闭之前等待剩余的任务完成他们的执行executor.setAwaitTerminationSeconds(60);//等待所有的任务结束后再关闭线程池executor.setWaitForTasksToCompleteOnShutdown(true);returnexecutor;}}AsyncService:
publicinterfaceAsyncService{MessageResultsendSms(StringcallPrefix,Stringmobile,StringactionType,Stringcontent);MessageResultsendEmail(Stringemail,Stringsubject,Stringcontent);}@Slf4j@ServicepublicclassAsyncServiceImplimplementsAsyncService{@AutowiredprivateIMessageHandlermesageHandler;@Override@Async(&#34;taskExecutor&#34;)publicMessageResultsendSms(StringcallPrefix,Stringmobile,StringactionType,Stringcontent){try{Thread.sleep(1000);mesageHandler.sendSms(callPrefix,mobile,actionType,content);}catch(Exceptione){log.error(&#34;发送短信异常->&#34;,e)}}@Override@Async(&#34;taskExecutor&#34;)publicsendEmail(Stringemail,Stringsubject,Stringcontent){try{Thread.sleep(1000);mesageHandler.sendsendEmail(email,subject,content);}catch(Exceptione){log.error(&#34;发送email异常->&#34;,e)}}}在实际项目中, 使用@Async调用线程池,推荐等方式是是使用自定义线程池的模式,不推荐直接使用@Async直接实现异步。
5. Spring ApplicationEvent事件实现异步
定义事件:
publicclassAsyncSendEmailEventextendsApplicationEvent{/***邮箱**/privateStringemail;/***主题**/privateStringsubject;/***内容**/privateStringcontent;/***接收者**/privateStringtargetUserId;}定义事件处理器:
@Slf4j@ComponentpublicclassAsyncSendEmailEventHandlerimplementsApplicationListener<AsyncSendEmailEvent>{@AutowiredprivateIMessageHandlermesageHandler;@Async(&#34;taskExecutor&#34;)@OverridepublicvoidonApplicationEvent(AsyncSendEmailEventevent){if(event==null){return;}Stringemail=event.getEmail();Stringsubject=event.getSubject();Stringcontent=event.getContent();StringtargetUserId=event.getTargetUserId();mesageHandler.sendsendEmailSms(email,subject,content,targerUserId);}}另外,可能有些时候采用ApplicationEvent实现异步的使用,当程序出现异常错误的时候,需要考虑补偿机制,那么这时候可以结合Spring Retry重试来帮助我们避免这种异常造成数据不一致问题。java进阶路线:https://www.yoodb.com/
6. 消息队列
回调事件消息生产者:
@Slf4j@ComponentpublicclassCallbackProducer{@AutowiredAmqpTemplateamqpTemplate;publicvoidsendCallbackMessage(CallbackDTOallbackDTO,finallongdelayTimes){log.info(&#34;生产者发送消息,callbackDTO,{}&#34;,callbackDTO);amqpTemplate.convertAndSend(CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getExchange(),CallbackQueueEnum.QUEUE_GENSEE_CALLBACK.getRoutingKey(),JsonMapper.getInstance().toJson(genseeCallbackDTO),newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Messagemessage)throwsAmqpException{//给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间message.getMessageProperties().setHeader(&#34;x-delay&#34;,delayTimes);message.getMessageProperties().setCorrelationId(callbackDTO.getSdkId());returnmessage;}});}}回调事件消息消费者:
@Slf4j@Component@RabbitListener(queues=&#34;message.callback&#34;,containerFactory=&#34;rabbitListenerContainerFactory&#34;)publicclassCallbackConsumer{@AutowiredprivateIGlobalUserServiceglobalUserService;@RabbitHandlerpublicvoidhandle(Stringjson,Channelchannel,@HeadersMap<String,Object>map)throwsException{if(map.get(&#34;error&#34;)!=null){//否认消息channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);return;}try{CallbackDTOcallbackDTO=JsonMapper.getInstance().fromJson(json,CallbackDTO.class);//执行业务逻辑globalUserService.execute(callbackDTO);//消息消息成功手动确认,对应消息确认模式acknowledge-mode:manualchannel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);}catch(Exceptione){log.error(&#34;回调失败->{}&#34;,e);}}}7. ThreadUtil异步工具类
@Slf4jpublicclassThreadUtils{publicstaticvoidmain(String[]args){for(inti=0;i<3;i++){ThreadUtil.execAsync(()->{ThreadLocalRandomthreadLocalRandom=ThreadLocalRandom.current();intnumber=threadLocalRandom.nextInt(20)+1;System.out.println(number);});log.info(&#34;当前第:&#34;+i+&#34;个线程&#34;);}log.info(&#34;taskfinish!&#34;);}}8. Guava异步
Guava的ListenableFuture顾名思义就是可以监听的Future,是对java原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用Guava ListenableFuture可以帮我们检测Future是否完成了,不需要再通过get()方法苦苦等待异步的计算结果,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。
ListenableFuture是一个接口,它从jdk的Future接口继承,添加了void addListener(Runnable listener, Executor executor)方法。
我们看下如何使用ListenableFuture。首先需要定义ListenableFuture的实例:
ListeningExecutorServiceexecutorService=MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());finalListenableFuture<Integer>listenableFuture=executorService.submit(newCallable<Integer>(){@OverridepublicIntegercall()throwsException{log.info(&#34;callableexecute...&#34;)TimeUnit.SECONDS.sleep(1);return1;}});首先通过MoreExecutors类的静态方法listeningDecorator方法初始化一个ListeningExecutorService的方法,然后使用此实例的submit方法即可初始化ListenableFuture对象。
ListenableFuture要做的工作,在Callable接口的实现类中定义,这里只是休眠了1秒钟然后返回一个数字1,有了ListenableFuture实例,可以执行此Future并执行Future完成之后的回调函数。
Futures.addCallback(listenableFuture,newFutureCallback<Integer>(){@OverridepublicvoidonSuccess(Integerresult){//成功执行...System.out.println(&#34;Getlistenablefuture&#39;sresultwithcallback&#34;+result);}@OverridepublicvoidonFailure(Throwablet){//异常情况处理...t.printStackTrace();}});那么,以上就是本期介绍的实现异步的8种方式了。
作者:austin流川枫
https://juejin.cn/post/7165147306688249870
公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!最近有很多人问,有没有读者交流群!加入方式很简单,公众号Java精选,回复“加群”,即可入群!Java精选面试题(微信小程序):3000+道面试题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计等,在线随时刷题!------ 特别推荐 ------特别推荐:专注分享最前沿的技术与资讯,为弯道超车做好准备及各种开源项目与高效率软件的公众号,「大咖笔记」,专注挖掘好东西,非常值得大家关注。点击下方公众号卡片关注。点击“阅读原文”,了解更多精彩内容!文章有帮助的话,点在看,转发吧! |
|