Introduction
「Producer-Consumer」問題是資工系很常用來解釋訊息交換的一種範例,用生產者跟消費者間的關係來描述訊息的傳遞。生產者負責產生資料並放在有限或是無限的緩衝區讓等待消費者來處理。串流資料(Streaming Data)本質上就是一端不斷的丟出資料,另一端需要持續地進行處理,就像 Producer-Consumer 一樣。
Kafka 是近期一個用來處理串流資料的熱們框架,其概念的示意圖如下:
接到串流資料之後我們可以搭配 Spark 進行處理:
串起來之後,一個很典型的應用就像這樣:
Kafka 是什麼?
Kafka 是由 Apache 軟體基金會開發的一個開源流處理平台,目標是為處理即時資料提供一個統一、高吞吐、低延遲的平台。其持久化層本質上是一個「按照分散式事務紀錄檔架構的大規模發布訂閱訊息佇列」。
相關的術語有:
- Topic:用來對訊息進行分類,每個進入到 Kafka 的資訊都會被放到一個 Topic 下
- Broker:用來實現資料儲存的主機伺服器
- Partition:每個 Topic 中的訊息會被分為若干個 Partition ,以提高訊息的處理效率
Spark/PySpark 是什麼?
Apache Spark 是一個延伸於 Hadoop MapReduce 的開源叢集運算框架,Spark 使用了記憶體內運算技術,能在資料尚未寫入硬碟時即在記憶體內分析運算。簡單來說就是一個利用分散式架構的資料運算工具,其主要的運算也是利用 Map - Reduce 的 運算邏輯。Spark 原本是用 Java/Scala 做開發,PySpark 是一個封裝後的介面,適合 Python 開發者使用。
環境準備
- kafka + zookeeper
我們直接用 brew kafka 比較方便,安裝之後再到資料夾中確認使用設定檔都存在。這邊可以補充一下 ZooKeeper 是一個 Hadoop 的分散式資源管理工具,kafka 也可以用它來做資源的管理與分配。如果直接用 brew 安裝的話,zookeeper 也會一併安裝。
1 | $ brew install kafka |
- PySpark
1 | $ pip install pyspark |
如果有出現找不到 SPARK_HOME 的錯誤訊息,在另外要配置一下的環境變數
1 | # Could not find valid SPARK_HOME while searching |
- JAVA8
1 | $ brew cask install java8 |
另外要配置一下 JAVA_HOME 的環境變數
1 | export JAVA_HOME=$(/usr/libexec/java_home -v 1.8) |
Workflow
⓪ 先切換到 kafka 安裝的目錄下
1 | cd /usr/local/Cellar/kafka/_________(版本號) |
① 啟動 zookeeper 服務(需要長駐執行在背景)
1 | ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties |
② 啟動 kafka 服務(需要長駐執行在背景)
1 | ./bin/kafka-server-start /usr/local/etc/kafka/server.properties |
③ 創建/查看 Topic
1 | # 建一個名為 test-kafka 的 Topic |
Streaming Data
產生與接收串流資料的方法有三種,可以使用 kafka-console 指令、kafka-python 套件,也可以使用 PySpark 來處,以下示範簡單的程式碼:
kafka-console
① Producer
使用指令(kafka-console-producer)產生串流資料到特定的 Topic
1 | $ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test-kafka |
② Costomer
使用指令(kafka-console-consumer)從特定的 Topic 接收串流資料
1 | $ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-kafka --from-beginning |
kafka-python
① Producer
1 |
|
執行:
1 | $ python kafka-producer.py |
② Costomer
1 |
|
執行:
1 | $ python kafka-costomer.py |
PySpark
① Costomer
1 |
|
執行 pyspark 的執行要用 spark-submit 指令,而且要加上 kafka 的配置檔
(所以要在同目錄下下載 spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar 配置檔)
1 | $ spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-consumer.py |
Connect Producer & Costomer
可以利用以上不同的 Producer 方法對同一個 topic 發送訊息,然後用 Costomer 的方法來接收!
小結
本系列預計會有三篇文章,把一個串流資料從來源到資料庫的過程走過一次:
- 在 mac 上建立 Python 的 Kafka 與 Spark 環境 <- 本篇
- Kafka 串接 Twitter Streaming API
- Spark 接收 Kafka 資料後儲存
Reference
[1] mac下kafka环境搭建测试
[2] 用 pip 在 macOS 上安裝單機使用的 pyspark
[3] What should I set JAVA_HOME to on OSX
[4] Running pyspark after pip install pyspark
[5] Getting Streaming data from Kafka with Spark Streaming using Python.
License
本著作由 Chang, Wei-Yaun (v123582) 製作,
以創用CC 姓名標示-相同方式分享 3.0 Unported授權條款釋出。