最近開始參與 Spark 的專案,所以有空的時候會開始做一些 Spark 新手筆記 😆。我的環境是 Windows 11,不過實際上在不同環境的作法應該沒有差太多的感覺,但或許會有一些小地方有微妙的不同就是了。因為我自己的電腦是 Windows 11,但平常比較習慣在 Linux-based 的環境開發,所以實際上文章中會紀錄的東西,大體上會是 Windows 11 + WSL2 的狀況,指令原則上我都是在 WSL2 的 Ubuntu 裡執行的。
Spark 環境準備
首先,根據 Spark 官方文件來看,Spark 本身不需要安裝,只需要下載並執行相應的指令即可。因此主要需要準備的環境是 Java、Maven,並且下載 Spark。其中 Java 跟 Maven 怎麼安裝,我就直接略過了,這裡簡單說明我目前的環境:
- Java: 21.0.6+7-Ubuntu-124.04.1
- Maven: 3.8.8
在 Spark 官方網站下載 Spark 時,因為目前的最新版是 3.5.4,因此我是選擇以下的選項下載的:
- Spark: 3.5.4
- Package Type: Pre-built for Apache Hadoop 3.3 and later (Scala 2.13)
其中 Package Type 其實我目前也不太確定效果是什麼,但因為我的範例專案會用 Scala,因此就選了名字有 Scala 的 🙃。
準備範例專案
新手的第一件事就是先弄個 Hello World,不過 Spark 印 Hello World 好像有點弱,所以我弄了一個比 Hello World 複雜一滴滴的範例專案,順便也簡單試了一下 Spark 與 Scala 的寫法。
git clone git@github.com:jimwayneyeh/example-spark-scala.git
接著因為要把打包的 JAR 丟去給 Spark 執行,所以我們需要把這個專案打包成 JAR。
cd example-spark-scala
mvn clean package
打包出來的檔案會在 ./target/example-spark-scala-1.0-SNAPSHOT.jar,這個路徑先記住,等等會用到。在那之前先來看一下這個範例專案裡寫了什麼。首先可以先看 tw.jimwayneyeh.example.spark_scala.App 這個類別,它的內容如下:
object App {
  def main(args: Array[String]): Unit = {
    println("Hello World!")
    val spark = SparkSession.builder().appName("exmaple app").getOrCreate()
    val customerDf =
      spark.read.options(Map("header" -> "true")).csv("customers-100.csv")
    println(s"Count: ${customerDf.count()}")
    spark.stop()
  }
}
因為程式碼很短,就不一一贅述了,簡單來說就是先建立一個 SparkSession,讀取專案根目錄的 customers-100.csv 檔,這個 CSV 檔是從 sample-csv-files 下載的範例 CSV 檔。CSV 讀取進來變成 data frame,指定它是有標頭的,然後算出這個 data frame 的個數並印出來。
讀取檔案的部份要特別注意的是,這裡給的路徑是相對於專案目錄的路徑,但檔案並非放在 src/main/resources 資料夾內。我詢問 ChatGPT 時,ChatGPT 回答說 Spark 正常運作時,資料都是從外部路徑帶進來的,所以一般來說 Spark 程式並不會讀取 JAR 裡面的資源。這個理由聽起來是感覺蠻合理的,畢竟會特地用 Spark 執行的運算,資料量應該都不會小,也不會有人想把資料放進 JAR 裡吧 😆。
另外也看一下一個最簡單的 Spark 專案,如果用 Maven 打包的話,需要什麼東西。POM 裡面只需要以下的 dependencies 即可。
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency> 
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>3.5.4</version>
    <scope>provided</scope>
</dependency>
其中可以特別注意到,Spark 的 dependency 的 scope 會是 provided,這應該是因為實際執行時,JAR 會跑在 Spark 上,所以 Spark dependencies 應該在執行環境上都有。
在 Spark 執行
範例專案的 JAR 準備好以後,回到前面下載好的 Spark 壓縮檔,把它解壓縮後進入 /bin 資料夾,並且執行以下的 spark-submit 指令,就可以啟動一個本機的 Spark 環境執行剛剛的範例專案了。
./spark-submit --class "tw.jimwayneyeh.example.spark_scala.App" --master local /PATH/TO/PROJECT/target/example-spark-scala-1.0-SNAPSHOT.jar
等了一下之後,就可以看到 console 輸出的 Spark logs 了。這裡我貼了很長,是因為順便想要紀錄它的執行過程,之後想要分析它的內容。不過現階段重要的是 log 當中格式沒有日期的那兩行,因為程式碼是用 println() 來輸出的,所以會忽略 logging 格式直接在 console 印出只有 message 的 log。可以看出它算出了 data frame 裡面有 100 筆資料。
Hello World!
Count: 100
以下則是紀錄一下 log 當中印出跟 App.scala 有關的部份,這些 log 就是實際在排程並執行範例程式時的過程了。
25/02/19 00:38:51 INFO SparkContext: Created broadcast 0 from csv at App.scala:14
25/02/19 00:38:51 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4211559 bytes, open cost is considered as scanning 4194304 bytes.
25/02/19 00:38:52 INFO SparkContext: Starting job: csv at App.scala:14
25/02/19 00:38:52 INFO DAGScheduler: Got job 0 (csv at App.scala:14) with 1 output partitions
25/02/19 00:38:52 INFO DAGScheduler: Final stage: ResultStage 0 (csv at App.scala:14)
25/02/19 00:38:52 INFO DAGScheduler: Parents of final stage: List()
25/02/19 00:38:52 INFO DAGScheduler: Missing parents: List()
25/02/19 00:38:52 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at csv at App.scala:14), which has no missing parents
25/02/19 00:38:52 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 13.8 KiB, free 434.2 MiB)
25/02/19 00:38:52 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.5 KiB, free 434.2 MiB)
25/02/19 00:38:52 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.255.255.254:43927 (size: 6.5 KiB, free: 434.4 MiB)
25/02/19 00:38:52 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1585
25/02/19 00:38:52 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at csv at App.scala:14) (first 15 tasks are for partitions Vector(0))
25/02/19 00:38:52 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
25/02/19 00:38:52 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.255.255.254, executor driver, partition 0, PROCESS_LOCAL, 9896 bytes) 
25/02/19 00:38:52 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
25/02/19 00:38:52 INFO CodeGenerator: Code generated in 12.528525 ms
25/02/19 00:38:52 INFO FileScanRDD: Reading File path: file:///mnt/g/scala/example-spark-scala/customers-100.csv, range: 0-17255, partition values: [empty row]
25/02/19 00:38:52 INFO CodeGenerator: Code generated in 40.709858 ms
25/02/19 00:38:53 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1800 bytes result sent to driver
25/02/19 00:38:53 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 697 ms on 10.255.255.254 (executor driver) (1/1)
25/02/19 00:38:53 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
25/02/19 00:38:53 INFO DAGScheduler: ResultStage 0 (csv at App.scala:14) finished in 0.890 s
25/02/19 00:38:53 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
25/02/19 00:38:53 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
25/02/19 00:38:53 INFO DAGScheduler: Job 0 finished: csv at App.scala:14, took 0.965247 s
25/02/19 00:38:53 INFO CodeGenerator: Code generated in 17.506485 ms
25/02/19 00:38:53 INFO FileSourceStrategy: Pushed Filters: 
25/02/19 00:38:53 INFO FileSourceStrategy: Post-Scan Filters:
25/02/19 00:38:53 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 199.5 KiB, free 434.0 MiB)
25/02/19 00:38:53 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 34.4 KiB, free 433.9 MiB)
25/02/19 00:38:53 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.255.255.254:43927 (size: 34.4 KiB, free: 434.3 MiB)
25/02/19 00:38:53 INFO SparkContext: Created broadcast 2 from csv at App.scala:14
25/02/19 00:38:53 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4211559 bytes, open cost is considered as scanning 4194304 bytes.
25/02/19 00:38:53 INFO FileSourceStrategy: Pushed Filters: 
25/02/19 00:38:53 INFO FileSourceStrategy: Post-Scan Filters:
25/02/19 00:38:53 INFO CodeGenerator: Code generated in 12.86595 ms
25/02/19 00:38:53 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 199.4 KiB, free 433.7 MiB)
25/02/19 00:38:53 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 34.4 KiB, free 433.7 MiB)
25/02/19 00:38:53 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.255.255.254:43927 (size: 34.4 KiB, free: 434.3 MiB)
25/02/19 00:38:53 INFO SparkContext: Created broadcast 3 from count at App.scala:15
25/02/19 00:38:53 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4211559 bytes, open cost is considered as scanning 4194304 bytes.
25/02/19 00:38:54 INFO DAGScheduler: Registering RDD 13 (count at App.scala:15) as input to shuffle 0
25/02/19 00:38:54 INFO DAGScheduler: Got map stage job 1 (count at App.scala:15) with 1 output partitions
25/02/19 00:38:54 INFO DAGScheduler: Final stage: ShuffleMapStage 1 (count at App.scala:15)
25/02/19 00:38:54 INFO DAGScheduler: Parents of final stage: List()
25/02/19 00:38:54 INFO DAGScheduler: Missing parents: List()
25/02/19 00:38:54 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD[13] at count at App.scala:15), which has no missing parents
25/02/19 00:38:54 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 18.8 KiB, free 433.7 MiB)
25/02/19 00:38:54 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 9.1 KiB, free 433.7 MiB)
25/02/19 00:38:54 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.255.255.254:43927 (size: 9.1 KiB, free: 434.3 MiB)
25/02/19 00:38:54 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1585
25/02/19 00:38:54 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD[13] at count at App.scala:15) (first 15 tasks are for partitions Vector(0))
25/02/19 00:38:54 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
25/02/19 00:38:54 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (10.255.255.254, executor driver, partition 0, PROCESS_LOCAL, 9885 bytes)
25/02/19 00:38:54 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
25/02/19 00:38:54 INFO CodeGenerator: Code generated in 11.085534 ms
25/02/19 00:38:54 INFO FileScanRDD: Reading File path: file:///mnt/g/scala/example-spark-scala/customers-100.csv, range: 0-17255, partition values: [empty row]
25/02/19 00:38:54 INFO CodeGenerator: Code generated in 7.906767 ms
25/02/19 00:38:54 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2105 bytes result sent to driver
25/02/19 00:38:54 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 320 ms on 10.255.255.254 (executor driver) (1/1)
25/02/19 00:38:54 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
25/02/19 00:38:54 INFO DAGScheduler: ShuffleMapStage 1 (count at App.scala:15) finished in 0.351 s
25/02/19 00:38:54 INFO DAGScheduler: looking for newly runnable stages
25/02/19 00:38:54 INFO DAGScheduler: running: HashSet()
25/02/19 00:38:54 INFO DAGScheduler: waiting: HashSet()
25/02/19 00:38:54 INFO DAGScheduler: failed: HashSet()
25/02/19 00:38:54 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 10.255.255.254:43927 in memory (size: 34.4 KiB, free: 434.3 MiB)
25/02/19 00:38:54 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 10.255.255.254:43927 in memory (size: 6.5 KiB, free: 434.3 MiB)
25/02/19 00:38:54 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 10.255.255.254:43927 in memory (size: 34.4 KiB, free: 434.4 MiB)
25/02/19 00:38:54 INFO CodeGenerator: Code generated in 12.077153 ms
25/02/19 00:38:54 INFO SparkContext: Starting job: count at App.scala:15
25/02/19 00:38:54 INFO DAGScheduler: Got job 2 (count at App.scala:15) with 1 output partitions
25/02/19 00:38:54 INFO DAGScheduler: Final stage: ResultStage 3 (count at App.scala:15)
25/02/19 00:38:54 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
25/02/19 00:38:54 INFO DAGScheduler: Missing parents: List()
25/02/19 00:38:54 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[16] at count at App.scala:15), which has no missing parents
25/02/19 00:38:54 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 13.1 KiB, free 434.1 MiB)
25/02/19 00:38:54 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.1 KiB, free 434.1 MiB)
25/02/19 00:38:54 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 10.255.255.254:43927 (size: 6.1 KiB, free: 434.4 MiB)
25/02/19 00:38:54 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1585
25/02/19 00:38:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[16] at count at App.scala:15) (first 15 tasks are for partitions Vector(0))
25/02/19 00:38:54 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks resource profile 0
25/02/19 00:38:54 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2) (10.255.255.254, executor driver, partition 0, NODE_LOCAL, 9276 bytes)
25/02/19 00:38:54 INFO Executor: Running task 0.0 in stage 3.0 (TID 2)
25/02/19 00:38:54 INFO ShuffleBlockFetcherIterator: Getting 1 (60.0 B) non-empty blocks including 1 (60.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
25/02/19 00:38:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 15 ms
25/02/19 00:38:54 INFO CodeGenerator: Code generated in 8.620798 ms
25/02/19 00:38:54 INFO Executor: Finished task 0.0 in stage 3.0 (TID 2). 4084 bytes result sent to driver
25/02/19 00:38:54 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 2) in 97 ms on 10.255.255.254 (executor driver) (1/1)
25/02/19 00:38:54 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
25/02/19 00:38:54 INFO DAGScheduler: ResultStage 3 (count at App.scala:15) finished in 0.108 s
25/02/19 00:38:54 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
25/02/19 00:38:54 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
25/02/19 00:38:54 INFO DAGScheduler: Job 2 finished: count at App.scala:15, took 0.120309 s
Count: 100
25/02/19 00:38:54 INFO SparkContext: SparkContext is stopping with exitCode 0.