2020年3月29日 星期日

Reactive Programming(三):Project Reactor

本系列的文章為 Modern Java in Action / Project Reactor 的讀書筆記,因此內容可能會有點跳躍。

在 Java 裡,Reactive Programming 實際上有不少選擇,例如 Akka、RxJava、Reactor 等。不過基於考量到與 Spring 的潛在相容性,以及理解後所需要的時間,最後還是選擇從 Reactor 上手了。因為 Spring WebFlux 看起來整個用詞也很像是延續 Reactor 來的,雖然說 Spring WebFlux 同樣也支援把底下的實作替換成其他函式庫,不過 Reactor 跟它應該是最親和的吧。

但話說回來,其實稍微看過 Reactor 和 RxJava 以後,會發現它們基本上幾乎可以說沒什麼不同,使用方法和概念等等的非常地相似。

Project Reactor

Reactor 中總共區分成 Publisher 和 Subscriber 兩種角色,合併起來才能完成整個流程。如果看得懂 Java 8 的 Lambda Expression 的話,應該會很容易理解之間的關係。

舉個簡單的例子:

Flux.fromIterable(Arrays.asList("red", "green", "blue"))
        .map(String::toUpperCase)
        .subscribe(color -> log.info("Color: {}", color));

這段程式碼雖然在還不了解 Reactor 的時候不能確定它是什麼意思,不過如果把 Flux 當成 Stream 類別來看的話,它的意思就相當明顯,就是對 Array 裡的每個元件都執行 toUpperCase(),然後最後輸出到 log。而這段 code 的執行結果也是跟 Stream 一樣:

Color: RED
Color: GREEN
Color: BLUE
Publisher: Flux 與 Mono

Reactor 將 Publisher 分為 Flux 與 Mono 兩種。簡要來說,就是如果是有 0-N 個東西組成的 stream;而 Mono 則是只有 0-1 個東西組成的 stream。

Flux 的生命週期會觸發 onNext()、onNext()、onNext()、….、onComplete()/onError()。也就是會有 0-N 次觸發 onNext(),直到 onComplete() 或者 onError 發生而停止 stream。其中在每次觸發 onNext() 時,stream 中的 item 就會通過 operator 的處理,變成 operator 輸出的東西。

Mono 則與 Flux 略為不同的地方在於,Mono 只會有 0-1 個 item,因此生命週期只會觸發最多一次 onNext(),接著就是 onComplete() 或者 onError() 結束了。

在 Publisher 的生命週期中,會把每個 event 送進宣告的 operator,每個 operator 都負責執行自己的任務。例如我們可以像在寫 Lambda Expression 一樣建立 .map() 的步驟,將 stream 中的項目從 A 樣子轉成 B 樣子;或者我們可以把 Flux<Object> 收集起來變成一個 Mono<Collection<Object>> 或反過來把 Mono<Collection<Object>> 展開變成 Flux<Object> 等等。

Subscriber

Publisher 可以交付給多個 Subscriber 訂閱,Subscriber 在完成訂閱以後,就會在可取得的範圍內去拉所有從 Publisher 送來的 event,並且執行宣告的動作。

舉例來說:

Flux<String> colorFlux = Flux.fromIterable(Arrays.asList("red", "green", "blue"))
        .map(String::toUpperCase)
        .map(color -> String.format("COLOR_%s", color));

colorFlux.subscribe(color -> {
    log.info("Subscriber #1: {}", color);
});

colorFlux.subscribe(color -> {
    log.info("Subscriber #2: {}", color);
});

在上面這段程式碼中,我們建立了一個 Publisher,這個 Publisher 的 stream 當中包含兩個 Operator,分別是變大寫和組 prefix,並且有兩個 Subscriber 分別訂閱了這個 Publisher。結果這段程式碼執行之後會是什麼結果呢?答案是:

Subscriber #1: COLOR_RED
Subscriber #1: COLOR_GREEN
Subscriber #1: COLOR_BLUE
Subscriber #2: COLOR_RED
Subscriber #2: COLOR_GREEN
Subscriber #2: COLOR_BLUE

在這裡暫時先不討論比較進階一點的話題,只看 Subscriber 的 subscribe() 動作。如上面所說,我們可以對同一個 Publisher 進行多次的 Subscriber 訂閱,每個 Subscriber 會以自己的步調拉完所有 Publish 可以提供的 event,這些 event 會各自走完所有宣告的 Operator 後,最後執行在 subscribe() 當中宣告的 onNext() 的程序(即當下一個 event 到來時要執行的程序)。而這整個程序都會是透過 event-driven 的形式非同步地觸發,但會因為 event loop 的關係,所以 event 依然會按照順序地出現在 Subscriber 這端。

除此之外,Subscriber 也可以選擇不要提供 onNext() 的程序,或者是額外再提供 onComplete()、onError()、onSubscribe() 等程序。

參考資料
  1. RxJava vs. Reactor

沒有留言: