2017年9月21日 星期四

Elasticsearch 去除重複資料

最近一直在處理 Elasticsearch 資料重複的問題
程式當中想要在插入資料之前,先搜尋有沒有重複的資料
如果有的話就先把重複的資料砍到剩下一筆,然後針對那一筆做資料更新。
不過實務上,程式跑起來有些會成功完成 deduplication,但有些則依然重複。

經過一些測試,感覺上問題大概跟以前在用 MongoDB 時有點類似,起因似乎來自於叢集的資料同步的問題。
因為資料變更後,不見得馬上會同步到所有叢集節點上,因此變更後馬上搜尋的話,有可能會搜尋到未變更前的資料。

目前看到的解法,大概就是讓變更速度變慢,以提昇資料的一致性(這點其實跟 MongoDB 結構是類似的)
具體來說,就是設定 Wait For Active Shards [1] 和 Refresh [2] 兩個參數
前者是要求寫入時,要等待 Elasticsearch 叢集寫入 N 個 Shard 以後才回覆
後者則是要求變更能夠被搜尋時才回覆。

以 Java 來說,就是設定像是下述的參數:

PreBuiltTransportClient esClient = new PreBuiltTransportClient();
....
DeleteResponse deleteResponse = esClient
    .prepareDelete(indexOfEs, typeOfEs, id)
    .setWaitForActiveShards(ActiveShardCount.ALL)
    .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
    .get();

上述的 Wait For Active Shards 和 Refresh 參數,可以用在包括 Index、Update 和 Delete 等操作上。

另外,如果想知道 Elasticsearch 上有沒有重複的資料存在,可以參考 [3] 裡面的範例 query
舉例來說,如果每個 Elasticsearch 紀錄是以 uid 欄位來區分唯一性的話,可以使用以下的 query 來查詢

{
  "size": 0,
  "aggs": {
    "duplicateCount": {
      "terms": {
        "field": “uid”
        "min_doc_count": 2
      },
      "aggs": {
        "duplicateDocuments": {
          "top_hits": {}
        }
      }
    }
  }
}

這組 query 會回覆以重複數量排序的所有結果。

參考資料
  1. Index API - Wait For Active Shards
  2. Index API – Refresh
  3. Eliminating Duplicate Documents in Elasticsearch

沒有留言: