1. 概述

本文主要分享 Hystrix 执行命令的结果缓存

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 执行命令整体流程如下图:

FROM 《【翻译】Hystrix文档-实现原理》「流程图」

  • 红圈 :在 《Hystrix 源码解析 —— 执行命令方式》 有详细解析。

  • 紫圈 :在 #toObservable() 方法里,如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回;如果缓存未命中,则返回【订阅了执行命令的 Observable】的 ReplySubject 对象缓存执行结果。

    • ReplySubject 能够重放执行结果,从而实现缓存的功效。本文不对 ReplySubject 做太多拓展,感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。

在官方提供的示例中,我们使用 CommandUsingRequestCache 进行调试 。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. 好处

点击 《【翻译】Hystrix文档-实现原理》「请求缓存」 ,查看对请求缓存的好处分享,写的真的很赞。

3. Observable#defer(...)

本小节为拓展内容,源码解析 RxJava ( 非 Hystrix ) 的 Observable#defer(...) 的方法实现。考虑到 Hystrix 大量使用,为了更好的理解,解析下源码。

《RxJava 源码解析 —— Observable#defer(...)》

4. AbstractCommand#toObservavle(...)

AbstractCommand#toObservavle(...) 方法,代码如下 :

  1: public Observable<R> toObservable() {  2:     final AbstractCommand<R> _cmd = this;  3:   4:     //doOnCompleted handler already did all of the SUCCESS work  5:     //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work  6:     final Action0 terminateCommandCleanup = new Action0() {} // ... 省略  7:   8:     //mark the command as CANCELLED and store the latency (in addition to standard cleanup)  9:     final Action0 unsubscribeCommandCleanup = new Action0() {} // ... 省略 10:  11:     final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { 12:         @Override 13:         public Observable<R> call() { 14:             if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { 15:                 return Observable.never(); 16:             } 17:             return applyHystrixSemantics(_cmd); 18:         } 19:     }; 20:  21:     final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {} // ... 省略  22:  23:     final Action0 fireOnCompletedHook = new Action0() {} // ... 省略  24:  25:     return Observable.defer(new Func0<Observable<R>>() { 26:         @Override 27:         public Observable<R> call() { 28:             /* this is a stateful object so can only be used once */ 29:             if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { 30:                 IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); 31:                 //TODO make a new error type for this 32:                 throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); 33:             } 34:  35:             // 命令开始时间戳 36:             commandStartTimestamp = System.currentTimeMillis(); 37:  38:             // TODO【2001】【打印日志】 39:             if (properties.requestLogEnabled().get()) { 40:                 // log this command execution regardless of what happened 41:                 if (currentRequestLog != null) { 42:                     currentRequestLog.addExecutedCommand(_cmd); 43:                 } 44:             } 45:  46:             // 缓存开关、缓存KEY 47:             final boolean requestCacheEnabled = isRequestCachingEnabled(); 48:             final String cacheKey = getCacheKey(); 49:  50:             // 优先从缓存中获取 51:             /* try from cache first */ 52:             if (requestCacheEnabled) { 53:                 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); 54:                 if (fromCache != null) { 55:                     isResponseFromCache = true; // 标记 从缓存中结果 56:                     return handleRequestCacheHitAndEmitValues(fromCache, _cmd); 57:                 } 58:             } 59:  60:             // 获得 执行命令Observable 61:             Observable<R> hystrixObservable = 62:                     Observable.defer(applyHystrixSemantics) 63:                             .map(wrapWithAllOnNextHooks); 64:  65:             // 获得 缓存Observable 66:             Observable<R> afterCache; 67:             // put in cache 68:             if (requestCacheEnabled && cacheKey != null) { 69:                 // wrap it for caching 70:                 HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); 71:                 // 并发若不存在 72:                 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); 73:                 if (fromCache != null) { // 添加失败 74:                     // another thread beat us so we'll use the cached value instead 75:                     toCache.unsubscribe(); 76:                     isResponseFromCache = true; // 标记 从缓存中结果 77:                     return handleRequestCacheHitAndEmitValues(fromCache, _cmd); 78:                 } else { // 添加成功 79:                     // we just created an ObservableCommand so we cast and return it 80:                     afterCache = toCache.toObservable(); 81:                 } 82:             } else { 83:                 afterCache = hystrixObservable; 84:             } 85:  86:             // 87:             return afterCache 88:                     .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) 89:                     .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once 90:                     .doOnCompleted(fireOnCompletedHook); 91:         } 92:     }); 93: }
  • 第 2 行 : _cmd 指向当前命令对象,用于下面实现 FuncX ,ActionX 内部类使用。


  • 第 11 至 19 行 :当缓存特性未开启,或者缓存未命中时,使用 applyHystrixSemantics 传入 Observable#defer(...) 方法,声明执行命令的 Observable。


  • 第 25 行 :声明缓存 Observable 。Hystrix 执行命令的 Observable 声明关系如下:


  • 第 29 至 33 行 :一条命令只能执行一次


  • 第 36 行 :记录命令开始时间戳。


  • 第 38 至 44 行 :TODO【2001】【打印日志】


  • 第 47 至 48 行 :缓存存开关、KEY 。


  • 第 52 至 58 行 :如果请求结果缓存这个特性被启用,并且缓存命中,则缓存的回应会立即通过一个 Observable 对象的形式返回。




    • 第 53 行 : requestCache 缓存,在 TODO 【2008】【请求缓存】 详细解析。

    • 第 53 行 :「6. HystrixCommandResponseFromCache」 详细解析。

    • 第 56 行 : #handleRequestCacheHitAndEmitValues(...) 方法,在第 78 行详细解析。

  • 第 61 至 63 行 :获取执行命令的 Observable 。在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》 详细解析。


  • 第 68 至 81 行 :当缓存特性开启,并且缓存未命中时,创建【订阅了执行命令的 Observable】的 HystrixCommandResponseFromCache 。





    • 第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe() 方法,取消 HystrixCommandResponseFromCache 的订阅。这一步很关键,因为我们不希望缓存不存在时,多个线程去执行命令,最好有且只有一个线程执行命令。在 「5. HystrixCachedObservable」 详细解析。

    • 第 77 行 :「6. HystrixCommandResponseFromCache」 详细解析。

    • 第 69 至 72 行 :创建 HystrixCommandResponseFromCache ,并添加到 requestCache 。哟, HystrixRequestCache#putIfAbsent(...) 方法,多个线程添加时,只有一个线程添加成功。

    • 第 73 至 77 行 :添加失败的线程(  ):



  • 第 80 行 :调用 HystrixCommandResponseFromCachetoObservable() 方法,获得缓存 Observable 。


  • 第 82 至 84 行 :当缓存特性未开启,使用执行命令 Observable 。


  • 第 87 至 91 行 :在返回的 Observable 上,订阅一些清理的处理逻辑。对这几个方法有疑惑的同学,可以阅读 《给 Android 开发者的 RxJava 详解》「 3) Subscribe (订阅) 」 。


5. HystrixCachedObservable

com.netflix.hystrix.HystrixCachedObservable ,缓存 Observable 。

HystrixCachedObservable 构造方法,代码如下 :

  1: public class HystrixCachedObservable<R> {  2: /**  3:  * 订阅  4:  */  5: protected final Subscription originalSubscription;  6: /**  7:  * 缓存 cachedObservable  8:  */  9: protected final Observable<R> cachedObservable; 10: /** 11:  * TODO 【2006】【outstandingSubscriptions】 12:  */ 13: private volatile int outstandingSubscriptions = 0; 14: //private AtomicInteger outstandingSubscriptions2 = new AtomicInteger(0); 15:  16: protected HystrixCachedObservable(final Observable<R> originalObservable) { 17:     ReplaySubject<R> replaySubject = ReplaySubject.create(); 18:     this.originalSubscription = originalObservable 19:             .subscribe(replaySubject); 20:  21:     this.cachedObservable = replaySubject 22:             .doOnUnsubscribe(new Action0() { 23:                 @Override 24:                 public void call() { 25:                     outstandingSubscriptions--; 26:                     if (outstandingSubscriptions == 0) { 27:                         originalSubscription.unsubscribe(); 28:                     } 29:                 } 30:             }) 31:             .doOnSubscribe(new Action0() { 32:                 @Override 33:                 public void call() { 34:                     outstandingSubscriptions++; 35:                 } 36:             }); 37: }
  • 第 17 至 19 行 :实际上,HystrixCachedObservable 不是一个 Observable 的子类,而是对传入的 Observable 封装 :使用 ReplaySubject 向传入的 Observable 发起订阅,通过 ReplaySubject 能够重放执行结果,从而实现缓存的功效。这里有几个卡到笔者的并且很有趣的点,我们一一道来 :从上文中,我们可以看到,传入的 originalObservablehystrixObservable执行命令 Observable 。在 Hystrix 里,提供了两种执行命令的隔离方式 :线程池( THREAD ) 和信号量( SEMAPHORE )。


    • 当使用 THREAD 隔离时, #subscribe(replaySubject) 调用完成时,实际命令并未开始执行,或者说,这是一个异步的执行命令的过程。那么,会不会影响返回执行结果呢?答案当然是不会,BlockingObservable 在得到执行完成才会结束阻塞,此时已经有执行结果。

    • 当使用 SEMAPHORE 隔离时, #subscribe(replaySubject) 调用完成时,实际命令已经执行完成,所以即使 AbstractCommand#toObservavle(...) 的第 75 行 :调用 HystrixCommandResponseFromCache#unsubscribe() 方法,也会浪费,重复执行命令。而对于 THREAD 隔离的情况,通过取消订阅的方式,只会执行一次命令。当然,如果“恶搞” THREAD 隔离的情况,增加 sleep 的调用如下,就能达到重复执行命令的效果。

  • 第 21 至 36 行 :TODO 【2006】【outstandingSubscriptions】原子性没问题么?历史版本使用的是 AtomicInteger 。



HystrixCachedObservable 的其他方法,点击 链接 查看。

6. HystrixCommandResponseFromCache

com.netflix.hystrix.HystrixCommandResponseFromCache ,是 HystrixCachedObservable 的子类。在父类的基础上,增加了对 AbstractCommand.executionResult 的关注。

HystrixCachedObservable#from(Observable, AbstractCommand) 方法,创建 HystrixCommandResponseFromCache 对象,点击 链接 查看。


HystrixCommandResponseFromCache#toObservableWithStateCopiedInto(...) 方法,点击 链接 查看。

  • 通过 completionLogicRun 属性,保证 #doOnError() , #doOnCompleted() , #doOnUnsubscribe() 方法有且只有一个方法执行具体逻辑。

    • #doOnError() , #doOnCompleted() 执行时,调用 #commandCompleted() 方法,从缓存命令( HystrixCommandResponseFromCache.originalCommand ) 复制 executionResult 属性给当前命令( commandToCopyStateInto ) 。

    • #doOnUnsubscribe() 执行时,调用 #commandUnsubscribed() 方法,使用当前命令( commandToCopyStateInto )自己的 executionResult ,不进行复制。

  • TODO 【2007】【executionResult】用途

666. 彩蛋

如鲠在喉的感觉,从周六开始磨了四天多,一直没写到一个比较舒服的状态。

先发现发,如果有不清晰的地方,烦请指出,谢谢。

胖友,分享一波朋友圈可好!


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
  2. 【前端词典】输入 URL 到展现涉及的缓存环节
  3. 华为交换机console口设置密码及状态查看命令
  4. 如何用一条命令将网页转成电脑 App
  5. Linux学习之常用的Linux文件内容查看命令!
  6. 3、新手必须掌握的Linux命令
  7. 5、Shell命令脚本
  8. 跟繁琐的命令行说拜拜!Gerapy分布式爬虫管理框架来袭!
  9. Mybatis 配置 自定义缓存 ehcache

随机推荐

  1. 定制替换Android桌面
  2. JDK10都发布了,nio你了解多少?
  3. 你不知道的,Java代码性能优化的 40+ 细节,
  4. [置顶] android利用zbar二维码扫描-(解决
  5. 还是初识
  6. X-Ray检测Android设备Root漏洞过程分析
  7. 源码实战 | 本地可跑,上线就崩?慌了!
  8. SQLite
  9. 图解源码 | MyBatis的Mapper原理
  10. 再见已是初识(初十)