Apache Spark 2.1.0(2)

前回のデータを使用して、もう少しそれっぽい物を。

# -*- coding: utf-8 -*-
import pyspark
import json

def main():

    conf = pyspark.SparkConf()
    conf.setAppName("nanohain sample2")
    sc = pyspark.SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    scSQL = pyspark.sql.SQLContext(sc)

    rawCards = sc.textFile("file:///tmp/nanohain_cards.json")

    rddCards = rawCards.map(lambda r: json.loads(r))
    dfCards = scSQL.createDataFrame(rddCards)
    dfCards.cache()

    dfCards_cost = dfCards.groupBy(["charaname", "charagroup"]).agg(
        pyspark.sql.functions.min("cost").alias("min_cost"),
        pyspark.sql.functions.max("cost").alias("max_cost")
    )
    print dfCards_cost

    dfCards_rank_win = pyspark.sql.Window.partitionBy("charaname", "charagroup").orderBy(pyspark.sql.functions.desc("idx"))
    dfCards_rank = dfCards.select(
        dfCards.charagroup,
        dfCards.charaname,
        dfCards.dress.alias("rank_dress"),
        pyspark.sql.functions.rank().over(dfCards_rank_win).alias("idx_order")
    ).filter("idx_order = 1")
    print dfCards_rank

    dfJoin = dfCards_cost.join(
        dfCards_rank,
        (dfCards_cost.charaname == dfCards_rank.charaname) & (dfCards_cost.charagroup == dfCards_rank.charagroup),
        "left"
    ).select(
        dfCards_cost.charagroup,
        dfCards_cost.charaname,
        dfCards_cost.min_cost,
        dfCards_cost.max_cost,
        dfCards_rank.rank_dress
    ).orderBy(dfCards_cost.charagroup, dfCards_cost.charaname)
    print dfJoin

    for card in dfJoin.collect():
        print card.charagroup.encode("utf-8"), card.charaname.encode("utf-8"), card.min_cost, card.max_cost, card.rank_dress.encode("utf-8")

    dfJoin.write.csv(
        "file:///tmp/dfJoin.csv",
        "overwrite"
    )

if __name__ == "__main__":
    main()

やっている事自体は何も中身がないのですけど。

SQLの一つ下の階層で記述している気分を味わえます。