Ubuntu 16.04 LTSにCSPをインストール

ちょいとwineに関する話題が出たのでUbuntu 16.04 LTS上にCLIP STUDIO PAINTをインストールに挑戦してみました。

wine上での動作は対象外・ライセンスの許可範囲外となります。利用に関しては自己責任でお願いします。

結論を先に書くと、セットアップは完了するのですが筆跡が何も描画されないという困った状況でした。WineHQには動作報告があるようなのですがVMWare Fusion上で動かしているのが原因かもしれません。

wineの導入

Ubuntu自体のリポジトリにもwine 1.6が含まれているのですが、ここではwinehqからstableをインストールします。

# If your system is 64 bit, enable 32 bit architecture (if you haven't already):
$ sudo dpkg --add-architecture i386

# Add the repository:
$ wget -nc https://dl.winehq.org/wine-builds/Release.key
$ sudo apt-key add Release.key
$ sudo apt-add-repository https://dl.winehq.org/wine-builds/ubuntu/

# Update packages:
$ sudo apt-get update

# Install
$ sudo apt-get install --install-recommends winehq-stable

英語版のCLIP STUDIO PAINTをダウンロード

CLIP STUDIO PAINTの日本語版を使用すると、日本語で表示されている箇所が正常に表示されません。

何が書いてあるか判別出来ないと「OK」/「Cancel」のボタンを押すことすらままなりませんので、ここでは英語版のCLIP STUDIO PAINTをインストールします。

exeを実行するには以下の様に指定します。

$ wine {{ filename }}

インストールが正しく完了すると、デスクトップ上にアイコンが表示されますので、そこから実行可能になります。

と…インストールは出来たものの問題が

見た目は正常に起動している様にみえるのですが、手元の環境では描画する事が出来ませんでした。VMWare Fusionで試している影響かと思い、open-vm-toolsを追加で導入してみたり、OpenGL Supportを切ってみたりしましたが特に何も変わらず…

Cephをセットアップしてみる

Ceph

名前しか知らなかったのですが、まずはインストールしてみる事にしました。

.

導入方法について(環境はVMWare Fusion上のUbuntu 16.04 LTEを使用)

簡単なインストール方法はCephのドキュメントページに記載されているのですが、手順通りに行うと本当に”インストールだけ”しかしません。

単なるソフトウェアであればインストール後にどうするか考えれば良いですが、ハードウェアやネットワーク構成とセットになっている場合は、ある程度見積もりや構成を考えておく方が良かったり。

Cephを動作させる為に必要なサービスは以下の二つ。

Monitor Daemon(mon)

他のMonitorやObject Storage Daemonと通信し、死活監視や対象データの所在を管理する。複数立ち上げて冗長性を持たせる事が可能。

Object Storage Daemon(osd)

ローカルにあるストレージを管理し、実際のデータ送受信を行うサービス。物理的なストレージにつき一つ必要となる。データとジャーナル情報を個別に指定する事も可能。

インストール前に意識しておくべき点としては、まず物理ディスクをどの様に接続して管理するかという事になります。

実験としては小さな構成として以下の様にしてみました。

  • ceph-mon  (172.16.2.32)
    • cephの監視用
  • ceph-osd01 (172.16.2.33)
    • /dev/sdb1を/var/local/volume_01にマウント
  • ceph-osd02 (172.16.2.34)
    • /dev/sdb1を/var/local/volume_01にマウント

インストール方法

モニターの初期化

基本はCeph Document > Installation (Quick)の通りに行います。

事前にceph-mon上で、SELinuxの無効化・iptablesのポート解放をしておきます。

ceph-deploy installを使用する場合は、Python環境も必要となります。

更に、それぞれのサーバーにceph管理者(ここではceph-master)を作成しておく必要があります。

(注意・Ubuntu 16.04 LTEのリポジトリから入手出来るcephはクラスター指定が出来ずceph固定となる様です)

# Monitor Daemon, Object Storage Daemonで実行
$ sudo apt-get install openssh-server
$ sudo apt-get install ntp
$ sudo apt-get install ceph

# Monitorにceph管理者でログインし環境設定
$ sudo apt-get install ceph-deploy
$ mkdir ceph-cluster1
$ cd ceph-cluster1
$ ssh-keygen
$ ssh-copy-id ceph-admin@ceph-mon
$ ssh-copy-id ceph-admin@ceph-osd01
$ ssh-copy-id ceph-admin@ceph-osd02

$ ceph-deploy new ceph-mon

カレントフォルダにceph.confが生成されるので、以下の内容を追加(目的とネットワーク環境で記述内容は異なります)

osd_pool_default_size = 2
public_network = 172.16.0.0/16

記述したら、それぞれの環境にcephのインストールを行います。

$ ceph-deploy install ceph-mon ceph-osd01 ceph-osd02
$ ceph-deploy mon create-initial

ストレージの追加

$ ceph-deploy osd prepare ceph-osd01:/var/local/volume_01
$ ceph-deploy osd prepare ceph-osd02:/var/local/volume_01

$ ceph-deploy osd activate ceph-osd01:/var/local/volume_01 ceph-osd02:/var/local/volume_01

$ ceph-deploy admin ceph-mon ceph-osd01 ceph-osd02
$ sudo chmod +r /etc/ceph/ceph.client.admin.keyring

$ ceph health
# HEALTH_OKが表示されていれば設定完了

ブロックデバイスの使用

# イメージを作成
$ rbd create test-image --size 1024
# ブロックデバイスとしてマウント
$ sudo rbd map test-image

基本はこれだけですが、環境によって/etc/ceph/ceph.confに以下の設定が必要となります。

rbd_default_features = 3

.

分散ファイルシステム

思うところあって、分散ファイルシステムについて色々調べて見ました。

色々と言いながら、調べたのは以下の三種類だけですけど…

GlusterFS

マスターサーバーが不要な分散ファイルシステム。

RAIDのような考え方でファイルが管理・配置される。

Ceph

Monitoring Daemon(MON)とObject Storage Daemon(OSD)の設置が必要。

管理対象のストレージに多彩なアクセス手段を提供する。

オブジェクトストレージとして使用する場合は、Amazon S3互換のAPIを使用することも可能。

Lustre

Meta Data Server(MDS)とObject Storage Service(OSS)の設置が必要。

HPC環境での実績があるシステム。

パッケージの取得はintelのHigh Performance Data Divisionから。なお、Ubuntu用のパッケージはprecise(12.04LTS)までしか用意されていない。

 

.

種類を列挙しようとすると他にもあるけど、何となく良さそう(Redhatが持っている、HPC環境で実績あり)に思えたのは上記の三種類。

それにしてもDFSって情報が少ない…

Apache Spark 2.1.0(7)

ファイル監視によるSpark Stream。

ストリーム生成側

以下の例では#NANOHAが含まれるツイートを取集。

(#NANOHAでツイートが拾えない場合は適当なキーワードに変更してください。)

ファイルを使用するストリームでは、Spark側が監視しているフォルダに直接書き込まず、一旦別の場所に作成してから監視下のフォルダに移動させる必要があります。

# -*- coding: utf-8 -*-
import datetime
import json
import os
import shutil

import twitter

CONSUMER_KEY = "****"
CONSUMER_SECRET = "****"
ACCESS_TOKEN = "****"
ACCESS_TOKEN_SECRET = "****"

SRC_DIR = "/tmp/work"
DST_DIR = "/tmp/stream"

def start_twitter_stream():

    oCApi = twitter.Api(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

    return oCApi.GetStreamFilter(track=["#NANOHA"])

def main():

    oCStream = start_twitter_stream()

    dateCurr = datetime.datetime.now()

    while True:
        dateNext = dateCurr + datetime.timedelta(minutes=1)
        filenameJson = dateCurr.strftime("tweet-%Y%m%d-%H%M%S.json")
        with open(os.path.join(SRC_DIR, filenameJson), "wb") as h:
            nWriteCount = 0
            for dictTweet in oCStream:
                #print json.dumps(dictTweet)
                #break
                dictData = {
                    "lang": dictTweet["lang"],
                    "id_str": dictTweet["id_str"],
                    "text": dictTweet["text"]
                }
                h.write(json.dumps(dictData) + "\n")
                nWriteCount += 1

                dateCurr = datetime.datetime.now()

                if dateCurr > dateNext:
                    shutil.move(
                        os.path.join(SRC_DIR, filenameJson),
                        os.path.join(DST_DIR, filenameJson)
                    )
                    print "%8d tweet saved." % nWriteCount
                    break

if __name__ == "__main__":
    main()

ストリーム取得側

フォルダに格納されたjsonファイルを読み込んで、メモリ内にaggtableを生成・SparkSQLでデータフレームを生成。

# -*- coding: utf-8 -*-
import pyspark
import datetime
import time

CHECKPOINT_DIR = "/tmp/checkpoint"
STREAM_DIR = "/tmp/stream"

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample6")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)
    #scSession.conf.set("spark.sql.streaming.checkpointLocation", CHECKPOINT_DIR)

    schemaJson = pyspark.sql.dataframe.StructType().add("lang", "string").add("id_str", "string").add("text", "string")
    dsJson = scSession.readStream.schema(schemaJson).format("json").load(STREAM_DIR)

    dfJson = dsJson.select(
        pyspark.sql.functions.decode(dsJson.text, "UTF-8").alias("text")
    )

    idCount = dfJson.groupBy("text").count().orderBy(pyspark.sql.functions.desc("count"))

    idCount.writeStream.outputMode("complete").queryName("aggtable").format("memory").start()

    while True:
        print "-------- %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),)
        for r in scSession.sql("select text, count from aggtable order by count desc limit 10").collect():
            try:
                print "%08d | " % (r["count"],), r["text"].encode("utf-8").split("\n")[0].strip()
            except AttributeError:
                print r
        time.sleep(5)

if __name__ == "__main__":
    main()

Spark Streamを使用する場合は、生成したデータストリームをstartで開始させておくと、Spark側がメモリ内テーブルを更新してくれます。

Apache Spark 2.1.0(6)

Spark Streaming処理のデモ

ストリーム生成側

# -*- coding: utf-8 -*-
import random
import socket
import time

def main():

    socketServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socketServer.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    socketServer.bind(("127.0.0.1", 8000))
    socketServer.listen(1)

    sockRemote, addrRemote = socketServer.accept()
    print "connection", addrRemote

    listName = ["なのは", "フェイト", "はやて"]

    while True:
        name = listName[random.randint(0, 2)]
        sockRemote.send(name + "\n")
        time.sleep(1)


if __name__ == "__main__":
    main()

ランダムで「なのは」「フェイト」「はやて」の文字列を戻し続けるだけのサーバープログラム。

Apache Streamは改行コードまでが取り込み単位なので、データ終端として改行コードが必要です。

ストリーム取得(Spark Streaming)側

# -*- coding: utf-8 -*-
import pyspark
from pyspark.sql import functions

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain Sample5")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSession = pyspark.sql.SparkSession(sc)

    conf = scSession.readStream.format("socket")
    conf.option("host", "127.0.0.1")
    conf.option("port", 8000)
    dsName = conf.load()

    dfName = dsName.select(
        dsName.value.alias("name")
    )
    print dfName

    nameCount = dfName.groupBy("name").count().orderBy(pyspark.sql.functions.desc("count"))

    query = nameCount.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()


if __name__ == "__main__":
    main()

実行結果

sample_5_server.py を実行してからsample5_clinet.pyをSparkで実行すると以下の様な表示がコンソールに出力されます。

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|name|count|
+----+-----+
| はやて|    6|
|フェイト|    3|
| なのは|    1|
+----+-----+

単純にカウントしたものを表示させているだけです。

Apache Spark 2.1.0(5)

やる気の感じられないサンプルなのは、動作メモのつもりで作成しているため…

recommendation

# -*- coding: utf-8 -*-
import pyspark
from pyspark.ml import recommendation
from pyspark.ml import evaluation

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample3")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    listData = [
        {"user": 1, "item": 1, "rating":  90.5},
        {"user": 1, "item": 2, "rating":  90.5},
        {"user": 1, "item": 3, "rating":  90.5},
        {"user": 2, "item": 1, "rating":  50.0},
        {"user": 2, "item": 2, "rating":  50.0},
        {"user": 2, "item": 3, "rating":  50.0},
        {"user": 3, "item": 1, "rating":  80.0},
        {"user": 3, "item": 2, "rating": 100.0},
        {"user": 3, "item": 3, "rating": 100.0}
    ]

    dfData = scSQL.createDataFrame(listData)
    # 学習用と評価用に分類する際は以下の様に分離出来ます。
    # dfLearn, dfCheck = dfData.randomSplit([0.8, 0.2])

    mlAls = recommendation.ALS(
        maxIter=5, regParam=0.01, implicitPrefs=False,
        userCol="user", itemCol="item", ratingCol="rating"
    )
    model = mlAls.fit(dfData)

    predictions = model.transform(dfData)
    for r in predictions.collect():
        print r

    evaluator = evaluation.RegressionEvaluator(
        metricName="rmse", labelCol="rating", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)

    print "rmse = ", rmse


if __name__ =="__main__":
    main()

naive beyes

# -*- coding: utf-8 -*-
import pyspark
from pyspark.ml import classification
from pyspark.ml import evaluation
from pyspark.ml import linalg

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample4")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    # ラベルは0からの連番で設定
    listData = [
        {"label":  0, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 0.0, 0.0, 5.0])},
        {"label":  1, "features": linalg.Vectors.dense([1.0, 1.0, 0.0, 0.0, 0.0, 5.0])},
        {"label":  2, "features": linalg.Vectors.dense([1.0, 0.0, 1.0, 0.0, 0.0, 5.0])},
        {"label":  3, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 1.0, 0.0, 5.0])},
        {"label":  4, "features": linalg.Vectors.dense([1.0, 0.0, 0.0, 0.0, 1.0, 5.0])},
    ]

    dfData = scSQL.createDataFrame(listData)
    # 学習用と評価用に分類する際は以下の様に分離出来ます。
    # dfLearn, dfCheck = dfData.randomSplit([0.8, 0.2])

    mlNBeyes = classification.NaiveBayes(smoothing=1.0, modelType="multinomial")
    model = mlNBeyes.fit(dfData)

    predictions = model.transform(dfData)
    for r in predictions.collect():
        print r

    evaluator = evaluation.RegressionEvaluator(
        metricName="rmse", labelCol="label", predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)

    print "rmse = ", rmse


if __name__ =="__main__":
    main()

 

Apache Spark 2.1.0(4)

SparkはCluster環境でこそ本領を発揮するのですが、そういった環境はなかなか用意するのが手間なので、試しにDocker化してみました。

雰囲気ぐらいは確認出来るかと…

Dockerfileはsingularities/sparkを手本にしています。

FROM ubuntu:16.04
MAINTAINER MizunagiKB

RUN apt-get update && ¥
apt-get install -yq --no-install-recommends wget && ¥
apt-get install -yq --no-install-recommends nfs-common && ¥
apt-get install -yq --no-install-recommends python2.7 && ¥
apt-get install -yq --no-install-recommends openjdk-8-jre && ¥
apt-get clean && ¥
rm -rf /tmp/* && rm -rf /var/tmp/*

RUN mkdir /opt/spark
ADD spark-2.1.0-bin-hadoop2.7.tgz /opt/spark

COPY spark-run.sh /

ENV PATH=$PATH:/opt/spark/spark-2.1.0-bin-hadoop2.7/bin
EXPOSE 4040 6066 7077 8080 8081

VOLUME ["/tmp", "/opt/spark/spark-2.1.0/logs"]

CMD ["/bin/bash"]

.

#!/bin/bash

SPARK_HOME=/opt/spark/spark-2.1.0-bin-hadoop2.7
CLASS_MASTER="org.apache.spark.deploy.master.Master"
CLASS_WORKER="org.apache.spark.deploy.worker.Worker"

. "$SPARK_HOME/sbin/spark-config.sh"
. "$SPARK_HOME/bin/load-spark-env.sh"

case "$1" in
  master)
    spark-class $CLASS_MASTER --webui-port 8080 --host `hostname` --port 7077
  ;;
  worker)
    spark-class $CLASS_WORKER --webui-port 8081 spark://$MASTER_PORT_7077_TCP_ADDR:7077
  ;;
esac

masterとworkerをそれぞれの実行はこんな感じで。

# master
$ docker run -h spark-master ¥
--name spark-master -i -t --rm ¥
mizunagi/spark spark-run.sh master

# worker
$ docker run -h spark-worker ¥
--name spark-worker -i -t --rm ¥
--link spark-master:master ¥
mizunagi/spark spark-run.sh worker

 

Apache Spark 2.1.0(3)

spark-submit時にスクリプトにコマンドライン引数を渡す方法。

sys.argvに普通に入っていますので、argparseモジュールを使用したり出来ます。

# -*- coding: utf-8 -*-
import sys
import argparse
import pyspark

def main():

    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--ivalue", type=int, dest="I_VALUE")
    parser.add_argument("-s", "--svalue", type=str, dest="S_VALUE")

    params = parser.parse_args(sys.argv[1:])

    print params.I_VALUE, params.S_VALUE

if __name__ == "__main__":
    main()

Pythonでの処理方法と同じです。

$ spark-submit {program} -i 1234 -s abcd
1234, abcd

 

Apache Spark 2.1.0(2)

前回のデータを使用して、もう少しそれっぽい物を。

# -*- coding: utf-8 -*-
import pyspark
import json

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample2")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)
    dfCards.cache()

    dfCards_cost = dfCards.groupBy(["charaname", "charagroup"]).agg(
        pyspark.sql.functions.min("cost").alias("min_cost"),
        pyspark.sql.functions.max("cost").alias("max_cost")
    )
    print dfCards_cost

    dfCards_rank_win = pyspark.sql.Window.partitionBy("charaname", "charagroup").orderBy(pyspark.sql.functions.desc("idx"))
    dfCards_rank = dfCards.select(
        dfCards.charagroup,
        dfCards.charaname,
        dfCards.dress.alias("rank_dress"),
        pyspark.sql.functions.rank().over(dfCards_rank_win).alias("idx_order")
    ).filter("idx_order = 1")
    print dfCards_rank

    dfJoin = dfCards_cost.join(
        dfCards_rank,
        (dfCards_cost.charaname == dfCards_rank.charaname) & (dfCards_cost.charagroup == dfCards_rank.charagroup),
        "left"
    ).select(
        dfCards_cost.charagroup,
        dfCards_cost.charaname,
        dfCards_cost.min_cost,
        dfCards_cost.max_cost,
        dfCards_rank.rank_dress
    ).orderBy(dfCards_cost.charagroup, dfCards_cost.charaname)
    print dfJoin

    for card in dfJoin.collect():
        print card.charagroup.encode("utf-8"), card.charaname.encode("utf-8"), card.min_cost, card.max_cost, card.rank_dress.encode("utf-8")

    dfJoin.write.csv(
        "file:///tmp/dfJoin.csv",
        "overwrite"
    )

if __name__ == "__main__":
    main()

やっている事自体は何も中身がないのですけど。

SQLの一つ下の階層で記述している気分を味わえます。

SparkからのAmazon S3アクセス

s3へのアクセス方法にはs3, s3a, s3nの三種類が存在していて、それぞれ

  • s3 … s3をブロックデバイスとしてアクセス。
  • s3a … Amazon Web Servicesのライブラリ経由のアクセス。
  • s3n … 独自実装によるアクセス。

大抵はs3aが使用できれば事足りる様な気がしています。

sc = pyspark.SparkContext()
confHadoop = sc._jsc.hadoopConfiguration()
confHadoop.set("fs.s3a.access.key", {access key})
confHadoop.set("fs.s3a.secret.key", {secret key})
# S3互換APIのストレージを利用したい場合は、ここでアドレスを個別に設定
# ex) http://192.168.0.1:9000/
# confHadoop.set("fs.s3a.endpoint", {end point})
confHadoop.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

rawCsv = sc.textFile("s3a://{bucket}/*.csv")

実行する時にpackagesを指定。

$ spark-submit --packages org.apache.hadoop:hadoop-aws:2.7.3 sample1.py

 

NFSの設定方法

Apache Sparkをクラスタモードで使用する際に、お手軽な方法なので。

内容はubuntuのサーバーガイドほぼそのまま。

サーバー側の設定

$ sudo apt-get install nfs-kernel-server

/etc/exportsに公開するフォルダとアドレスを設定

/opt/nfs_share *(rw,sync,no_root_squash)

クライアント側の設定

$ sudo apt-get install nfs-common
$ sudo mount {server}:/opt/share /mnt/nfs_share

/etc/fstabに記述する場合

{server}:/opt/nfs_share /mnt/nfs_share nfs rsize=8192,wsize=8192,timeo=14,intr

 

Apache Spark 2.1.0(1)

今更ながらSparkを使い始めてみました。

環境はubuntu 16.04.2 LTS, macOS Sierra 10.12.3を使用しています。

Sparkのインストール

ダウンロードはウェブサイトからビルド済みイメージを取得して、任意の場所に展開するだけです。

環境にあわせたものを用意する必要がありますが、何もないところに導入する場合であれば、特に何も考えずにspark-2.1.0-bin-hadoop2.7.tgzを選択します。

# 場所はどこでも良いのですが、ここでは/opt/sparkにインストールしています。
$ sudo mkdir /opt/spark
$ sudo chown {user} /opt/spark
$ tar -zvxf spark-2.1.0-bin-hadoop2.7.tgz -C /opt/spark

sparkの動作にはjava(とPython)が必要なのでインストールします。

$ sudo apt-get install openjdk-8-jre
$ sudo apt-get install python2.7

これでひとまず使用可能になります。

spark-shell

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:08:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:08:08 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:08:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:08:13 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Spark context Web UI available at http://192.168.31.142:4040
Spark context available as 'sc' (master = local[*], app id = local-1488686889013).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/
         
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

pyspark

Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/03/05 13:09:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/05 13:09:44 WARN Utils: Your hostname, spark resolves to a loopback address: 127.0.1.1; using 192.168.31.142 instead (on interface ens32)
17/03/05 13:09:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/03/05 13:09:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
>>> 

試験動作

動いているかわからない為、適当なデータを使って検索して見ます。

試しに魔法少女リリカルなのはINNOCENTのカード一覧(抜粋)を用意して見ました。

nanohain_cards.zip

idx, cost, charaname, charagroup, dressがjson形式で格納されています。

以下の形式で保存されています。(実際には1レコード1行)

{
    "idx": "CD_01_0001",
    "cost": 3,
    "cardname": "海聖小学校生徒",
    "dress": "スクール", 
    "charaname": "高町なのは",
    "charagroup": "ミッドチルダ"
}

カード枚数とキャラクター毎の枚数を計算

# -*- coding: utf-8 -*-
import pyspark
import json
import csv

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample1")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)

    print dfCards.count()

    dfChara = dfCards.groupBy("charaname").count()
    for card in dfChara.collect():
        print card["charaname"].encode("utf-8"), card["count"]

if __name__ == "__main__":
    main()

集計結果が画面に表示されるはず。

ファイルに書き出したい場合は二種類の方法があります。

Spark内蔵機能による出力

    dfChara.write.csv("file:///tmp/dfChara.csv")

この方法が本来の使用方法となります。出力先もローカル以外にhdfsやAmazon S3を指定することが出来ます。

欠点というわけではありませんが、このまま実行すると分散処理の都合上複数のファイル(標準設定だと200)が出力されてしまいます。ひとつにまとめたい場合はrepatitionを使用する事で分割数を抑制することが可能です。

ただ、データが一箇所に偏ってしまうと、せっかくの分散処理が活用出来ませんので注意が必要です。

Pythonモジュールによる出力

    with open("/tmp/dfChara.csv", "wb") as fileWrite:
        csvWrite = csv.DictWriter(fileWrite, dfChara.columns)
        csvWrite.writeheader()
        for card in dfChara.collect():
            csvWrite.writerow(
                {
                    "charaname": card["charaname"].encode("utf-8"),
                    "count": card["count"]
                }
            )

Pythonモジュールを使用した方法だとこちら。

collectを呼び出すと、分散処理内容がmaster側に集約され、リストとして扱えますので、あとは好きなように処理が可能です。

ただし巨大なデータを扱っている場合、masterのメモリに収まりきらない場合があります。

Sparkクラスタの構成

Sparkでクラスタを構成するには三種類の方法があります。

  • Sparkだけで構成する方法
  • Yarnを使用する方法
  • Mesosを使用する方法

クラスタ内のPCリソースを100%占有してよいのであれば、Sparkだけで構成する方法が最も容易です。

Sparkのみでクラスタを構成

クラスタを構成するにはSparkをインストールしてmasterとslaveの役割を与えるだけです。

# masterの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh
# slaveの役割を与えるPC上で実行
$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slave.sh -h spark://{master}:{port}

複数台をまとめて実行させたい場合は、

まず、/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/spark-env.shを作成し、

  • SPARK_MASTER_HOST
  • SPARK_MASTER_PORT

を設定します。

次に/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/slavesを作成し、ホスト名称、またはIPアドレスを列挙しておきます。

正しく設定されていれば以下のコマンドでslave全てを開始できます。

$ /opt/spark/spark-2.1.0-bin-hadoop2.7/sbin/start-slaves.sh

やっていることは、slavesに記載されているサーバーにsshでログインしてstart-slaveをしているだけなので、この方法を使用する場合はssh実行時にパスワードを不要(具体的には公開鍵による認証)にしておく必要があります。

画面解像度の追加方法

Ubuntu上でXを使用する際、目的の画面解像度が選択できない場合、以下を参考にして画面解像度を追加可能。

解像度の確認方法

# 設定可能な解像度の一覧表示
$ xrandr

解像度の追加方法

$ cvt 800 600
# 2560x1440 59.96 Hz (CVT 3.69M9) hsync: 89.52 kHz; pclk: 312.25 MHz
Modeline "2560x1440_60.00"  312.25  2560 2752 3024 3488  1440 1443 1448 1493 -hsync +vsync

$ xrandr --newmode "2560x1440_60.00"  312.25  2560 2752 3024 3488  1440 1443 1448 1493 -hsync +vsync

$ xrandr --addmode {{output}} 800x600

{{output}}の指定は、解像度の確認に出力したものを指定します。

Apache MesosをAzureに導入

Apache Mesosを試してみたくなり、Ubuntu 14.04にApache Mesos 1.0.0を導入してみました。

なお導入作業はMicrosoft Azure上で行っていますが、Ubuntu 14.04であれば他の環境でも再現可能かと思います。

Azure Virtual Machineの作成

サイズはStandard DS2 v2で作成。

Azureでは仮想マシンの作成方法に、「仮想マシン(クラシック)」と「Virtual machines」があります。

単純に仮想マシンを構築するだけの場合は、両者には違いがそれ程無いように見えますが、管理をしようと思うとかなり異なってきます。

今回はリソースマネージャによる管理を行う事にしました。

インストール作業

ウェブサイトにあるGetting Startedを素直に実行するだけですが0.28.2をビルドしている時にcgroupとlinux-toolsが不足していたような気がしたので、それらも追加インストールしておきます。

パッケージのインストール

sudo apt-get update
sudo apt-get install -y tar wget git
sudo apt-get install -y openjdk-7-jdk
sudo apt-get install -y autoconf libtool
sudo apt-get install -y build-essential
sudo apt-get install -y python-dev libcurl4-nss-dev
sudo apt-get install -y libsasl2-dev libsasl2-modules maven libapr1-dev libsvn-dev

sudo apt-get install -y libcgroup-dev cgroup-bin
sudo apt-get install -y linux-tools-common linux-tools-generic linux-tools-`uname -r`

ソースを取得して展開

cd /home/azureuser
wget http://www.apache.org/dist/mesos/1.0.0/mesos-1.0.0.tar.gz
tar -zvxf mesos-1.0.0.tar.gz

ビルドとテスト

cd mesos-1.0.0
mkdir build
cd build
../configure
make
make check

ビルド後のチェックが通っていれば完成しています。

動作テスト

# master
./bin/mesos-master.sh --ip=127.0.0.1 --work_dir=/var/lib/mesos
# agent
./bin/mesos-agent.sh --master=127.0.0.1:5050 --work_dir=/var/lib/mesos
# frameworkのテスト実行
./src/examples/python/test-framework 127.0.0.1:5050

インストール

make install

インストールをしなくても、前述した動作テストと同じ事をすればひとまず使用する事は出来ますが、インストールする事でmesosコマンドによる管理が行える様になります。

ビルド前のconfigure時にprefixを指定しなかった場合は、以下の場所にmesosが展開されます。

  • /usr/local/lib … mesosライブラリ
  • /usr/local/lib/mesos/modules … mesosが使用するライブラリ
  • /var/local/etc/mesos … mesosの設定スクリプト
  • /usr/local/sbin … mesosの実行スクリプト
  • /usr/local/share/mesos … WebUI用のファイル等

インストール後の設定

インストールしただけでは、まだ必要な設定がされていない為、環境にあわせて設定を行います。

まず/usr/local/libをライブラリ検索パスに追加します。

# 一時的に追加する場合
sudo /sbin/ldconfig /usr/local/lib

# 恒久的に追加する場合は/etc/ld.so.conf.d/libc.conf等に/usr/local/libを追加
sudo /sbin/ldconfig
mesos-master用の設定
  1. /usr/local/etc/mesos にあるmesos-master-env.sh.templateをコピーし、mesos-master-env.shを作成します。
  2. mesos-master-env.shを開き、必要な設定値を追記します。
# This file contains environment variables that are passed to mesos-master.
# To get a description of all options run mesos-master --help; any option
# supported as a command-line option is also supported as an environment
# variable.

# Some options you're likely to want to set:
# export MESOS_log_dir=/var/log/mesos

export MESOS_ip=0.0.0.0
export MESOS_work_dir=/var/lib/mesos
mesos-agent用の設定
  1. /usr/local/etc/mesos にあるmesos-agent-env.sh.templateをコピーし、mesos-agent-env.shを作成します。
  2. mesos-agent-env.shを開き、必要な設定値を追記します。
# This file contains environment variables that are passed to mesos-agent.
# To get a description of all options run mesos-agent --help; any option
# supported as a command-line option is also supported as an environment
# variable.

# You must at least set MESOS_master.

# The mesos master URL to contact. Should be host:port for
# non-ZooKeeper based masters, otherwise a zk:// or file:// URL.
#export MESOS_master=unknown-machine:5050
export MESOS_master=127.0.0.1:5050
export MESOS_work_dir=/var/lib/mesos

# Other options you're likely to want to set:
# export MESOS_log_dir=/var/log/mesos
# export MESOS_work_dir=/var/run/mesos
# export MESOS_isolation=cgroups

起動方法

起動は以下の様に行います。

# masterを起動する場合
mesos daemon.sh mesos-master

# agentを起動する場合
mesos daemon.sh masos-agent

幾つかのコマンドについてはpythonで実装されているので、PYTHONPATHにmesos関係のモジュールが配置されている場所を追加する必要があります。

Azure Resource Explorerを使用した複製

ビルドしたmesosはmasterとagentの両方ですので、起動するスクリプトでどちらにもなれます。

mesosは一台の環境内にmaster(agentを監視するサーバー)とagentを構成する事も出来ますので、mesos自体のテストをする事が出来るのですが、ここでは複数台の構成を行います。

 

複数台構成するには、個々にVirtual Machinesを立ち上げて前述したビルドを行えば良いのですが、Azure Resource Explorerを使用する事で容易に複製が可能です。

複製作業

注意:以下の作業を行うと対象のVMは起動させる事が出来なくなります。

  1. 対象のVMを停止させる。
  2. Azure Resource Explorerを開き、先程作成したVirtual Machineを探す。
  3. Azure Resource Explorerのモードを”Read/Write”に変更する。
  4. generalizeをクリック。
  5. captureに必要な項目(vhdPrefix, destinationContainerName, overwriteVhds)を記入。
  6. captureをクリック。
capture時に以下の様な項目を入力した場合、
{
    "vhdPrefix": "mesosimage",
    "destinationContainerName": "mesosvhds",
    "overwriteVhds": "true"
}
リソースマネージャ内のBLOBにvhdsと展開時に使用可能なテンプレートが出力されます。
http://{{storage account}}.blob.core.windows.net/system/Microsoft.Compute/Images/mesosvhds/mesosimage-{{元のvhds名}}

.

複製完了後に以下の手順でVirtual Machineを展開します。

  1. AzurePortalを開き、新規を選択。
  2. テンプレートのデプロイを選択。
  3. capture時に生成されたテンプレート内容を編集画面にペースト。
  4. vhd項目のuriを修正(この部分は個々のVMで異なる名称を設定する必要があります)。
  5. 保存。

保存を押すと、テンプレート内にあるparametersの記入が促されます。

テンプレートの作成前に、ネットワークインターフェスを作成しておく事をお勧めします。

可用性セットについてもここで指定が可能です。

自分の場合はリソースグループ内にはこのような構成を事前にしておきました。

  • 可用性セット
    • availability-masters
    • avaliability-agents
  • 仮想ネットワーク
    • network-mesos
    •  ネットワークセキュリティグループ
      • net-sec-master
      • net-sec-agent
    •  ネットワークインターフェース
      • netif-master01(10.0.0.8)
      • netif-master02(10.0.0.9)
      • netif-agent01
      • netif-agent02
  •  パブリックIPアドレス
    • addr-public-master
    • addr-public-master01
  • ロードバランサ
    • lb-public
    • バックエンドアドレスプール
      • lb-public-master
  • ストレージアカウント
    • storagemesos

Apache Sparkの導入

Apache sparkをmesos上で動かすにはオフィシャルからバイナリをダウンロードしてくるのが最も容易です。クラスタのリソース管理はmesosが行うため、Sparkを使うだけであればHadoopもYARNも不要です。

  • Apache SparkをオフィシャルサイトのDownloadページから取得する。(ダウンロードしてくるものがわからなければ、Pre-build for Hadoop 2.7 and latorを取得)
  • すべてのクラスタからアクセスできる場所にプログラムを配置。(httpやhdfs、Amazon s3といった共有ストレージに配置をしますが、自分はローカルのウェブサーバーにファイルを配置しました。)
  • 任意の場所にSparkを展開。

Apache Sparkを対話モードで起動

export SPARK_EXECUTOR_URI=http://127.0.0.1/spark-2.0.0-bin-hadoop2.7.tgz
export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so

cd {{spark-2.0.0 install dir}}

# spark shell(scala)
./bin/spark-shell --master=mesos://{{mesos-master address}}:5050

# spark shell(python)
./bin/pyspark --master=mesos://{{mesos-master address}}:5050

動作テスト

val hFile = sc.textFile("/var/www/html/index.html")
val filter = hFile.filter(_.contains("href"))
println(filter.count)

.

 

VisualStudio Codeをおためし

Microsoft Build 2015(Day 1)で発表されたVisualStudio Codeを早速インストールしてみました。

現在プレビュー版として公開されているものとなります。

vsc_01 atom_01

上の図は適当に間違えてエラー表示をさせています。

自分はMac上ではAtomを使用しているのでAtomのスクリーンショットを貼り付けてみました。(Atom側はTheme変更 + minimap + atom-typescriptを導入しています。)

VisualStudio Codeは”VisualStudio”という名前が付いているのですが、ポジション的にはテキストエディタとなります。Node.jsがベースな部分はAtomに似ている感じです。

現在はプレビュー版ではありますが、恐らくパッケージマネージャで機能追加が出来るようになるのではないかと思われます。

Microsoft製なら日本語も大丈夫…と思っていたのですが、残念ながら現時点では全角文字を人文字分と認識しているようです。これはAtomも同じですが、japanese-wrapを導入する事で解決しています。

vsc_02 Git管理下にあるファイルだとDiffを表示出来たり、コミットも行えるようです。

vsc_03 現時点では気の利いたUIがない設定。

デフォルト設定を参照しながら、自分の設定項目を記述していきます。

さすがに現時点ではプレビュー版ではありますが、Windows, Mac, Linux環境下で使用するテキストエディタが増えることは良いことだと思います。(自分はAtomに慣れてしまいましたけど…)

CoreOSを使用したDocker環境の作成(2)

とりあえずDockerコマンドで覚えておくこととか。

docker

pull

リポジトリからイメージをダウンロードしてくる。

create

コンテナの作成。

run

コンテナの作成と実行。

runするときに-vを指定する事で、ホスト側のボリューム(フォルダ)をマウント可能。

rm

コンテナの破棄。

rmi

イメージの破棄。

CoreOSを使用したDocker環境の作成(1)

ISOからのインストール方法

ISOファイルを使用したCoreOSのインストールの流れは、ダウンロードしたISOファイルを使用してブートした後、インストール対象のブロックデバイスに対してインストールイメージを書き込むといった感じになります。(親切な対話メニューみたいなものはありません。)

まずやること

  1. CoreOSのサイトからisoファイルをダウンロードする。
  2. isoを使用してインストール対象をブートする。

ISOファイルでbootをすると、coreユーザーでオートログインが行われます。ここでCoreOSのマニュアル通りにcoreos-install -d /dev/sda -C stable とやりたくなりますが、これだけを実行してもインストールしか行われません。

「インストールは完了したけど、ログインが出来ない」という状態になってしまいますので、必要項目を記述したcloud-config.yamlというファイルを作成します。(詳細はhttps://coreos.com/docs/cluster-management/setup/cloudinit-cloud-config/を参照。)

まずパスワードを生成(以下の例ではパスワードをcoreに設定)

openssl passwd -1
Passwod:
Verifying - Password:
$1$mTy.idEN$tZ1.w9IlEbd7cxrD0vtu21

生成されたパスワードをユーザーに設定します。

#cloud-config

users:
  - name: core
    passwd: $1$mTy.idEN$tZ1.w9IlEbd7cxrD0vtu21
  - groups:
    - sudo
    - docker

設定ファイルを作成したらyamlファイルの設定を検証します。

coreos-cloudinit -validate --from-file=cloud-config.yaml
Checking availability of "local-file"
Fetching user-data from datasource of type "local-file"

大丈夫なようです。

パスワードを直接設定する以外にも、

  • ssh-authorized-keysを設定する方法
  • coreos-ssh-import-githubによるGitHubからのimport
  • coreos-ssh-import-urlによるJSONからのimport

といった指定も可能です。

作成した設定を使用してインストールを行います。

coreos-install -d /dev/sda -C stable -c /home/core/cloud-config.yaml

完了したら再起動を行います。

再起動完了後、指定したパスワード(または方法)でログインが可能になります。

ちなみに上記の作業をsshで接続して行いたい場合は、ISOでのオートログイン後にsudo passwd core してしまえば、sshでログイン可能になります。