Spark Streaming処理のデモ
ストリーム生成側
# -*- coding: utf-8 -*- import random import socket import time def main(): socketServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) socketServer.bind(("127.0.0.1", 8000)) socketServer.listen(1) sockRemote, addrRemote = socketServer.accept() print "connection", addrRemote listName = ["なのは", "フェイト", "はやて"] while True: name = listName[random.randint(0, 2)] sockRemote.send(name + "\n") time.sleep(1) if __name__ == "__main__": main()
ランダムで「なのは」「フェイト」「はやて」の文字列を戻し続けるだけのサーバープログラム。
Apache Streamは改行コードまでが取り込み単位なので、データ終端として改行コードが必要です。
ストリーム取得(Spark Streaming)側
# -*- coding: utf-8 -*- import pyspark from pyspark.sql import functions def main(): conf = pyspark.SparkConf() conf.setAppName("nanohain Sample5") sc = pyspark.SparkContext(conf=conf) sc.setLogLevel("ERROR") scSession = pyspark.sql.SparkSession(sc) conf = scSession.readStream.format("socket") conf.option("host", "127.0.0.1") conf.option("port", 8000) dsName = conf.load() dfName = dsName.select( dsName.value.alias("name") ) print dfName nameCount = dfName.groupBy("name").count().orderBy(pyspark.sql.functions.desc("count")) query = nameCount.writeStream.outputMode("complete").format("console").start() query.awaitTermination() if __name__ == "__main__": main()
実行結果
sample_5_server.py を実行してからsample5_clinet.pyをSparkで実行すると以下の様な表示がコンソールに出力されます。
------------------------------------------- Batch: 1 ------------------------------------------- +----+-----+ |name|count| +----+-----+ | はやて| 6| |フェイト| 3| | なのは| 1| +----+-----+
単純にカウントしたものを表示させているだけです。