2019年12月29日 星期日

Reactive Programming(二):Flow API

本系列的文章為 Modern Java in Action / Project Reactor 的讀書筆記,因此內容可能會有點跳躍。

Flow API

在 Java 9 當中,有了 Flow API [1] 這套函式庫,用以支援基本的 Reactive Programming 需要的功能,也就是 publish、subscribe、以及 backpressure 這幾個重要的特性。

Flow API 有以下這幾個介面:

  • Flow.Publisher<T>
  • Flow.Subscriber<T>
  • Flow.Subscription
  • Flow.Processor<T, R>

下面會分別介紹每個介面的功能與用途。

Publisher & Subscriber

Publisher 和 Subscriber 是標準的 Publish–Subscribe Pattern 的實作,由 Publisher 產生 event、並由一個或多個 Subscriber 消耗 event。以下是 Publisher 以及 Subscriber 的介面內容:

@FunctionalInterface
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}
Publisher 可以允許 Subscriber 透過 subscribe() 將自己註冊到 Publisher 當中,使 Publisher 發送 event 時能夠送給 Subscriber。不過在 Publisher 與 Subscriber 之間的規劃則是由 Subscription 在控制的。

public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

相較於相對單純的 Publisher,Subscriber 稍微複雜一點,有四個 callback 函式,字面上應該蠻淺顯地可以看出四個函式分別會在什麼時候產生動作:

  • onSubscribe() - 當訂閱 Publisher 時觸發。
  • onNext() - 當收到 event 時觸發,其中 T 就是 event 的型態。
  • onError() - 當發生錯誤時觸發,需要稍微留意的是這裡嚴格來說指的是「Publisher 發生某些錯誤,因而無法繼續發送事件」的意思。
  • onComplete() – 當 Publisher 通知資料已經完結時觸發。

整個 event 的處理流程必然會是這樣的程序:

onSubscribe onNext* (onError | onComplete)?

onSubscribe() 會在最開始被觸發一次,然後 onNext() 會觸發 0 到無限多次,最後觸發 onError() 或者 onComplete() 結束整個 event stream。

Subscription

當 Subscriber 向 Publisher 註冊的時候,Publisher 的第一個動作就是先調用 Subscriber 的 onSubscribe() 函式,此時會傳入一個 Subscription 物件。Subscription 介面的定義如下:

public interface Subscription {
    void request(long n);
    void cancel();
}

request() 函式用來通知 Publisher 說 Subscriber 已經準備好可以接收 n 個 event;cancel() 函式則用來取消訂閱,讓 Publisher 接下來不要再繼續發送 event 給這個 Subscriber。

Processor

Processor 介面的定義上只有如下的內容:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

它同時代表了 Publisher 和 Subscriber,可以用來串接多個 stream,作為整個 stream 當中的階段。舉例來說,當上游的 Publisher 觸發 onError() 時,它可以決定是否要修復錯誤、或者是要發送 onError() 的訊號給下游的 Subscriber。

Flow API 的實作?

眼尖的話可能會注意到,這裡在談的都是「介面」,那麼「實作」呢?實際上,Java 9 並沒有提供完整的 Flow API 實作,主要原因是因為這個介面誕生的時候,在 Java 的世界當中已經有多個 Reactive Library 可以使用了。因此 Flow API 將自己定位在標準化各種 Reactive Library 的規範,但實際的實作則由各個 Reactive Library 自行提供。

參考資料
  1. java.util.concurrent.Flow
  2. Modern Java in Action

沒有留言: