熔断器 Hystrix 源码解析 —— 命令合并执行
2.1 构造方法
HystrixCollapser 构造方法,代码如下 :
public abstract class HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType>, HystrixObservable<ResponseType> { private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory; private final HystrixRequestCache requestCache; private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper; private final HystrixCollapserMetrics metrics; /* package for tests */ HystrixCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { if (collapserKey == null || collapserKey.name().trim().equals("")) { String defaultKeyName = getDefaultNameFromClass(getClass()); collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName); } HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder); this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, properties); this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy()); if (metrics == null) { this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties); } else { this.metrics = metrics; } final HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this; /* strategy: HystrixMetricsPublisherCollapser */ HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties); /** * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class. */ collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() { @Override public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) { Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests); self.metrics.markShards(shards.size()); return shards; } @Override public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) { final HystrixCommand<BatchReturnType> command = self.createCommand(requests); command.markAsCollapsedCommand(this.getCollapserKey(), requests.size()); self.metrics.markBatch(requests.size()); return command.toObservable(); } @Override public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) { return batchResponse.single().doOnNext(new Action1<BatchReturnType>() { @Override public void call(BatchReturnType batchReturnType) { // this is a blocking call in HystrixCollapser self.mapResponseToRequests(batchReturnType, requests); } }).ignoreElements().cast(Void.class); } @Override public HystrixCollapserKey getCollapserKey() { return self.getCollapserKey(); } }; }}
BatchReturnType 泛型,多个命令合并执行返回结果类型。
ResponseType 泛型,单个命令执行返回结果类型。
RequestArgumentType 泛型,单个命令参数类型。
collapserFactory
属性,RequestCollapser 工厂,在 「3. RequestCollapserFactory」 详细解析。requestCache
属性,TODO 【2012】【请求上下文】collapserInstanceWrapper
属性,命令合并器包装器。com.netflix.hystrix.collapser.HystrixCollapserBridge
接口,点击 链接 查看代码。HystrixCollapserBridge ,为 RequestBatch 透明调用 HystrixCollapser 或 HystrixObservableCollapser 的方法不同的实现。参见 《桥接模式》 。
metrics
属性,TODO 【2002】【metrics】
2.2 执行命令方式
在 《Hystrix 源码解析 —— 执行命令方式》 中,我们已经看了 HystrixCommand 提供的四种执行命令方式。
HystrixCollapser 类似于 HystrixCommand ,也提供四种相同的执行命令方式,其中如下三种方式代码基本类似,我们就给下传送门,就不重复啰嗦了 :
#observe()
方法 :传送门 。#queue()
方法 :传送门 。#execute()
方法 :传送门 。
下面一起来看看 #toObservable()
方法的实现,代码如下 :
1: public Observable<ResponseType> toObservable() { 2: // when we callback with the data we want to do the work 3: // on a separate thread than the one giving us the callback 4: return toObservable(Schedulers.computation()); 5: } 6: 7: public Observable<ResponseType> toObservable(Scheduler observeOn) { 8: return Observable.defer(new Func0<Observable<ResponseType>>() { 9: @Override 10: public Observable<ResponseType> call() { 11: // // 缓存开关、缓存KEY 12: final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); 13: final String cacheKey = getCacheKey(); 14: 15: // 优先从缓存中获取 16: /* try from cache first */ 17: if (isRequestCacheEnabled) { 18: HystrixCachedObservable<ResponseType> fromCache = requestCache.get(cacheKey); 19: if (fromCache != null) { 20: metrics.markResponseFromCache(); 21: return fromCache.toObservable(); 22: } 23: } 24: 25: // 获得 RequestCollapser 26: RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); 27: 28: // 提交 命令请求 29: Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument()); 30: 31: // 获得 缓存Observable 32: if (isRequestCacheEnabled && cacheKey != null) { 33: HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response); 34: HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache); 35: if (fromCache == null) { 36: return toCache.toObservable(); 37: } else { 38: toCache.unsubscribe(); // 取消订阅 39: return fromCache.toObservable(); 40: } 41: } 42: 43: // 获得 非缓存Observable 44: return response; 45: } 46: }); 47: }
observeOn
方法参数,实际方法暂未用到,跳过无视。第 11 至 13 行 :缓存存开关、KEY 。
【反向】第 32 至 41 行 :获得【缓存 Observable】。这块代码和
AbstractCommand#toObservavle(...)
类似,在《Hystrix 源码解析 —— 执行结果缓存》「4. AbstractCommand#toObservavle(...)」 有详细解析。【反向】第 44 行 :获得【非缓存 Observable】。
注意 :返回的 Observable ,很可能命令实际并未执行,或者说并未执行完成,此时在
#queue()
/#execute()
方法,通过 BlockingObservable 阻塞等待执行完成。BlockingObservable 在 《RxJava 源码解析 —— BlockingObservable》 有详细解析。第 26 行 :调用
RequestCollapserFactory#getRequestCollapser()
,获得 RequestCollapser 。在 「3. RequestCollapserFactory」 详细解析。第 29 行 :提交单个命令请求到请求队列( RequestQueue ),即命令合并执行整体流程第一步。在 「4. RequestCollapser」 详细解析。
2.3 核心方法
#getRequestArgument(...)
抽象方法,获得单个命令参数。代码如下 :
publicabstractRequestArgumentTypegetRequestArgument();
#createCommand(...)
抽象方法,将多个命令请求合并,创建一个HystrixCommand 。代码如下 :
protectedabstractHystrixCommand<BatchReturnType>createCommand(Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests);
#mapResponseToRequests(...)
抽象方法,将一个HystrixCommand 的执行结果,映射回对应的命令请求们。
protectedabstractvoidmapResponseToRequests(BatchReturnTypebatchResponse,Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests);
#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。代码如下 :
protectedCollection<Collection<CollapsedRequest<ResponseType,RequestArgumentType>>>shardRequests(Collection<CollapsedRequest<ResponseType,RequestArgumentType>>requests){
returnCollections.singletonList(requests);
}
在未重写
#shardRequests(...)
的情况下,整体方法流程如下 :在重写
#shardRequests(...)
的情况下,整体方法流程如下 :本图中命令请求分片仅仅是例子,实际根据重写的逻辑不同而不同。
3. RequestCollapserFactory
com.netflix.hystrix.collapser.RequestCollapserFactory
,RequestCollapser 工厂。
public class RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> { private final CollapserTimer timer; private final HystrixCollapserKey collapserKey; private final HystrixCollapserProperties properties; private final HystrixConcurrencyStrategy concurrencyStrategy; private final Scope scope; public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties properties) { /* strategy: ConcurrencyStrategy */ this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); this.timer = timer; this.scope = scope; this.collapserKey = collapserKey; this.properties = properties; }
timer
属性,命令合并器的定时器,在 「5. CollapserTimer」 详细解析。collapserKey
属性,命令合并器标识,实现类似 HystrixThreadPoolKey 。HystrixCollapserKey ,点击 链接 查看代码。
HystrixThreadPoolKey ,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「3. HystrixThreadPoolKey」 有详细解析。
properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略,在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「4. HystrixConcurrencyStrategy」 有详细解析。scope
属性,命令请求作用域。目前有两种作用域 :REQUEST
:请求上下文( HystrixRequestContext )。Typically this means that requests within a single user-request (ie. HTTP request) are collapsed.
No interaction with other user requests.
1 queue per user request.GLOBAL
:全局。Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed.
1 queue for entire app.
调用 #getRequestCollapser()
方法,获得 RequestCollapser 。代码如下 :
public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) { if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) { return getCollapserForUserRequest(commandCollapser); } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) { return getCollapserForGlobalScope(commandCollapser); } else { logger.warn("Invalid Scope: {} Defaulting to REQUEST scope.", getScope()); return getCollapserForUserRequest(commandCollapser); }}
根据
scope
不同,调用两个不同方法,获得 RequestCollapser 。这两个方法大体逻辑相同,优先从缓存中查找满足条件的 RequestCollapser 返回;若不存在,则创建满足条件的 RequestCollapser 添加到缓存并返回。REQUEST
:调用#getCollapserForUserRequest()
方法,TODO 【2012】【请求上下文】。GLOBAL
:调用#getCollapserForGlobalScope()
方法,点击 链接 查看中文注释的代码。
4. RequestCollapser
com.netflix.hystrix.collapser.RequestCollapser
,命令请求合并器。主要用于 :
提交单个命令请求到请求队列( RequestQueue )。
接收来自定时任务提交的多个命令,合并执行。
4.1 构造方法
RequestCollapser 构造方法,代码如下 :
public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> { private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser; // batch can be null once shutdown private final AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>> batch = new AtomicReference<RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>>(); private final AtomicReference<Reference<TimerListener>> timerListenerReference = new AtomicReference<Reference<TimerListener>>(); private final AtomicBoolean timerListenerRegistered = new AtomicBoolean(); private final CollapserTimer timer; private final HystrixCollapserProperties properties; private final HystrixConcurrencyStrategy concurrencyStrategy; RequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, HystrixCollapserProperties properties, CollapserTimer timer, HystrixConcurrencyStrategy concurrencyStrategy) { this.commandCollapser = commandCollapser; // the command with implementation of abstract methods we need this.concurrencyStrategy = concurrencyStrategy; this.properties = properties; this.timer = timer; batch.set(new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get())); }}
commandCollapser
属性,命令合并器包装器。batch
属性,RequestBatch,即是本文一直说的请求队列。在 「4.2 RequestBatch」 也会详细解析。timerListenerReference
属性,注册在命令合并器的定时器的监听器。每个 RequestCollapser 独有一个监听器。该监听器( 实际上会使用该监听器创建定时任务 )固定周期从请求队列获取多个命令执行,提交 RequestCollapser 合并执行。在 「5. CollapserTimer」 也会详细解析。timerListenerRegistered
属性,timerListenerReference
是否已经注册。timer
属性,命令合并器的定时器。properties
属性,命令合并器属性配置。concurrencyStrategy
属性,并发策略。
4.2 RequestBatch
com.netflix.hystrix.collapser.RequestBatch
,命令请求队列。提供如下功能 :
命令请求的添加
命令请求的移除
命令请求的批量执行。笔者把 RequestBatch 解释成 "命令请求队列",主要方便大家理解。
那可能有胖友有疑问,为啥该功能不在 RequestCollapser 直接实现,这样 RequestBatch 成为纯粹的队列呢?在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch 构造方法,代码如下 :
public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> { private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser; private final int maxBatchSize; private final AtomicBoolean batchStarted = new AtomicBoolean(); private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap = new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>(); private final HystrixCollapserProperties properties; private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock(); public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) { this.properties = properties; this.commandCollapser = commandCollapser; this.maxBatchSize = maxBatchSize; }}
commandCollapser
属性,命令合并器包装器。maxBatchSize
属性,队列最大长度。batchStarted
属性,执行是否开始。argumentMap
属性,命令请求参数映射( 队列 )。properties
属性,命令合并器属性配置。batchLock
属性,argumentMap
操作的读写锁。
RequestBatch 实现队列具体的操作方法,在 「4.3 #submitRequest(arg)」/「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 一起解析。
4.3 #submitRequest(arg)
在 #toObservable()
方法里,调用 #submitRequest(arg)
方法,提交单个命令请求到 RequestBatch 。代码如下 :
1: public Observable<ResponseType> submitRequest(final RequestArgumentType arg) { 2: /* 3: * We only want the timer ticking if there are actually things to do so we register it the first time something is added. 4: */ 5: if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) { 6: /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */ 7: timerListenerReference.set(timer.addListener(new CollapsedTask())); 8: } 9: 10: // loop until succeed (compare-and-set spin-loop) 11: while (true) { 12: // 获得 RequestBatch 13: final RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get(); 14: if (b == null) { 15: return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown")); 16: } 17: 18: // 添加到 RequestBatch 19: final Observable<ResponseType> response; 20: if (arg != null) { 21: response = b.offer(arg); 22: } else { 23: response = b.offer( (RequestArgumentType) NULL_SENTINEL); 24: } 25: 26: // 添加成功,返回 Observable 27: // it will always get an Observable unless we hit the max batch size 28: if (response != null) { 29: return response; 30: } else { 31: // 添加失败,执行 RequestBatch ,并创建新的 RequestBatch 32: // this batch can't accept requests so create a new one and set it if another thread doesn't beat us 33: createNewBatchAndExecutePreviousIfNeeded(b); 34: } 35: } 36: }
第 5 至 8 行 :当 RequestCollapser 的监听任务( CollapsedTask )还未创建,进行初始化。
第 11 至 35 行 :死循环,直到提交单个命令请求到 RequestBatch 成功。
第 13 至 16 行 :获得 RequestBatch 。从目前代码看下来,除非 RequestCollapser 被
#shutdown()
后才会出现为null
的情况。第 19 至 24 行 :调动
RequestBatch#offer(...)
方法,提交单个命令请求到 RequestBatch ,并获得 Observable 。这里对arg==null
做了特殊处理,因为RequestBatch.argumentMap
是 ConcurrentHashMap ,不允许值为null
。另外,RequestBatch#offer(...)
方法的实现代码,在结束了当前方法,详细解析。第 28 至 29 行 :添加成功,返回 Observable 。
第 30 至 34 行 :添加失败,执行当前 RequestBatch 的多个命令合并执行,并创建新的 RequestBatch 。在 「4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)」 详细解析。
RequestBatch#offer(...)
方法,代码如下 :
1: public Observable<ResponseType> offer(RequestArgumentType arg) { 2: // 执行已经开始,添加失败 3: /* short-cut - if the batch is started we reject the offer */ 4: if (batchStarted.get()) { 5: return null; 6: } 7: 8: /* 9: * The 'read' just means non-exclusive even though we are writing. 10: */ 11: if (batchLock.readLock().tryLock()) { 12: try { 13: // 执行已经开始,添加失败 14: /* double-check now that we have the lock - if the batch is started we reject the offer */ 15: if (batchStarted.get()) { 16: return null; 17: } 18: 19: // 超过队列最大长度,添加失败 20: if (argumentMap.size() >= maxBatchSize) { 21: return null; 22: } else { 23: // 创建 CollapsedRequestSubject ,并添加到队列 24: CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest = new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this); 25: final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest); 26: /** 27: * If the argument already exists in the batch, then there are 2 options: 28: * A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses 29: * be hooked up to that argument 30: * B) If request caching is OFF: return an error to all duplicate argument requests 31: * 32: * This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible 33: * logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser) 34: * of trying to figure out which argument of a set of duplicates should get attached to a response. 35: * 36: * See https://github.com/Netflix/Hystrix/pull/1176 for further discussion. 37: */ 38: if (existing != null) { 39: boolean requestCachingEnabled = properties.requestCacheEnabled().get(); 40: if (requestCachingEnabled) { 41: return existing.toObservable(); 42: } else { 43: return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!")); 44: } 45: } else { 46: return collapsedRequest.toObservable(); 47: } 48: 49: } 50: } finally { 51: batchLock.readLock().unlock(); 52: } 53: } else { 54: return null; 55: } 56: }
第 4 至 6 行 :执行已经开始,添加失败。在
RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法的开头,优先 CAS使batchStarted=true
。第 11 行 :获得读锁。
The'read'just means non-exclusive even though we are writing.
,即使该方法实际在做"写操作",不排他,线程安全,所以可以使用读锁。第 15 至 17 行 :
double-check
,执行已经开始,添加失败。在RequestBatch#executeBatchIfNotAlreadyStarted(...)
方法,优先 CAS 使batchStarted=true
,再获取写锁,所以会出现该情况。第 20 至 21 行 :超过队列最大长度,添加失败。
第 24 至 25 行 :创建
com.netflix.hystrix.collapser.CollapsedRequestSubject
,并将它添加到队列(argumentMap
) 。argument
属性,单个命令请求参数。valueSet
属性,结果( Response ) 是否设置,通过#setResponse()
/#emitResponse()
方法设置。subject
属性,可回放执行结果的 Subject 。此处使用 ReplaySubject 的主要目的,当 HystrixCollapser 开启缓存功能时,通过回放执行结果,在 《Hystrix 源码解析 —— 执行结果缓存》「5. HystrixCachedObservable」 也有相同的实现。另外,这里有一点要注意下,ReplaySubject 并没有向任何 Observable 订阅结果,而是通过#setResponse()
/#emitResponse()
方法设置结果。outstandingSubscriptions
属性,订阅数量。subjectWithAccounting
属性,带订阅数量的 ReplaySubject 。当取消订阅时,调用RequestBatch#remove(arg)
方法,移除单个命令请求。
CollapsedRequestSubject 实现
com.netflix.hystrix.HystrixCollapser.CollapsedRequest
接口,定义了批量命令执行的请求,不仅限于获得请求参数(#getArgument()
方法 ),也包括对批量命令执行结束后,每个请求的结果设置(#setResponse(...)
/#emitResponse(...)
/#setException(...)
/#setComplete()
方法 ),点击 链接 查看该接口的代码。CollapsedRequestSubject 构造方法,代码如下:
/* package */classCollapsedRequestSubject<T,R>implementsCollapsedRequest<T,R>{
/**
* 参数
*/
private final R argument;
/**
* 结果( response ) 是否设置
*/
private AtomicBoolean valueSet = new AtomicBoolean(false);
/**
* 可回放的 ReplaySubject
*/
private final ReplaySubject<T> subject = ReplaySubject.create();
/**
* 带订阅数量的 ReplaySubject
*/
private final Observable<T> subjectWithAccounting;
/**
* 订阅数量
*/
private volatile int outstandingSubscriptions = 0;
public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
// 设置 argument
if (arg == RequestCollapser.NULL_SENTINEL) {
this.argument = null;
} else {
this.argument = arg;
}
// 设置 带订阅数量的 ReplaySubject
this.subjectWithAccounting = subject
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions--;
if (outstandingSubscriptions == 0) {
containingBatch.remove(arg);
}
}
});
}
}
第 38 至 47 行 :返回 Observable 。
当
argumentMap
已经存在arg
对应的 Observable 时,必须开启缓存 (HystrixCollapserProperties.requestCachingEnabled=true
) 功能。原因是,如果在相同的arg
,并且未开启缓存,同时第 43 行实现的是collapsedRequest.toObservable()
,那么相同的arg
将有多个 Observable 执行命令,此时HystrixCollapserBridge#mapResponseToRequests(...)
方法无法将执行( Response )赋值到arg
对应的命令请求( CollapsedRequestSubject ) 。更多讨论,见 https://github.com/Netflix/Hystrix/pull/1176 。回过头看
HystrixCollapser#toObservable()
方法的第 32 至 41 行的代码,这里也有对缓存功能,是不是重复了呢?argumentMap
针对的是 RequestBatch 级的缓存,HystrixCollapser : RequestCollapser : RequestBatch 是1:1:N
的关系,通过HystrixCollapser#toObservable()
对缓存的处理逻辑,保证 RequestBatch 切换后,依然有缓存。
RequestBatch#remove()
方法,代码如下 :
/* package-private */ void remove(RequestArgumentType arg) { if (batchStarted.get()) { //nothing we can do return; } if (batchLock.readLock().tryLock()) { try { /* double-check now that we have the lock - if the batch is started, deleting is useless */ if (batchStarted.get()) { return; } argumentMap.remove(arg); } finally { batchLock.readLock().unlock(); } }}
当 RequestBatch 开始执行,不允许移除单个命令请求。
4.4 #createNewBatchAndExecutePreviousIfNeeded(previousBatch)
本小节建议在 「5. CollapserTimer」 后,再回过头看。
#createNewBatchAndExecutePreviousIfNeeded(previousBatch)
方法,代码如下 :
1: private void createNewBatchAndExecutePreviousIfNeeded(RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> previousBatch) { 2: if (previousBatch == null) { 3: throw new IllegalStateException("Trying to start null batch which means it was shutdown already."); 4: } 5: if (batch.compareAndSet(previousBatch, new RequestBatch<BatchReturnType, ResponseType, RequestArgumentType>(properties, commandCollapser, properties.maxRequestsInBatch().get()))) { 6: // this thread won so trigger the previous batch 7: previousBatch.executeBatchIfNotAlreadyStarted(); 8: } 9: }
第 5 行 :通过 CAS 修改
batch
,保证并发情况下的线程安全。同时注意,此处也进行了新的 RequestBatch ,切换掉老的 RequestBatch 。第 6 行 :使用老的 RequestBatch ,调用
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,命令合并执行。
RequestBatch#executeBatchIfNotAlreadyStarted()
方法,代码如下 :
1: public void executeBatchIfNotAlreadyStarted() { 2: /* 3: * - check that we only execute once since there's multiple paths to do so (timer, waiting thread or max batch size hit) 4: * - close the gate so 'offer' can no longer be invoked and we turn those threads away so they create a new batch 5: */ 6: // 设置 执行已经开始 7: if (batchStarted.compareAndSet(false, true)) { 8: // 获得 写锁 9: /* wait for 'offer'/'remove' threads to finish before executing the batch so 'requests' is complete */ 10: batchLock.writeLock().lock(); 11: 12: try { 13: // 将多个命令请求分片成 N 个【多个命令请求】。 14: // shard batches 15: Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values()); 16: // for each shard execute its requests 17: for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) { 18: try { 19: // 将多个命令请求合并,创建一个 HystrixCommand 20: // create a new command to handle this batch of requests 21: Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests); 22: 23: // 将一个 HystrixCommand 的执行结果,映射回对应的命令请求们 24: commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() { 25: 26: /** 27: * This handles failed completions 28: */ 29: @Override 30: public void call(Throwable e) { 31: // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted 32: Exception ee; 33: if (e instanceof Exception) { 34: ee = (Exception) e; 35: } else { 36: ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e); 37: } 38: logger.debug("Exception mapping responses to requests.", e); 39: // if a failure occurs we want to pass that exception to all of the Futures that we've returned 40: for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) { 41: try { 42: ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee); 43: } catch (IllegalStateException e2) { 44: // if we have partial responses set in mapResponseToRequests 45: // then we may get IllegalStateException as we loop over them 46: // so we'll log but continue to the rest 47: logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2); 48: } 49: } 50: } 51: 52: }).doOnCompleted(new Action0() { 53: 54: /** 55: * This handles successful completions 56: */ 57: @Override 58: public void call() { 59: // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly 60: Exception e = null; 61: for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) { 62: try { 63: e = ((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(e,"No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation."); 64: } catch (IllegalStateException e2) { 65: logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2); 66: } 67: } 68: } 69: 70: }).subscribe(); 71: 72: } catch (Exception e) { 73: // 异常 74: logger.error("Exception while creating and queueing command with batch.", e); 75: // if a failure occurs we want to pass that exception to all of the Futures that we've returned 76: for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) { 77: try { 78: request.setException(e); 79: } catch (IllegalStateException e2) { 80: logger.debug("Failed trying to setException on CollapsedRequest", e2); 81: } 82: } 83: } 84: } 85: 86: } catch (Exception e) { 87: // 异常 88: logger.error("Exception while sharding requests.", e); 89: // same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails 90: for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) { 91: try { 92: request.setException(e); 93: } catch (IllegalStateException e2) { 94: logger.debug("Failed trying to setException on CollapsedRequest", e2); 95: } 96: } 97: } finally { 98: batchLock.writeLock().unlock(); 99: }100: }101: }
代码看起来是有点长哈,请对照着官方示例 CommandCollapserGetValueForKey 一起看,临门一脚了,胖友!
第 7 行 :通过 CAS 修改
batchStarted
,保证并发情况下的线程安全。第 10 行 :获得写锁。等待调用
#offer(...)
/#remove(...)
方法的线程执行完成,以保证命令合并执行时,不再有新的请求添加或移除。第 15 行 :调用
HystrixCollapserBridge#shardRequests(...)
方法,将多个命令请求分片成 N 个【多个命令请求】。默认实现下,不进行分片。点击 链接 查看代码。第 17 行 :循环 N 个【多个命令请求】。
第 21 行 :调用
HystrixCollapserBridge#createObservableCommand(...)
方法,将多个命令请求合并,创建一个 HystrixCommand 。点击 链接 查看代码。第 24 行 :调用
HystrixCollapserBridge#mapResponseToRequests(...)
方法,将一个 HystrixCommand 的执行结果,映射回对应的命令请求们。点击 链接 查看代码。Observable#single()
方法,如果 Observable 终止时只发射了一个值,返回那个值,否则抛出异常。在 《ReactiveX文档中文翻译》「single」 有相关分享。Observable#ignoreElements()
方法,抑制原始 Observable 发射的所有数据,只允许它的终止通知(#onError()
或#onCompleted()
)通过。在 《ReactiveX文档中文翻译》「IgnoreElements」 有相关分享。也推荐点击rx.internal.operators.OperatorIgnoreElements
看下源码,可能更加易懂。Observable#cast()
方法,将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据,它是map
的一个特殊版本。在 《ReactiveX文档中文翻译》「cast」 有相关分享。也推荐点击rx.internal.operators.OperatorCast
看下源码,可能更加易懂。使用
Observable#ignoreElements()
/Observable#cast()
方法,用于将 Observable 变成不再继续向下发射数据项,只给现有方法里Observable#doNext()
处理数据项,调用HystrixCollapser#mapResponseToRequests(...)
方法。点击 链接 ,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
第 24 至 50 行 :调用
Observable#doError(Action1)
方法,当命令合并执行发生异常时,设置每个CollapsedRequestSubject 的执行结果为异常。点击 链接,查看
CollapsedRequestSubject#setResponse(response)
方法的代码。
第 52 至 68 行 :调用
Observable#doOnCompleted(Action0)
方法,当命令合并执行完成时,检查每个CollapsedRequestSubject 是否都有返回结果。设置没有返回结果的 CollapsedRequestSubject 的执行结果为异常。一般情况下,是用户实现HystrixCollapser#mapResponseToRequests(...)
方法存在 BUG 。另外,如果不设置,将导致无结果的单个命令请求无限阻塞。第 70 行 :调用
Observable#subscribe()
方法,触发 HystrixCommand 执行。第 72 至 96 行 :发生异常,设置每个 CollapsedRequestSubject 的执行结果为异常。
点击 链接,查看
CollapsedRequestSubject#setException(response)
方法的代码。
第 97 至 99 行 :释放写锁。
5. CollapserTimer
com.netflix.hystrix.collapser.CollapserTimer
,命令合并器的定时器接口,定义了提交定时监听器,生成定时任务的接口方法,代码如下 :
public interface CollapserTimer { Reference<TimerListener> addListener(TimerListener collapseTask);}
5.1 RealCollapserTimer
com.netflix.hystrix.collapser.RealCollapserTimer
,命令合并器的定时器实现类,代码如下 :
public class RealCollapserTimer implements CollapserTimer { /* single global timer that all collapsers will schedule their tasks on */ private final static HystrixTimer timer = HystrixTimer.getInstance(); @Override public Reference<TimerListener> addListener(TimerListener collapseTask) { return timer.addTimerListener(collapseTask); }}
实际上,使用的是 HystrixTimer 提供的单例。在 《Hystrix 源码解析 —— 执行结果缓存》「3. HystrixTimer 」 有详细解析。
5.2 CollapsedTask
com.netflix.hystrix.collapser.RequestCollapser.CollapsedTask
,定时任务,固定周期( 可配,默认 HystrixCollapserProperties.timerDelayInMilliseconds=10ms
) 轮询其对应的一个RequestCollapser 当前RequestBatch 。若有命令需要执行,则提交 RequestCollapser 合并执行。
代码比较简单,点击 链接 直接看代码。
666. 彩蛋
T T 一开始把命令合并执行,理解成类似线程池批量执行任务,怎么看官方示例,怎么奇怪。有一样的同学,一起泪目 + 握爪下。
本文有点点长,实在不想拆分成多篇。
恩,另外部分地方写的不够清晰,欢迎一起讨论和优化。
胖友,分享一波朋友圈可好!
更多相关文章
- 熔断器 Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑
- http协议请求方法都有哪些?网络安全学习提升
- git命令备忘系列(一):基础命令
- 熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- 【前端词典】8 个提高 JS 性能的方法
- AngularJS 日期时间选择组件(附详细使用方法)
- 华为交换机console口设置密码及状态查看命令
- 5 种方法教你用Python玩转histogram直方图