熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时
1. 概述
本文主要分享 Hystrix 命令执行(三)之执行超时。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
开启执行超时功能,需要配置 :
HystrixCommandProperties.executionTimeoutEnabled
:执行命令超时功能开关。值 :Boolean
默认值 :
true
HystrixCommandProperties.executionTimeoutInMilliseconds
:执行命令超时时长。值 :Integer
单位 :毫秒
默认值 :1000 毫秒
在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中, #executeCommandAndObserve(...)
方法的第 75 行 lift(newHystrixObservableTimeoutOperator<R>(_cmd))
,实现了对执行命令超时的监控。
对
Observable#lift(Operator)
方法不熟悉的同学,在 《RxJava 源码解析 —— Observable#lift(Operator)》 有详细解析。
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
2. HystrixObservableTimeoutOperator
HystrixObservableTimeoutOperator 类,代码如下 :
1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { 2: 3: final AbstractCommand<R> originalCommand; 4: 5: public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { 6: this.originalCommand = originalCommand; 7: } 8: 9: @Override 10: public Subscriber<? super R> call(final Subscriber<? super R> child) { 11: // 创建 订阅 12: final CompositeSubscription s = new CompositeSubscription(); 13: // 添加 订阅 14: // if the child unsubscribes we unsubscribe our parent as well 15: child.add(s); 16: 17: //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later 18: final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); 19: 20: TimerListener listener = new TimerListener() { 21: 22: @Override 23: public void tick() { 24: // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath 25: // otherwise it means we lost a race and the run() execution completed or did not start 26: if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { 27: // report timeout failure 28: originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); 29: 30: // shut down the original request 31: s.unsubscribe(); 32: 33: final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { 34: 35: @Override 36: public void run() { 37: child.onError(new HystrixTimeoutException()); 38: } 39: }); 40: 41: timeoutRunnable.run(); 42: //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout 43: } 44: } 45: 46: @Override 47: public int getIntervalTimeInMilliseconds() { 48: return originalCommand.properties.executionTimeoutInMilliseconds().get(); 49: } 50: }; 51: 52: final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); 53: 54: // set externally so execute/queue can see this 55: originalCommand.timeoutTimer.set(tl); 56: 57: /** 58: * If this subscriber receives values it means the parent succeeded/completed 59: */ 60: Subscriber<R> parent = new Subscriber<R>() { 61: 62: @Override 63: public void onCompleted() { 64: if (isNotTimedOut()) { 65: // stop timer and pass notification through 66: tl.clear(); 67: // 完成 68: child.onCompleted(); 69: } else { 70: System.out.println("timeout: " + "onCompleted"); // 笔者调试用 71: } 72: } 73: 74: @Override 75: public void onError(Throwable e) { 76: if (isNotTimedOut()) { 77: // stop timer and pass notification through 78: tl.clear(); 79: // 异常 80: child.onError(e); 81: } else { 82: System.out.println("timeout: " + "onError"); // 笔者调试用 83: } 84: } 85: 86: @Override 87: public void onNext(R v) { 88: if (isNotTimedOut()) { 89: // 继续执行 90: child.onNext(v); 91: } else { 92: System.out.println("timeout: " + "onNext"); // 笔者调试用 93: } 94: } 95: 96: /** 97: * 通过 CAS 判断是否超时 98: * 99: * @return 是否超时100: */101: private boolean isNotTimedOut() {102: // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED103: return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||104: originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);105: }106: 107: };108: 109: // 添加 订阅110: // if s is unsubscribed we want to unsubscribe the parent111: s.add(parent);112: 113: return parent;114: }115: 116: }
第 12 行 :创建订阅
s
。第 15 行 :添加订阅
s
到child
的订阅。第 18 行 :获得 HystrixRequestContext 。因为下面
listener
的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。第 20 至 50 行 :创建执行命令超时监听器
listener
( TimerListener ) 。当超过执行命令的时长(TimerListener#getIntervalTimeInMilliseconds()
)时,TimerListener#tick()
方法触发调用。HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到
Callable#run()
所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。
ExecutionIsolationStrategy.THREAD
:该策略下提供取消订阅(#unsubscribe()
),并且命令执行超时,强制取消命令的执行。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「6.5 FutureCompleterWithConfigurableInterrupt」 有详细解析。ExecutionIsolationStrategy.SEMAPHORE
:该策略下未提供取消订阅(#unsubscribe()
)时,对超时执行命令的取消。所以,在选择执行隔离策略,要注意这块。
第 26 行 :通过
AbstractCommand.isCommandTimedOut
变量 CAS 操作,保证和下面第 60 行的parent
有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :第 28 行 :TODO 【2011】【Hystrix 事件机制】
第 31 行 :取消订阅
s
。注意 :不同执行隔离策略此处的表现不同。第 34 至 41 行 :执行
child#onError(e) 【Subscriber#onError(Throwable)】
方法,处理 HystrixTimeoutException 异常。该异常会被handleFallback
处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。
第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。
第 55 行 :设置 TimerListener 到
AbstractCommand.timeoutTimer
属性。用于执行超时等等场景下的 TimerListener 的清理(tl#clear()
)。如下方法有通过该属性对 TimerListener 的清理 :AbstractCommand#handleCommandEnd()
AbstractCommand#cleanUpAfterResponseFromCache()
第 60 至 107 行 :创建新的 Subscriber (
parent
)。在传参的child
的基础上,增加了对是否执行超时的判断(#isNotTimedOut()
)和TimerListener的清理。第 111 行 :添加添加订阅
parent
到s
的订阅。整体订阅关系如下 :这里看起来
s
有些“多余” ?因为parent
和listener
存在互相引用的情况,通过s
解决。
第 113 行 :返回
parent
。注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》 。
3. HystrixTimer
com.netflix.hystrix.util.HystrixTimer
,Hystrix 定时器。
目前有如下场景使用 :
执行命令超时任务,本文详细解析。
命令批量执行,在 《Hystrix 源码解析 —— 命令合并执行》「5. CollapsedTask」 详细解析。
HystrixTimer 构造方法,代码如下 :
public class HystrixTimer { /** * 单例 */ private static HystrixTimer INSTANCE = new HystrixTimer(); /* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>(); private HystrixTimer() { // private to prevent public instantiation } public static HystrixTimer getInstance() { return INSTANCE; }}
INSTANCE
静态属性,单例。executor
属性,定时任务执行器( ScheduledExecutor )。
调用 HystrixTimer#addTimerListener(TimerListener)
方法,提交定时监听器,生成定时任务,代码如下 :
1: public Reference<TimerListener> addTimerListener(final TimerListener listener) { 2: startThreadIfNeeded(); 3: // add the listener 4: 5: Runnable r = new Runnable() { 6: 7: @Override 8: public void run() { 9: try { 10: listener.tick(); 11: } catch (Exception e) { 12: logger.error("Failed while ticking TimerListener", e); 13: } 14: } 15: }; 16: 17: ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); 18: return new TimerReference(listener, f); 19: }
第 2 行 :调用
#startThreadIfNeeded()
方法,保证executor
延迟初始化已完成。#startThreadIfNeeded()
方法 ,比较简单,点击 链接 查看。ScheduledExecutor 在 「3.1 ScheduledExecutor」 详细解析。
第 5 至 15 行 :创建定时任务 Runnable 。在
Runnable#run()
方法里,调用TimerListener#tick()
方法。在「3.2 TimerListener」 详细解析。第 17 行 :提交定时监听器,生成定时任务
f
( ScheduledFuture )。第 18 行 :使用
listener
+f
创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。
3.1 ScheduledExecutor
com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor
,Hystrix 定时任务执行器。代码如下 :
/* package */ static class ScheduledExecutor { /** * 定时任务线程池执行器 */ /* package */ volatile ScheduledThreadPoolExecutor executor; /** * 是否初始化 */ private volatile boolean initialized; /** * We want this only done once when created in compareAndSet so use an initialize method */ public void initialize() { // coreSize HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy(); int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get(); // 创建 ThreadFactory ThreadFactory threadFactory = null; if (!PlatformSpecific.isAppEngineStandardEnvironment()) { threadFactory = new ThreadFactory() { final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { threadFactory = PlatformSpecific.getAppEngineThreadFactory(); } // 创建 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory); // 已初始化 initialized = true; } public ScheduledThreadPoolExecutor getThreadPool() { return executor; } public boolean isInitialized() { return initialized; }}
线程池大小(
coreSize
),通过HystrixTimerThreadPoolProperties.corePoolSize
配置。
3.2 TimerListener
com.netflix.hystrix.util.HystrixTimer.TimerListener
,Hystrix 定时任务监听器*接口*。代码如下 :
public static interface TimerListener { /** * The 'tick' is called each time the interval occurs. * <p> * This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning. * <p> * This contract is used to keep this implementation single-threaded and simplistic. * <p> * If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value. */ void tick(); /** * How often this TimerListener should 'tick' defined in milliseconds. */ int getIntervalTimeInMilliseconds();}
#tick()
方法 :时间到达( 超时 )执行的逻辑。#getIntervalTimeInMilliseconds()
方法 :返回到达( 超时 )时间时长。
3.3 TimerReference
com.netflix.hystrix.util.HystrixTimer.TimerReference
,Hystrix 定时任务引用。代码如下 :
private static class TimerReference extends SoftReference<TimerListener> { private final ScheduledFuture<?> f; TimerReference(TimerListener referent, ScheduledFuture<?> f) { super(referent); this.f = f; } @Override public void clear() { super.clear(); // stop this ScheduledFuture from any further executions f.cancel(false); // 非强制 }}
通过
#clear()
方法,可以取消定时任务的执行。
666. 彩蛋
顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。
喵了个咪~
胖友,分享一波朋友圈可好!
更多相关文章
- 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
- 【前端词典】8 个提高 JS 性能的方法
- AngularJS 日期时间选择组件(附详细使用方法)
- 华为交换机console口设置密码及状态查看命令
- 5 种方法教你用Python玩转histogram直方图
- 如何用一条命令将网页转成电脑 App
- IDEA Debug 无法进入断点的解决方法
- libp2p-rs kad 使用及调试方法
- Linux学习之常用的Linux文件内容查看命令!