Introduction

「Producer-Consumer」問題是資工系很常用來解釋訊息交換的一種範例,用生產者跟消費者間的關係來描述訊息的傳遞。生產者負責產生資料並放在有限或是無限的緩衝區讓等待消費者來處理。串流資料(Streaming Data)本質上就是一端不斷的丟出資料,另一端需要持續地進行處理,就像 Producer-Consumer 一樣。

Kafka 是近期一個用來處理串流資料的熱們框架,其概念的示意圖如下:

Imgur

接到串流資料之後我們可以搭配 Spark 進行處理:

Imgur

串起來之後,一個很典型的應用就像這樣:

Imgur

Kafka 是什麼?

Kafka 是由 Apache 軟體基金會開發的一個開源流處理平台,目標是為處理即時資料提供一個統一、高吞吐、低延遲的平台。其持久化層本質上是一個「按照分散式事務紀錄檔架構的大規模發布訂閱訊息佇列」。

相關的術語有:

  • Topic:用來對訊息進行分類,每個進入到 Kafka 的資訊都會被放到一個 Topic 下
  • Broker:用來實現資料儲存的主機伺服器
  • Partition:每個 Topic 中的訊息會被分為若干個 Partition ,以提高訊息的處理效率

Imgur

Spark/PySpark 是什麼?

Apache Spark 是一個延伸於 Hadoop MapReduce 的開源叢集運算框架,Spark 使用了記憶體內運算技術,能在資料尚未寫入硬碟時即在記憶體內分析運算。簡單來說就是一個利用分散式架構的資料運算工具,其主要的運算也是利用 Map - Reduce 的 運算邏輯。Spark 原本是用 Java/Scala 做開發,PySpark 是一個封裝後的介面,適合 Python 開發者使用。

環境準備

  1. kafka + zookeeper

我們直接用 brew kafka 比較方便,安裝之後再到資料夾中確認使用設定檔都存在。這邊可以補充一下 ZooKeeper 是一個 Hadoop 的分散式資源管理工具,kafka 也可以用它來做資源的管理與分配。如果直接用 brew 安裝的話,zookeeper 也會一併安裝。

1
2
3
4
5
6
7
8
9
$ brew install kafka 
$ ls /usr/local/etc/kafka # kafka 跟 zookeeper 設定檔會放在此資料夾下
connect-console-sink.properties consumer.properties
connect-console-source.properties log4j.properties
connect-distributed.properties producer.properties
connect-file-sink.properties server.properties
connect-file-source.properties tools-log4j.properties
connect-log4j.properties trogdor.conf
connect-standalone.properties zookeeper.properties
  1. PySpark
1
2
3
4
5
6
7
8
9
10
11
$ pip install pyspark
$ pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/

Using Python version 3.5.2 (default, Nov 30 2016 12:41:46)
SparkSession available as 'spark'.

如果有出現找不到 SPARK_HOME 的錯誤訊息,在另外要配置一下的環境變數

1
2
3
4
5
# Could not find valid SPARK_HOME while searching
# ['/home/user', '/home/user/.local/bin']

export PYSPARK_PYTHON=python3
export SPARK_HOME=/Users/wei/Envs/py3dev/lib/python3.5/site-packages/pyspark
  1. JAVA8
1
2
3
4
5
$ brew cask install java8
$ java -version
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

另外要配置一下 JAVA_HOME 的環境變數

1
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

Workflow

⓪ 先切換到 kafka 安裝的目錄下

1
cd /usr/local/Cellar/kafka/_________(版本號)

① 啟動 zookeeper 服務(需要長駐執行在背景

1
./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

② 啟動 kafka 服務(需要長駐執行在背景

1
./bin/kafka-server-start /usr/local/etc/kafka/server.properties

③ 創建/查看 Topic

1
2
3
4
5
# 建一個名為 test-kafka 的 Topic
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-kafka

# 查看目前已經建立過的 Topic
./bin/kafka-topics --list --zookeeper localhost:2181\n\n

Streaming Data

產生與接收串流資料的方法有三種,可以使用 kafka-console 指令、kafka-python 套件,也可以使用 PySpark 來處,以下示範簡單的程式碼:

kafka-console

① Producer

使用指令(kafka-console-producer)產生串流資料到特定的 Topic

1
$ ./bin/kafka-console-producer --broker-list localhost:9092 --topic test-kafka

② Costomer

使用指令(kafka-console-consumer)從特定的 Topic 接收串流資料

1
$ ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test-kafka --from-beginning

kafka-python

① Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

from kafka import KafkaProducer
import time

brokers, topic = 'localhost:9092', 'test-kafka'

def start():
while True:
print(" --- produce ---")
time.sleep(10)
producer.send('topic', key=b'foo', value=b'bar')
producer.flush()


if __name__ == '__main__':
producer = KafkaProducer(bootstrap_servers=brokers)
start()
producer.close()

執行:

1
$ python kafka-producer.py

② Costomer

1
2
3
4
5
6
7
8
9

from kafka import KafkaConsumer

brokers, topic = 'localhost:9092', 'test-kafka'

if __name__ == '__main__':
consumer = KafkaConsumer(topic, group_id='test-consumer-group', bootstrap_servers=[brokers])
for msg in consumer:
print("key=%s, value=%s" % (msg.key, msg.value))

執行:

1
$ python kafka-costomer.py

PySpark

① Costomer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

brokers, topic = 'localhost:9092', 'test-kafka'

if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
ssc.start()
ssc.awaitTermination()

執行 pyspark 的執行要用 spark-submit 指令,而且要加上 kafka 的配置檔
(所以要在同目錄下下載 spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar 配置檔)

1
$ spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar pyspark-consumer.py

Connect Producer & Costomer

可以利用以上不同的 Producer 方法對同一個 topic 發送訊息,然後用 Costomer 的方法來接收!

小結

本系列預計會有三篇文章,把一個串流資料從來源到資料庫的過程走過一次:

Reference

[1] mac下kafka环境搭建测试
[2] 用 pip 在 macOS 上安裝單機使用的 pyspark
[3] What should I set JAVA_HOME to on OSX
[4] Running pyspark after pip install pyspark
[5] Getting Streaming data from Kafka with Spark Streaming using Python.


License


本著作由 Chang, Wei-Yaun (v123582) 製作,
創用CC 姓名標示-相同方式分享 3.0 Unported授權條款釋出。