Apache Spark 2.1.0(7)

ファイル監視によるSpark Stream。

ストリーム生成側

以下の例では#NANOHAが含まれるツイートを取集。

(#NANOHAでツイートが拾えない場合は適当なキーワードに変更してください。)

ファイルを使用するストリームでは、Spark側が監視しているフォルダに直接書き込まず、一旦別の場所に作成してから監視下のフォルダに移動させる必要があります。

# -*- coding: utf-8 -*-
import datetime
import json
import os
import shutil

import twitter

CONSUMER_KEY = "****"
CONSUMER_SECRET = "****"
ACCESS_TOKEN = "****"
ACCESS_TOKEN_SECRET = "****"

SRC_DIR = "/tmp/work"
DST_DIR = "/tmp/stream"

def start_twitter_stream():

    oCApi = twitter.Api(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

    return oCApi.GetStreamFilter(track=["#NANOHA"])

def main():

    oCStream = start_twitter_stream()

    dateCurr = datetime.datetime.now()

    while True:
        dateNext = dateCurr + datetime.timedelta(minutes=1)
        filenameJson = dateCurr.strftime("tweet-%Y%m%d-%H%M%S.json")
        with open(os.path.join(SRC_DIR, filenameJson), "wb") as h:
            nWriteCount = 0
            for dictTweet in oCStream:
                #print json.dumps(dictTweet)
                #break
                dictData = {
                    "lang": dictTweet["lang"],
                    "id_str": dictTweet["id_str"],
                    "text": dictTweet["text"]
                }
                h.write(json.dumps(dictData) + "\n")
                nWriteCount += 1

                dateCurr = datetime.datetime.now()

                if dateCurr > dateNext:
                    shutil.move(
                        os.path.join(SRC_DIR, filenameJson),
                        os.path.join(DST_DIR, filenameJson)
                    )
                    print "%8d tweet saved." % nWriteCount
                    break

if __name__ == "__main__":
    main()

ストリーム取得側

フォルダに格納されたjsonファイルを読み込んで、メモリ内にaggtableを生成・SparkSQLでデータフレームを生成。

# -*- coding: utf-8 -*-
import pyspark
import datetime
import time

CHECKPOINT_DIR = "/tmp/checkpoint"
STREAM_DIR = "/tmp/stream"

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample6")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)
    #scSession.conf.set("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR)

    schemaJson = pyspark.sql.dataframe.StructType().add("lang", "string").add("id_str", "string").add("text", "string")
    dsJson = scSession.readStream.schema(schemaJson).format("json").load(STREAM_DIR)

    dfJson = dsJson.select(
        pyspark.sql.functions.decode(dsJson.text, "UTF-8").alias("text")
    )

    idCount = dfJson.groupBy("text").count().orderBy(pyspark.sql.functions.desc("count"))

    idCount.writeStream.outputMode("complete").queryName("aggtable").format("memory").start()

    while True:
        print "-------- %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),)
        for r in scSession.sql("select text, count from aggtable order by count desc limit 10").collect():
            try:
                print "%08d | " % (r["count"],), r["text"].encode("utf-8").split("\n")[0].strip()
            except AttributeError:
                print r
        time.sleep(5)

if __name__ == "__main__":
    main()

Spark Streamを使用する場合は、生成したデータストリームをstartで開始させておくと、Spark側がメモリ内テーブルを更新してくれます。

Apache Spark 2.1.0(6)

Spark Streaming処理のデモ

ストリーム生成側

# -*- coding: utf-8 -*-
import random
import socket
import time

def main():

    socketServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    socketServer.bind(("127.0.0.1", 8000))
    socketServer.listen(1)

    sockRemote, addrRemote = socketServer.accept()
    print "connection", addrRemote

    listName = ["なのは", "フェイト", "はやて"]

    while True:
        name = listName[random.randint(0, 2)]
        sockRemote.send(name + "\n")
        time.sleep(1)


if __name__ == "__main__":
    main()

ランダムで「なのは」「フェイト」「はやて」の文字列を戻し続けるだけのサーバープログラム。

Apache Streamは改行コードまでが取り込み単位なので、データ終端として改行コードが必要です。

ストリーム取得(Spark Streaming)側

# -*- coding: utf-8 -*-
import pyspark
from pyspark.sql import functions

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample5")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)

    conf = scSession.readStream.format("socket")
    conf.option("host", "127.0.0.1")
    conf.option("port", 8000)
    dsName = conf.load()

    dfName = dsName.select(
        dsName.value.alias("name")
    )
    print dfName

    nameCount = dfName.groupBy("name").count().orderBy(pyspark.sql.functions.desc("count"))

    query = nameCount.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()


if __name__ == "__main__":
    main()

実行結果

sample_5_server.py を実行してからsample5_clinet.pyをSparkで実行すると以下の様な表示がコンソールに出力されます。

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|name|count|
+----+-----+
| はやて|    6|
|フェイト|    3|
| なのは|    1|
+----+-----+

単純にカウントしたものを表示させているだけです。

Apache Spark 2.1.0(5)

やる気の感じられないサンプルなのは、動作メモのつもりで作成しているため…

recommendation

# -*- coding: utf-8 -*-
import pyspark
from pyspark.ml import recommendation
from pyspark.ml import evaluation

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample3")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    listData = [
        {"user": 1, "item": 1, "rating":  90.5},
        {"user": 1, "item": 2, "rating":  90.5},
        {"user": 1, "item": 3, "rating":  90.5},
        {"user": 2, "item": 1, "rating":  50.0},
        {"user": 2, "item": 2, "rating":  50.0},
        {"user": 2, "item": 3, "rating":  50.0},
        {"user": 3, "item": 1, "rating":  80.0},
        {"user": 3, "item": 2, "rating": 100.0},
        {"user": 3, "item": 3, "rating": 100.0}
    ]

    dfData = scSQL.createDataFrame(listData)
    # 学習用と評価用に分類する際は以下の様に分離出来ます。
    # dfLearn, dfCheck = dfData.randomSplit([0.8, 0.2])

    mlAls = recommendation.ALS(
        maxIter=5, regParam=0.01, implicitPrefs=False,
        userCol="user", itemCol="item", ratingCol="rating"
    )
    model = mlAls.fit(dfData)

    predictions = model.transform(dfData)
    for r in predictions.collect():
        print r

    evaluator = evaluation.RegressionEvaluator(
        metricName="rmse", labelCol="rating", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)

    print "rmse = ", rmse


if __name__ =="__main__":
    main()

naive beyes

# -*- coding: utf-8 -*-
import pyspark
from pyspark.ml import classification
from pyspark.ml import evaluation
from pyspark.ml import linalg

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample4")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    # ラベルは0からの連番で設定
    listData = [
        {"label":  0, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 0.0, 0.0, 5.0])},
        {"label":  1, "features": linalg.Vectors.dense([1.0, 1.0, 0.0, 0.0, 0.0, 5.0])},
        {"label":  2, "features": linalg.Vectors.dense([1.0, 0.0, 1.0, 0.0, 0.0, 5.0])},
        {"label":  3, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 1.0, 0.0, 5.0])},
        {"label":  4, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 0.0, 1.0, 5.0])},
    ]

    dfData = scSQL.createDataFrame(listData)
    # 学習用と評価用に分類する際は以下の様に分離出来ます。
    # dfLearn, dfCheck = dfData.randomSplit([0.8, 0.2])

    mlNBeyes = classification.NaiveBayes(smoothing=1.0, modelType="multinomial")
    model = mlNBeyes.fit(dfData)

    predictions = model.transform(dfData)
    for r in predictions.collect():
        print r

    evaluator = evaluation.RegressionEvaluator(
        metricName="rmse", labelCol="label", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)

    print "rmse = ", rmse


if __name__ =="__main__":
    main()

 

Apache Spark 2.1.0(4)

SparkはCluster環境でこそ本領を発揮するのですが、そういった環境はなかなか用意するのが手間なので、試しにDocker化してみました。

雰囲気ぐらいは確認出来るかと…

Dockerfileはsingularities/sparkを手本にしています。

FROM ubuntu:16.04
MAINTAINER MizunagiKB

RUN apt-get update && ¥
apt-get install -yq --no-install-recommends wget && ¥
apt-get install -yq --no-install-recommends nfs-common && ¥
apt-get install -yq --no-install-recommends python2.7 && ¥
apt-get install -yq --no-install-recommends openjdk-8-jre && ¥
apt-get clean && ¥
rm -rf /tmp/* && rm -rf /var/tmp/*

RUN mkdir /opt/spark
ADD spark-2.1.0-bin-hadoop2.7.tgz /opt/spark

COPY spark-run.sh /

ENV PATH=$PATH:/opt/spark/spark-2.1.0-bin-hadoop2.7/bin
EXPOSE 4040 6066 7077 8080 8081

VOLUME ["/tmp", "/opt/spark/spark-2.1.0/logs"]

CMD ["/bin/bash"]

.

#!/bin/bash

SPARK_HOME=/opt/spark/spark-2.1.0-bin-hadoop2.7
CLASS_MASTER="org.apache.spark.deploy.master.Master"
CLASS_WORKER="org.apache.spark.deploy.worker.Worker"

. "$SPARK_HOME/sbin/spark-config.sh"
. "$SPARK_HOME/bin/load-spark-env.sh"

case "$1" in
  master)
    spark-class $CLASS_MASTER --webui-port 8080 --host `hostname` --port 7077
  ;;
  worker)
    spark-class $CLASS_WORKER --webui-port 8081 spark://$MASTER_PORT_7077_TCP_ADDR:7077
  ;;
esac

masterとworkerをそれぞれの実行はこんな感じで。

# master
$ docker run -h spark-master ¥
--name spark-master -i -t --rm ¥
mizunagi/spark spark-run.sh master

# worker
$ docker run -h spark-worker ¥
--name spark-worker -i -t --rm ¥
--link spark-master:master ¥
mizunagi/spark spark-run.sh worker

 

Apache Spark 2.1.0(3)

spark-submit時にスクリプトにコマンドライン引数を渡す方法。

sys.argvに普通に入っていますので、argparseモジュールを使用したり出来ます。

# -*- coding: utf-8 -*-
import sys
import argparse
import pyspark

def main():

    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--ivalue", type=int, dest="I_VALUE")
    parser.add_argument("-s", "--svalue", type=str, dest="S_VALUE")

    params = parser.parse_args(sys.argv[1:])

    print params.I_VALUE, params.S_VALUE

if __name__ == "__main__":
    main()

Pythonでの処理方法と同じです。

$ spark-submit {program} -i 1234 -s abcd
1234, abcd

 

Apache Spark 2.1.0(2)

前回のデータを使用して、もう少しそれっぽい物を。

# -*- coding: utf-8 -*-
import pyspark
import json

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample2")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)
    dfCards.cache()

    dfCards_cost = dfCards.groupBy(["charaname", "charagroup"]).agg(
        pyspark.sql.functions.min("cost").alias("min_cost"),
        pyspark.sql.functions.max("cost").alias("max_cost")
    )
    print dfCards_cost

    dfCards_rank_win = pyspark.sql.Window.partitionBy("charaname", "charagroup").orderBy(pyspark.sql.functions.desc("idx"))
    dfCards_rank = dfCards.select(
        dfCards.charagroup,
        dfCards.charaname,
        dfCards.dress.alias("rank_dress"),
        pyspark.sql.functions.rank().over(dfCards_rank_win).alias("idx_order")
    ).filter("idx_order = 1")
    print dfCards_rank

    dfJoin = dfCards_cost.join(
        dfCards_rank,
        (dfCards_cost.charaname == dfCards_rank.charaname) & (dfCards_cost.charagroup == dfCards_rank.charagroup),
        "left"
    ).select(
        dfCards_cost.charagroup,
        dfCards_cost.charaname,
        dfCards_cost.min_cost,
        dfCards_cost.max_cost,
        dfCards_rank.rank_dress
    ).orderBy(dfCards_cost.charagroup, dfCards_cost.charaname)
    print dfJoin

    for card in dfJoin.collect():
        print card.charagroup.encode("utf-8"), card.charaname.encode("utf-8"), card.min_cost, card.max_cost, card.rank_dress.encode("utf-8")

    dfJoin.write.csv(
        "file:///tmp/dfJoin.csv",
        "overwrite"
    )

if __name__ == "__main__":
    main()

やっている事自体は何も中身がないのですけど。

SQLの一つ下の階層で記述している気分を味わえます。

SparkからのAmazon S3アクセス

s3へのアクセス方法にはs3, s3a, s3nの三種類が存在していて、それぞれ

  • s3 … s3をブロックデバイスとしてアクセス。
  • s3a … Amazon Web Servicesのライブラリ経由のアクセス。
  • s3n … 独自実装によるアクセス。

大抵はs3aが使用できれば事足りる様な気がしています。

sc = pyspark.SparkContext()
confHadoop = sc._jsc.hadoopConfiguration()
confHadoop.set("fs.s3a.access.key", {access key})
confHadoop.set("fs.s3a.secret.key", {secret key})
# S3互換APIのストレージを利用したい場合は、ここでアドレスを個別に設定
# ex) http://192.168.0.1:9000/
# confHadoop.set("fs.s3a.endpoint", {end point})
confHadoop.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

rawCsv = sc.textFile("s3a://{bucket}/*.csv")

実行する時にpackagesを指定。

$ spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.3 sample1.py

 

Apache Spark 2.1.0(1)

今更ながらSparkを使い始めてみました。

環境はubuntu 16.04.2 LTS, macOS Sierra 10.12.3を使用しています。

Sparkのインストール

ダウンロードはウェブサイトからビルド済みイメージを取得して、任意の場所に展開するだけです。

環境にあわせたものを用意する必要がありますが、何もないところに導入する場合であれば、特に何も考えずにspark-2.1.0-bin-hadoop2.7.tgzを選択します。

# 場所はどこでも良いのですが、ここでは/opt/sparkにインストールしています。
$ sudo mkdir /opt/spark
$ sudo chown {user} /opt/spark
$ tar -zvxf spark-2.1.0-bin-hadoop2.7.tgz -C /opt/spark

sparkの動作にはjava(とPython)が必要なのでインストールします。

$ sudo apt-get install openjdk-8-jre
$ sudo apt-get install python2.7

これでひとまず使用可能になります。

spark-shell

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:08:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:08:08 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:08:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:08:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.31.142:4040
Spark context available as 'sc' (master = local[*], app id = local-1488686889013).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

pyspark

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:09:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:09:44 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:09:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:09:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> 

試験動作

動いているかわからない為、適当なデータを使って検索して見ます。

試しに魔法少女リリカルなのはINNOCENTのカード一覧(抜粋)を用意して見ました。

nanohain_cards.zip

idx, cost, charaname, charagroup, dressがjson形式で格納されています。

以下の形式で保存されています。(実際には1レコード1行)

{
    "idx": "CD_01_0001",
    "cost": 3,
    "cardname": "海聖小学校生徒",
    "dress": "スクール", 
    "charaname": "高町なのは",
    "charagroup": "ミッドチルダ"
}

カード枚数とキャラクター毎の枚数を計算

# -*- coding: utf-8 -*-
import pyspark
import json
import csv

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample1")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)

    print dfCards.count()

    dfChara = dfCards.groupBy("charaname").count()
    for card in dfChara.collect():
        print card["charaname"].encode("utf-8"), card["count"]

if __name__ == "__main__":
    main()

集計結果が画面に表示されるはず。

ファイルに書き出したい場合は二種類の方法があります。

Spark内蔵機能による出力

    dfChara.write.csv("file:///tmp/dfChara.csv")

この方法が本来の使用方法となります。出力先もローカル以外にhdfsやAmazon S3を指定することが出来ます。

欠点というわけではありませんが、このまま実行すると分散処理の都合上複数のファイル(標準設定だと200)が出力されてしまいます。ひとつにまとめたい場合はrepatitionを使用する事で分割数を抑制することが可能です。

ただ、データが一箇所に偏ってしまうと、せっかくの分散処理が活用出来ませんので注意が必要です。

Pythonモジュールによる出力

    with open("/tmp/dfChara.csv", "wb") as fileWrite:
        csvWrite = csv.DictWriter(fileWrite, dfChara.columns)
        csvWrite.writeheader()
        for card in dfChara.collect():
            csvWrite.writerow(
                {
                    "charaname": card["charaname"].encode("utf-8"),
                    "count": card["count"]
                }
            )

Pythonモジュールを使用した方法だとこちら。

collectを呼び出すと、分散処理内容がmaster側に集約され、リストとして扱えますので、あとは好きなように処理が可能です。

ただし巨大なデータを扱っている場合、masterのメモリに収まりきらない場合があります。

Sparkクラスタの構成

Sparkでクラスタを構成するには三種類の方法があります。

  • Sparkだけで構成する方法
  • Yarnを使用する方法
  • Mesosを使用する方法

クラスタ内のPCリソースを100%占有してよいのであれば、Sparkだけで構成する方法が最も容易です。

Sparkのみでクラスタを構成

クラスタを構成するにはSparkをインストールしてmasterとslaveの役割を与えるだけです。

# masterの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh
# slaveの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slave.sh -h spark://{master}:{port}

複数台をまとめて実行させたい場合は、

まず、/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/spark-env.shを作成し、

  • SPARK_MASTER_HOST
  • SPARK_MASTER_PORT

を設定します。

次に/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/slavesを作成し、ホスト名称、またはIPアドレスを列挙しておきます。

正しく設定されていれば以下のコマンドでslave全てを開始できます。

$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slaves.sh

やっていることは、slavesに記載されているサーバーにsshでログインしてstart-slaveをしているだけなので、この方法を使用する場合はssh実行時にパスワードを不要(具体的には公開鍵による認証)にしておく必要があります。