ファイル監視によるSpark Stream。
ストリーム生成側
以下の例では#NANOHAが含まれるツイートを取集。
(#NANOHAでツイートが拾えない場合は適当なキーワードに変更してください。)
ファイルを使用するストリームでは、Spark側が監視しているフォルダに直接書き込まず、一旦別の場所に作成してから監視下のフォルダに移動させる必要があります。
# -*- coding: utf-8 -*- import datetime import json import os import shutil import twitter CONSUMER_KEY = "****" CONSUMER_SECRET = "****" ACCESS_TOKEN = "****" ACCESS_TOKEN_SECRET = "****" SRC_DIR = "/tmp/work" DST_DIR = "/tmp/stream" def start_twitter_stream(): oCApi = twitter.Api(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET) return oCApi.GetStreamFilter(track=["#NANOHA"]) def main(): oCStream = start_twitter_stream() dateCurr = datetime.datetime.now() while True: dateNext = dateCurr + datetime.timedelta(minutes=1) filenameJson = dateCurr.strftime("tweet-%Y%m%d-%H%M%S.json") with open(os.path.join(SRC_DIR, filenameJson), "wb") as h: nWriteCount = 0 for dictTweet in oCStream: #print json.dumps(dictTweet) #break dictData = { "lang": dictTweet["lang"], "id_str": dictTweet["id_str"], "text": dictTweet["text"] } h.write(json.dumps(dictData) + "\n") nWriteCount += 1 dateCurr = datetime.datetime.now() if dateCurr > dateNext: shutil.move( os.path.join(SRC_DIR, filenameJson), os.path.join(DST_DIR, filenameJson) ) print "%8d tweet saved." % nWriteCount break if __name__ == "__main__": main()
ストリーム取得側
フォルダに格納されたjsonファイルを読み込んで、メモリ内にaggtableを生成・SparkSQLでデータフレームを生成。
# -*- coding: utf-8 -*- import pyspark import datetime import time CHECKPOINT_DIR = "/tmp/checkpoint" STREAM_DIR = "/tmp/stream" def main(): conf = pyspark.SparkConf() conf.setAppName("nanohain Sample6") sc = pyspark.SparkContext(conf=conf) sc.setLogLevel("ERROR") scSession = pyspark.sql.SparkSession(sc) #scSession.conf.set("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR) schemaJson = pyspark.sql.dataframe.StructType().add("lang", "string").add("id_str", "string").add("text", "string") dsJson = scSession.readStream.schema(schemaJson).format("json").load(STREAM_DIR) dfJson = dsJson.select( pyspark.sql.functions.decode(dsJson.text, "UTF-8").alias("text") ) idCount = dfJson.groupBy("text").count().orderBy(pyspark.sql.functions.desc("count")) idCount.writeStream.outputMode("complete").queryName("aggtable").format("memory").start() while True: print "-------- %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),) for r in scSession.sql("select text, count from aggtable order by count desc limit 10").collect(): try: print "%08d | " % (r["count"],), r["text"].encode("utf-8").split("\n")[0].strip() except AttributeError: print r time.sleep(5) if __name__ == "__main__": main()
Spark Streamを使用する場合は、生成したデータストリームをstartで開始させておくと、Spark側がメモリ内テーブルを更新してくれます。