1. 概述

本文主要分享 Hystrix 命令执行(三)之执行超时

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

开启执行超时功能,需要配置 :

  • HystrixCommandProperties.executionTimeoutEnabled :执行命令超时功能开关。

    • 值 :Boolean

    • 默认值 : true

  • HystrixCommandProperties.executionTimeoutInMilliseconds :执行命令超时时长。

    • 值 :Integer

    • 单位 :毫秒

    • 默认值 :1000 毫秒

在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中, #executeCommandAndObserve(...) 方法的第 75 行 lift(newHystrixObservableTimeoutOperator<R>(_cmd)) ,实现了对执行命令超时的监控。

  • 对 Observable#lift(Operator) 方法不熟悉的同学,在 《RxJava 源码解析 —— Observable#lift(Operator)》 有详细解析。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. HystrixObservableTimeoutOperator

HystrixObservableTimeoutOperator 类,代码如下 :

  1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {  2:   3:     final AbstractCommand<R> originalCommand;  4:   5:     public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {  6:         this.originalCommand = originalCommand;  7:     }  8:   9:     @Override 10:     public Subscriber<? super R> call(final Subscriber<? super R> child) { 11:         // 创建 订阅 12:         final CompositeSubscription s = new CompositeSubscription(); 13:         // 添加 订阅 14:         // if the child unsubscribes we unsubscribe our parent as well 15:         child.add(s); 16:  17:         //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later 18:         final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); 19:  20:         TimerListener listener = new TimerListener() { 21:  22:             @Override 23:             public void tick() { 24:                 // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath 25:                 // otherwise it means we lost a race and the run() execution completed or did not start 26:                 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { 27:                     // report timeout failure 28:                     originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); 29:  30:                     // shut down the original request 31:                     s.unsubscribe(); 32:  33:                     final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { 34:  35:                         @Override 36:                         public void run() { 37:                             child.onError(new HystrixTimeoutException()); 38:                         } 39:                     }); 40:  41:                     timeoutRunnable.run(); 42:                     //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout 43:                 } 44:             } 45:  46:             @Override 47:             public int getIntervalTimeInMilliseconds() { 48:                 return originalCommand.properties.executionTimeoutInMilliseconds().get(); 49:             } 50:         }; 51:  52:         final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); 53:  54:         // set externally so execute/queue can see this 55:         originalCommand.timeoutTimer.set(tl); 56:  57:         /** 58:          * If this subscriber receives values it means the parent succeeded/completed 59:          */ 60:         Subscriber<R> parent = new Subscriber<R>() { 61:  62:             @Override 63:             public void onCompleted() { 64:                 if (isNotTimedOut()) { 65:                     // stop timer and pass notification through 66:                     tl.clear(); 67:                     // 完成 68:                     child.onCompleted(); 69:                 } else { 70:                     System.out.println("timeout: " + "onCompleted"); // 笔者调试用 71:                 }  72:             } 73:  74:             @Override 75:             public void onError(Throwable e) { 76:                 if (isNotTimedOut()) { 77:                     // stop timer and pass notification through 78:                     tl.clear(); 79:                     // 异常 80:                     child.onError(e); 81:                 } else { 82:                     System.out.println("timeout: " + "onError"); // 笔者调试用 83:                 }  84:             } 85:  86:             @Override 87:             public void onNext(R v) { 88:                 if (isNotTimedOut()) { 89:                     // 继续执行 90:                     child.onNext(v); 91:                 } else { 92:                     System.out.println("timeout: " + "onNext"); // 笔者调试用 93:                 } 94:             } 95:  96:             /** 97:              * 通过 CAS 判断是否超时 98:              * 99:              * @return 是否超时100:              */101:             private boolean isNotTimedOut() {102:                 // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED103:                 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||104:                         originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);105:             }106: 107:         };108: 109:         // 添加 订阅110:         // if s is unsubscribed we want to unsubscribe the parent111:         s.add(parent);112: 113:         return parent;114:     }115: 116: }
  • 第 12 行 :创建订阅 s


  • 第 15 行 :添加订阅 schild 的订阅。


  • 第 18 行 :获得 HystrixRequestContext 。因为下面 listener的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。


  • 第 20 至 50 行 :创建执行命令超时监听器 listener ( TimerListener ) 。当超过执行命令的时长( TimerListener#getIntervalTimeInMilliseconds() )时, TimerListener#tick()方法触发调用。





    • HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到 Callable#run() 所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。

    • ExecutionIsolationStrategy.THREAD :该策略下提供取消订阅( #unsubscribe() ),并且命令执行超时,强制取消命令的执行。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「6.5 FutureCompleterWithConfigurableInterrupt」 有详细解析。

    • ExecutionIsolationStrategy.SEMAPHORE :该策略下提供取消订阅( #unsubscribe() )时,对超时执行命令的取消。所以,在选择执行隔离策略,要注意这块

    • 第 26 行 :通过 AbstractCommand.isCommandTimedOut 变量 CAS 操作,保证和下面第 60 行的 parent 有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :

    • 第 28 行 :TODO 【2011】【Hystrix 事件机制】

    • 第 31 行 :取消订阅 s 。注意 :不同执行隔离策略此处的表现不同




    • 第 34 至 41 行 :执行 child#onError(e) 【Subscriber#onError(Throwable)】 方法,处理 HystrixTimeoutException 异常。该异常会被 handleFallback 处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。




  • 第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。


  • 第 55 行 :设置 TimerListener 到 AbstractCommand.timeoutTimer 属性。用于执行超时等等场景下的 TimerListener 的清理( tl#clear() )。如下方法有通过该属性对 TimerListener 的清理 :




    • AbstractCommand#handleCommandEnd()

    • AbstractCommand#cleanUpAfterResponseFromCache()

  • 第 60 至 107 行 :创建的 Subscriber ( parent )。在传参的 child 的基础上,增加了对是否执行超时的判断( #isNotTimedOut() )和TimerListener的清理。


  • 第 111 行 :添加添加订阅 parents 的订阅。整体订阅关系如下 :





    • 这里看起来 s 有些“多余” ?因为 parent 和 listener 存在互相引用的情况,通过 s 解决。

  • 第 113 行 :返回 parent注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》 。


3. HystrixTimer

com.netflix.hystrix.util.HystrixTimer ,Hystrix 定时器。

目前有如下场景使用 :

  • 执行命令超时任务,本文详细解析。

  • 命令批量执行,在 《Hystrix 源码解析 —— 命令合并执行》「5. CollapsedTask」 详细解析。

HystrixTimer 构造方法,代码如下 :

public class HystrixTimer {    /**     * 单例     */    private static HystrixTimer INSTANCE = new HystrixTimer();    /* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();    private HystrixTimer() {        // private to prevent public instantiation    }    public static HystrixTimer getInstance() {        return INSTANCE;    }}
  • INSTANCE 静态属性,单例。

  • executor 属性,定时任务执行器( ScheduledExecutor )。


调用 HystrixTimer#addTimerListener(TimerListener) 方法,提交定时监听器,生成定时任务,代码如下 :

  1: public Reference<TimerListener> addTimerListener(final TimerListener listener) {  2:     startThreadIfNeeded();  3:     // add the listener  4:   5:     Runnable r = new Runnable() {  6:   7:         @Override  8:         public void run() {  9:             try { 10:                 listener.tick(); 11:             } catch (Exception e) { 12:                 logger.error("Failed while ticking TimerListener", e); 13:             } 14:         } 15:     }; 16:  17:     ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); 18:     return new TimerReference(listener, f); 19: }
  • 第 2 行 :调用 #startThreadIfNeeded() 方法,保证 executor 延迟初始化已完成。

    • #startThreadIfNeeded() 方法 ,比较简单,点击 链接 查看。

    • ScheduledExecutor 在 「3.1 ScheduledExecutor」 详细解析。

  • 第 5 至 15 行 :创建定时任务 Runnable 。在 Runnable#run() 方法里,调用 TimerListener#tick() 方法。在「3.2 TimerListener」 详细解析。

  • 第 17 行 :提交定时监听器,生成定时任务 f ( ScheduledFuture )。

  • 第 18 行 :使用 listener + f 创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。

3.1 ScheduledExecutor

com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor ,Hystrix 定时任务执行器。代码如下 :

/* package */ static class ScheduledExecutor {    /**    * 定时任务线程池执行器    */    /* package */ volatile ScheduledThreadPoolExecutor executor;    /**     * 是否初始化     */    private volatile boolean initialized;    /**     * We want this only done once when created in compareAndSet so use an initialize method     */    public void initialize() {        // coreSize        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();        int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();        // 创建 ThreadFactory        ThreadFactory threadFactory = null;        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {            threadFactory = new ThreadFactory() {                final AtomicInteger counter = new AtomicInteger();                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());                    thread.setDaemon(true);                    return thread;                }            };        } else {            threadFactory = PlatformSpecific.getAppEngineThreadFactory();        }        // 创建 ScheduledThreadPoolExecutor        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);        // 已初始化        initialized = true;   }   public ScheduledThreadPoolExecutor getThreadPool() {       return executor;   }   public boolean isInitialized() {       return initialized;   }}
  • 线程池大小( coreSize ),通过 HystrixTimerThreadPoolProperties.corePoolSize 配置。

3.2 TimerListener

com.netflix.hystrix.util.HystrixTimer.TimerListener ,Hystrix 定时任务监听器*接口*。代码如下 :

public static interface TimerListener {   /**    * The 'tick' is called each time the interval occurs.    * <p>    * This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning.    * <p>    * This contract is used to keep this implementation single-threaded and simplistic.    * <p>    * If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value.    */   void tick();   /**    * How often this TimerListener should 'tick' defined in milliseconds.    */   int getIntervalTimeInMilliseconds();}
  • #tick() 方法 :时间到达( 超时 )执行的逻辑。

  • #getIntervalTimeInMilliseconds() 方法 :返回到达( 超时 )时间时长。

3.3 TimerReference

com.netflix.hystrix.util.HystrixTimer.TimerReference ,Hystrix 定时任务引用。代码如下 :

private static class TimerReference extends SoftReference<TimerListener> {    private final ScheduledFuture<?> f;    TimerReference(TimerListener referent, ScheduledFuture<?> f) {        super(referent);        this.f = f;    }    @Override    public void clear() {        super.clear();        // stop this ScheduledFuture from any further executions        f.cancel(false); // 非强制    }}
  • 通过 #clear() 方法,可以取消定时任务的执行。

666. 彩蛋

顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。

喵了个咪~

胖友,分享一波朋友圈可好!


©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
  2. 【前端词典】8 个提高 JS 性能的方法
  3. AngularJS 日期时间选择组件(附详细使用方法)
  4. 华为交换机console口设置密码及状态查看命令
  5. 5 种方法教你用Python玩转histogram直方图
  6. 如何用一条命令将网页转成电脑 App
  7. IDEA Debug 无法进入断点的解决方法
  8. libp2p-rs kad 使用及调试方法
  9. Linux学习之常用的Linux文件内容查看命令!

随机推荐

  1. 前端优化方案-JavaScript 优化方案
  2. HTML5练习之简陋版我画你猜(一)
  3. 有一个简单但有用的jquery.JsPlumb示例吗
  4. 根据AngularJS中的条件制作输入类型文件[
  5. JqueryUI Draggable - 仅自动垂直调整父
  6. iKcamp出品|全网最新|微信小程序|基于最新版
  7. 如何使用变量填充iframe源?
  8. 使信号器生成的代理集线器动态
  9. 只有在读取数据库后才选择(案例)(开关)
  10. 未捕获的IndexSizeError:未能在'HTMLTabl