2013年9月12日 星期四

實作流量限制:Token bucket algorithm

流量限制是蠻平常的網路傳輸的功能~不過實作上其實也不是那麼單純
像是雖然可以靠暫停執行緒的方式去控制平均流量,但一般來說除非是一個 byte 一個 byte 接收或傳送
否則通常很難剛好跑到指定的流量時就讓它暫時停止。
然後這時就會有一些相關的衍生問題,例如這一秒多傳的 byte 數目,下一秒是不是要扣回來?是不是需要控制瞬間最大流量?

在 [1] 的討論中,有提到基本的流量控制的程式碼,同時也有人提到「Token Bucket Algorithm」這個演算法
比較詳細的說明可以參考 [2],不過我也沒有認真看這個演算法的內容....
[3] 則是在 Java 上實作 Token Bucket Algorithm 的 Open Source Library~!
使用方法如同 [3] 的說明一樣,相當簡單。
基本的用法如下:(轉錄自 [3] 的說明)
// Create a token bucket with a capacity of 40 kb tokens that refills at a fixed interval of 20 kb tokens per second
TokenBucket bucket = TokenBuckets.newFixedIntervalRefill(40960, 20480, 1, TimeUnit.SECONDS);

// ...
while (true) {
  String response = prepareResponse();

  // Consume tokens from the bucket commensurate with the size of the response
  bucket.consume(response.length());

  send(response);
}
從上述程式碼可以看出,其實唯一要做的就是初始化時,要告訴 Token Bucket 允許的平均流量和瞬間最大流量,還有時間區間的單位
然後每次讀資料時,就告訴 Token bucket 讀了多少,其他事情它就會自動處理好了。

在我的狀況中,我要用來處理 InputStream,因此我的實作方式如下:
public class LimitedInputStream extends InputStream {
  private InputStream STREAM = null;
  private TokenBucket BUCKET = null;
  
  public LimitedInputStream (InputStream stream) {
    this.STREAM = stream;
    
    // Initiate the bandwidth limitation.
    long burst = 20480;
    long avg = 10240;
    this.BUCKET = TokenBuckets.newFixedIntervalRefill(burst, avg, 1, TimeUnit.SECONDS);
  }

  @Override
  public int read() throws IOException {
    int len = this.STREAM.read();
    
    if(len > 0) {
      this.BUCKET.consume(len);
    }
    return len;
  }
  
  @Override
  public int read(byte[] buffer) throws IOException {
    int len = this.STREAM.read(buffer);
    
    if(len > 0) {
      this.BUCKET.consume(len);
    }
    return len;
  }
  
  @Override
  public int read(byte b[], int off, int len) throws IOException {
    len = this.STREAM.read(b, off, len);
    
    if(len > 0) {
      this.BUCKET.consume(len);
    }
    return len;
  }
}
我把這個 Class 命名為 LimitedInputStream,也就是有做速度限制的 InutStream。
它做的事情就是初始化時,順便初始化一個 Token bucket,然後每次讀取 InputStream 時都去更新 Token bucket,更新完才把讀取的長度回應。
平常使用時,唯一需要特別變更的也就只有第 9 和第 10 行,要指定每秒允許的平均流量和瞬間最大流量。

參考資料:
1、How can I limit bandwidth in Java?
2、What do you understand by Token Bucket Algorithm. What are its applications ?
3、token-bucket

1 則留言:

KY 提到...

想問 user 要使用你的 class的範例?