Apache Spark 2.1.0(7)

ファイル監視によるSpark Stream。

ストリーム生成側

以下の例では#NANOHAが含まれるツイートを取集。

(#NANOHAでツイートが拾えない場合は適当なキーワードに変更してください。)

ファイルを使用するストリームでは、Spark側が監視しているフォルダに直接書き込まず、一旦別の場所に作成してから監視下のフォルダに移動させる必要があります。

# -*- coding: utf-8 -*-
import datetime
import json
import os
import shutil

import twitter

CONSUMER_KEY = "****"
CONSUMER_SECRET = "****"
ACCESS_TOKEN = "****"
ACCESS_TOKEN_SECRET = "****"

SRC_DIR = "/tmp/work"
DST_DIR = "/tmp/stream"

def start_twitter_stream():

    oCApi = twitter.Api(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

    return oCApi.GetStreamFilter(track=["#NANOHA"])

def main():

    oCStream = start_twitter_stream()

    dateCurr = datetime.datetime.now()

    while True:
        dateNext = dateCurr + datetime.timedelta(minutes=1)
        filenameJson = dateCurr.strftime("tweet-%Y%m%d-%H%M%S.json")
        with open(os.path.join(SRC_DIR, filenameJson), "wb") as h:
            nWriteCount = 0
            for dictTweet in oCStream:
                #print json.dumps(dictTweet)
                #break
                dictData = {
                    "lang": dictTweet["lang"],
                    "id_str": dictTweet["id_str"],
                    "text": dictTweet["text"]
                }
                h.write(json.dumps(dictData) + "\n")
                nWriteCount += 1

                dateCurr = datetime.datetime.now()

                if dateCurr > dateNext:
                    shutil.move(
                        os.path.join(SRC_DIR, filenameJson),
                        os.path.join(DST_DIR, filenameJson)
                    )
                    print "%8d tweet saved." % nWriteCount
                    break

if __name__ == "__main__":
    main()

ストリーム取得側

フォルダに格納されたjsonファイルを読み込んで、メモリ内にaggtableを生成・SparkSQLでデータフレームを生成。

# -*- coding: utf-8 -*-
import pyspark
import datetime
import time

CHECKPOINT_DIR = "/tmp/checkpoint"
STREAM_DIR = "/tmp/stream"

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample6")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)
    #scSession.conf.set("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR)

    schemaJson = pyspark.sql.dataframe.StructType().add("lang", "string").add("id_str", "string").add("text", "string")
    dsJson = scSession.readStream.schema(schemaJson).format("json").load(STREAM_DIR)

    dfJson = dsJson.select(
        pyspark.sql.functions.decode(dsJson.text, "UTF-8").alias("text")
    )

    idCount = dfJson.groupBy("text").count().orderBy(pyspark.sql.functions.desc("count"))

    idCount.writeStream.outputMode("complete").queryName("aggtable").format("memory").start()

    while True:
        print "-------- %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),)
        for r in scSession.sql("select text, count from aggtable order by count desc limit 10").collect():
            try:
                print "%08d | " % (r["count"],), r["text"].encode("utf-8").split("\n")[0].strip()
            except AttributeError:
                print r
        time.sleep(5)

if __name__ == "__main__":
    main()

Spark Streamを使用する場合は、生成したデータストリームをstartで開始させておくと、Spark側がメモリ内テーブルを更新してくれます。