今更ながらSparkを使い始めてみました。
環境はubuntu 16.04.2 LTS, macOS Sierra 10.12.3を使用しています。
Sparkのインストール
ダウンロードはウェブサイトからビルド済みイメージを取得して、任意の場所に展開するだけです。
環境にあわせたものを用意する必要がありますが、何もないところに導入する場合であれば、特に何も考えずにspark-2.1.0-bin-hadoop2.7.tgzを選択します。
# 場所はどこでも良いのですが、ここでは/opt/sparkにインストールしています。 $ sudo mkdir /opt/spark $ sudo chown {user} /opt/spark $ tar -zvxf spark-2.1.0-bin-hadoop2.7.tgz -C /opt/spark
sparkの動作にはjava(とPython)が必要なのでインストールします。
$ sudo apt-get install openjdk-8-jre $ sudo apt-get install python2.7
これでひとまず使用可能になります。
spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/03/05 13:08:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/05 13:08:08 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32) 17/03/05 13:08:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/03/05 13:08:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Spark context Web UI available at http://192.168.31.142:4040 Spark context available as 'sc' (master = local[*], app id = local-1488686889013). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121) Type in expressions to have them evaluated. Type :help for more information. scala>
pyspark
Python 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/03/05 13:09:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/03/05 13:09:44 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32) 17/03/05 13:09:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 17/03/05 13:09:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.1.0 /_/ Using Python version 2.7.12 (default, Nov 19 2016 06:48:10) SparkSession available as 'spark'. >>>
試験動作
動いているかわからない為、適当なデータを使って検索して見ます。
試しに魔法少女リリカルなのはINNOCENTのカード一覧(抜粋)を用意して見ました。
idx, cost, charaname, charagroup, dressがjson形式で格納されています。
以下の形式で保存されています。(実際には1レコード1行)
{ "idx": "CD_01_0001", "cost": 3, "cardname": "海聖小学校生徒", "dress": "スクール", "charaname": "高町なのは", "charagroup": "ミッドチルダ" }
カード枚数とキャラクター毎の枚数を計算
# -*- coding: utf-8 -*- import pyspark import json import csv def main(): conf = pyspark.SparkConf() conf.setAppName("nanohain sample1") sc = pyspark.SparkContext(conf=conf) sc.setLogLevel("ERROR") scSQL = pyspark.sql.SQLContext(sc) rawCards = sc.textFile("file:///tmp/nanohain_cards.json") rddCards = rawCards.map(lambda r: json.loads(r)) dfCards = scSQL.createDataFrame(rddCards) print dfCards.count() dfChara = dfCards.groupBy("charaname").count() for card in dfChara.collect(): print card["charaname"].encode("utf-8"), card["count"] if __name__ == "__main__": main()
集計結果が画面に表示されるはず。
ファイルに書き出したい場合は二種類の方法があります。
Spark内蔵機能による出力
dfChara.write.csv("file:///tmp/dfChara.csv")
この方法が本来の使用方法となります。出力先もローカル以外にhdfsやAmazon S3を指定することが出来ます。
欠点というわけではありませんが、このまま実行すると分散処理の都合上複数のファイル(標準設定だと200)が出力されてしまいます。ひとつにまとめたい場合はrepatitionを使用する事で分割数を抑制することが可能です。
ただ、データが一箇所に偏ってしまうと、せっかくの分散処理が活用出来ませんので注意が必要です。
Pythonモジュールによる出力
with open("/tmp/dfChara.csv", "wb") as fileWrite: csvWrite = csv.DictWriter(fileWrite, dfChara.columns) csvWrite.writeheader() for card in dfChara.collect(): csvWrite.writerow( { "charaname": card["charaname"].encode("utf-8"), "count": card["count"] } )
Pythonモジュールを使用した方法だとこちら。
collectを呼び出すと、分散処理内容がmaster側に集約され、リストとして扱えますので、あとは好きなように処理が可能です。
ただし巨大なデータを扱っている場合、masterのメモリに収まりきらない場合があります。
Sparkクラスタの構成
Sparkでクラスタを構成するには三種類の方法があります。
- Sparkだけで構成する方法
- Yarnを使用する方法
- Mesosを使用する方法
クラスタ内のPCリソースを100%占有してよいのであれば、Sparkだけで構成する方法が最も容易です。
Sparkのみでクラスタを構成
クラスタを構成するにはSparkをインストールしてmasterとslaveの役割を与えるだけです。
# masterの役割を与えるPC上で実行 $ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh # slaveの役割を与えるPC上で実行 $ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slave.sh -h spark://{master}:{port}
複数台をまとめて実行させたい場合は、
まず、/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/spark-env.shを作成し、
- SPARK_MASTER_HOST
- SPARK_MASTER_PORT
を設定します。
次に/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/slavesを作成し、ホスト名称、またはIPアドレスを列挙しておきます。
正しく設定されていれば以下のコマンドでslave全てを開始できます。
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slaves.sh
やっていることは、slavesに記載されているサーバーにsshでログインしてstart-slaveをしているだけなので、この方法を使用する場合はssh実行時にパスワードを不要(具体的には公開鍵による認証)にしておく必要があります。