一、前言
Reactive Streams(反應式流)是一種用於處理非同步資料流的編程範式,旨在解決傳統同步處理在處理大量資料或高並發情境下的瓶頸。雖然 Java 平台本身並沒有直接將 Reactive Streams 作為核心 API 引入,但在 Java 生態中,許多框架和函式庫已經廣泛採用了 Reactive Streams 的概念和規範。Java 9 引入了
java.util.concurrent.Flow
API,才真正將反應式流的基礎概念納入 Java 標準庫。
本文將回顧 Java 7 至 17 期間,Reactive Streams 在 Java 世界的發展與演變,並探討其架構、相關工具以及程式碼範例。需要注意的是,Java 7 和 8 主要透過第三方函式庫(如 RxJava 和 Project Reactor)來利用 Reactive Streams,而 Java 9 才引入標準 API。
目錄
- 一、前言
- 二、Java 7 & 8:第三方函式庫的先行
-
三、Java 9:標準化
java.util.concurrent.Flow
API - 四、背壓(Backpressure)處理
- 五、Java 10-17:後續發展
- 六、總結
- 附錄
二、Java 7 & 8:第三方函式庫的先行
(2.1) 分析與架構
在 Java 7 和 8 時代,Reactive Streams 的主要實作由第三方函式庫提供,其中最知名的兩個是:
-
RxJava (Reactive Extensions for the JVM): 由 Netflix 開發,提供基於觀察者模式的反應式編程模型。RxJava 提供了豐富的操作符,方便開發者進行資料轉換、過濾、合併等操作。它的核心概念包括
Observable
(可觀察對象) 和Subscriber
(訂閱者)。 -
Project Reactor: 由 Pivotal 開發,是 Spring WebFlux 的基礎,提供
Mono
和Flux
兩種反應式類型。Mono
代表最多發送一個元素的非同步流,而Flux
則可以發送零個或多個元素。Reactor 強調背壓 (backpressure) 機制,避免生產者過快產生資料而導致消費端過載。
架構圖 (以 RxJava 為例):
- Producer (生產者): 發送資料的來源,例如資料庫查詢結果、網路請求等。
-
Observable (可觀察對象):
代表一個非同步資料流,可以發送零個或多個元素,並以
onComplete()
或onError()
終止。 -
Operator (操作符):
用於轉換、過濾、合併資料流的中間步驟,例如
map()
,filter()
,flatMap()
等。 - Subscriber (訂閱者): 接收並處理資料流的終端消費者。
(2.2) 工具與範例
-
RxJava: 開發者需要將 RxJava 的 JAR 檔案加入到專案的 classpath 中。
-
Project Reactor (Maven Dependency):
xxxxxxxxxx
1<dependency>
2<groupId>io.reactivex.rxjava3</groupId>
3<artifactId>rxjava</artifactId>
4<version>3.1.10</version>
5</dependency>
6<dependency>
7<groupId>org.reactivestreams</groupId>
8<artifactId>reactive-streams</artifactId>
9<version>1.0.4</version>
10</dependency>
11
程式碼範例 (RxJava):
xxxxxxxxxx
1import io.reactivex.rxjava3.core.Observable;
2import io.reactivex.rxjava3.disposables.Disposable;
3
4public class RxJavaExample {
5public static void main(String[] args) {
6Observable<Integer> numbers = Observable.just(1, 2, 3, 4, 5);
7
8Disposable subscription = numbers
9.filter(n -> n % 2 == 0) // 篩選偶數
10.map(n -> n * 2) // 每個偶數乘以 2
11.subscribe(
12System.out::println, // 消費者處理元素
13Throwable::printStackTrace, // 消費者處理錯誤
14() -> System.out.println("Completed") // 消費者完成時
15);
16
17// 透過 subscription.dispose(); 取消訂閱
18subscription.dispose();
19}
20}
21
輸出結果:
xxxxxxxxxx
14
28
3Completed
4
-
Project Reactor: 開發者需要將 Reactor 的 JAR 檔案加入到專案的 classpath 中。
-
Project Reactor (Maven Dependency):
xxxxxxxxxx
1<dependency>
2<groupId>io.projectreactor</groupId>
3<artifactId>reactor-core</artifactId>
4<version>3.7.2</version>
5</dependency>
6<dependency>
7<groupId>org.reactivestreams</groupId>
8<artifactId>reactive-streams</artifactId>
9<version>1.0.4</version>
10</dependency>
11
程式碼範例 (Reactor):
xxxxxxxxxx
1import reactor.core.publisher.Flux;
2
3public class ReactorExample {
4public static void main(String[] args) {
5Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
6
7numbers
8.filter(n -> n % 2 == 0)
9.map(n -> n * 2)
10.subscribe(
11System.out::println,
12Throwable::printStackTrace,
13() -> System.out.println("Completed")
14);
15
16// Reactor Flux 會自動終止
17}
18}
19
輸出結果:
xxxxxxxxxx
14
28
3Completed
4
三、Java 9:標準化
java.util.concurrent.Flow
API
(3.1) 分析與架構
Java 9 引入了
java.util.concurrent.Flow
API,將 Reactive Streams 的基本概念帶入了 Java 標準庫。這個 API 並不提供完整的反應式編程實作,而是定義了關鍵的介面,方便不同的反應式框架進行互操作。主要的介面包括:
-
Flow.Publisher<T>
: 代表資料發布者,可以發送零個或多個T
類型的元素。 -
Flow.Subscriber<T>
: 代表資料訂閱者,接收並處理T
類型的元素。 -
Flow.Subscription
: 代表Publisher
和Subscriber
之間的訂閱關係,用於請求元素或取消訂閱。 -
Flow.Processor<T, R>
: 同時扮演Publisher<R>
和Subscriber<T>
的角色,用於資料轉換。
架構圖 (Java 9 Flow API):
-
Publisher
發布資料。 -
Subscriber
透過Subscription
請求資料。 -
Subscription
管理資料流和背壓機制。
(3.2) 工具與範例
java.util.concurrent.Flow
並不提供直接實作,需要結合其他反應式函式庫使用。例如,可以使用 Reactor 來實作基於
Flow
的發布者和訂閱者。
我們這邊直接使用 Java SE 原生的程式碼,不依賴第三方函式庫(如 Project Reactor),來實作類似 Reactor 的反應式流功能。
使用 Java SE 原生程式碼實作 Reactor 的功能,會面臨一些挑戰和限制:
- 複雜度較高: 需要處理背壓 [1] 、非同步操作、錯誤處理等細節,程式碼會比較複雜。
- 缺乏豐富的操作符: Java SE 原生 API 只提供了基本的介面,您需要自己實作資料轉換、過濾、合併等操作符。
- 效能可能不如專業函式庫: 專業函式庫經過效能優化,原生實作可能難以達到相同的效能。
- 背壓實作: 處理背壓會需要較多的程式碼來控制資料流,這也是實作的核心挑戰之一。
Java SE 原生實作 (簡化版):
以下提供一個簡化的範例,使用 java.util.concurrent.Flow 和 ExecutorService 來實作類似 Flux 的功能,並著重於背壓處理:
xxxxxxxxxx
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class NativeReactiveExample {
// 簡易的 Publisher 實作
static class SimplePublisher<T> implements Flow.Publisher<T> {
private List<T> data;
private ExecutorService executor;
public SimplePublisher(List<T> data, ExecutorService executor) {
this.data = data;
this.executor = executor;
}
public void subscribe(Flow.Subscriber<? super T> subscriber) {
executor.submit(() -> {
SimpleSubscription<T> subscription = new SimpleSubscription<>(subscriber, data);
subscriber.onSubscribe(subscription);
});
}
}
// 簡易的 Subscription 實作
static class SimpleSubscription<T> implements Flow.Subscription {
private Flow.Subscriber<? super T> subscriber;
private List<T> data;
private AtomicInteger requested = new AtomicInteger(0);
private int currentIndex = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition dataAvailable = lock.newCondition();
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private AtomicBoolean isComplete = new AtomicBoolean(false); // 新增的標記
public SimpleSubscription(Flow.Subscriber<? super T> subscriber, List<T> data) {
this.subscriber = subscriber;
this.data = data;
}
public void request(long n) {
if (isCancelled.get())
return; //如果被取消了就不再發送資料
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("請求數量必須大於零。"));
return;
}
lock.lock();
try {
if (isCancelled.get())
return;
requested.addAndGet((int) n);
dataAvailable.signal();
} finally {
lock.unlock();
}
processData();
}
private void processData() {
lock.lock();
try {
while (requested.get() > 0 && currentIndex < data.size() && !isCancelled.get()) {
subscriber.onNext(data.get(currentIndex++));
requested.decrementAndGet();
}
if (currentIndex >= data.size() && !isCancelled.get() && isComplete.compareAndSet(false, true)) { // 只執行一次 onComplete
subscriber.onComplete();
}
} finally {
lock.unlock();
}
}
public void cancel() {
isCancelled.set(true);
lock.lock();
try {
dataAvailable.signal();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
// 資料來源
List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 執行緒池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 建立 Publisher
SimplePublisher<Integer> publisher = new SimplePublisher<>(numbers, executor);
// 建立 Subscriber
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicInteger receivedCount = new AtomicInteger(0);
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
subscription.request(2);
}
public void onNext(Integer item) {
System.out.println("Received: " + item);
if (receivedCount.incrementAndGet() % 2 == 0) {
subscription.request(2);
}
}
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
public void onComplete() {
System.out.println("Completed");
executor.shutdown();
}
};
//訂閱
publisher.subscribe(subscriber);
}
}
程式碼說明:
-
SimplePublisher
: -
實作 Flow.Publisher
介面。 - 使用 ExecutorService 來實現非同步發布資料。
- subscribe() 方法建立 SimpleSubscription 並啟動。
-
實作 Flow.Publisher
-
SimpleSubscription
: - 實作 Flow.Subscription 介面。
- 使用 requested 變數追蹤 Subscriber 請求的資料量。
- 使用 ReentrantLock 和 Condition 控制資料發送,處理背壓。
- request(long n) 方法處理 Subscriber 的資料請求。
- cancel() 方法處理取消訂閱的狀況。
- processData() : 在呼叫 subscriber.onComplete() 前,使用 isComplete.compareAndSet(false,true) 確保 onComplete 只會執行一次。
-
main() 方法:
- 建立資料列表,執行緒池。
- 建立 SimplePublisher 和 Flow.Subscriber 物件。
- Subscriber 每次收到兩個數值,才請求下兩個。
- 執行訂閱。
執行結果:
xxxxxxxxxx
onSubscribe
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Received: 6
Received: 7
Received: 8
Received: 9
Received: 10
Completed
四、背壓(Backpressure)處理
(4.1) 什麼是背壓?
在反應式流中,背壓(Backpressure)是一種 流量控制機制 ,用於解決生產者(Publisher)產生資料的速度快於消費者(Subscriber)處理速度的問題。如果生產者不斷產生資料,而消費者來不及處理,就會導致記憶體溢出、效能下降甚至應用崩潰。
背壓的目的是讓消費者能 主動控制 資料的接收速度,確保系統穩定。
(4.2) 背壓的運作方式與常見類型
背壓(Backpressure)是反應式流中一種至關重要的 流量控制機制 ,旨在解決生產者(Publisher)產生資料的速度快於消費者(Subscriber)處理速度的問題。若生產者持續高速產生資料,而消費者無法及時處理,將導致記憶體溢出、效能下降甚至應用程式崩潰。背壓的核心思想是允許消費者主動控制資料接收速率,確保系統的穩定性。
背壓如何運作?
背壓處理通常透過以下機制運作:
- 請求 (Request): 消費者明確地向生產者請求所需數量的資料,而不是被動接收。
- 回應 (Response): 生產者根據消費者的請求發送資料,不多發也不少發,確保資料傳輸的精確性。
- 控制 (Control): 消費者可以根據自身的處理能力,動態調整請求的頻率,從而控制資料流速,實現彈性化的流量控制。
簡單來說,背壓機制就是讓 消費者主動告知生產者,它能處理多少資料 。
常見的背壓類型
-
基於請求/確認 (Request/Ack):
- 運作方式: 消費者明確請求一定數量的資料,生產者發送資料後等待消費者確認接收。
- 特點: 這是最常見的背壓類型,能確保資料不會遺失。消費者在處理完一批資料後,才會請求下一批。
- 適用場景: 適用於需要嚴格資料保證的場景。
-
緩衝 (Buffering):
- 運作方式: 生產者將數據緩衝起來,等待消費者處理。
- 特點: 需要管理緩衝區大小和溢出情況。當緩衝區滿時,可能需要採取丟棄舊資料或阻塞生產者的措施。
- 適用場景: 適用於消費者處理速度不穩定,但可以接受一定延遲的場景。
-
丟棄 (Dropping):
- 運作方式: 生產者直接丟棄無法立即處理的數據。
- 特點: 簡單直接,但可能導致資料遺失。
- 適用場景: 適用於可以容忍資料遺失的情況,如監控資料。
-
回壓 (Backpressure):
- 運作方式: 消費者通知生產者減慢資料產生速度。
- 特點: 消費者主動控制生產者的發送速率,避免過載。
- 適用場景: 適用於消費者處理能力有限,且需要保持系統穩定的場景。
小結
背壓是反應式流中一個重要的概念,它可以幫助開發者更好地處理非同步的資料流,並確保系統的穩定性和效能。不同的背壓類型適用於不同的場景,開發者需要根據具體的需求選擇合適的策略。在實務上,請求/確認 (Request/Ack) 和回壓 (Backpressure) 是最常用的背壓策略,能夠有效控制資料流,避免生產者過快發送資料導致消費者過載。
(4.3) 使用 Java SE 原生 API 實作背壓
Java 9 引入的 java.util.concurrent.Flow API 提供了 Flow.Subscription 介面,其中包含了 request(long n) 方法,允許 Subscriber 控制發布者發送的元素數量,並以此為基礎來支援背壓機制。
我們將使用 Java SE 原生 API(主要使用 java.util.concurrent.Flow 和相關類別)提供一個更專注於背壓處理的範例。這個範例將更清楚地展示消費者如何控制生產者的資料流速。
目標:
-
建立一個生產者,產生大量資料:
- SimplePublisher 類別負責產生資料,其中 totalElements 參數控制了資料的總量。
- 它透過 ExecutorService 在另一個執行緒中執行資料的產生和發送。
-
建立一個消費者,以較慢的速度處理資料:
- 在 main 方法中建立的 Flow.Subscriber 匿名類別,模擬了消費者的行為。
- Thread.sleep() 和隨機暫停時間,模擬了消費者處理資料的緩慢速度,以及隨機發生的壅塞狀況。
-
使用背壓處理,基於請求/確認的背壓,讓生產者根據消費者的處理能力調整資料產生速度:
- 請求: 消費者透過 subscription.request(n) 發出明確的請求。
- 生產者根據請求發送: SimpleSubscription 裡的 processData() 方法根據 requested 變數決定發送的資料量,它不會發送超出消費者請求的資料,它會持續發送直到請求被滿足或者資料全部發送完畢。
- 確認: 消費者收到資料後,會再次呼叫 request ,表示已處理完接收的資料,並可以接收新的資料。
- 背壓: 當消費者處理速度慢時,它請求資料的速度也會變慢,requested 會維持在較低水平,生產者也會跟著減慢速度。
-
只使用 Java SE 原生 API:
- 程式碼中只使用了 java.util.concurrent.Flow 及其相關類別(例如 ExecutorService、ReentrantLock、Condition)等 Java SE 原生的 API,沒有依賴任何第三方函式庫。
程式碼範例:
xxxxxxxxxx
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class NativeBackpressureExample {
// 生產者 (Publisher) 實作
static class SimplePublisher implements Flow.Publisher<Integer> {
private final int totalElements;
private final ExecutorService executor;
public SimplePublisher(int totalElements, ExecutorService executor) {
this.totalElements = totalElements;
this.executor = executor;
}
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
executor.submit(() -> {
SimpleSubscription subscription = new SimpleSubscription(subscriber, totalElements);
subscriber.onSubscribe(subscription);
});
}
}
// 訂閱者 (Subscription) 實作
static class SimpleSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super Integer> subscriber;
private final int totalElements;
private final AtomicInteger requested = new AtomicInteger(0); // 追蹤消費者請求的數量
private int currentIndex = 0;
private ReentrantLock lock = new ReentrantLock();
private Condition dataAvailable = lock.newCondition();
private AtomicInteger isCancelled = new AtomicInteger(0);
public SimpleSubscription(Flow.Subscriber<? super Integer> subscriber, int totalElements) {
this.subscriber = subscriber;
this.totalElements = totalElements;
}
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("請求數量必須大於零。"));
return;
}
lock.lock();
try{
if(isCancelled.get() ==1) return;
requested.addAndGet((int)n);
dataAvailable.signal();
}finally{
lock.unlock();
}
processData();
}
private void processData() {
lock.lock();
try{
while(requested.get() >0 && currentIndex < totalElements && isCancelled.get() ==0){
subscriber.onNext(currentIndex++);
requested.decrementAndGet();
}
if (currentIndex >= totalElements && isCancelled.get() ==0) {
subscriber.onComplete();
}
}finally {
lock.unlock();
}
}
public void cancel() {
isCancelled.set(1);
lock.lock();
try{
dataAvailable.signal();
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
int totalElements = 100; // 生產者產生 100 個資料
ExecutorService executor = Executors.newFixedThreadPool(2);
// 建立生產者
SimplePublisher publisher = new SimplePublisher(totalElements, executor);
// 建立消費者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicInteger receivedCount = new AtomicInteger(0);
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
subscription.request(2);
}
public void onNext(Integer item) {
System.out.println("Consumer received: " + item);
try {
Thread.sleep(100); // 模擬消費者處理資料很慢
if (receivedCount.incrementAndGet() % 2 ==0){
subscription.request(2);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
subscriber.onError(e);
}
}
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
public void onComplete() {
System.out.println("Completed.");
executor.shutdown();
}
};
// 訂閱
publisher.subscribe(subscriber);
}
}
程式碼結構:
-
SimplePublisher
(生產者):-
實作
Flow.Publisher<Integer>
介面,負責產生資料。 -
使用
totalElements
指定產生資料的總數。 -
使用
ExecutorService
異步發佈資料。 -
在
subscribe()
中,創建SimpleSubscription
並與Subscriber
關聯。
-
實作
-
SimpleSubscription
(訂閱關係):-
實作
Flow.Subscription
介面,管理生產者與消費者之間的訂閱關係。 -
requested
(請求計數): 使用AtomicInteger
追蹤消費者請求但尚未發送的資料量。 -
currentIndex
(資料索引): 記錄目前已發送資料的索引。 -
同步控制:
使用
ReentrantLock
和Condition
來同步資料發送。 -
取消訂閱:
使用
isCancelled
來控制是否取消發送資料。 -
request(long n)
(請求處理): 處理消費者發出的資料請求,並將請求數累加到requested
,同時觸發processData()
發送資料。 -
processData()
(資料發送): 根據requested
數量發送資料,並確保subscriber.onComplete()
只執行一次,同時處理資料發送完成的情況。 -
cancel()
(取消處理) 處理訂閱者取消訂閱的狀況。
-
實作
-
main
方法 (主要邏輯):-
設定
totalElements
,創建SimplePublisher
生產者。 -
創建
Flow.Subscriber
消費者,並在onNext()
中:- 模擬消費者處理數據的速度,並使用隨機數來模擬雍塞情況。
- 判斷是否發送請求。
-
在
onSubscribe()
中,發送第一個request
請求。 -
使用
publisher.subscribe(subscriber)
進行訂閱。
-
設定
背壓處理的核心邏輯:
-
Flow.Subscription
的request(long n)
: 消費者透過此方法請求n
個資料。 -
SimpleSubscription
的requested
變數: 追蹤消費者請求的數據數量。 -
SimpleSubscription
的processData()
: 生產者根據requested
的數量發送資料,保證不會超過消費者請求的量。 -
消費者控制請求:
消費者在接收到數據之後,才會透過
request
發出新的請求,控制資料流速,同時透過模擬壅塞,展現背壓機制。
觀察背壓效果:
- 執行程式碼。
-
觀察輸出結果:
-
您會看到程式碼首先輸出
onSubscribe: initial request for 2
,表示消費者在訂閱時發出了初始請求,請求生產者發送 2 個資料。 -
接著,您會看到消費者陸續收到資料(
Consumer received: x
),每次收到資料的數量不一定,因為在消費者端模擬了隨機的壅塞情況。 -
在處理完兩個數據後,消費者會發出新的請求,訊息
Consumer request more data
表示消費者請求更多的數據,而生產者會根據消費者的請求,每次發送 2 個數值。 -
您會看到
Consumer is sleeping for xxxms, (Simulating Congestion)
,這表示消費者在處理資料時,隨機地模擬了資料壅塞的情況。模擬壅塞期間,消費者不會向生產者請求資料,藉此展示背壓效果。 -
最後,當生產者將全部資料都傳送完畢,並且沒有新的請求,您只會看到一個
Completed.
訊息,表示整個資料串流已經順利完成。
-
您會看到程式碼首先輸出
-
調整隨機壅塞機率:
您可以調整
random.nextInt(5) == 0
中的數值,來控制消費者模擬壅塞的機率。 數值越小,代表越容易出現壅塞狀況,您就能看到更多次數的隨機暫停,且資料傳輸速度更慢。 -
調整睡眠時間:
您可以調整
random.nextInt(200) + 50
的值,來調整消費者模擬壅塞時的暫停時間,數值越大,代表暫停時間越長,您就能看到資料傳輸的速度越慢。
執行結果:
xxxxxxxxxx
onSubscribe: initial request for 2
Consumer received: 0
Consumer received: 1
Consumer request more data
Consumer received: 2
Consumer received: 3
Consumer request more data
Consumer received: 4
Consumer received: 5
Consumer request more data
Consumer received: 6
Consumer received: 7
Consumer request more data
Consumer received: 8
Consumer received: 9
Consumer request more data
Consumer received: 10
Consumer received: 11
Consumer request more data
Consumer received: 12
Consumer received: 13
Consumer is sleeping for 238ms, (Simulating Congestion)
Consumer request more data
Consumer received: 14
Consumer received: 15
Consumer request more data
Consumer received: 16
Consumer received: 17
Consumer request more data
Consumer received: 18
Consumer received: 19
Consumer request more data
Consumer received: 20
Consumer received: 21
Consumer request more data
Consumer received: 22
Consumer received: 23
Consumer request more data
Consumer received: 24
Consumer received: 25
Consumer request more data
Consumer received: 26
Consumer received: 27
Consumer request more data
Consumer received: 28
Consumer is sleeping for 209ms, (Simulating Congestion)
Consumer received: 29
Consumer request more data
Consumer received: 30
Consumer received: 31
Consumer request more data
Consumer received: 32
Consumer received: 33
Consumer request more data
Consumer received: 34
Consumer is sleeping for 204ms, (Simulating Congestion)
Consumer received: 35
Consumer request more data
Consumer received: 36
Consumer received: 37
Consumer is sleeping for 86ms, (Simulating Congestion)
Consumer request more data
Consumer received: 38
Consumer is sleeping for 226ms, (Simulating Congestion)
Consumer received: 39
Consumer request more data
Consumer received: 40
Consumer is sleeping for 128ms, (Simulating Congestion)
Consumer received: 41
Consumer is sleeping for 214ms, (Simulating Congestion)
Consumer request more data
Consumer received: 42
Consumer received: 43
Consumer request more data
Consumer received: 44
Consumer received: 45
Consumer request more data
Consumer received: 46
Consumer received: 47
Consumer request more data
Consumer received: 48
Consumer received: 49
Consumer request more data
Consumer received: 50
Consumer received: 51
Consumer request more data
Consumer received: 52
Consumer received: 53
Consumer request more data
Consumer received: 54
Consumer received: 55
Consumer request more data
Consumer received: 56
Consumer received: 57
Consumer request more data
Consumer received: 58
Consumer received: 59
Consumer request more data
Consumer received: 60
Consumer received: 61
Consumer request more data
Consumer received: 62
Consumer received: 63
Consumer request more data
Consumer received: 64
Consumer received: 65
Consumer request more data
Consumer received: 66
Consumer received: 67
Consumer request more data
Consumer received: 68
Consumer received: 69
Consumer request more data
Consumer received: 70
Consumer received: 71
Consumer is sleeping for 141ms, (Simulating Congestion)
Consumer request more data
Consumer received: 72
Consumer received: 73
Consumer is sleeping for 247ms, (Simulating Congestion)
Consumer request more data
Consumer received: 74
Consumer received: 75
Consumer request more data
Consumer received: 76
Consumer received: 77
Consumer request more data
Consumer received: 78
Consumer received: 79
Consumer request more data
Consumer received: 80
Consumer received: 81
Consumer is sleeping for 201ms, (Simulating Congestion)
Consumer request more data
Consumer received: 82
Consumer received: 83
Consumer request more data
Consumer received: 84
Consumer received: 85
Consumer request more data
Consumer received: 86
Consumer is sleeping for 143ms, (Simulating Congestion)
Consumer received: 87
Consumer is sleeping for 154ms, (Simulating Congestion)
Consumer request more data
Consumer received: 88
Consumer received: 89
Consumer request more data
Consumer received: 90
Consumer is sleeping for 96ms, (Simulating Congestion)
Consumer received: 91
Consumer request more data
Consumer received: 92
Consumer is sleeping for 240ms, (Simulating Congestion)
Consumer received: 93
Consumer request more data
Consumer received: 94
Consumer is sleeping for 241ms, (Simulating Congestion)
Consumer received: 95
Consumer request more data
Consumer received: 96
Consumer received: 97
Consumer is sleeping for 82ms, (Simulating Congestion)
Consumer request more data
Consumer received: 98
Consumer received: 99
Consumer request more data
Completed.
(4.5) Flow.Processor 的應用
Flow.Processor
介面在
java.util.concurrent.Flow
API 中扮演著雙重角色,它既是
Publisher<R>
(可以發布資料),又是
Subscriber<T>
(可以訂閱資料)。這使得
Flow.Processor
非常適合用於資料轉換、過濾和路由等場景。
(4.5.1)
Flow.Processor
的概念
簡單來說,
Flow.Processor
就像一個資料處理的中繼站。它可以:
-
訂閱一個
Publisher<T>
: 接收上游的資料流。 - 處理資料: 對接收到的資料進行轉換、過濾或任何其他操作。
-
發佈一個
Publisher<R>
: 將處理後的資料傳遞給下游的Subscriber
。
其中
T
代表輸入資料的類型,
R
代表輸出資料的類型。
Flow.Processor
可以讓你將複雜的資料處理邏輯串接起來,形成一個處理管道。
(4.5.2) 使用範例
由於
java.util.concurrent.Flow
API 本身不提供
Processor
的實作,我們需要使用第三方函式庫來實現
Processor
。這裡我們使用 Reactor 函式庫來示範,因為 Reactor 提供了
Flux
和
Mono
,它們都實作了
Flow.Publisher
和
Flow.Subscriber
,所以可以當作
Flow.Processor
來使用。
-
Project Reactor: 開發者需要將 Reactor 的 JAR 檔案加入到專案的 classpath 中。
-
Project Reactor (Maven Dependency):
xxxxxxxxxx
1<dependency>
2<groupId>io.projectreactor</groupId>
3<artifactId>reactor-core</artifactId>
4<version>3.7.2</version>
5</dependency>
6<dependency>
7<groupId>org.reactivestreams</groupId>
8<artifactId>reactive-streams</artifactId>
9<version>1.0.4</version>
10</dependency>
11
程式碼範例 (使用 Reactor 的 Flux 作為 Processor):
xxxxxxxxxx
import reactor.core.publisher.Flux;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import java.util.concurrent.Flow;
public class ReactorProcessorExample {
public static void main(String[] args) {
// 建立 Publisher (資料來源)
Flux<Integer> source = Flux.just(1, 2, 3, 4, 5);
// 使用 Flux 作為 Processor 來轉換資料
Flux<String> processor = source
.filter(n -> n % 2 == 0) // 過濾偶數
.map(n -> "Number: " + n); // 轉換為字串
// 建立 Java SE 原生 Flow.Subscriber
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
this.subscription = subscription;
subscription.request(2); // 請求 2 個元素
}
public void onNext(String item) {
System.out.println("Received: " + item);
// 處理完一個數據後,再請求兩個,實作背壓
subscription.request(2);
}
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
public void onComplete() {
System.out.println("Completed.");
}
};
// 使用 Reactor BaseSubscriber 橋接
processor.subscribe(new BaseSubscriber<String>() {
protected void hookOnSubscribe(Subscription subscription) {
//將 Reactor 的 Subscription 轉換為 Java SE 原生的 Subscription 物件
subscriber.onSubscribe(new Flow.Subscription() {
public void request(long n) {
subscription.request(n); // 將請求傳遞給 Reactor 的 Publisher
}
public void cancel() {
subscription.cancel(); // 將取消傳遞給 Reactor 的 Publisher
}
});
}
protected void hookOnNext(String value) {
// 將 Reactor 發布的資料,傳遞給 Java SE 原生的 Subscriber
subscriber.onNext(value);
}
protected void hookOnError(Throwable throwable) {
// 將 Reactor 發布的錯誤,傳遞給 Java SE 原生的 Subscriber
subscriber.onError(throwable);
}
protected void hookOnComplete() {
// 將 Reactor 發布的完成事件,傳遞給 Java SE 原生的 Subscriber
subscriber.onComplete();
}
});
}
}
程式碼說明:
-
建立資料來源 (
source
) : 使用Flux.just()
建立一個包含整數的Flux
。 -
使用
Flux
作為Processor
:-
利用
filter(n -> n % 2 == 0)
過濾出偶數。 -
利用
map(n -> "Number: " + n)
將偶數轉換為字串。 -
processor
同時扮演了Subscriber<Integer>
(訂閱source
) 和Publisher<String>
(發布轉換後的資料)。
-
利用
-
建立
Subscriber
: 建立一個匿名類別實作Flow.Subscriber
,用於接收processor
發布的資料。 -
訂閱
Processor
: 使用processor.subscribe(subscriber)
將subscriber
訂閱到processor
。
執行結果:
xxxxxxxxxx
Received: Number: 2
Received: Number: 4
Completed.
刻意橋接 Java SE 原生 Flow API 和 Reactor 的反應式流系統:
- 我們在範例中,為了使用 java.util.concurrent.Flow.Subscriber (Java SE 原生),必須使用 BaseSubscriber 作為橋樑,手動地將 Reactor 的 org.reactivestreams.Subscription 轉換為 java.util.concurrent.Flow.Subscription,並手動地調用 subscriber 物件的方法。這是一個額外的、非自然的步驟,並非 Reactor 的建議用法。
- 學習和理解 Java SE 原生的 java.util.concurrent.Flow API,並希望看到它如何與 Reactor 這樣的第三方函式庫互動。透過刻意橋接,這對於理解反應式流的底層原理和如何整合不同的反應式系統是有幫助的,可以更深入地了解這些 API 的運作方式和局限性。
Reactor 的設計理念: Reactor 是一個成熟的反應式函式庫,它有自己的一套訂閱和背壓機制。Reactor 主要使用 org.reactivestreams 包下的介面,例如 org.reactivestreams.Publisher 和 org.reactivestreams.Subscriber,並使用 Reactor 內建的類型(例如 Mono 和 Flux)。Reactor 的設計原則是希望開發者使用 Reactor 本身提供的元件和 API,以便更好地享受 Reactor 帶來的便利。
(4.6) 完善錯誤處理說明
在反應式流中,錯誤處理是非常重要的一環。由於資料流是非同步的,傳統的
try-catch
機制可能無法很好地處理錯誤。因此,我們需要使用反應式框架提供的機制來處理錯誤。
(4.6.1) 錯誤處理的重要性
在反應式流中,如果沒有妥善處理錯誤,可能會導致以下問題:
-
資料流中斷:
未處理的錯誤可能會導致資料流過早終止,使得下游的
Subscriber
無法接收到後續的資料。 - 應用程式崩潰: 如果錯誤沒有被捕獲,可能會導致應用程式崩潰。
- 難以追蹤錯誤: 由於錯誤是非同步發生的,如果沒有好的記錄和處理機制,會很難追蹤錯誤的來源。
因此,在反應式流中,我們需要使用反應式框架提供的機制來處理錯誤,確保資料流的穩定性,避免應用程式崩潰,並方便追蹤錯誤。
(4.6.2) Reactor 的錯誤處理機制
由於程式碼中使用的是 Reactor 最基礎的錯誤處理方式,所以我們直接使用
subscribe
方法來處理錯誤,
subscribe
方法的簽名如下:
xxxxxxxxxx
subscribe(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Runnable onComplete)
它接受三個函數介面作為參數:
-
Consumer<? super T> onNext
: 用來處理正常資料的程式碼。 -
Consumer<? super Throwable> onError
: 用來處理錯誤的程式碼。 -
Runnable onComplete
: 用來處理資料流完成的程式碼。
在
subscribe
方法裡面,我們可以透過 Lambda 表達式,來處理資料、錯誤和完成事件。
xxxxxxxxxx
processor.subscribe(
item -> System.out.println("Received: " + item), // onNext
throwable -> { System.err.println("Error: " + throwable.getMessage());
throwable.printStackTrace();}, // onError
() -> System.out.println("Completed.") //onComplete
);
在以上的範例中,我們使用了 Lambda 表達式,來分別處理
onNext
,
onError
, 和
onComplete
的事件。
(4.6.3) 使用範例
以下程式碼範例展示了如何使用 Reactor 的錯誤處理操作符來處理資料流中的錯誤。
-
Project Reactor: 開發者需要將 Reactor 的 JAR 檔案加入到專案的 classpath 中。
-
Project Reactor (Maven Dependency):
xxxxxxxxxx
1<dependency>
2<groupId>io.projectreactor</groupId>
3<artifactId>reactor-core</artifactId>
4<version>3.7.2</version>
5</dependency>
6<dependency>
7<groupId>org.reactivestreams</groupId>
8<artifactId>reactive-streams</artifactId>
9<version>1.0.4</version>
10</dependency>
11
xxxxxxxxxx
import reactor.core.publisher.Flux;
import java.util.List;
public class ReactorProcessorErrorHandlingExample {
public static void main(String[] args) {
// 建立一個會拋出錯誤的 Publisher
Flux<Object> source = Flux.fromIterable(List.of(1, 2, 0, 4, 5))
.map(i -> {
if (i == 0) {
throw new ArithmeticException("/ by zero");
}
return 100 / i;
})
.log("source")
.map(n -> "Number: " + n);
// 使用 Flux 作為 Processor 並處理錯誤
Flux<Object> processor = source;
processor.subscribe(
item -> System.out.println("Received: " + item),
throwable -> {
System.err.println("Error: " + throwable.getMessage());
throwable.printStackTrace();
},
() -> System.out.println("Completed."));
}
}
執行結果:
xxxxxxxxxx
[ INFO] (main) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(100)
Received: Number: 100
[ INFO] (main) | onNext(50)
Received: Number: 50
[ERROR] (main) | onError(java.lang.ArithmeticException: / by zero)
[ERROR] (main) - java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
at ReactorProcessorErrorHandlingExample.lambda$main$0(ReactorProcessorErrorHandlingExample.java:10)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:402)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:291)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:9012)
at reactor.core.publisher.Flux.subscribe(Flux.java:8856)
at reactor.core.publisher.Flux.subscribe(Flux.java:8780)
at ReactorProcessorErrorHandlingExample.main(ReactorProcessorErrorHandlingExample.java:18)
Error: / by zero
java.lang.ArithmeticException: / by zero
at ReactorProcessorErrorHandlingExample.lambda$main$0(ReactorProcessorErrorHandlingExample.java:10)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxIterable$IterableSubscription.fastPath(FluxIterable.java:402)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:291)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.LambdaSubscriber.onSubscribe(LambdaSubscriber.java:119)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:9012)
at reactor.core.publisher.Flux.subscribe(Flux.java:8856)
at reactor.core.publisher.Flux.subscribe(Flux.java:8780)
at ReactorProcessorErrorHandlingExample.main(ReactorProcessorErrorHandlingExample.java:18)
說明:
-
你現在的程式碼透過
subscribe
的 Lambda 表達式,捕捉了ArithmeticException
,並印出了堆疊追蹤。 - 我們移除了所有與非同步相關的操作,讓程式碼回到最基本的層面,方便我們觀察資料流與 exception 處理流程。
五、Java 10-17:後續發展
Java 10 到 Java 17 並沒有直接針對 Reactive Streams 的核心 API(
java.util.concurrent.Flow
)進行重大更新。不過,這段期間 Java 平台在效能、語法特性和非同步 API 的改進,間接地提升了 Reactive Streams 的開發體驗和應用效能。
(5.1) 框架生態的持續發展
隨著 Java 版本更新,RxJava 和 Project Reactor 等反應式框架持續演進,提供更豐富的功能、更好的效能和更便捷的 API。這些框架不斷精進,以滿足現代應用開發的需求,並且更完善地支援
java.util.concurrent.Flow
API。
框架的演進主要體現在以下方面:
-
更豐富的操作符:
新版本通常會增加更多操作符,方便處理複雜的非同步資料流,例如時間相關的操作符(
delay
,throttle
)和錯誤處理的操作符(retry
,onErrorResume
)。 - 效能優化: 透過底層實作的優化,例如更有效率的記憶體管理和非同步任務排程,提高反應式流的處理速度。
- API 便利性: 框架持續精進 API 設計,提供更簡潔易用的介面,加速開發流程。
(5.2) Java 平台改進對 Reactive Streams 的影響
雖然 Java 10-17 沒有直接修改
java.util.concurrent.Flow
API,但平台層面的改進依然間接影響了 Reactive Streams 的開發和應用:
- JVM 效能優化: 歷代 Java 版本中,JVM 不斷進行效能優化,例如垃圾回收器的改進和 JIT 編譯器的優化,這些優化也能提升反應式流的執行效能。
-
非同步 API 的增強:
例如 Java 11 中引入的
HttpClient
,可以更方便地發起非同步網路請求,這些 API 可以更容易地整合到反應式流中,擴展了 Reactive Streams 的應用場景。 -
更好的併發支援:
Java 標準庫中併發工具的改進,例如
CompletableFuture
等,在某些場景下也能與 Reactive Streams 更好地協同工作,提供更高效的非同步處理方案。
六、總結
從 Java 7 到 17,Reactive Streams 的發展經歷了一個從第三方函式庫主導到標準化的過程。Java 7 和 8 主要依賴 RxJava 和 Project Reactor 等函式庫;Java 9 引入了
java.util.concurrent.Flow
API,定義了 Reactive Streams 的核心介面,實現了標準化;而 Java 10-17 在此基礎上提供了持續優化,並由生態系統不斷完善。
總而言之,Reactive Streams 的引入,讓 Java 開發者能夠更好地應對高並發、大量資料流的挑戰,透過非同步和背壓機制,提高了應用程式的響應性和效能。隨著 Java 和相關生態系統的不斷演進,Reactive Streams 將在未來持續發揮重要作用。
附錄
-
「背壓處理」(Backpressure)
在反應式編程中,背壓(Backpressure)是一種流量控制機制,用於處理生產者(Publisher)產生資料的速度比消費者(Subscriber)能夠處理的速度更快的情況。想像一下,一個水龍頭(生產者)不斷地流出水,而水桶(消費者)卻無法在水龍頭流出的速度下及時接住水,這時候就需要一些機制來控制水流,避免水桶溢出。
在反應式流中,如果生產者產生資料的速度太快,而消費者來不及處理,就會導致以下問題:
- 記憶體溢出(Out of Memory): 消費者需要緩衝還沒處理的資料,如果緩衝區太大,可能會導致記憶體溢出。
- 效能下降: 過多的未處理資料會佔用系統資源,導致效能下降。
- 應用崩潰: 如果記憶體溢出或資源耗盡,應用可能會崩潰。
背壓處理的目的:
背壓處理的目的是確保消費者不會被過多的資料淹沒,而是以其能處理的速度接收資料。它允許消費者通知生產者減慢或暫停資料的產生,直到消費者準備好接收更多資料。
背壓處理的運作方式:
反應式流中的背壓處理通常透過以下方式運作:
- 請求 (Request): 消費者明確地向生產者請求資料。消費者不是被動地接收資料,而是主動要求。
- 生產者回應該請求: 生產者根據消費者的請求來發送資料,不多發,也不少發。
- 消費者控制請求頻率: 消費者可以根據自己的處理能力,調整請求資料的頻率。
背壓處理的類型:
在實務上,背壓處理有幾種常見的策略:
- 請求/確認(Request/Ack): 消費者發送請求後,生產者發送數據,消費者確認接收數據,這是一種較為嚴格的背壓。
- 緩衝(Buffering): 生產者將數據緩衝起來,等待消費者處理,但緩衝區的大小有限,需要管理溢出的情況。
- 丟棄(Dropping): 生產者直接丟棄無法立即處理的數據。
- 回壓(Backpressure): 消費者通知生產者減慢資料產生速度。
為什麼需要背壓?背壓處理在以下情境中尤其重要:
- 高並發系統: 大量並發請求可能導致生產者產生大量數據。
- I/O 密集操作: 消費者處理 I/O 操作(如資料庫、網路)可能會很慢。
- 資料來源速度不一致: 生產者的速度可能快於消費者,此時就需要背壓處理來確保系統穩定。
沒有留言:
張貼留言