注意:本文来自chaodongyang.com,点击阅读原文
java编程思想之并发(线程之间的协作) 当你使用多线程来同时运行多个任务时,可以通过使用锁来同步两个任务的行为,从而使的一个任务不会干涉另一个任务的资源。也就是说,如果两个任务交替的步入某项共享资源,你可以使用互斥来保证任何时刻只有一个任务可以访问这项资源。
线程之间的协作 上面的问题已经解决了,下一步是如何使得任务彼此之间可以协作,使得多个任务可以一起工作去解决某个问题。现在的问题不是彼此之间的干涉,而是彼此之间的协作。解决这类问题的关键是某些部分必须在其他部分被解决之前解决。 当任务协作时,关键问题是这些任务之间的握手。为了实现握手,我们使用了相同的基础特性:互斥。在这种情况下,互斥能够确保只有一个任务可以响应某个信号,这样就能根除任何可能的竞争条件。在互斥上,我们为任务添加了一种途径,可以将自身挂起,直至某些外部条件发生变化,表示是时候让这个任务开始为止。
wait() 与 notifyAll() wait() 可以使你等待某个条件发生变化,而改变这个条件通常是由另一个任务来改变。你肯定不想在你的任务测试这个条件的同时,不断的进行空循环,这被称为忙等待,是一种不良的 cpu 使用方式。因此 wait() 会在外部条件发生变化的时候将任务挂起,并且只有在 notif() 或 notifAll() 发生时,这个任务才会被唤醒并去检查所发生的变化。因此,wait() 提供了一种在任务之间对活动同步的方式。 调用 sleep() 时候锁并没有被释放,调用 yield() 也是一样。当一个任务在方法里遇到对 wait() 调用时,线程执行被挂起,对象的锁被释放。这就意味着另一个任务可以获得锁,因此在改对象中的其他 synchronized 方法可以在 wait() 期间被调用。因此,当你在调用 wait() 时,就是在声明:“我已经做完了所有的事情,但是我希望其他的 synchronized 操作在条件何时的情况下能够被执行”。 有两种形式的 wait():
第一种接受毫秒作为参数:指再次暂停的时间。
在 wait() 期间对象锁是被释放的。
可以通过 notif() 或 notifAll(),或者指令到期,从 wait() 中恢复执行。
第二种不接受参数的 wait().
这种 wait() 将无线等待下去,直到线程接收到 notif() 或 notifAll()。
wait()、notif()以及 notifAll() 有一个比较特殊的方面,那就是这些方法是基类 Object 的一部分,而不是属于 Thread 类。仅仅作为线程的功能却成为了通用基类的一部分。原因是这些方法操作的锁,也是所有对象的一部分。所以你可以将 wait() 放进任何同步控制方法里,而不用考虑这个类是继承自 Thread 还是 Runnable。实际上,只能在同步方法或者同步代码块里调用 wait()、notif() 或者 notifAll()。如果在非同步代码块里操作这些方法,程序可以通过编译,但是在运行时会得到 IllegalMonitorStateException 异常。意思是,在调用 wait()、notif() 或者 notifAll() 之前必须拥有获取对象的锁。 比如,如果向对象 x 发送 notifAll(),那就必须在能够得到 x 的锁的同步控制块中这么做:
1 2 3 synchronized (x){ x.notifAll(); }
我们看一个示例:一个是将蜡涂到 Car 上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Car { private boolean waxOn = false ; public synchronized void waxed () { waxOn = true ; notifyAll(); } public synchronized void buffed () { waxOn = false ; notifyAll(); } public synchronized void waitForWaxing () throws InterruptedException{ while (waxOn == false ) { wait(); } } public synchronized void waitForBuffing () throws InterruptedException{ while (waxOn == true ) { wait(); } } }
开始打蜡的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class WaxOn implements Runnable { private Car car; protected WaxOn (Car car) { super (); this .car = car; } @Override public void run () { try { while (!Thread.interrupted()) { System.out.println("Wax one" ); TimeUnit.MICROSECONDS.sleep(200 ); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println(" Exiting via interrupt" ); } System.out.println("Ending wax on task" ); } }
开始抛光的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class WaxOff implements Runnable { private Car car; protected WaxOff (Car car) { super (); this .car = car; } @Override public void run () { try { while (!Thread.interrupted()) { car.waitForWaxing(); System.out.println("Wax off" ); TimeUnit.MICROSECONDS.sleep(200 ); car.buffed(); } } catch (InterruptedException e) { System.out.println("Wxtiing via interrupt" ); } System.out.println("Ending wax off task" ); } }
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class WaxOmatic { public static void main (String[] args) throws Exception{ Car car = new Car (); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new WaxOff (car)); service.execute(new WaxOn (car)); TimeUnit.SECONDS.sleep(1 ); service.shutdownNow(); } }
执行结果:
1 2 3 4 5 6 7 8 9 10 Wax one Wax off Wax one Wax off Wax one Wax off Exiting via interrupt Wxtiing via interrupt Ending wax on task Ending wax off task
在 waitForWaxing() 中检查 WaxOn 标志,如果它是 false,那么这个调用任务将会被挂起。这个行为发生在 synchronized 方法中这一点很重要。因为在这个方法中任务已经获得了锁。当你调用 wait() 时,线程被挂起,而锁被释放。释放锁是本质所在,因为为了安全的改变对象的状态,其他某个任务就必须能够获得这个锁。 WaxOn.run() 表示给汽车打蜡的第一个步骤,它执行他的操作:调用 sleep() 模拟打蜡的时间,然后告知汽车打蜡结束,并且调用 waitForWaxing(),这个方法会调用 wait() 挂起当前打蜡的任务。直到 WaxOff 任务调用这两车的 buffed(),从而改变状态并且调用 notfiAll() 重新唤醒为止。翻过来也是一样的,在运行程序时,你可以看到控制权在两个任务之间来回的传递,这两个步骤过程在不断的重复。
** 错失的信号 ** 当两个线程使用 notif()/wait() 或者 notifAll()/wait() 进行协作时,有可能会错过某个信号。假设线程 T1 是通知 T2 的线程,而这两个线程都使用下面的方式实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 T1: synchronized (X){ <setup condition for T2> x.notif(); } T2: while (someCondition){ synchronized (x){ x.wait(); } }
以上的例子假设 T2 对 someCondition 发现其为 true()。在执行 Potit 其中线程调度器可能切换到了 T1。而 T1 将会执行重新设置 condition,并且调用唤醒。当 T2 继续执行时,以至于不能意识到条件已经发生变化,因此会盲目的进入 wait()。此时唤醒在之前已经调用过了,而 T2 将无限的等待下去唤醒的信号。 解决该问题的方案是防止 someCondition 变量上产生竞争条件:
1 2 3 4 5 synchronized (x){ while (someCondition){ x.wait(); } }
notif() 与 notifAll() 可能有多个任务在单个 Car 对象上被挂起处于 wait() 状态,因此调用 notifyAll() 比调用 notify() 更安全。使用 notify() 而不是 notifyAll() 是一种优化。使用 notify() 时,在众多等待同一个锁的任务中只有一个被唤醒,因此如果你希望使用 notify(),就必须保证被唤醒的是恰当的任务。另外使用 notify() ,所有任务都必须等待相同的条件,因为如果你有多个任务在等待不同的条件,那你就不会知道是否唤醒了恰当的任务。如果使用 notfiy(),当条件发生变化时,必须只有一个任务能从中收益。最后,这些限制对所有可能存在的子类都必须总起作用。如果这些规则任何一条不满足都必须使用 notifyAll()。 在 Java 的线程机制中,有一个描述是这样的:notifyAll() 将唤醒所有正在等待的任务。这是否意味着在程序中任何地方,任何处于 wait() 状态中的任务都将被任何对 notifyAll() 的调用唤醒呢?在下面的实例中说明了情况并非如此,当 notifyAll() 因某个特定锁被调用时,只有等待这个锁的任务才会被唤醒:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Blocker { synchronized void waitingCall () { try { while (!Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() + " " ); } } catch (InterruptedException e) { } } synchronized void prod () { notify(); } synchronized void prodAll () { notifyAll(); } }
创建任务 Task:
1 2 3 4 public class Task implements Runnable { static Blocker blocker = new Blocker (); public void run () { blocker.waitingCall(); } }
创建任务 Task2:
1 2 3 4 5 public class Task2 implements Runnable { static Blocker blocker = new Blocker (); public void run () { blocker.waitingCall(); } }
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class NotifyVsNotifyAll { public static void main (String[] args) throws Exception{ ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0 ; i < 3 ; i++) { service.execute(new Task ()); } service.execute(new Task2 ()); Timer timer = new Timer (); timer.scheduleAtFixedRate(new TimerTask () { boolean prod = true ; @Override public void run () { if (prod) { System.out.println("notify" ); Task.blocker.prod(); prod = false ; }else { System.out.println("notifyAll" ); Task.blocker.prodAll(); prod = true ; } } }, 400 , 400 ); TimeUnit.SECONDS.sleep(5 ); timer.cancel(); System.out.println("Time cancle" ); TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("Task2.blocker.prodAll() " ); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500 ); System.out.println("\nShutting down" ); service.shutdownNow(); } }
测试结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] notify Thread[pool-1 -thread-2 ,5 ,main] notifyAll Thread[pool-1 -thread-2 ,5 ,main] Thread[pool-1 -thread-1 ,5 ,main] Thread[pool-1 -thread-3 ,5 ,main] Time cancle Task2.blocker .prodAll () Thread[pool-1 -thread-4 ,5 ,main] Shutting down
从上面输出的结果可以看出,我们启动了三个 Task 任务线程,一个 Task2 线程。使用 timer 做了一个定时器,每间隔 4 毫秒就轮换启动 Task.blocker 的 notify() 和 notifyAll()方法。我们看到 Task 和 Task2 都有 Blocker 对象,他们调用 Blocker 对象的时候都会被阻塞。我们看到当调用 Task.prod() 的时候只有一个在等待锁的任务被唤醒,其余两个继续挂起。当调用 Task.prodAll() 的时候等待的三个线程都会被唤醒。当调用 Task2。prodAll() 的时候 只有 Task2 的线程任务被唤醒。其余的三个 Task 任务继续挂起。
生产者与消费者 请考虑这样一种情况,在饭店有一个厨师和一个服务员。这个服务员必须等待厨师做好膳食。当厨师准备好时会通知服务员,之后服务员上菜,然后返回继续等待。这是一个任务协作示例:厨师代表生产者,而服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须是以有序的方式关闭。 膳食类:
1 2 3 4 5 public class Meal { private final int orderNum; public Meal (int orderNum) { this .orderNum = orderNum; } public String toString () { return "Meal " + orderNum; } }
服务生类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson (Restaurant r) { restaurant = r; } public void run () { try { while (!Thread.interrupted()) { synchronized (this ) { while (restaurant.meal == null ) wait(); } Print.print("Waitperson got " + restaurant.meal); synchronized (restaurant.chef) { restaurant.meal = null ; restaurant.chef.notifyAll(); } } } catch (InterruptedException e) { Print.print("WaitPerson interrupted" ); } } }
厨师类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Chef implements Runnable { private Restaurant restaurant; private int count = 0 ; public Chef (Restaurant r) { restaurant = r; } public void run () { try { while (!Thread.interrupted()) { synchronized (this ) { while (restaurant.meal != null ) wait(); } if (++count == 10 ) { Print.print("Out of food, closing" ); restaurant.exec.shutdownNow(); } Print.printnb("Order up! " ); synchronized (restaurant.waitPerson) { restaurant.meal = new Meal (count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100 ); } } catch (InterruptedException e) { Print.print("Chef interrupted" ); } } }
测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson (this ); Chef chef = new Chef (this ); public Restaurant () { exec.execute(chef); exec.execute(waitPerson); } public static void main (String[] args) { new Restaurant (); } }
执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 Order up! Waitperson got Meal 1 Order up! Waitperson got Meal 2 Order up! Waitperson got Meal 3 Order up! Waitperson got Meal 4 Order up! Waitperson got Meal 5 Order up! Waitperson got Meal 6 Order up! Waitperson got Meal 7 Order up! Waitperson got Meal 8 Order up! Waitperson got Meal 9 Out of food, closing Order up! WaitPerson interrupted Chef interrupted
使用显示的 Lock 和 Condition 对象
在 java SE5 的类库中还有额外的显示工具。我们来重写我们的打蜡和抛光类。使用互斥并允许任务挂起的基本类是 Condition,你可以通过在 Condition 上调用 await() 来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,你可以通过调用 signal() 来通知这个任务,从而唤醒一个任务,或者调用 signalAll() 来唤醒所有在这个 Condition 上被挂起的任务。(signalAll() 比 notifAll() 是更安全的方式)
下面是重写版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 class Car { private Lock lock = new ReentrantLock (); private Condition condition = lock.newCondition(); private boolean waxOn = false ; public void waxed () { lock.lock(); try { waxOn = true ; condition.signalAll(); } finally { lock.unlock(); } } public void buffed () { lock.lock(); try { waxOn = false ; condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing () throws InterruptedException { lock.lock(); try { while (waxOn == false ) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing () throws InterruptedException{ lock.lock(); try { while (waxOn == true ) condition.await(); } finally { lock.unlock(); } } } class WaxOn implements Runnable { private Car car; public WaxOn (Car c) { car = c; } public void run () { try { while (!Thread.interrupted()) { printnb("Wax On! " ); TimeUnit.MILLISECONDS.sleep(200 ); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { print("Exiting via interrupt" ); } print("Ending Wax On task" ); } } class WaxOff implements Runnable { private Car car; public WaxOff (Car c) { car = c; } public void run () { try { while (!Thread.interrupted()) { car.waitForWaxing(); printnb("Wax Off! " ); TimeUnit.MILLISECONDS.sleep(200 ); car.buffed(); } } catch (InterruptedException e) { print("Exiting via interrupt" ); } print("Ending Wax Off task" ); } } public class WaxOMatic2 { public static void main (String[] args) throws Exception { Car car = new Car (); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff (car)); exec.execute(new WaxOn (car)); TimeUnit.SECONDS.sleep(5 ); exec.shutdownNow(); } }
在 Car 的构造器中单个的 Lock 将产生一个 Condition 对象,这个对象被用来管理任务之间的通信。但是这个 Condition 不包含任何有关处理状态的信息,因此你需要额外的表示处理状态的信息,即 Boolean waxOn。
生产者消费者与队列 wait() 和 notifAll() 方法以一种非常低级的方式解决了任务的互操作的问题,即每次交互时都握手。许多时候我们可以使用同步队列来解决协作的问题,同步队列在任何时刻只允许一个任务插入或移除元素。在 Java.util.concurrent.BlockingQueue 接口中提供了这个队列,这个接口有大量的标准实现。可以使用 LinkedBlockingQueue 他是一个无界队列,还可以使用 ArrayBlockingQueue,它具有固定的尺寸,可以在它被阻塞之前向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列为空时,那么这些队列就可以挂起这些任务,并且当有更多的元素可用时恢复这些消费任务。阻塞队列可以解决非常大的问题,而其方式与 wait() 和 notifyAll() 相比,则简单切可靠。
下面是一个简单的测试,它将多个 LiftOff 对象执行串行化。消费者 LiftOffRunner 将每个 LiftOff 对象从 BlockIngQueue 中推出并直接运行。它通过显示的调用 run() 而是用自己的线程来运行,而不是为每个任务启动一个线程。
首先把之前写过的 LiftOff 类贴出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class LiftOff implements Runnable { protected int countDown = 10 ; private static int taskCount = 0 ; private final int id = taskCount++; public LiftOff () {} public LiftOff (int countDown) { this .countDown = countDown; } public String status () { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!" ) + "), " ; } public void run () { while (countDown-- > 0 ) { System.out.print(status()); Thread.yield (); } } }
LiftOffRunner 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; protected LiftOffRunner (BlockingQueue<LiftOff> rockets) { super (); this .rockets = rockets; } public void add (LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { System.out.println("添加失败" ); } } @Override public void run () { try { while (!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { System.out.println("运行中断" ); } System.out.println("退出运行" ); } }
最后是测试类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class TestBlockingQueues { static void getkey () { try { new BufferedReader (new InputStreamReader (System.in)).readLine(); } catch (IOException e) { e.printStackTrace(); } } static void getkey (String message) { Print.print(message); getkey(); } static void test (String msg,BlockingQueue<LiftOff> queue) { LiftOffRunner runner = new LiftOffRunner (queue); Thread thread = new Thread (runner); thread.start(); for (int i = 0 ; i < 5 ; i++) { runner.add(new LiftOff (5 )); } getkey("Press Enter " + msg); thread.interrupt(); } public static void main (String[] args) { test("LinkedBlockingQueue" , new LinkedBlockingQueue <LiftOff>()); test("ArrayBlockingQueue" , new ArrayBlockingQueue <>(3 )); test("SynchronousQueue" , new SynchronousQueue <>()); } }
吐司 BlockingQueue
下面是一个示例,每一台机器都有三个任务:一个只做吐司、一个给吐司抹黄油、另一个在涂抹黄油的吐司上抹果酱。我们来示例如果使用 BlockIngQueue 来运行这个示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast (int idn) { id = idn; } public void butter () { status = Status.BUTTERED; } public void jam () { status = Status.JAMMED; } public Status getStatus () { return status; } public int getId () { return id; } public String toString () { return "Toast " + id + ": " + status; } } class ToastQueue extends LinkedBlockingQueue <Toast> {}class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0 ; private Random rand = new Random (47 ); public Toaster (ToastQueue tq) { toastQueue = tq; } public void run () { try { while (!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep( 100 + rand.nextInt(500 )); Toast t = new Toast (count++); print(t); toastQueue.put(t); } } catch (InterruptedException e) { print("Toaster interrupted" ); } print("Toaster off" ); } } class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer (ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run () { try { while (!Thread.interrupted()) { Toast t = dryQueue.take(); t.butter(); print(t); butteredQueue.put(t); } } catch (InterruptedException e) { print("Butterer interrupted" ); } print("Butterer off" ); } } class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer (ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run () { try { while (!Thread.interrupted()) { Toast t = butteredQueue.take(); t.jam(); print(t); finishedQueue.put(t); } } catch (InterruptedException e) { print("Jammer interrupted" ); } print("Jammer off" ); } } class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0 ; public Eater (ToastQueue finished) { finishedQueue = finished; } public void run () { try { while (!Thread.interrupted()) { Toast t = finishedQueue.take(); if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { print(">>>> Error: " + t); System.exit(1 ); } else print("Chomp! " + t); } } catch (InterruptedException e) { print("Eater interrupted" ); } print("Eater off" ); } } public class ToastOMatic { public static void main (String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue (), butteredQueue = new ToastQueue (), finishedQueue = new ToastQueue (); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster (dryQueue)); exec.execute(new Butterer (dryQueue, butteredQueue)); exec.execute(new Jammer (butteredQueue, finishedQueue)); exec.execute(new Eater (finishedQueue)); TimeUnit.SECONDS.sleep(5 ); exec.shutdownNow(); } }
这个示例中没有任何显示的同步,因为同步队列和系统的设计隐式的管理了每片 Toast 在任何时刻都只有一个任务在操作。因为队列的阻塞,使得处理过程将被自动挂起和恢复。