Introduction

這是系列文章的第二篇,主要解釋當 PySp① ark 接到 Kafka 資料運算後,串接到 MongoDB 的這一段。

兩種做法

接續著前一篇的教學,我們已經可以在 PySpark 中,計算出 Word - Count 的資料。接下來,我們試圖將資料寫入資料庫。我們使用內建的 foreachRDD 操作,它允許將 RDD 的資料取出操作,通常用來導入外部。

1
2
3
4
5
6
7
...

counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)

counts.foreachRDD(process)

可以採用兩種方法接上 MongoDB:

① PyMongo

可以使用 PyMongo 套件的方法,簡單粗暴:

1
2
3
4
5
6
from pymongo import MongoClient

def process(time, rdd):
doc = map(lambda d: {'name': d[0], 'count': d[1]}, rdd.collect())
db = MongoClient("mongodb://127.0.0.1:27017")['process_pymongo']['data']
db.insert(doc)

② PySpark Connector

也可以搭配 PySpark Connector 的方式來處理:

1
2
3
4
5
6
7
from pyspark.sql import SQLContext
from pyspark.sql.types import *

def process(time, rdd):
doc = map(lambda d: {'name': d[0], 'count': d[1]}, rdd.collect())
df = ctx.createDataFrame(rdd, ["word", "count"])
df.write.format("com.mongodb.spark.sql").options(uri="mongodb://127.0.0.1:27017", database="process_pyspark", collection="data").mode("append").save()

執行時要載入對應的 packgae:

1
$ spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.1.0.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.0 costomer-pyspark-mongo.py

小結

本系列預計會有三篇文章,把一個串流資料從來源到資料庫的過程走過一次:

Reference

[1] MongoDB和数据流:使用MongoDB作为Kafka消费者
[2] MongoDB+Spark 完整的大數據方案


License

本文主要參考 ,如果侵權之疑慮,請及時聯繫我們。


本著作由Chang Wei-Yaun (v123582)製作,
創用CC 姓名標示-相同方式分享 3.0 Unported授權條款釋出。