2018年12月25日 星期二

Java SE 入門教學 - 執行緒(2)

更新時間:12/25/2018

前言

在前篇 執行緒(1) 初次介紹了執行緒,並提出執行緒的競速情況(Race Condition)。在此篇文章會介紹簡單、靈活地操作執行緒,至於更深入的控制執行緒的剖析,將不在此篇討論。

在執行緒中的"鎖"提供了兩種主要特性:互斥(mutual exclusion)和可見性(visibility)。

互斥即一次只允許一個執行緒持有某個特定的鎖,因此可使用該特性實作對共享數據的協調訪問協議,這樣,一次就只有一個執行緒能夠使用該共享數據。

可見性要更加複雜一些,它必須確保釋放鎖之前對共享數據做出更改,對於隨後獲得該鎖的另一個執行緒是可見的。如果沒有同步機制提供的這種可見性保證,執行緒看到的共享變數可能是修改前的值或不一致的值,這將引發許多嚴重問題。


一、Object 類別的 wait() / notify() / notifyAll()

Object 類別的常見方法-wait/notify/notifyAll方法中,有稍微提及這個觀念,可以利用這三個方法實作執行緒的溝通。

在執行緒中調用 wait() 方法,將阻塞等待其他執行緒的通知;其他執行緒調用 notify() 方法或 notifyAll() 方法,將通知等待的執行緒再次進入可執行緒池中準備執行。

Object 是所有類別的根類別,它有 5 個方法組成了等待/通知機制的核心:notify()、notifyAll()、wait()、wait(long) 和 wait(long, int),這些方法必須寫在有「synchronized」關鍵字的同步化區塊中。

在 Java 中,所有的類別都從 Object 繼承而來,因此,所有的類別都擁有這些共有方法可供使用。而且,由於他們都被宣告為 final,因此在子類別中不能覆寫任何一個方法。

在多執行緒中,「生產者與消費者」是經典的問題,我們將使用這類問題闡述如何操作執行緒。

範例:小白狗(消費者)與它的主人(生產者)

建立 Cookies 類別

建立 Put 類別,實作 Runnable 介面。

建立 Eat 類別,實作 Runnable 介面。

建立 DogAndCookies 類別,測試 wait() 與 notify()


二、CountDownLatch 類別

CountDownLatch 類別在 java.util.concurrent 套件包中,一個同步輔助類別,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。用給定的計數初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await() 方法會一直受阻塞。之後,會釋放所有等待的執行緒,await() 的所有後續調用都將立即返回。這種操作只能一次(計數無法被重置)。一個執行緒(或者多個),等待另外 N 個執行緒完成某個事情之後才能執行。

範例:老師(消費者)等待學生(生產者)把功課交上來,才能批改學生作業

建立 Student 類別,實作 Runnable 介面。

建立 Teacher 類別,實作 Runnable 介面。

建立 CountDownLatchDemo 類別,測試一個執行緒等待多個執行緒完成工作。


三、CyclicBarrier 類別

CyclicBarrier 類別在 java.util.concurrent 套件包中,CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續幹活。CyclicBarrier 默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的執行緒數量,每個執行緒調用 await() 方法告訴 CyclicBarrier 我已經到達了屏障,然後當前執行緒被阻塞。

範例:有一場百米賽跑,當跑者準備好後,才一起出發

建立 Runner 類別,實作 Runnable 介面。

建立 CyclicBarrierDemo 類別,測試所有執行緒必須全部到達屏障後,才能繼續往下執行。


四、執行緒池(Executor 介面)

Executor 框架主要由三大部分組成和一個工廠類別

任務:包括被執行任務需要實作的介面 Runnable 介面或者 Callable 介面。

任務的執行:包括執行機制的核心介面 Executor,以及繼承 Executor 的 ExecutorService 介面。Executor 框架有兩個關鍵類別 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。

異步計算的結果:包括介面 Future 和實作 Future 介面的 FutureTask 類別。

工廠類別(Executors):使用靜態方法建立各種執行緒池。

Executor 框架的使用

主執行緒創建實作 Runnable 或者 Callable 介面的任務物件,工具類 Executors 可以把一個 Runnable 物件封裝成一個 Callable 物件,使用 Executors.callable(Runnable task)。然後交給 ExecutorService 用 execute 或者 submit 執行,前者不返回結果,後者可以返回實作了 Future 介面的物件(到目前為止返回的是 FutureTask 物件),FutureTask 實作了 Runnable,程序員可以創建 FutureTask,然後直接交給 ExecutorService。

最後主執行緒可以執行 FutureTask.get() 方法等待任務完成(阻塞方法),也可以使用 FutureTask.cancel(boolean mayInterruptIfRunning) 來取消任務的執行。

您會發現,使用 Executor 框架後,不需要創建 Thread 物件實例,而是使用工廠類別 Executors 創建,並把實作 Runnable 和 Callable 的類別丟進創建的執行緒池中統一管理。

方法 功能
Executors.newCachedThreadPool()
Executors.newCachedThreadPool(ThreadFactory threadFactory)
創建可緩存的執行緒池,如果執行緒池中的執行緒在60秒未被使用就將被移除,在執行新的任務時,當執行緒池中有之前創建的可用執行緒就重用可用執行緒,否則就新建一條執行緒,它是無上限容量的執行緒池,適用於執行很多的短期異步任務的小程序,或者負載較輕的服務器。
Executors.newFixedThreadPool(int nThreads)
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
創建可重用且固定執行緒數的執行緒池,如果執行緒池中的所有執行緒都處於活動狀態,此時再提交任務就在隊列中等待,直到有可用執行緒;如果執行緒池中的某個執行緒由於異常而結束時,執行緒池就會再補充一條新執行緒。適用於負載比較重的服務器。
Executors.newSingleThreadExecutor()
Executors.newSingleThreadExecutor(ThreadFactory threadFactory)
創建一個單執行緒的Executor,如果該執行緒因為異常而結束就新建一條執行緒來繼續執行後續的任務,適用於需要保證順序的執行各個任務,並且在任意時間點不會有多個執行緒時活動的場景。
Executors.newScheduledThreadPool(int corePoolSize)
Executors.newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
創建一個可延遲執行或定期執行的執行緒池,包含若干個執行緒。適用於需要多個後台程序執行週期的任務,同時為了滿足資源管理的需求而限制後台執行緒的數量的場景。
Executors.newSingleThreadScheduledExecutor()
Executors.newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
只包含一個執行緒的執行延遲或者定期的執行緒池。適用於需要單個後台執行緒執行週期任務,同時需要保證順序地執行各個任務的場景。

範例:把百米賽跑改成使用執行緒池


Callable<V> 介面

Callable 的作用與 Runnable 類似,可讓您定義想要的執行流程,差別是
  Runnable 的 run() 方法無法回傳值,也無法拋出例外;
   Callable 的 call() 方法可以回傳值,也可以拋出受檢例外。

範例:樂透彩,實作 Callable 介面並取得回傳值

創建一個 Lotto 類別,實作 Callable 介面

創建一個 ExecutorCallableDemo 類別,主程式中使用執行緒池,並使用 submit() 方法取得 Future 物件。

範例:樂透彩,測試 ExecutorService 的 invokeAll() 方法

修改 ExecutorCallableDemo 類別


五、Fork/Join 框架

Fork/Join 框架是一個實作 ExecutorService 介面的多執行緒處理器。它可以把一個大任務劃分為若干個小的任務併發(Concurrency)執行,充分利用可用的資源,進而提高應用的執行效率。

Fork/Join 實作了 ExecutorService,所以它的任務也需要放在執行緒池執行。它的不同在於它使用了工作竊取(work-stealing)算法,完成自己的工作而處理空閒的工作執行緒能夠從其它仍然處理忙碌(Busy)狀態的工作執行緒處竊取等待執行的任務。Fork/Join 框架的核心是 ForkJoinPool 類別,它是對 AbstractExecutorService 類別的擴展,ForkJoinPool 實現了工作竊取算法,並可以執行 ForkJoinTask 任務。

Fork/Join 就是要讓一個大的任務分割成若干的小任務,等待小任務完成後,再整合結果,稱為分而治之(Divide and Conquer)。所以第一步當然要做任務的分割,大致方式如下:

if(這個任務足夠小){
  // 執行要做的任務
} else {
  // 將任務分割成兩個小部分
  // 執行兩個小部分並等待執行結果
}

要實現 ForkJoinTask 我們需要一個類別繼承 RecursiveTask 或 RecursiveAction,並覆寫(Override) compute() 方法,依照上述的邏輯放進該方法內。RecursiveTask 和 RecursiveAction 都繼承了 ForkJoinTask,它們兩者的區別是 RecursiveTask 有返回值,而 RecursiveAction 沒有。

我們拿費式數列當作例子,因為費式數列的定義就是個遞迴,雖然使用遞迴不是最佳解法,但可以測試 Fork/Join 的效能。

傳統遞迴版本,建立 Fibonacci 類別

Fork/Join 版本,建立 FibonacciTask 類別,繼承 RecursiveTask 類別

做好了任務類別後,就可以開始調用了!
首先我們需要 ForkJoinPool 執行緒池,然後向執行緒池中提交一個 ForkJoinTask 並得到結果。
ForkJoinPool 的 invoke(ForkJoinTask<T> task) 方法為執行指定的任務,並返回結果 T 。

建立 ForkJoinDemo 類別,分別計算遞迴版本和 Fork/Join 版本的時間


六、總結

使用 Object 類別內的方法與 synchronized 關鍵字來溝通多執行緒,但效能不好。

可以使用執行緒池管理多執行緒,此篇文章並沒有全面解析,只是介紹有此功能可以使用。

處理多執行緒必須要考慮到「執行緒安全(Thread-Safe)」的問題,所以 Java 有許多的用法與框架處理多執行緒,其內容無法使用一篇文章就能說明清楚,如要更深入,請繼續深入多執行緒的議題。





沒有留言:

張貼留言