前言
在前篇 執行緒(1) 初次介紹了執行緒,並提出執行緒的競速情況(Race Condition)。在此篇文章會介紹簡單、靈活地操作執行緒,至於更深入的控制執行緒的剖析,將不在此篇討論。
在執行緒中的"鎖"提供了兩種主要特性:互斥(mutual exclusion)和可見性(visibility)。
互斥即一次只允許一個執行緒持有某個特定的鎖,因此可使用該特性實作對共享數據的協調訪問協議,這樣,一次就只有一個執行緒能夠使用該共享數據。
可見性要更加複雜一些,它必須確保釋放鎖之前對共享數據做出更改,對於隨後獲得該鎖的另一個執行緒是可見的。如果沒有同步機制提供的這種可見性保證,執行緒看到的共享變數可能是修改前的值或不一致的值,這將引發許多嚴重問題。
一、Object 類別的 wait() / notify() / notifyAll()
在 Object 類別的常見方法-wait/notify/notifyAll方法中,有稍微提及這個觀念,可以利用這三個方法實作執行緒的溝通。
在執行緒中調用 wait() 方法,將阻塞等待其他執行緒的通知;其他執行緒調用 notify() 方法或 notifyAll() 方法,將通知等待的執行緒再次進入可執行緒池中準備執行。
Object 是所有類別的根類別,它有 5 個方法組成了等待/通知機制的核心:notify()、notifyAll()、wait()、wait(long) 和 wait(long, int),這些方法必須寫在有「synchronized」關鍵字的同步化區塊中。
在 Java 中,所有的類別都從 Object 繼承而來,因此,所有的類別都擁有這些共有方法可供使用。而且,由於他們都被宣告為 final,因此在子類別中不能覆寫任何一個方法。
在多執行緒中,「生產者與消費者」是經典的問題,我們將使用這類問題闡述如何操作執行緒。
範例:小白狗(消費者)與它的主人(生產者)
建立 Cookies 類別
/**
*
* @author Ethan
*/
class Cookies {
private int cookiesNo;
// volatile 關鍵字提供了執行緒的可見性,並不能保證執行緒安全性和原子性。
// 若有執行緒變動了變數值,令一個執行緒一定可以看到更新過後的變數值。
private static volatile boolean empty = true;
public synchronized void put(int cNo){
while(!empty){
try{
// 將執行緒放入"等待池"中
wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
System.out.println("主人放了第 " + cNo + " 塊餅乾");
cookiesNo = cNo;
// 更改旗標
empty = false;
// 通知在"等待池"中的執行緒,回到"可執行池"中
notify();
}
public synchronized void eat(int cNo){
while(empty){
try{
// 將執行緒放入"等待池"中
wait();
}catch(InterruptedException e){
e.printStackTrace();
}
}
System.out.println("小白狗吃了第 " + cNo + " 塊餅乾");
// 更改旗標
empty = true;
// 通知在"等待池"中的執行緒,回到"可執行池"中
notify();
}
}
建立 Put 類別,實作 Runnable 介面。
xxxxxxxxxx
/**
*
* @author Ethan
*/
class Put implements Runnable{
Cookies cookies;
public Put(Cookies cookies) {
this.cookies = cookies;
}
public void run() {
for(int i=1; i<=10; i++){
cookies.put(i);
}
}
}
建立 Eat 類別,實作 Runnable 介面。
xxxxxxxxxx
/**
*
* @author Ethan
*/
class Eat implements Runnable{
Cookies cookies;
public Eat(Cookies cookies) {
this.cookies = cookies;
}
public void run() {
for(int i=1; i<=10; i++){
cookies.eat(i);
}
}
}
建立 DogAndCookies 類別,測試 wait() 與 notify()
xxxxxxxxxx
/**
*
* @author Ethan
*/
public class DogAndCookies {
public static void main(String[] args) {
Cookies cookies = new Cookies();
Put put = new Put(cookies);
Eat eat = new Eat(cookies);
Thread master = new Thread(put);
Thread dog = new Thread(eat);
dog.start();
master.start();
}
}

二、CountDownLatch 類別
CountDownLatch 類別在 java.util.concurrent 套件包中,一個同步輔助類別,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。用給定的計數初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await() 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await() 的所有後續調用都將立即返回。這種操作只能一次(計數無法被重置)。一個執行緒(或者多個),等待另外 N 個執行緒完成某個事情之後才能執行。
範例:老師(消費者)等待學生(生產者)把功課交上來,才能批改學生作業
建立 Student 類別,實作 Runnable 介面。
xxxxxxxxxx
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*
* @author Ethan
*/
class Student implements Runnable{
private CountDownLatch downLatch;
private String name;
public Student(CountDownLatch downLatch, String name) {
this.downLatch = downLatch;
this.name = name;
}
public void run() {
System.out.println(this.name + " 正在努力寫功課中...");
try {
// TimeUnit.sleep() 內部呼叫 Thread.sleep() 也會拋出 InterruptedException 例外。
// 隨機停止執行續 0 ~ 10 秒
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.name + " 功課做完了");
// 將 計數器 的數字減一
this.downLatch.countDown();
}
}
建立 Teacher 類別,實作 Runnable 介面。
xxxxxxxxxx
import java.util.concurrent.CountDownLatch;
/**
*
* @author Ethan
*/
class Teacher implements Runnable{
private CountDownLatch downLatch;
public Teacher(CountDownLatch downLatch) {
this.downLatch = downLatch;
}
public void run() {
System.out.println("老師正在等待所有的學生把功課做完......");
try {
// 調用 CountDownLatch 的 await() 方法
// 就好比調用了 Object 的 wait() 方法
// 當前執行緒會處於"阻塞"狀態
// 直到 downLatch 倒數到 0 時,才回到"可執行池"中
this.downLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("老師開始批改學生的作業!");
}
}
建立 CountDownLatchDemo 類別,測試一個執行緒等待多個執行緒完成工作。
xxxxxxxxxx
import java.util.concurrent.CountDownLatch;
/**
*
* @author Ethan
*/
public class CountDownLatchDemo {
public static void main(String[] args) {
// 實例化一個 CountDownLatch 物件,等待三個執行緒完成
CountDownLatch latch = new CountDownLatch(3);
Student s1 = new Student(latch, "張三");
Student s2 = new Student(latch, "李四");
Student s3 = new Student(latch, "王二");
Teacher teacher = new Teacher(latch);
new Thread(s1).start();
new Thread(s2).start();
new Thread(s3).start();
new Thread(teacher).start();
}
}

三、CyclicBarrier 類別
CyclicBarrier 類別在 java.util.concurrent 套件包中,CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的執行緒數量,每個執行緒調用 await() 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。
範例:有一場百米賽跑,當跑者準備好後,才一起出發
建立 Runner 類別,實作 Runnable 介面。
xxxxxxxxxx
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
*
* @author Ethan
*/
class Runner implements Runnable{
private CyclicBarrier cb;
private String name;
public Runner(CyclicBarrier cb, String name) {
this.cb = cb;
this.name = name;
}
public void run() {
try {
// 隨機睡眠 0 ~ 5 秒
Thread.sleep(1000 * new Random().nextInt(5));
System.out.println(this.name + " 已經準備好...");
// 已到達屏障
cb.await();
// 當最後一個執行緒到達屏障,才能回到"可執行池"中
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (BrokenBarrierException e2) {
e2.printStackTrace();
}
System.out.println(this.name + " 出發!!");
}
}
建立 CyclicBarrierDemo 類別,測試所有執行緒必須全部到達屏障後,才能繼續往下執行。
xxxxxxxxxx
import java.util.concurrent.CyclicBarrier;
/**
*
* @author Ethan
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 實例化一個 CyclicBarrier 物件,屏障數量為 5
// 必須要等待最後一個執行緒到達屏障時,
// 所有被屏障攔截的執行緒才會回到"可執行池"中
CyclicBarrier cb = new CyclicBarrier(5);
Runner run1 = new Runner(cb, "李四");
Runner run2 = new Runner(cb, "丁一");
Runner run3 = new Runner(cb, "王二");
Runner run4 = new Runner(cb, "陳五");
Runner run5 = new Runner(cb, "張三");
new Thread(run1).start();
new Thread(run2).start();
new Thread(run3).start();
new Thread(run4).start();
new Thread(run5).start();
}
}

四、執行緒池(Executor 介面)
Executor 框架主要由三大部分組成和一個工廠類別:
任務:包括被執行任務需要實作的介面 Runnable 介面或者 Callable 介面。
任務的執行:包括執行機制的核心介面 Executor,以及繼承 Executor 的 ExecutorService 介面。Executor 框架有兩個關鍵類別 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。
異步計算的結果:包括介面 Future 和實作 Future 介面的 FutureTask 類別。
工廠類別(Executors):使用靜態方法建立各種執行緒池。

Executor 框架的使用:
主執行緒創建實作 Runnable 或者 Callable 介面的任務物件,工具類 Executors 可以把一個 Runnable 物件封裝成一個 Callable 物件,使用 Executors.callable(Runnable task)。然後交給 ExecutorService 用 execute 或者 submit 執行,前者不返回結果,後者可以返回實作了 Future 介面的物件(到目前為止返回的是 FutureTask 物件),FutureTask 實作了 Runnable,程序員可以創建 FutureTask,然後直接交給 ExecutorService。
最後主執行緒可以執行 FutureTask.get() 方法等待任務完成(阻塞方法),也可以使用 FutureTask.cancel(boolean mayInterruptIfRunning) 來取消任務的執行。

您會發現,使用 Executor 框架後,不需要創建 Thread 物件實例,而是使用工廠類別 Executors 創建,並把實作 Runnable 和 Callable 的類別丟進創建的執行緒池中統一管理。
方法 | 功能 |
---|---|
Executors.newCachedThreadPool() Executors.newCachedThreadPool(ThreadFactory threadFactory) |
創建可緩存的執行緒池,如果執行緒池中的執行緒在60秒未被使用就將被移除,在執行新的任務時,當執行緒池中有之前創建的可用執行緒就重用可用執行緒,否則就新建一條執行緒,它是無上限容量的執行緒池,適用於執行很多的短期異步任務的小程序,或者負載較輕的服務器。 |
Executors.newFixedThreadPool(int nThreads) Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory) |
創建可重用且固定執行緒數的執行緒池,如果執行緒池中的所有執行緒都處於活動狀態,此時再提交任務就在隊列中等待,直到有可用執行緒;如果執行緒池中的某個執行緒由於異常而結束時,執行緒池就會再補充一條新執行緒。適用於負載比較重的服務器。 |
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor(ThreadFactory threadFactory) |
創建一個單執行緒的Executor,如果該執行緒因為異常而結束就新建一條執行緒來繼續執行後續的任務,適用於需要保證順序的執行各個任務,並且在任意時間點不會有多個執行緒時活動的場景。 |
Executors.newScheduledThreadPool(int corePoolSize) Executors.newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) |
創建一個可延遲執行或定期執行的執行緒池,包含若干個執行緒。適用於需要多個後台程序執行週期的任務,同時為了滿足資源管理的需求而限制後台執行緒的數量的場景。 |
Executors.newSingleThreadScheduledExecutor() Executors.newSingleThreadScheduledExecutor(ThreadFactory threadFactory) |
只包含一個執行緒的執行延遲或者定期的執行緒池。適用於需要單個後台執行緒執行週期任務,同時需要保證順序地執行各個任務的場景。 |
範例:把百米賽跑改成使用執行緒池
xxxxxxxxxx
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author Ethan
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 實例化一個 CyclicBarrier 物件,屏障數量為 5
// 必須要等待最後一個執行緒到達屏障時,
// 所有被屏障攔截的執行緒才會回到"可執行池"中
CyclicBarrier cb = new CyclicBarrier(5);
// 建立緩存的執行緒池
ExecutorService executor = Executors.newCachedThreadPool();
// 在執行緒池中加入 Runnable 或 Callable, 並執行
executor.execute(new Runner(cb, "李四"));
executor.execute(new Runner(cb, "丁一"));
executor.execute(new Runner(cb, "王二"));
executor.execute(new Runner(cb, "陳五"));
executor.execute(new Runner(cb, "張三"));
// 沒有使用時,要把執行緒池關閉
executor.shutdown();
}
}

Callable<V> 介面
Callable 的作用與 Runnable 類似,可讓您定義想要的執行流程,差別是
Runnable 的 run() 方法無法回傳值,也無法拋出例外;
Callable 的 call() 方法可以回傳值,也可以拋出受檢例外。
範例:樂透彩,實作 Callable 介面並取得回傳值
創建一個 Lotto 類別,實作 Callable 介面
xxxxxxxxxx
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
*
* @author Ethan
*/
public class Lotto implements Callable<Integer> {
public Integer call() throws Exception {
System.out.println("\n搖獎機正在搖出樂透數字...");
// 此執行緒隨機睡眠 0 ~ 3 秒
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
// 隨機產生樂透號碼
int num = new Random().nextInt(49) + 1;
System.out.println("樂透號碼: " + num + ", 產生的時間: " + new Date());
return num;
}
}
創建一個 ExecutorCallableDemo 類別,主程式中使用執行緒池,並使用 submit() 方法取得 Future 物件。
xxxxxxxxxx
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @author Ethan
*/
public class ExecutorCallableDemo {
public static void main(String[] args) {
System.out.println("請顯示搖獎機產生的樂透數字");
// 建立緩存的執行緒池
ExecutorService pool = Executors.newCachedThreadPool();
System.out.println("搖槳中..\n\n");
// 從執行緒池中,執行已實作Callable的類別,並返回 Future 物件
Future<Integer> future = pool.submit(new Lotto());
int result = -1;
try {
// 從 Future 物件中取得返回值
result = future.get();
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e2) {
e2.printStackTrace();
}
System.out.println("\n搖獎機產生的樂透號碼: " + result);
// 關閉執行緒池
pool.shutdown();
}
}

範例:樂透彩,測試 ExecutorService 的 invokeAll() 方法
修改 ExecutorCallableDemo 類別
xxxxxxxxxx
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @author Ethan
*/
public class ExecutorCallableDemo {
public static void main(String[] args) {
System.out.println("請顯示搖獎機產生的樂透數字(從七個中選一個)");
// 建立緩存的執行緒池
ExecutorService pool = Executors.newCachedThreadPool();
// 建立一個可以裝 Lotto 物件的集合
ArrayList<Callable<Integer>> list = new ArrayList<>();
for(int i=1; i<=7; i++){
list.add(new Lotto());
}
System.out.println("搖槳中...");
try {
// 把 list 的內容全部取出來
// 回傳 List<Future<T>>
List<Future<Integer>> resultList = pool.invokeAll(list);
System.out.print("樂透數字 : ");
for(Future future : resultList){
System.out.print(future.get() + " ");
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 關閉執行緒池
pool.shutdown();
}
}

五、Fork/Join 框架
Fork/Join 框架是一個實作 ExecutorService 介面的多執行緒處理器。它可以把一個大任務劃分為若干個小的任務併發(Concurrency)執行,充分利用可用的資源,進而提高應用的執行效率。
Fork/Join 實作了 ExecutorService,所以它的任務也需要放在執行緒池執行。它的不同在於它使用了工作竊取(work-stealing)算法,完成自己的工作而處理空閒的工作執行緒能夠從其它仍然處理忙碌(Busy)狀態的工作執行緒處竊取等待執行的任務。Fork/Join 框架的核心是 ForkJoinPool 類別,它是對 AbstractExecutorService 類別的擴展,ForkJoinPool 實現了工作竊取算法,並可以執行 ForkJoinTask 任務。
Fork/Join 就是要讓一個大的任務分割成若干的小任務,等待小任務完成後,再整合結果,稱為分而治之(Divide and Conquer)。所以第一步當然要做任務的分割,大致方式如下:
// 執行要做的任務
} else {
// 將任務分割成兩個小部分
// 執行兩個小部分並等待執行結果
}
要實現 ForkJoinTask 我們需要一個類別繼承 RecursiveTask 或 RecursiveAction,並覆寫(Override) compute() 方法,依照上述的邏輯放進該方法內。RecursiveTask 和 RecursiveAction 都繼承了 ForkJoinTask,它們兩者的區別是 RecursiveTask 有返回值,而 RecursiveAction 沒有。
我們拿費式數列當作例子,因為費式數列的定義就是個遞迴,雖然使用遞迴不是最佳解法,但可以測試 Fork/Join 的效能。
傳統遞迴版本,建立 Fibonacci 類別
xxxxxxxxxx
/**
*
* @author Ethan
*/
public class Fibonacci {
public int fibonacci(int num){
if(num == 0 || num == 1){
return num;
}else{
return fibonacci(num-1) + fibonacci(num-2);
}
}
}
Fork/Join 版本,建立 FibonacciTask 類別,繼承 RecursiveTask 類別
xxxxxxxxxx
import java.util.concurrent.RecursiveTask;
/**
*
* @author Ethan
*/
class FibonacciTask extends RecursiveTask<Integer>{
private int num;
private int result;
public FibonacciTask(int num) {
this.num = num;
}
public int getResult(){
return result;
}
protected Integer compute() {
if(num < 25){
result = new Fibonacci().fibonacci(num);
}else{
FibonacciTask task1 = new FibonacciTask(num-1);
FibonacciTask task2 = new FibonacciTask(num-2);
task1.fork();
result = task2.compute() + task1.join();
}
return result;
}
}
做好了任務類別後,就可以開始調用了!
首先我們需要 ForkJoinPool 執行緒池,然後向執行緒池中提交一個 ForkJoinTask 並得到結果。
ForkJoinPool 的 invoke(ForkJoinTask<T> task) 方法為執行指定的任務,並返回結果 T 。
建立 ForkJoinDemo 類別,分別計算遞迴版本和 Fork/Join 版本的時間
xxxxxxxxxx
import java.util.Date;
import java.util.concurrent.ForkJoinPool;
/**
*
* @author Ethan
*/
public class ForkJoinDemo {
public static void main(String[] args) {
int num = 40;
// 傳統遞迴版本。
long t1 = new Date().getTime();
System.out.println(new Fibonacci().fibonacci(num) + " ");
long t2 = new Date().getTime();
System.out.println("花費時間 : " + (t2-t1) );
// Fork/Join 版本。計算大量資料時,才能顯示出來優勢。
long t3 = new Date().getTime();
int processors = Runtime.getRuntime().availableProcessors(); // 取得 CPU 數量
FibonacciTask task = new FibonacciTask(num);
ForkJoinPool pool = new ForkJoinPool(processors); // 建立 Fork/Join 執行緒池
System.out.println(pool.invoke(task) + " "); // 執行給定任務,完成後返回結果
long t4 = new Date().getTime();
System.out.println("花費時間 : " + (t4-t3) );
System.out.println("CPU 數量 : " + processors);
}
}

六、總結
使用 Object 類別內的方法與 synchronized 關鍵字來溝通多執行緒,但效能不好。
可以使用執行緒池管理多執行緒,此篇文章並沒有全面解析,只是介紹有此功能可以使用。
處理多執行緒必須要考慮到「執行緒安全(Thread-Safe)」的問題,所以 Java 有許多的用法與框架處理多執行緒,其內容無法使用一篇文章就能說明清楚,如要更深入,請繼續深入多執行緒的議題。
沒有留言:
張貼留言