前回のデータを使用して、もう少しそれっぽい物を。
# -*- coding: utf-8 -*- import pyspark import json def main(): conf = pyspark.SparkConf() conf.setAppName("nanohain sample2") sc = pyspark.SparkContext(conf=conf) sc.setLogLevel("ERROR") scSQL = pyspark.sql.SQLContext(sc) rawCards = sc.textFile("file:///tmp/nanohain_cards.json") rddCards = rawCards.map(lambda r: json.loads(r)) dfCards = scSQL.createDataFrame(rddCards) dfCards.cache() dfCards_cost = dfCards.groupBy(["charaname", "charagroup"]).agg( pyspark.sql.functions.min("cost").alias("min_cost"), pyspark.sql.functions.max("cost").alias("max_cost") ) print dfCards_cost dfCards_rank_win = pyspark.sql.Window.partitionBy("charaname", "charagroup").orderBy(pyspark.sql.functions.desc("idx")) dfCards_rank = dfCards.select( dfCards.charagroup, dfCards.charaname, dfCards.dress.alias("rank_dress"), pyspark.sql.functions.rank().over(dfCards_rank_win).alias("idx_order") ).filter("idx_order = 1") print dfCards_rank dfJoin = dfCards_cost.join( dfCards_rank, (dfCards_cost.charaname == dfCards_rank.charaname) & (dfCards_cost.charagroup == dfCards_rank.charagroup), "left" ).select( dfCards_cost.charagroup, dfCards_cost.charaname, dfCards_cost.min_cost, dfCards_cost.max_cost, dfCards_rank.rank_dress ).orderBy(dfCards_cost.charagroup, dfCards_cost.charaname) print dfJoin for card in dfJoin.collect(): print card.charagroup.encode("utf-8"), card.charaname.encode("utf-8"), card.min_cost, card.max_cost, card.rank_dress.encode("utf-8") dfJoin.write.csv( "file:///tmp/dfJoin.csv", "overwrite" ) if __name__ == "__main__": main()
やっている事自体は何も中身がないのですけど。
SQLの一つ下の階層で記述している気分を味わえます。