Apache Spark 2.1.0(7)


ファイル監視によるSpark Stream。

ストリーム生成側

以下の例では#NANOHAが含まれるツイートを取集。

(#NANOHAでツイートが拾えない場合は適当なキーワードに変更してください。)

ファイルを使用するストリームでは、Spark側が監視しているフォルダに直接書き込まず、一旦別の場所に作成してから監視下のフォルダに移動させる必要があります。

ストリーム取得側

フォルダに格納されたjsonファイルを読み込んで、メモリ内にaggtableを生成・SparkSQLでデータフレームを生成。

Spark Streamを使用する場合は、生成したデータストリームをstartで開始させておくと、Spark側がメモリ内テーブルを更新してくれます。

Apache Spark 2.1.0(6)


Spark Streaming処理のデモ

ストリーム生成側

ランダムで「なのは」「フェイト」「はやて」の文字列を戻し続けるだけのサーバープログラム。

Apache Streamは改行コードまでが取り込み単位なので、データ終端として改行コードが必要です。

ストリーム取得(Spark Streaming)側

実行結果

sample_5_server.py を実行してからsample5_clinet.pyをSparkで実行すると以下の様な表示がコンソールに出力されます。

単純にカウントしたものを表示させているだけです。

Apache Spark 2.1.0(5)


やる気の感じられないサンプルなのは、動作メモのつもりで作成しているため…

recommendation

naive beyes

 

Apache Spark 2.1.0(4)


SparkはCluster環境でこそ本領を発揮するのですが、そういった環境はなかなか用意するのが手間なので、試しにDocker化してみました。

雰囲気ぐらいは確認出来るかと…

Dockerfileはsingularities/sparkを手本にしています。

.

masterとworkerをそれぞれの実行はこんな感じで。

 

Apache Spark 2.1.0(3)


spark-submit時にスクリプトにコマンドライン引数を渡す方法。

sys.argvに普通に入っていますので、argparseモジュールを使用したり出来ます。

Pythonでの処理方法と同じです。

 

Apache Spark 2.1.0(2)


前回のデータを使用して、もう少しそれっぽい物を。

やっている事自体は何も中身がないのですけど。

SQLの一つ下の階層で記述している気分を味わえます。

SparkからのAmazon S3アクセス


s3へのアクセス方法にはs3, s3a, s3nの三種類が存在していて、それぞれ

  • s3 … s3をブロックデバイスとしてアクセス。
  • s3a … Amazon Web Servicesのライブラリ経由のアクセス。
  • s3n … 独自実装によるアクセス。

大抵はs3aが使用できれば事足りる様な気がしています。

実行する時にpackagesを指定。

 

Apache Spark 2.1.0(1)


今更ながらSparkを使い始めてみました。

環境はubuntu 16.04.2 LTS, macOS Sierra 10.12.3を使用しています。

Sparkのインストール

ダウンロードはウェブサイトからビルド済みイメージを取得して、任意の場所に展開するだけです。

環境にあわせたものを用意する必要がありますが、何もないところに導入する場合であれば、特に何も考えずにspark-2.1.0-bin-hadoop2.7.tgzを選択します。

sparkの動作にはjava(とPython)が必要なのでインストールします。

これでひとまず使用可能になります。

spark-shell

pyspark

試験動作

動いているかわからない為、適当なデータを使って検索して見ます。

試しに魔法少女リリカルなのはINNOCENTのカード一覧(抜粋)を用意して見ました。

nanohain_cards.zip

idx, cost, charaname, charagroup, dressがjson形式で格納されています。

以下の形式で保存されています。(実際には1レコード1行)

カード枚数とキャラクター毎の枚数を計算

集計結果が画面に表示されるはず。

ファイルに書き出したい場合は二種類の方法があります。

Spark内蔵機能による出力

この方法が本来の使用方法となります。出力先もローカル以外にhdfsやAmazon S3を指定することが出来ます。

欠点というわけではありませんが、このまま実行すると分散処理の都合上複数のファイル(標準設定だと200)が出力されてしまいます。ひとつにまとめたい場合はrepatitionを使用する事で分割数を抑制することが可能です。

ただ、データが一箇所に偏ってしまうと、せっかくの分散処理が活用出来ませんので注意が必要です。

Pythonモジュールによる出力

Pythonモジュールを使用した方法だとこちら。

collectを呼び出すと、分散処理内容がmaster側に集約され、リストとして扱えますので、あとは好きなように処理が可能です。

ただし巨大なデータを扱っている場合、masterのメモリに収まりきらない場合があります。

Sparkクラスタの構成

Sparkでクラスタを構成するには三種類の方法があります。

  • Sparkだけで構成する方法
  • Yarnを使用する方法
  • Mesosを使用する方法

クラスタ内のPCリソースを100%占有してよいのであれば、Sparkだけで構成する方法が最も容易です。

Sparkのみでクラスタを構成

クラスタを構成するにはSparkをインストールしてmasterとslaveの役割を与えるだけです。

複数台をまとめて実行させたい場合は、

まず、/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/spark-env.shを作成し、

  • SPARK_MASTER_HOST
  • SPARK_MASTER_PORT

を設定します。

次に/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/slavesを作成し、ホスト名称、またはIPアドレスを列挙しておきます。

正しく設定されていれば以下のコマンドでslave全てを開始できます。

やっていることは、slavesに記載されているサーバーにsshでログインしてstart-slaveをしているだけなので、この方法を使用する場合はssh実行時にパスワードを不要(具体的には公開鍵による認証)にしておく必要があります。

このページへのリンクやツイートによる共有はご自由にどうぞ