PySparkのDataframeでの累積構成比の計算に戸惑ったのですが、ググっても意外とすんなりいい感じの情報にありつけなかったので、備忘録として残しておきます。
Window関数がわかってりゃあなんてことないとのない内容だろうとは思いますが、お勉強も兼ねて。
前提
基本的にgoogle colaboratory上で動作確認をしてるのですが、ライブラリのインポートなどは基本的に以下の記事相当の環境設定をしてます。
サンプルコード
以下、本題のサンプルコードです
1. Spark Dataframeでサンプルデータを読み込む
こちらのサイトから拾ったテーブルに対して適当に名前の列を追加したデータを使って集計していきます。
とりあえずPandasのDataframeで読み込んで中身を見てみます。
# テストデータの作成 # https://deepage.net/features/pandas-groupby.html import pandas as pd import numpy as np input_data = "sample_group_wt_name.csv" pdf_input = pd.read_csv(input_data) pdf_input
name | class | sex | weight | height | time | |
---|---|---|---|---|---|---|
0 | aaa | A | F | 45 | 150 | 85 |
1 | bbb | A | M | 50 | 160 | 80 |
2 | ccc | A | F | 55 | 155 | 74 |
3 | ddd | B | M | 78 | 180 | 90 |
4 | eee | B | F | 51 | 158 | 65 |
5 | fff | B | M | 40 | 155 | 68 |
6 | ggg | C | F | 80 | 185 | 90 |
7 | hhh | C | M | 86 | 175 | 81 |
8 | iiii | C | F | 52 | 162 | 73 |
上で読み込んだデータをPySparkのDataframeに変換します
sdf_input = sqlContext.createDataFrame(pdf_input) sdf_input.show()
+----+-----+---+------+------+----+ |name|class|sex|weight|height|time| +----+-----+---+------+------+----+ | aaa| A| F| 45| 150| 85| | bbb| A| M| 50| 160| 80| | ccc| A| F| 55| 155| 74| | ddd| B| M| 78| 180| 90| | eee| B| F| 51| 158| 65| | fff| B| M| 40| 155| 68| | ggg| C| F| 80| 185| 90| | hhh| C| M| 86| 175| 81| |iiii| C| F| 52| 162| 73| +----+-----+---+------+------+----+
2. グループごとの累積構成比を求める
ここからは性別(sex)ごとに体重(weight)を降順に並び替え、累積構成比を出してみます。
まず、Window関数を定義します。
Window関数の詳細説明は省きます。ざっくりいうと、特定の行に対してテーブルの別の行の値を参照しながら処理を行う場合に利用する関数です。Window()関数の各メソッドでレコードの参照範囲を指定したインスタンスを作成し、集計時に集計関数に対してこれを渡すことで狙いの処理を実現します。
今回は集計処理では「グループごとの合計を出す」「構成比を累積して各レコードの累積構成比を出す」の集計ステップでWindow関数を利用するつもりなので、それぞれに対してWindow関数を用意します。性別ごとに体重に対して降順で累積構成比を集計するので、"sex"でのグルーピングと"weight"での降順ソートを指定します。参照の幅は「グループ内の合計処理」の場合はグループ内の最初のレコードから最後のレコードの間を指定し、「累積構成比処理」の場合はグループ内の最初のレコードから現在処理中のレコードまでを指定します。
from pyspark.sql.window import Window # グループ内の合計計算用 w_bySex_rate = (Window() .partitionBy("sex") # グループの区切りにする変数指定 .orderBy(fn.col("weight").desc()) # グループ内でソートをする場合の基準の変数指定 降順なら.desc()指定 .rowsBetween(Window().unboundedPreceding, Window().unboundedFollowing)) # グループの最初〜最後の行を演算対象にする # 累積構成比の計算用 w_bySex_cum = (Window() .partitionBy("sex") .orderBy(fn.col("weight").desc()) .rowsBetween(Window().unboundedPreceding, Window().currentRow)) # グループの最初〜現在の行を演算対象にする
で、Window関数を定義したら、これを使って実際に集計していきます。
.withColumnで以下のステップを一つづつ処理し、処理結果の列を追加していきます。
- グループ内の合計を計算
- 合計値を使って各レコードの構成比を計算
- グループ内の前レコードの構成比を合計して累積構成比を計算
# 集計 sdf_out_bySex = (sdf_input .select("name", "sex", "weight") .withColumn("total", fn.sum(fn.col("weight")).over(w_bySex_rate)) # グループ内の合計 .withColumn("rate", fn.col("weight")/fn.col("total")) # 構成比 .withColumn("rate_cum", fn.sum(fn.col("rate")).over(w_bySex_cum))) # 累積構成比 # そのままだと見づらいのでPandasのDataframeに変換して出力 pdf_out_bySex = sdf_out_bySex.toPandas() pdf_out_bySex
name | sex | weight | total | rate | rate_cum | |
---|---|---|---|---|---|---|
0 | ggg | F | 80 | 283 | 0.282686 | 0.282686 |
1 | ccc | F | 55 | 283 | 0.194346 | 0.477032 |
2 | iiii | F | 52 | 283 | 0.183746 | 0.660777 |
3 | eee | F | 51 | 283 | 0.180212 | 0.840989 |
4 | aaa | F | 45 | 283 | 0.159011 | 1 |
5 | hhh | M | 86 | 254 | 0.338583 | 0.338583 |
6 | ddd | M | 78 | 254 | 0.307087 | 0.645669 |
7 | bbb | M | 50 | 254 | 0.19685 | 0.84252 |
8 | fff | M | 40 | 254 | 0.15748 | 1 |
という感じで、性別ごとの累積構成比が出ます。
3. 全体の累積構成比を求める
上の例では性別ごとのグループで累積構成比を求めましたが、特にグルーピングせずに全体に値して求めたい場合はWindow関数でpatritionBy()を指定しなければいいだけです。
まずWindow関数を指定して、、、
from pyspark.sql.window import Window # グループ内の合計計算用 w_byAll_rate = (Window() .orderBy(fn.col("weight").desc()) # グループ内でソートをする場合の基準の変数指定 降順なら.desc()指定 .rowsBetween(Window().unboundedPreceding, Window().unboundedFollowing)) # グループの最初〜最後の行を演算対象にする # 累積構成比の計算用 w_byAll_cum = (Window() .orderBy(fn.col("weight").desc()) .rowsBetween(Window().unboundedPreceding, Window().currentRow)) # グループの最初〜現在の行を演算対象にする
で、集計処理を実施。
# 集計 sdf_out_byAll = (sdf_input .select("name", "sex", "weight") .withColumn("total", fn.sum(fn.col("weight")).over(w_byAll_rate)) # グループ内の合計 .withColumn("rate", fn.col("weight")/fn.col("total")) # 構成比 .withColumn("rate_cum", fn.sum(fn.col("rate")).over(w_byAll_cum))) # 累積構成比 # そのままだと見づらいのでPandasのDataframeに変換して出力 pdf_out_byAll = sdf_out_byAll.toPandas() pdf_out_byAll
name | sex | weight | total | rate | rate_cum | |
---|---|---|---|---|---|---|
0 | hhh | M | 86 | 537 | 0.160149 | 0.160149 |
1 | ggg | F | 80 | 537 | 0.148976 | 0.309125 |
2 | ddd | M | 78 | 537 | 0.145251 | 0.454376 |
3 | ccc | F | 55 | 537 | 0.102421 | 0.556797 |
4 | iiii | F | 52 | 537 | 0.096834 | 0.653631 |
5 | eee | F | 51 | 537 | 0.094972 | 0.748603 |
6 | bbb | M | 50 | 537 | 0.09311 | 0.841713 |
7 | aaa | F | 45 | 537 | 0.083799 | 0.925512 |
8 | fff | M | 40 | 537 | 0.074488 | 1 |
全体の累積構成比が出ます。
おわりに
ということで、未来の自分のためのバカ丁寧な備忘録でした。
余談ですが、PySparkのDataframeはメソッドチェーンで処理を追加できて情弱にも優しい仕様なのがめっちゃいいですね。