2.1 构造方法

HystrixCollapser 构造方法,代码如下 :

public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>        implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> {    private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;    private final HystrixRequestCache requestCache;    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;    private final HystrixCollapserMetrics metrics;    /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {        if (collapserKey == null || collapserKey.name().trim().equals("")) {            String defaultKeyName = getDefaultNameFromClass(getClass());            collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);        }        HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);        this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties);        this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());        if (metrics == null) {            this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);        } else {            this.metrics = metrics;        }        final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;         /* strategy: HystrixMetricsPublisherCollapser */        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);        /**         * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.         */        collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {            @Override            public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {                Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);                self.metrics.markShards(shards.size());                return shards;            }            @Override            public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {                final HystrixCommand<BatchReturnType> command = self.createCommand(requests);                command.markAsCollapsedCommand(this.getCollapserKey(), requests.size());                self.metrics.markBatch(requests.size());                return command.toObservable();            }            @Override            public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {                return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {                    @Override                    public void call(BatchReturnType batchReturnType) {                        // this is a blocking call in HystrixCollapser                        self.mapResponseToRequests(batchReturnType, requests);                    }                }).ignoreElements().cast(Void.class);            }            @Override            public HystrixCollapserKey getCollapserKey() {                return self.getCollapserKey();            }        };    }}
  • BatchReturnType 泛型多个命令合并执行返回结果类型。

  • ResponseType 泛型单个命令执行返回结果类型。

  • RequestArgumentType 泛型单个命令参数类型。

  • collapserFactory 属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。

  • requestCache 属性,TODO 【2012】【请求上下文】

  • collapserInstanceWrapper 属性,命令合并器包装器。

    • com.netflix.hystrix.collapser.HystrixCollapserBridge 接口,点击 链接 查看代码。

    • HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。

  • metrics 属性,TODO 【2002】【metrics】

2.2 执行命令方式

在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。

HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :

  • #observe() 方法 :传送门 。

  • #queue() 方法 :传送门 。

  • #execute() 方法 :传送门 。

下面一起来看看 #toObservable() 方法的实现,代码如下 :

  1: public Observable<ResponseType> toObservable() {  2:     // when we callback with the data we want to do the work  3:     // on a separate thread than the one giving us the callback  4:     return toObservable(Schedulers.computation());  5: }  6:   7: public Observable<ResponseType> toObservable(Scheduler observeOn) {  8:     return Observable.defer(new Func0<Observable<ResponseType>>() {  9:         @Override 10:         public Observable<ResponseType> call() { 11:             // // 缓存开关、缓存KEY 12:             final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); 13:             final String cacheKey = getCacheKey(); 14:  15:             // 优先从缓存中获取 16:             /* try from cache first */ 17:             if (isRequestCacheEnabled) { 18:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey); 19:                 if (fromCache != null) { 20:                     metrics.markResponseFromCache(); 21:                     return fromCache.toObservable(); 22:                 } 23:             } 24:  25:             // 获得 RequestCollapser 26:             RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); 27:  28:             // 提交 命令请求 29:             Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument()); 30:  31:             // 获得 缓存Observable 32:             if (isRequestCacheEnabled && cacheKey != null) { 33:                 HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); 34:                 HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache); 35:                 if (fromCache == null) { 36:                     return toCache.toObservable(); 37:                 } else { 38:                     toCache.unsubscribe(); // 取消订阅 39:                     return fromCache.toObservable(); 40:                 } 41:             } 42:  43:             // 获得 非缓存Observable 44:             return response; 45:         } 46:     }); 47: }
  • observeOn 方法参数,实际方法暂未用到,跳过无视。

  • 第 11 至 13 行 :缓存存开关、KEY 。

  • 反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和 AbstractCommand#toObservavle(...) 类似,在《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(...)」 有详细解析。

  • 反向】第 44 行 :获得【非缓存 Observable】。

  • 注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在 #queue() / #execute() 方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。

  • 第 26 行 :调用 RequestCollapserFactory#getRequestCollapser() ,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。

  • 第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。

2.3 核心方法

  • #getRequestArgument(...) 抽象方法,获得单个命令参数。代码如下 :



  1. publicabstractRequestArgumentTypegetRequestArgument();


  • #createCommand(...) 抽象方法,将多个命令请求合并,创建一个HystrixCommand 。代码如下 :



  1. protectedabstractHystrixCommand<BatchReturnType>createCommand(Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests);


  • #mapResponseToRequests(...) 抽象方法,将一个HystrixCommand 的执行结果,映射回对应的命令请求们。



  1. protectedabstractvoidmapResponseToRequests(BatchReturnTypebatchResponse,Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests);


  • #shardRequests(...) 方法,将多个命令请求分片N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :



  1. protectedCollection<Collection<CollapsedRequest<ResponseType,RequestArgumentType>>>shardRequests(Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests){

  2.    returnCollections.singletonList(requests);

  3. }


  • 未重写 #shardRequests(...) 的情况下,整体方法流程如下 :


  • 重写 #shardRequests(...) 的情况下,整体方法流程如下 :


    • 本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。

3. RequestCollapserFactory

com.netflix.hystrix.collapser.RequestCollapserFactory ,RequestCollapser 工厂

public class RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> {    private final CollapserTimer timer;    private final HystrixCollapserKey collapserKey;    private final HystrixCollapserProperties properties;    private final HystrixConcurrencyStrategy concurrencyStrategy;    private final Scope scope;    public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) {         /* strategy: ConcurrencyStrategy */        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();        this.timer = timer;        this.scope = scope;        this.collapserKey = collapserKey;        this.properties = properties;    }
  • timer 属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。


  • collapserKey 属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。




    • HystrixCollapserKey ,点击 链接 查看代码。

    • HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。

  • properties 属性,命令合并器属性配置。


  • concurrencyStrategy 属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。


  • scope 属性,命令请求作用域。目前有两种作用域 :



    • REQUEST :请求上下文( HystrixRequestContext )。

      Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
      No interaction with other user requests.
      1 queue per user request.

    • GLOBAL :全局。

      Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
      1 queue for entire app.


调用 #getRequestCollapser() 方法,获得 RequestCollapser 。代码如下 :

public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {   if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {       return getCollapserForUserRequest(commandCollapser);   } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {       return getCollapserForGlobalScope(commandCollapser);   } else {       logger.warn("Invalid Scope: {}  Defaulting to REQUEST scope.", getScope());       return getCollapserForUserRequest(commandCollapser);   }}
  • 根据 scope 不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。

    • REQUEST :调用 #getCollapserForUserRequest() 方法,TODO 【2012】【请求上下文】。

    • GLOBAL :调用 #getCollapserForGlobalScope() 方法,点击 链接 查看中文注释的代码。

4. RequestCollapser

com.netflix.hystrix.collapser.RequestCollapser命令请求合并器。主要用于 :

  • 提交单个命令请求到请求队列( RequestQueue )。

  • 接收来自定时任务提交的多个命令,合并执行。

4.1 构造方法

RequestCollapser 构造方法,代码如下 :

public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;    // batch can be null once shutdown    private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>();    private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>();    private final AtomicBoolean timerListenerRegistered = new AtomicBoolean();    private final CollapserTimer timer;    private final HystrixCollapserProperties properties;    private final HystrixConcurrencyStrategy concurrencyStrategy;    RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) {        this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need         this.concurrencyStrategy = concurrencyStrategy;        this.properties = properties;        this.timer = timer;        batch.set(new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()));    }}
  • commandCollapser 属性,命令合并器包装器。

  • batch 属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。

  • timerListenerReference 属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。

  • timerListenerRegistered 属性, timerListenerReference 是否已经注册。

  • timer 属性,命令合并器的定时器。

  • properties 属性,命令合并器属性配置。

  • concurrencyStrategy 属性,并发策略。

4.2 RequestBatch

com.netflix.hystrix.collapser.RequestBatch ,命令请求队列。提供如下功能 :

  • 命令请求的添加

  • 命令请求的移除

  • 命令请求的批量执行。笔者把 RequestBatch 解释成 "命令请求队列",主要方便大家理解。

    • 那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。

RequestBatch 构造方法,代码如下 :

public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {    private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;    private final int maxBatchSize;    private final AtomicBoolean batchStarted = new AtomicBoolean();    private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =            new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();    private final HystrixCollapserProperties properties;    private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();    public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {        this.properties = properties;        this.commandCollapser = commandCollapser;        this.maxBatchSize = maxBatchSize;    }}
  • commandCollapser 属性,命令合并器包装器。

  • maxBatchSize 属性,队列最大长度。

  • batchStarted 属性,执行是否开始。

  • argumentMap 属性,命令请求参数映射( 队列 )。

  • properties 属性,命令合并器属性配置。

  • batchLock 属性, argumentMap 操作的读写锁

RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。

4.3 #submitRequest(arg)

#toObservable() 方法里,调用 #submitRequest(arg) 方法,提交单个命令请求到 RequestBatch 。代码如下 :

  1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {  2:     /*  3:      * We only want the timer ticking if there are actually things to do so we register it the first time something is added.  4:      */  5:     if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {  6:         /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */  7:         timerListenerReference.set(timer.addListener(new CollapsedTask()));  8:     }  9:  10:     // loop until succeed (compare-and-set spin-loop) 11:     while (true) { 12:         // 获得 RequestBatch 13:         final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get(); 14:         if (b == null) { 15:             return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown")); 16:         } 17:  18:         // 添加到 RequestBatch 19:         final Observable<ResponseType> response; 20:         if (arg != null) { 21:             response = b.offer(arg); 22:         } else { 23:             response = b.offer( (RequestArgumentType) NULL_SENTINEL); 24:         } 25:  26:         // 添加成功,返回 Observable 27:         // it will always get an Observable unless we hit the max batch size 28:         if (response != null) { 29:             return response; 30:         } else { 31:             // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch 32:             // this batch can't accept requests so create a new one and set it if another thread doesn't beat us 33:             createNewBatchAndExecutePreviousIfNeeded(b); 34:         } 35:     } 36: }
  • 第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。

  • 第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功

    • 第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被 #shutdown() 后才会出现为 null 的情况。

    • 第 19 至 24 行 :调动 RequestBatch#offer(...) 方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对 arg==null 做了特殊处理,因为 RequestBatch.argumentMap 是 ConcurrentHashMap ,不允许值为 null 。另外, RequestBatch#offer(...) 方法的实现代码,在结束了当前方法,详细解析。

    • 第 28 至 29 行 :添加成功,返回 Observable 。

    • 第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。


RequestBatch#offer(...) 方法,代码如下 :

  1: public Observable<ResponseType>  offer(RequestArgumentType arg) {  2:     // 执行已经开始,添加失败  3:     /* short-cut - if the batch is started we reject the offer */  4:     if (batchStarted.get()) {  5:         return null;  6:     }  7:   8:     /*  9:      * The 'read' just means non-exclusive even though we are writing. 10:      */ 11:     if (batchLock.readLock().tryLock()) { 12:         try { 13:             // 执行已经开始,添加失败 14:             /* double-check now that we have the lock - if the batch is started we reject the offer */ 15:             if (batchStarted.get()) { 16:                 return null; 17:             } 18:  19:             // 超过队列最大长度,添加失败 20:             if (argumentMap.size() >= maxBatchSize) { 21:                 return null; 22:             } else { 23:                 // 创建 CollapsedRequestSubject ,并添加到队列 24:                 CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this); 25:                 final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest); 26:                 /** 27:                  * If the argument already exists in the batch, then there are 2 options: 28:                  * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses 29:                  * be hooked up to that argument 30:                  * B) If request caching is OFF: return an error to all duplicate argument requests 31:                  * 32:                  * This maintains the invariant that each batch has no duplicate arguments.  This prevents the impossible 33:                  * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser) 34:                  * of trying to figure out which argument of a set of duplicates should get attached to a response. 35:                  * 36:                  * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion. 37:                  */ 38:                 if (existing != null) { 39:                     boolean requestCachingEnabled = properties.requestCacheEnabled().get(); 40:                     if (requestCachingEnabled) { 41:                         return existing.toObservable(); 42:                     } else { 43:                         return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "]  This is not supported.  Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!")); 44:                     } 45:                 } else { 46:                     return collapsedRequest.toObservable(); 47:                 } 48:  49:             } 50:         } finally { 51:             batchLock.readLock().unlock(); 52:         } 53:     } else { 54:         return null; 55:     } 56: }
  • 第 4 至 6 行 :执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法的开头,优先 CAS使 batchStarted=true


  • 第 11 行 :获得读锁The'read'just means non-exclusive even though we are writing. ,即使该方法实际在做"写操作",不排他,线程安全,所以可以使用读锁。


  • 第 15 至 17 行 : double-check,执行已经开始,添加失败。在 RequestBatch#executeBatchIfNotAlreadyStarted(...) 方法,优先 CAS 使 batchStarted=true,再获取写锁,所以会出现该情况。


  • 第 20 至 21 行 :超过队列最大长度,添加失败。


  • 第 24 至 25 行 :创建 com.netflix.hystrix.collapser.CollapsedRequestSubject ,并将添加到队列( argumentMap ) 。


    • argument 属性,单个命令请求参数。

    • valueSet 属性,结果( Response ) 是否设置,通过 #setResponse()#emitResponse() 方法设置。

    • subject 属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过 #setResponse()#emitResponse() 方法设置结果

    • outstandingSubscriptions 属性,订阅数量。

    • subjectWithAccounting 属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用 RequestBatch#remove(arg) 方法,移除单个命令请求。

    • CollapsedRequestSubject 实现 com.netflix.hystrix.HystrixCollapser.CollapsedRequest 接口,定义了批量命令执行的请求,不仅限于获得请求参数( #getArgument() 方法 ),也包括对批量命令执行结束后,每个请求的结果设置( #setResponse(...)/ #emitResponse(...)/ #setException(...)/ #setComplete() 方法 ),点击 链接 查看该接口的代码。

    • CollapsedRequestSubject 构造方法,代码如下:


  1. /* package */classCollapsedRequestSubject<T,R>implementsCollapsedRequest<T,R>{

    /**


  2.  * 参数


  3.  */


  4. private final R argument;




  5. /**


  6.  * 结果( response ) 是否设置


  7.  */


  8. private AtomicBoolean valueSet = new AtomicBoolean(false);


  9. /**


  10.  * 可回放的 ReplaySubject


  11.  */


  12. private final ReplaySubject&lt;T&gt; subject = ReplaySubject.create();


  13. /**


  14.  * 带订阅数量的 ReplaySubject


  15.  */


  16. private final Observable&lt;T&gt; subjectWithAccounting;




  17. /**


  18.  * 订阅数量


  19.  */


  20. private volatile int outstandingSubscriptions = 0;




  21. public CollapsedRequestSubject(final R arg, final RequestBatch&lt;?, T, R&gt; containingBatch) {


  22.     // 设置 argument


  23.     if (arg == RequestCollapser.NULL_SENTINEL) {


  24.         this.argument = null;


  25.     } else {


  26.         this.argument = arg;


  27.     }


  28.     // 设置 带订阅数量的 ReplaySubject


  29.     this.subjectWithAccounting = subject


  30.             .doOnSubscribe(new Action0() {


  31.                 @Override


  32.                 public void call() {


  33.                     outstandingSubscriptions++;


  34.                 }


  35.             })


  36.             .doOnUnsubscribe(new Action0() {


  37.                 @Override


  38.                 public void call() {


  39.                     outstandingSubscriptions--;


  40.                     if (outstandingSubscriptions == 0) {


  41.                         containingBatch.remove(arg);


  42.                     }


  43.                 }


  44.             });


  45. }


  46. }

第 38 至 47 行 :返回 Observable 。




  • 当 argumentMap 已经存在 arg 对应的 Observable 时,必须开启缓存 ( HystrixCollapserProperties.requestCachingEnabled=true ) 功能。原因是,如果在相同的 arg ,并且未开启缓存,同时第 43 行实现的是 collapsedRequest.toObservable() ,那么相同的 arg 将有多个 Observable 执行命令,此时 HystrixCollapserBridge#mapResponseToRequests(...) 方法无法将执行( Response )赋值到 arg 对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。

  • 回过头看 HystrixCollapser#toObservable() 方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢? argumentMap 针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是 1:1:N 的关系,通过 HystrixCollapser#toObservable() 对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存


RequestBatch#remove() 方法,代码如下 :

/* package-private */ void remove(RequestArgumentType arg) {    if (batchStarted.get()) {        //nothing we can do        return;    }    if (batchLock.readLock().tryLock()) {        try {            /* double-check now that we have the lock - if the batch is started, deleting is useless */            if (batchStarted.get()) {                return;            }            argumentMap.remove(arg);        } finally {            batchLock.readLock().unlock();        }    }}
  • 当 RequestBatch 开始执行,不允许移除单个命令请求。

4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)

本小节建议在 「5. CollapserTimer」 后,再回过头看。

#createNewBatchAndExecutePreviousIfNeeded(previousBatch) 方法,代码如下 :

  1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) {  2:     if (previousBatch == null) {  3:         throw new IllegalStateException("Trying to start null batch which means it was shutdown already.");  4:     }  5:     if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) {  6:         // this thread won so trigger the previous batch  7:         previousBatch.executeBatchIfNotAlreadyStarted();  8:     }  9: }
  • 第 5 行 :通过 CAS 修改 batch ,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。

  • 第 6 行 :使用老的 RequestBatch ,调用 RequestBatch#executeBatchIfNotAlreadyStarted() 方法,命令合并执行。


RequestBatch#executeBatchIfNotAlreadyStarted() 方法,代码如下 :

  1: public void executeBatchIfNotAlreadyStarted() {  2:     /*  3:      * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit)  4:      * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch  5:      */  6:     // 设置 执行已经开始  7:     if (batchStarted.compareAndSet(false, true)) {  8:         // 获得 写锁  9:         /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */ 10:         batchLock.writeLock().lock(); 11:  12:         try { 13:             // 将多个命令请求分片成 N 个【多个命令请求】。 14:             // shard batches 15:             Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values()); 16:             // for each shard execute its requests  17:             for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) { 18:                 try { 19:                     // 将多个命令请求合并,创建一个 HystrixCommand 20:                     // create a new command to handle this batch of requests 21:                     Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests); 22:  23:                     // 将一个 HystrixCommand 的执行结果,映射回对应的命令请求们 24:                     commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() { 25:  26:                         /** 27:                          * This handles failed completions 28:                          */ 29:                         @Override 30:                         public void call(Throwable e) { 31:                             // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted 32:                             Exception ee; 33:                             if (e instanceof Exception) { 34:                                 ee = (Exception) e; 35:                             } else { 36:                                 ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e); 37:                             } 38:                             logger.debug("Exception mapping responses to requests.", e); 39:                             // if a failure occurs we want to pass that exception to all of the Futures that we've returned 40:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) { 41:                                 try { 42:                                     ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee); 43:                                 } catch (IllegalStateException e2) { 44:                                     // if we have partial responses set in mapResponseToRequests 45:                                     // then we may get IllegalStateException as we loop over them 46:                                     // so we'll log but continue to the rest 47:                                     logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2); 48:                                 } 49:                             } 50:                         } 51:  52:                     }).doOnCompleted(new Action0() { 53:  54:                         /** 55:                          * This handles successful completions 56:                          */ 57:                         @Override 58:                         public void call() { 59:                             // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly 60:                             Exception e = null; 61:                             for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) { 62:                                 try { 63:                                    e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation."); 64:                                 } catch (IllegalStateException e2) { 65:                                     logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2); 66:                                 } 67:                             } 68:                         } 69:  70:                     }).subscribe(); 71:                      72:                 } catch (Exception e) { 73:                     // 异常 74:                     logger.error("Exception while creating and queueing command with batch.", e); 75:                     // if a failure occurs we want to pass that exception to all of the Futures that we've returned 76:                     for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) { 77:                         try { 78:                             request.setException(e); 79:                         } catch (IllegalStateException e2) { 80:                             logger.debug("Failed trying to setException on CollapsedRequest", e2); 81:                         } 82:                     } 83:                 } 84:             } 85:  86:         } catch (Exception e) { 87:             // 异常 88:             logger.error("Exception while sharding requests.", e); 89:             // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails 90:             for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) { 91:                 try { 92:                     request.setException(e); 93:                 } catch (IllegalStateException e2) { 94:                     logger.debug("Failed trying to setException on CollapsedRequest", e2); 95:                 } 96:             } 97:         } finally { 98:             batchLock.writeLock().unlock(); 99:         }100:     }101: }
  • 代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!

  • 第 7 行 :通过 CAS 修改 batchStarted ,保证并发情况下的线程安全。

  • 第 10 行 :获得写锁。等待调用 #offer(...)#remove(...) 方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。

  • 第 15 行 :调用 HystrixCollapserBridge#shardRequests(...) 方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。

  • 第 17 行 :循环 N 个【多个命令请求】。

  • 第 21 行 :调用 HystrixCollapserBridge#createObservableCommand(...) 方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。

  • 第 24 行 :调用 HystrixCollapserBridge#mapResponseToRequests(...) 方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。

    • Observable#single() 方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。

    • Observable#ignoreElements() 方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知( #onError() 或 #onCompleted())通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击 rx.internal.operators.OperatorIgnoreElements 看下源码,可能更加易懂。

    • Observable#cast() 方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是 map 的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击 rx.internal.operators.OperatorCast 看下源码,可能更加易懂。

    • 使用 Observable#ignoreElements()Observable#cast() 方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里 Observable#doNext() 处理数据项,调用 HystrixCollapser#mapResponseToRequests(...) 方法。

    • 点击 链接 ,查看 CollapsedRequestSubject#setResponse(response) 方法的代码。

  • 第 24 至 50 行 :调用 Observable#doError(Action1) 方法,当命令合并执行发生异常时,设置每个CollapsedRequestSubject 的执行结果为异常。

    • 点击 链接,查看 CollapsedRequestSubject#setResponse(response) 方法的代码。

  • 第 52 至 68 行 :调用 Observable#doOnCompleted(Action0) 方法,当命令合并执行完成时,检查每个CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现 HystrixCollapser#mapResponseToRequests(...) 方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞

  • 第 70 行 :调用 Observable#subscribe() 方法,触发 HystrixCommand 执行。

  • 第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。

    • 点击 链接,查看 CollapsedRequestSubject#setException(response) 方法的代码。

  • 第 97 至 99 行 :释放写锁

5. CollapserTimer

com.netflix.hystrix.collapser.CollapserTimer ,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :

public interface CollapserTimer {    Reference<TimerListener> addListener(TimerListener collapseTask);}

5.1 RealCollapserTimer

com.netflix.hystrix.collapser.RealCollapserTimer ,命令合并器的定时器实现类,代码如下 :

public class RealCollapserTimer implements CollapserTimer {    /* single global timer that all collapsers will schedule their tasks on */    private final static HystrixTimer timer = HystrixTimer.getInstance();    @Override    public Reference<TimerListener> addListener(TimerListener collapseTask) {        return timer.addTimerListener(collapseTask);    }}
  • 实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer 」 有详细解析。

5.2 CollapsedTask

com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds=10ms ) 轮询其对应的一个RequestCollapser 当前RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。

代码比较简单,点击 链接 直接看代码。

666. 彩蛋

T T 一开始把命令合并执行,理解成类似线程池批量执行任务,怎么看官方示例,怎么奇怪。有一样的同学,一起泪目 + 握爪下。

本文有点点长,实在不想拆分成多篇。

恩,另外部分地方写的不够清晰,欢迎一起讨论和优化。

胖友,分享一波朋友圈可好!


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑
  2. http协议请求方法都有哪些?网络安全学习提升
  3. git命令备忘系列(一):基础命令
  4. 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
  5. 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
  6. 【前端词典】8 个提高 JS 性能的方法
  7. AngularJS 日期时间选择组件(附详细使用方法)
  8. 华为交换机console口设置密码及状态查看命令
  9. 5 种方法教你用Python玩转histogram直方图

随机推荐

  1. 在不触发hashchange事件的情况下更改哈希
  2. 从事件监听器OnClick中排除按钮
  3. Ajax请求等到第一次AJAX调用未完成
  4. jQuery中$.get、$.post、$.getJSON和$.aj
  5. js或Jquery中判断字符串中是否有换行符或
  6. 如何使用javascript或jquery获取图像的自
  7. jQuery——将title属性用作悬停的文本,但
  8. 将AngularJS、jQueryUI、Angular-Drag-Dr
  9. 我需要从jquery ajax post中撤回数据并将
  10. scrollTop到溢出滚动div中的活动元素