Apache Spark 2.1.0(1)

今更ながらSparkを使い始めてみました。

環境はubuntu 16.04.2 LTS, macOS Sierra 10.12.3を使用しています。

Sparkのインストール

ダウンロードはウェブサイトからビルド済みイメージを取得して、任意の場所に展開するだけです。

環境にあわせたものを用意する必要がありますが、何もないところに導入する場合であれば、特に何も考えずにspark-2.1.0-bin-hadoop2.7.tgzを選択します。

# 場所はどこでも良いのですが、ここでは/opt/sparkにインストールしています。
$ sudo mkdir /opt/spark
$ sudo chown {user} /opt/spark
$ tar -zvxf spark-2.1.0-bin-hadoop2.7.tgz -C /opt/spark

sparkの動作にはjava(とPython)が必要なのでインストールします。

$ sudo apt-get install openjdk-8-jre
$ sudo apt-get install python2.7

これでひとまず使用可能になります。

spark-shell

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:08:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:08:08 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:08:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:08:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.31.142:4040
Spark context available as 'sc' (master = local[*], app id = local-1488686889013).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

pyspark

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:09:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:09:44 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:09:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:09:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> 

試験動作

動いているかわからない為、適当なデータを使って検索して見ます。

試しに魔法少女リリカルなのはINNOCENTのカード一覧(抜粋)を用意して見ました。

nanohain_cards.zip

idx, cost, charaname, charagroup, dressがjson形式で格納されています。

以下の形式で保存されています。(実際には1レコード1行)

{
    "idx": "CD_01_0001",
    "cost": 3,
    "cardname": "海聖小学校生徒",
    "dress": "スクール", 
    "charaname": "高町なのは",
    "charagroup": "ミッドチルダ"
}

カード枚数とキャラクター毎の枚数を計算

# -*- coding: utf-8 -*-
import pyspark
import json
import csv

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample1")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)

    print dfCards.count()

    dfChara = dfCards.groupBy("charaname").count()
    for card in dfChara.collect():
        print card["charaname"].encode("utf-8"), card["count"]

if __name__ == "__main__":
    main()

集計結果が画面に表示されるはず。

ファイルに書き出したい場合は二種類の方法があります。

Spark内蔵機能による出力

    dfChara.write.csv("file:///tmp/dfChara.csv")

この方法が本来の使用方法となります。出力先もローカル以外にhdfsやAmazon S3を指定することが出来ます。

欠点というわけではありませんが、このまま実行すると分散処理の都合上複数のファイル(標準設定だと200)が出力されてしまいます。ひとつにまとめたい場合はrepatitionを使用する事で分割数を抑制することが可能です。

ただ、データが一箇所に偏ってしまうと、せっかくの分散処理が活用出来ませんので注意が必要です。

Pythonモジュールによる出力

    with open("/tmp/dfChara.csv", "wb") as fileWrite:
        csvWrite = csv.DictWriter(fileWrite, dfChara.columns)
        csvWrite.writeheader()
        for card in dfChara.collect():
            csvWrite.writerow(
                {
                    "charaname": card["charaname"].encode("utf-8"),
                    "count": card["count"]
                }
            )

Pythonモジュールを使用した方法だとこちら。

collectを呼び出すと、分散処理内容がmaster側に集約され、リストとして扱えますので、あとは好きなように処理が可能です。

ただし巨大なデータを扱っている場合、masterのメモリに収まりきらない場合があります。

Sparkクラスタの構成

Sparkでクラスタを構成するには三種類の方法があります。

  • Sparkだけで構成する方法
  • Yarnを使用する方法
  • Mesosを使用する方法

クラスタ内のPCリソースを100%占有してよいのであれば、Sparkだけで構成する方法が最も容易です。

Sparkのみでクラスタを構成

クラスタを構成するにはSparkをインストールしてmasterとslaveの役割を与えるだけです。

# masterの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh
# slaveの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slave.sh -h spark://{master}:{port}

複数台をまとめて実行させたい場合は、

まず、/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/spark-env.shを作成し、

  • SPARK_MASTER_HOST
  • SPARK_MASTER_PORT

を設定します。

次に/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/slavesを作成し、ホスト名称、またはIPアドレスを列挙しておきます。

正しく設定されていれば以下のコマンドでslave全てを開始できます。

$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slaves.sh

やっていることは、slavesに記載されているサーバーにsshでログインしてstart-slaveをしているだけなので、この方法を使用する場合はssh実行時にパスワードを不要(具体的には公開鍵による認証)にしておく必要があります。