最近的几篇博客里我讨论了长轮询(long polling)和Spring的DeferredResult技术,并且利用这些概念将生产者消费者项目塞进了一个Web应用程序。 尽管博客中的示例代码展示了相关概念,却也包含了很多逻辑问题。除了在实际的应用程序中不会使用简单的LinkedBlockingQueue而是选择JMS或者其他强健的消息队列服务,并且只会有一个用户可以获得匹配更新。还有一个严重的问题就是在JVM关闭时,行为不良的线程不会被关闭。


你可能会问:为什么这会成为问题……好吧,对程序员来说这真的算不上一个问题,只要随便写点代码就可以解决。但是对使用软件的人而言这却会带来不必要的麻烦。原因是这样会产生很多行为不良的线程,而执行Tomcat的shutdown.sh命令收效甚微。这时你不得不执行下面命令野蛮的杀掉web服务器:


ps -ef | grep java


先得到进程pid,然后


kill -9 <<pid>>


……接着需要有一大片web服务器需要重启,这种混乱绝对让人头痛。最后你执行shutdown.sh停止Tomcat。


在我最近的几篇博客里,我编写的那些行为不良的线程在run()方法开头都包含了下面的代码:


@Override

public void run() { 

 

  while (true) { 

    try { 

 

      DeferredResult<Message> result = resultQueue.take(); 

      Message message = queue.take(); 

 

      result.setResult(message); 

 

    } catch (InterruptedException e) { 

      throw new UpdateException("Cannot get latest update. " + e.getMessage(), e); 

    } 

  } 

}


在上面的代码里,我用了一个无限循环while(true),这意味着线程会一直运行并且不会终止。


@Override

public void run() { 

 

  sleep(5); // Sleep等待app重新加载 

 

  logger.info("The match has now started..."); 

  long now = System.currentTimeMillis(); 

  List<Message> matchUpdates = match.getUpdates(); 

 

  for (Message message : matchUpdates) { 

 

    delayUntilNextUpdate(now, message.getTime()); 

    logger.info("Add message to queue: {}", message.getMessageText()); 

    queue.add(message); 

  } 

  start = true; // 结束,重启 

  logger.warn("GAME OVER"); 

}


上面第二个示例中线程的行为同样很糟糕。线程会从MatchUpdates列表中取消息并在合适的时候添加到消息队列。唯一的可取之处是他们会抛出异常InterruptedException,如果正确处理线程可以正常停止。然而,没有人能确保这一点。


对上面代码的一个有效地快速修正……只要确保创建所有线程都是后台线程。后台线程的定义是:在程序结束时,即使线程还在运行但不会阻止JVM退出。一个后台线程的例子就是JVM的垃圾回收线程。将线程设置为后台线程只需要调用:


thread.setDaemon(true);


……接着执行shutdown.sh,然后砰的一声所有的线程都消失了。然而这种做法有一个问题:如果你的后台线程正在执行重要的任务,刚刚开始执行就被突然结束掉会导致丢失很多重要的数据该怎么办?


你需要确保所有线程都被友好地关闭,在关闭前完成所有正在执行的任务。本文接下来将为这些错误的线程给出一个修复,使用ShutdownHook让他们在关闭前互相协调。根据文档的描述:“一个shutdown hook就是一个初始化但没有启动的线程。 当虚拟机开始执行关闭程序时,它会启动所有已注册的shutdown hook(不按先后顺序)并且并发执行。”读完最后一句话,你可能已经猜到了你需要的就是创建一个负责关闭多有其他线程的线程并通过shutdown hook传递给JVM。只要在你已有线程的run() 方法里用几个小的class做一些手脚。


需要创建ShutdownService和Hook两个类。首先展示的是Hook类,它会将ShutdownService 连接到你的线程,代码如下:


public class Hook { 

 

  private static final Logger logger = LoggerFactory.getLogger(Hook.class); 

 

  private boolean keepRunning = true; 

 

  private final Thread thread; 

 

  Hook(Thread thread) { 

    this.thread = thread; 

  } 

 

  /** 

   * @return True 如果后台线程继续运行

   */ 

  public boolean keepRunning() { 

    return keepRunning; 

  } 

 

  /** 

   * 告诉客户端后台线程关闭并等待友好地关闭

   */ 

  public void shutdown() { 

    keepRunning = false; 

    thread.interrupt(); 

    try { 

      thread.join(); 

    } catch (InterruptedException e) { 

      logger.error("Error shutting down thread with hook", e); 

    } 

  } 

}


Hook包含两个实例变量:keepRunning和thread。thread是对Hook负责关闭实例对象的引用,而keepRunning则是告诉线程继续运行。


Hook有两个public方法:keepRunning()和shutdown()。线程调用keepRunning()来确认是否需要继续运行,而shutdown()是由ShutdownService的shutdown hook线程调用以关闭目标线程。这就是两个方法的有趣之处。首先将keepRunning变量置为false,接着调用thread.interrupt()来打断线程强制抛出一个InterruptedException,最后调用thread.join()等待线程实例关闭。


值得注意的是这种方法需要你的线程配合。如果其中某个线程出错,那么所有的工作都会失败。为了避免这种情况最好在thread.join(…)中加入一个超时。


@Service

public class ShutdownService { 

 

  private static final Logger logger = LoggerFactory.getLogger(ShutdownService.class); 

 

  private final List<Hook> hooks; 

 

  public ShutdownService() { 

    logger.debug("Creating shutdown service"); 

    hooks = new ArrayList<Hook>(); 

    createShutdownHook(); 

  } 

 

  /** 

   * Protected for testing 

   */

  @VisibleForTesting

  protected void createShutdownHook() { 

    ShutdownDaemonHook shutdownHook = new ShutdownDaemonHook(); 

    Runtime.getRuntime().addShutdownHook(shutdownHook); 

  } 

 

  protected class ShutdownDaemonHook extends Thread { 

 

    /** 

     * 循环并使用hook关闭所有后台线程

     * 

     * @see java.lang.Thread#run() 

     */

    @Override

    public void run() { 

 

      logger.info("Running shutdown sync"); 

 

      for (Hook hook : hooks) { 

        hook.shutdown(); 

      } 

    } 

  } 

 

  /** 

   * 创建hook class的新实例 

   */

  public Hook createHook(Thread thread) { 

 

    thread.setDaemon(true); 

    Hook retVal = new Hook(thread); 

    hooks.add(retVal); 

    return retVal; 

  } 

 

  @VisibleForTesting

  List<Hook> getHooks() { 

    return hooks; 

  } 

}


ShutdownService是一个Spring服务包含一个由引用的线程提供的Hook类列表用来关闭线程。它还包括了一个继承了Thread的内部类ShutdownDaemonHook。在ShutdownService构造函数中会创建一个ShutdownDaemonHook实例并传递给JVM作为shutdown hook,代码如下:


Runtime.getRuntime().addShutdownHook(shutdownHook);


ShutdownService 有一个public方法:createHook()。createHook()做的第一步是确保所有传递的线程都被设置为后台线程。接下来会创建一个新的Hook实例,在最终存储结果到列表返回给调用者之前作为参数传递给线程。


最后要做的就是将ShutdownService继承到DeferredResultService和MatchReporter。这两个类包含了行为不良的线程。


@Service("DeferredService") 

public class DeferredResultService implements Runnable { 

 

  private static final Logger logger = LoggerFactory.getLogger(DeferredResultService.class); 

 

  private final BlockingQueue<DeferredResult<Message>> resultQueue = new LinkedBlockingQueue<>(); 

 

  private Thread thread; 

 

  private volatile boolean start = true; 

 

  @Autowired

  private ShutdownService shutdownService; 

 

  private Hook hook; 

 

  @Autowired

  @Qualifier("theQueue") 

  private LinkedBlockingQueue<Message> queue; 

 

  @Autowired

  @Qualifier("BillSkyes") 

  private MatchReporter matchReporter; 

 

  public void subscribe() { 

    logger.info("Starting server"); 

    matchReporter.start(); 

    startThread(); 

  } 

 

  private void startThread() { 

 

    if (start) { 

      synchronized (this) { 

        if (start) { 

          start = false; 

          thread = new Thread(this, "Studio Teletype"); 

          hook = shutdownService.createHook(thread); 

          thread.start(); 

        } 

      } 

    } 

  } 

 

  @Override

  public void run() { 

 

    logger.info("DeferredResultService - Thread running"); 

    while (hook.keepRunning()) { 

      try { 

 

        DeferredResult<Message> result = resultQueue.take(); 

        Message message = queue.take(); 

 

        result.setResult(message); 

 

      } catch (InterruptedException e) { 

        System.out.println("Interrupted when waiting for latest update. " + e.getMessage()); 

      } 

    } 

    System.out.println("DeferredResultService - Thread ending"); 

  } 

 

  public void getUpdate(DeferredResult<Message> result) { 

    resultQueue.add(result); 

  } 

 

}


为DeferredResultService做的第一个修改就是自动匹配ShutdownService实例。接着在线程创建以后thread.start()调用之前使用ShutdownService创建一个Hook实例:


thread = new Thread(this, "Studio Teletype"); 

hook = shutdownService.createHook(thread); 

thread.start();


最后将while(true)替换为:


while (hook.keepRunning()) {


……通知线程什么时候需要结束while循环并关闭。


你可能已经注意到上面的代码里有一些System.out.println()调用。原因并不是对关闭hook线程的执行顺序不确定。需要记住,不仅仅是你编写的类试图关闭其他的子系统也是如此。这就是为这在我原来的代码中,logger.info(…)会给出下面的异常输出:


Exception in thread "Studio Teletype" java.lang.NoClassDefFoundError: org/apache/log4j/spi/ThrowableInformation

 at org.apache.log4j.spi.LoggingEvent.(LoggingEvent.java:159)

 at org.apache.log4j.Category.forcedLog(Category.java:391)

 at org.apache.log4j.Category.log(Category.java:856)

 at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:382)

 at com.captaindebug.longpoll.service.DeferredResultService.run(DeferredResultService.java:75)

 at java.lang.Thread.run(Thread.java:722)

Caused by: java.lang.ClassNotFoundException: org.apache.log4j.spi.ThrowableInformation

 at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1714)

 at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1559)

 ... 6 more


这里的异常是因为logger在调用时已经被卸载了,因此会给出报错。再一次,文档中是这么描述的:“Shutdown hook是在JVM生命周期中的一个微妙的时间执行,因此需要进行防御性变成。尤其是应该注意线程安全尽可能地避免死锁。Hook代码应该不对任何服务盲目依赖,因为这些服务可能会注册自己的shutdown hook并且此时也在关闭的过程中。例如,试图使用基于线程的服务比如AWT时间分发线程会导致死锁。


MatchReport类也需要进行类似的修改。主要的区别在于run() 方法中的hook.keepRunning()是一个for循环:


public class MatchReporter implements Runnable { 

 

  private static final Logger logger = LoggerFactory.getLogger(MatchReporter.class); 

 

  private final Match match; 

 

  private final Queue<Message> queue; 

 

  private volatile boolean start = true; 

 

  @Autowired

  private ShutdownService shutdownService; 

 

  private Hook hook; 

 

  public MatchReporter(Match theBigMatch, Queue<Message> queue) { 

    this.match = theBigMatch; 

    this.queue = queue; 

  } 

 

  /** 

   * 由Spring加载上下文之后调用。会启动匹配……

   */

  public void start() { 

 

    if (start) { 

      synchronized (this) { 

        if (start) { 

          start = false; 

          logger.info("Starting the Match Reporter..."); 

          String name = match.getName(); 

          Thread thread = new Thread(this, name); 

          hook = shutdownService.createHook(thread); 

 

          thread.start(); 

        } 

      } 

    } else { 

      logger.warn("Game already in progress"); 

    } 

  } 

 

  /** 

   * The main run loop 

   */

  @Override

  public void run() { 

 

    sleep(5); // Sleep等待应用加载

 

    logger.info("The match has now started..."); 

    long now = System.currentTimeMillis(); 

    List<Message> matchUpdates = match.getUpdates(); 

 

    for (Message message : matchUpdates) { 

 

      delayUntilNextUpdate(now, message.getTime()); 

      if (!hook.keepRunning()) { 

        break; 

      } 

      logger.info("Add message to queue: {}", message.getMessageText()); 

      queue.add(message); 

    } 

    start = true; // Game over, can restart 

    logger.warn("GAME OVER"); 

  } 

 

  private void sleep(int deplay) { 

    try { 

      TimeUnit.SECONDS.sleep(10); 

    } catch (InterruptedException e) { 

      logger.info("Sleep interrupted..."); 

    } 

  } 

 

  private void delayUntilNextUpdate(long now, long messageTime) { 

 

    while (System.currentTimeMillis() < now + messageTime) { 

 

      try { 

        Thread.sleep(100); 

      } catch (InterruptedException e) { 

        logger.info("MatchReporter Thread interrupted..."); 

      } 

    } 

  } 

 

}


最终的代码测试是在匹配更新过程到一半时执行Tomcat shutdown.sh命令。当JVM终止时会通过ShutdownDaemonHook类调用shutdown hook,其中的run()方法会对Hook实例列表循环执行通知他们关闭各自的线程。如果你执行tail -f查看服务器的日志文件(我这里是catalina.out,你的Tomcat配置可能与我不同),你会看到服务器友好地关闭记录。


本文附带的代码可以在GitHub上找到:https://github.com/roghughe/captaindebug/tree/master/long-poll。


更多相关文章

  1. Java 线程面试题 Top 50
  2. Java后端开发三年多线程你都懂,问你异步编程你说你没听过???
  3. 把 WebAssembly 用于提升速度和代码重用[每日前端夜话0xBC]
  4. 无法理解如何使用javascript和jquery代码
  5. 调用没有当前上下文的OpenGL ES API(每个线程记录一次)
  6. Jquery Validate 正则表达式实用验证代码常用的
  7. Jquery 只执行一次的代码
  8. JavaScript函数中的Ruby代码
  9. Jquery禁用/启用按钮与文本框代码

随机推荐

  1. 我的Android学习计划--走在架构师的路上
  2. Android(安卓)ANR 探索
  3. Android:SNS客户端开发三:数据库操作(一)
  4. 为什么iOS比Android更吸引开发者
  5. Android ListView拖动时背景变黑的解决方
  6. android:duplicateParentState属性解释
  7. Android基础笔记(三)-数据存储和界面展现
  8. Android学习计划表
  9. Android菜鸟必看:解析应用的基本原理
  10. android数据库存储-sqllite