コンテンツへスキップ

Apache Spark 2.1.0(6)

タグ:

Spark Streaming処理のデモ

ストリーム生成側

# -*- coding: utf-8 -*-
import random
import socket
import time

def main():

    socketServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    socketServer.bind(("127.0.0.1", 8000))
    socketServer.listen(1)

    sockRemote, addrRemote = socketServer.accept()
    print "connection", addrRemote

    listName = ["なのは", "フェイト", "はやて"]

    while True:
        name = listName[random.randint(0, 2)]
        sockRemote.send(name + "\n")
        time.sleep(1)


if __name__ == "__main__":
    main()

ランダムで「なのは」「フェイト」「はやて」の文字列を戻し続けるだけのサーバープログラム。

Apache Streamは改行コードまでが取り込み単位なので、データ終端として改行コードが必要です。

ストリーム取得(Spark Streaming)側

# -*- coding: utf-8 -*-
import pyspark
from pyspark.sql import functions

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample5")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)

    conf = scSession.readStream.format("socket")
    conf.option("host", "127.0.0.1")
    conf.option("port", 8000)
    dsName = conf.load()

    dfName = dsName.select(
        dsName.value.alias("name")
    )
    print dfName

    nameCount = dfName.groupBy("name").count().orderBy(pyspark.sql.functions.desc("count"))

    query = nameCount.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()


if __name__ == "__main__":
    main()

実行結果

sample_5_server.py を実行してからsample5_clinet.pyをSparkで実行すると以下の様な表示がコンソールに出力されます。

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|name|count|
+----+-----+
| はやて|    6|
|フェイト|    3|
| なのは|    1|
+----+-----+

単純にカウントしたものを表示させているだけです。