本系列的文章為 Modern Java in Action / Project Reactor 的讀書筆記,因此內容可能會有點跳躍。
在 Java 裡,Reactive Programming 實際上有不少選擇,例如 Akka、RxJava、Reactor 等。不過基於考量到與 Spring 的潛在相容性,以及理解後所需要的時間,最後還是選擇從 Reactor 上手了。因為 Spring WebFlux 看起來整個用詞也很像是延續 Reactor 來的,雖然說 Spring WebFlux 同樣也支援把底下的實作替換成其他函式庫,不過 Reactor 跟它應該是最親和的吧。
但話說回來,其實稍微看過 Reactor 和 RxJava 以後,會發現它們基本上幾乎可以說沒什麼不同,使用方法和概念等等的非常地相似。
Project Reactor
Reactor 中總共區分成 Publisher 和 Subscriber 兩種角色,合併起來才能完成整個流程。如果看得懂 Java 8 的 Lambda Expression 的話,應該會很容易理解之間的關係。
舉個簡單的例子:
1 2 3 |
Flux.fromIterable(Arrays.asList( "red" , "green" , "blue" )) .map(String::toUpperCase) .subscribe(color -> log.info( "Color: {}" , color)); |
這段程式碼雖然在還不了解 Reactor 的時候不能確定它是什麼意思,不過如果把 Flux 當成 Stream 類別來看的話,它的意思就相當明顯,就是對 Array 裡的每個元件都執行 toUpperCase(),然後最後輸出到 log。而這段 code 的執行結果也是跟 Stream 一樣:
1 2 3 |
Color: RED Color: GREEN Color: BLUE |
Publisher: Flux 與 Mono
Reactor 將 Publisher 分為 Flux 與 Mono 兩種。簡要來說,就是如果是有 0-N 個東西組成的 stream;而 Mono 則是只有 0-1 個東西組成的 stream。
Flux 的生命週期會觸發 onNext()、onNext()、onNext()、….、onComplete()/onError()。也就是會有 0-N 次觸發 onNext(),直到 onComplete() 或者 onError 發生而停止 stream。其中在每次觸發 onNext() 時,stream 中的 item 就會通過 operator 的處理,變成 operator 輸出的東西。
Mono 則與 Flux 略為不同的地方在於,Mono 只會有 0-1 個 item,因此生命週期只會觸發最多一次 onNext(),接著就是 onComplete() 或者 onError() 結束了。
在 Publisher 的生命週期中,會把每個 event 送進宣告的 operator,每個 operator 都負責執行自己的任務。例如我們可以像在寫 Lambda Expression 一樣建立 .map() 的步驟,將 stream 中的項目從 A 樣子轉成 B 樣子;或者我們可以把 Flux<Object> 收集起來變成一個 Mono<Collection<Object>> 或反過來把 Mono<Collection<Object>> 展開變成 Flux<Object> 等等。
Subscriber
Publisher 可以交付給多個 Subscriber 訂閱,Subscriber 在完成訂閱以後,就會在可取得的範圍內去拉所有從 Publisher 送來的 event,並且執行宣告的動作。
舉例來說:
1 2 3 4 5 6 7 8 9 10 11 |
Flux<String> colorFlux = Flux.fromIterable(Arrays.asList( "red" , "green" , "blue" )) .map(String::toUpperCase) .map(color -> String.format( "COLOR_%s" , color)); colorFlux.subscribe(color -> { log.info( "Subscriber #1: {}" , color); }); colorFlux.subscribe(color -> { log.info( "Subscriber #2: {}" , color); }); |
在上面這段程式碼中,我們建立了一個 Publisher,這個 Publisher 的 stream 當中包含兩個 Operator,分別是變大寫和組 prefix,並且有兩個 Subscriber 分別訂閱了這個 Publisher。結果這段程式碼執行之後會是什麼結果呢?答案是:
1 2 3 4 5 6 |
Subscriber # 1 : COLOR_RED Subscriber # 1 : COLOR_GREEN Subscriber # 1 : COLOR_BLUE Subscriber # 2 : COLOR_RED Subscriber # 2 : COLOR_GREEN Subscriber # 2 : COLOR_BLUE |
在這裡暫時先不討論比較進階一點的話題,只看 Subscriber 的 subscribe() 動作。如上面所說,我們可以對同一個 Publisher 進行多次的 Subscriber 訂閱,每個 Subscriber 會以自己的步調拉完所有 Publish 可以提供的 event,這些 event 會各自走完所有宣告的 Operator 後,最後執行在 subscribe() 當中宣告的 onNext() 的程序(即當下一個 event 到來時要執行的程序)。而這整個程序都會是透過 event-driven 的形式非同步地觸發,但會因為 event loop 的關係,所以 event 依然會按照順序地出現在 Subscriber 這端。
除此之外,Subscriber 也可以選擇不要提供 onNext() 的程序,或者是額外再提供 onComplete()、onError()、onSubscribe() 等程序。
沒有留言:
張貼留言