铁文整理

14.10 同步器

java.util.concurrent包包含了几个能帮助人们管理相互合作的线程集的类,见表14-3。这些机制具有为线程之间的共用集节点模式(common rendezvous patterns)提供的“预置功能”(canned functionality)。如果有一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的库类而不要试图提供手工的锁与条件的集合。

14-3 同步器

它能做什么

何时使用

CyclicBarrier

允许线程集等待直至其中预定数目的线程到达一个公共障栅(barrier,然后可以选择执行一个处理障栅的动作

当大量的线程需要在它们的结果可用之前完成时

CountDownLatch

允许线程集等待直到计数器减为0

当一个或多个线程需要等待直到指定数目的事件发生

Exchanger

允许两个线程在要交换的对象准备好时交换对象

当两个线程工作在同一数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据

Semaphore

允许线程集等待直到被允许继续运行为止

限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止

SynchronousQueue

允许一个线程把对象交给另一个线程

在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时

14.10.1 信号量

概念上讲,一个信号量管理许多的许可证(permits)。为了通过信号量,线程通过调用acquire请求许可。许可的数目是固定的,由此限制了通过的线程数量。其他线程可以通过调用release释放许可。其实没有实际的许可对象,信号量仅维护一个计数。而且,许可不是必须由获取它的线程释放。事实上,任何线程都可以释放任意数目的许可。如果释放的许可多于可用许可的最大数目,信号量只是被设置为可用许可的最大数目。这种随意性使得信号量既具有灵活性又容易带来混乱。

信号量在1968年由Edsger Dijkstra发明,作为同步原语(synchronization primitive)。Dijkstra指出信号量可以被有效地实现,并且有足够的能力解决许多常见的线程同步问题。几乎任何一本操作系统教科书中,都能看到使用信号量实现的有界队列。当然,应用程序员不必自己实现有界队列。建议在信号量的行为能适合你面对的同步问题时才使用它,否则你会陷入思维的混乱。

一个简单的例子,一个许可数为1的信号量作为一个可由其他线程打开或关闭的门很有用。在第14.10.6节有这样的例子,工作器线程创建动画。偶尔,工作器线程等待用户按下一个按钮。工作器线程试图获得一个许可,并且它不得不等待直到按钮点击释放一个许可。

14.10.2 倒计时门栓

一个倒计时门栓(CountDownLatch)让一个线程集等待直到计数变为0。倒计时门栓是一次性的。—旦计数为0,就不能再重用了。

一个有用的特例是计数值为1的门栓。实现一个只能通过一次的门。线程在门外等候直到另一个线程将计数器值置为0

举例,假定一个线程集需要一些初始的数据来完成工作。工作器线程被启动并在门外等候。另一个线程准备数据,当数据准备好的时候,调用countDown,所有工作器线程就可以继续运行了。

然后,可以使用第二个门栓检査什么时候所有工作器线程完成工作。用线程数初始化门栓。每个工作器线程在结束前将门栓计数减1。另一个获取工作结果的线程在门外等待,一旦所有工作器线程终止该线程继续运行。

14.10.3 障栅

CyclicBarrier类实现了一个集结点(rendezvous)称为障栅(barrier)。考虑大量线程运行在一次计算的不同部分的情形。当所有部分都准备好时,需要把结果组合在一起。当一个线程完成了它的那部分任务后,我们让它运行到障栅处。一旦所有的线程都到达了这个障栅,障栅就撤锁,线程就可以继续运行。

下面是其细节。首先,构造一个障栅,并给出参与的线程数:

CyclicBarrier barrier = new CyclicBarrier(nthreads);

每一个线程做一些工作,完成后在障栅上调用await

public void run() {

doWork();

barrier.await();

}

await方法有一个可选的超时参数:

barrier.await(100, TimeUnit.MILLISECONDS);

如果任何一个在障栅上等待的线程离开了障撕,那么障栅就被破坏了(线程可能离开是因为它调用await时设置了超时,或者因为它被中断了)。在这种情况下,所有其他线程的await方法抛出BrokenBarrierException异常。那些已经在等待的线程立即终止await的调用。

可以提供一个可选的障柵动作(barrier action),当所有线程到达障栅的时候就会执行这一动作。

Runnable barrierAction = ....;

CyclicBarrier barrier = new CyclicBarrier(nthreads, barrierAction);

该动作可以收集那些单个线程的运行结果。

障栅被称为是循环的(cyclic),因为可以在所有等待线程被释放后被重用。在这一点上,有别于CountDownLatchCountDownLatch只能被使用一次。

14.10.4 交换器

当两个线程在同一个数据缓冲区的两个实例上工作的时候,就可以使用交换器。典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。

14.10.5 同步队列

同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueueput方法时,它会阻塞直到另一个线程调用take方法为止,反之亦然。与Exchanger的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。

即使SynchronousQueue类实现了BlockingQueue接口,概念上讲,它依然不是一个队列它没有包含任何元素,它的size方法总是返回0

14.10.6 例子:暂停动画与恢复动画

考虑一个要做某项工作的程序,更新屏幕的显示,在等待用户査看结果之后按下按钮继续,然后完成工作的下一步。

许可计数为1的信号量可以用来处理工作器线程和事件分配线程之间的同步。工作器线程在准备好要暂停的时候调用acquire。只要用户点击Continue按钮,GUI线程就调用release

如果在工作器线程准备好的时候,用户多次点击该按钮会发生什么哫?毕竟因为只有一个许可证可用,许可计数为1

14-13中的程序使用这种思路工作。程序以动画形式展示排序算法。工作器线程对数组进行排序,周期性地停止,等待用户继续给出许可。用户看到算法当前状态的绘图会感到满意,并按下Continue按钮允许工作器线程进行下一步处理。

这里不打算让读者为排序算法烦恼,于是,调用Arrays.sort,它实现归并算法。要暂停该算法,我们提供了一个等待信号量的Comparator对象。因此,每当算法比较两个元素的时候,暂停动画。绘制数组的当前值并加亮显示被比较的元素(见图14-8)。

注释:动画展示了较小的有序段归并到较大的有序段,但是它并不那么精确。mergesort算法使用第二个数组存放我们无法看到的临时值。这个例子的要点是不要过于用心地钻研排序算法,而是展示如何使用信号量用来暂停工作器线程。

14-13 AlgorithmAnimation.java

import java.awt.*;

import java.awt.geom.*;

import java.awt.event.*;

import java.util.*;

import java.util.concurrent.*;

import javax.swing.*;

/**

* This program animates a sort algorithm.

*

* @version 1.01 2007-05-18

* @author Cay Horstmann

*/

public class AlgorithmAnimation {

public static void main(String[] args) {

EventQueue.invokeLater(new Runnable() {

public void run() {

JFrame frame = new AnimationFrame();

frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);

frame.setVisible(true);

}

});

}

}

/**

* This frame shows the array as it is sorted, together with buttons to

* single-step the animation or to run it without interruption.

*/

class AnimationFrame extends JFrame {

public AnimationFrame() {

ArrayComponent comp = new ArrayComponent();

add(comp, BorderLayout.CENTER);

final Sorter sorter = new Sorter(comp);

JButton runButton = new JButton("Run");

runButton.addActionListener(new ActionListener() {

public void actionPerformed(ActionEvent event) {

sorter.setRun();

}

});

JButton stepButton = new JButton("Step");

stepButton.addActionListener(new ActionListener() {

public void actionPerformed(ActionEvent event) {

sorter.setStep();

}

});

JPanel buttons = new JPanel();

buttons.add(runButton);

buttons.add(stepButton);

add(buttons, BorderLayout.NORTH);

setSize(DEFAULT_WIDTH, DEFAULT_HEIGHT);

Thread t = new Thread(sorter);

t.start();

}

private static final int DEFAULT_WIDTH = 300;

private static final int DEFAULT_HEIGHT = 300;

}

/**

* This runnable executes a sort algorithm. When two elements are compared, the

* algorithm pauses and updates a component.

*/

class Sorter implements Runnable {

/**

* Constructs a Sorter.

*

* @param values

* the array to be sorted

* @param comp

* the component on which to display the sorting progress

*/

public Sorter(ArrayComponent comp) {

values = new Double[VALUES_LENGTH];

for (int i = 0; i < values.length; i++)

values[i] = new Double(Math.random());

this.component = comp;

this.gate = new Semaphore(1);

this.run = false;

}

/**

* Sets the sorter to "run" mode. Called on the event dispatch thread.

*/

public void setRun() {

run = true;

gate.release();

}

/**

* Sets the sorter to "step" mode. Called on the event dispatch thread.

*/

public void setStep() {

run = false;

gate.release();

}

public void run() {

Comparator<Double> comp = new Comparator<Double>() {

public int compare(Double i1, Double i2) {

component.setValues(values, i1, i2);

try {

if (run)

Thread.sleep(DELAY);

else

gate.acquire();

} catch (InterruptedException exception) {

Thread.currentThread().interrupt();

}

return i1.compareTo(i2);

}

};

Arrays.sort(values, comp);

component.setValues(values, null, null);

}

private Double[] values;

private ArrayComponent component;

private Semaphore gate;

private static final int DELAY = 100;

private volatile boolean run;

private static final int VALUES_LENGTH = 30;

}

/**

* This component draws an array and marks two elements in the array.

*/

class ArrayComponent extends JComponent {

/**

* Sets the values to be painted. Called on the sorter thread.

*

* @param values

* the array of values to display

* @param marked1

* the first marked element

* @param marked2

* the second marked element

*/

public synchronized void setValues(Double[] values, Double marked1, Double marked2) {

this.values = values.clone();

this.marked1 = marked1;

this.marked2 = marked2;

repaint();

}

public synchronized void paintComponent(Graphics g) // Called on the event dispatch thread

{

if (values == null)

return;

Graphics2D g2 = (Graphics2D) g;

int width = getWidth() / values.length;

for (int i = 0; i < values.length; i++) {

double height = values[i] * getHeight();

Rectangle2D bar = new Rectangle2D.Double(width * i, 0, width, height);

if (values[i] == marked1 || values[i] == marked2)

g2.fill(bar);

else

g2.draw(bar);

}

}

private Double marked1;

private Double marked2;

private Double[] values;

}

APIjava.util.concurrent.CyclicBarrier 5.0

  • CyclicBarrier(int parties)

  • CyclicBarrier(int parties, Runnable barrierAction):构建一个线程数目是parties的循环障栅。当所有的线程都在障栅上调用await之后,执行barrierAction。

  • int await()

  • int await(long time, TimeUnit unit):等待直到所有的线程在障栅上调用await或者时间超时为止,在这种情况下会抛出TimeoutException异常。成功时,返回这个线程的序号。第一个线程的序号为parties-1,最后一个线程是0。

APIjava.utill.concurrent.CountDownLatch 5.0

  • CountDownLatch(int count):用给定的计数构建一个倒计时门拴。

  • void await():等待这个门栓的计数降为0。

  • boolean await(long time, TimeUnit unit):等待这个门栓的计数降为0或者时间超时。如果计数为0返回true。如果超时返回false。

  • public void countDown():递减这个门栓的计数值,

APIjava.util.concrrent.Exchanger<V> 5.0

  • V exchange(V item)

  • V exchange(V item, long time, TimeUnit unit):阻塞直到另一个线程调用这个方法,然后,同其他线程交换item,并返回其他线程的item。第二个方法时间超时时,抛出TimeoutException异常。

APIjava.util.concurrent.SynchronousQueue<V> 5.0

  • SynchronousQueue()

  • SynchronousQueue(boolean fair):构建一个允许线程提交item的同步队列。如果fair为true,队列优先照顾等待了最长时间的线程。

  • void put(V item):阻塞直到另一个线程调用take来获取item。

  • V take():阻塞直到另一个线程调用put。返回另一个线程提供的item。

APIjava.util.concurrent.Semaphore 5.0

  • Semaphore(int permits)

  • Semaphore(int permits, boolean fair):用给定的许可数目为最大值构建一个信号量。如果fair为true,队列优先照顾等待了最长时间的线程。

  • void acquire():等待获得一个许可。

  • boolean tryAcquire():尝试获得一个许可,如果没有许可是可用的,返回false。

  • boolean tryAcquire(long time, TimeUnit unit):尝试在给定时间内获得一个许可,如果没有许可是可用的,返回false。

void release():释放一个许可。

更多相关文章

  1. Java多线程之Thread、Runnable、Callable及线程池
  2. Java,Socket&TCP编程 实现多线程端对端通信与文件传输
  3. Java错误:线程“main”中的异常java.lang.ArrayIndexOutOfBoundsE
  4. Java并发面试题:三个线程轮流打印十次abc
  5. 【java】线程安全的整型类AtomicInteger
  6. Java多线程编程
  7. 线程“main”中的异常java.lang.RuntimeException:无法编译的源代
  8. Java ThreadPoolExecutor 线程池调度器
  9. Java多线程wait和notify协作,按序打印abc

随机推荐

  1. 什么是好的数据指标:精益数据分析
  2. 为什么说 Python 内置函数并不是万能的?
  3. 如何处理偏态数据?
  4. SQL今日一题(6):count
  5. Python有序字典的两个小“惊喜”~~
  6. 学习SQL:MySQL必知必会
  7. 醒醒!Python已经支持中文变量名啦!
  8. Python 幕后解释器:一系列的学习资源
  9. SQL每日一题
  10. 再说相关性分析