雑記 in hibernation

頭の整理と備忘録

Pysparkで(個人的に)忘れがちなtipsメモ

たまーにPysparkを使うことがあるのですが、たまーにすぎて使う頃には前回までの習熟度が完全にリセットされている、という非効率を繰り返している今日この頃。ということで、毎回ど忘れして無駄に遠回りしてしまいがちなtips(と言うほど大袈裟な情報ではないです、、、)を備忘録としてまとめておきます。

0. 前提

サンプルコードの実行環境として、この記事相当の環境構築が済んでいて、 toeming.hatenablog.com

この記事相当のサンプルデータの読み込みができている、という前提とします。

toeming.hatenablog.com

1. 条件付きで集計できる

サンプルデータのクラスごとに体重の合計を集計するケースを考えます。 普通にやるとこんな感じ。

(sdf_input
    .groupby('class')
    .agg(
        fn.sum(fn.col('weight')).alias('sum')
    )
    .orderBy('class')
).show()
+-----+---+
|class|sum|
+-----+---+
|    A|150|
|    B|169|
|    C|218|
+-----+---+

集計対象を女性に絞りたい場合、以下のような書き方で楽に条件付きの集計ができます。

(sdf_input
    .groupby('class')
    .agg(
        # sum()の中に「Trueの時に集計対象のカラムを参照/Falseの時には0を参照」の形で書く
        fn.sum( fn.when(fn.col('sex')=='F', fn.col('weight') ).otherwise(fn.lit(0))).alias('sum'),
         # Trueの時に1をカウントすれば、条件に一致するレコード数のカウントになる
        fn.sum( fn.when(fn.col('sex')=='F', fn.lit(1) ).otherwise(fn.lit(0))).alias('cnt')
    )
    .orderBy('class')
).show()
+-----+---+---+
|class|sum|cnt|
+-----+---+---+
|    A|100|  2|
|    B| 51|  1|
|    C|132|  2|
+-----+---+---+


2. Joinするときにキーを一括指定できる

複数キーでJoinしたい場合、キーが両テーブルで同じカラム名であれば、listでまとめて指定することができます。

まずはテスト用のデータを作ります。

# 適当に左側のテーブルを作る
sdf_left = sdf_input.select(['name', 'class','sex'])
sdf_left.show()
# 適当に右側のテーブルを作る
sdf_right = sdf_input.select(['class', 'sex', 'weight']).dropDuplicates(['class', 'sex'])
sdf_right.show()
+----+-----+---+
|name|class|sex|
+----+-----+---+
| aaa|    A|  F|
| bbb|    A|  M|
| ccc|    A|  F|
| ddd|    B|  M|
| eee|    B|  F|
| fff|    B|  M|
| ggg|    C|  F|
| hhh|    C|  M|
|iiii|    C|  F|
+----+-----+---+

+-----+---+------+
|class|sex|weight|
+-----+---+------+
|    B|  M|    78|
|    A|  F|    45|
|    B|  F|    51|
|    C|  F|    80|
|    A|  M|    50|
|    C|  M|    86|
+-----+---+------+

で、クラスと性別をキーにleft joinしてみます。list表記で複数キーを一気に指定できます。

# クラスと性別でleft join
sdf_left.join(sdf_right, ['class', 'sex'], 'left').show()
+-----+---+----+------+
|class|sex|name|weight|
+-----+---+----+------+
|    B|  M| ddd|    78|
|    B|  M| fff|    78|
|    A|  F| aaa|    45|
|    A|  F| ccc|    45|
|    B|  F| eee|    51|
|    C|  F| ggg|    80|
|    C|  F|iiii|    80|
|    A|  M| bbb|    50|
|    C|  M| hhh|    86|
+-----+---+----+------+


ちなみにこれ知った時「めっちゃ便利だな〜」と感動したのですが、今確認したら普通に公式のドキュメントに書いてありましたね。恥ずい。

spark.apache.org


3. データフレームに対してScikit-learnのモデルで推論する

toeming.hatenablog.com


4. 複数のカラムをリストで指定して合計

複数のカラムを指定して合計する場合、正攻法でやると以下のようにベタにかけます。

(sdf_input
 .withColumn("sum_0", fn.col('weight')+fn.col('height')+fn.col('time'))
 ).show()
+----+-----+---+------+------+----+-----+
|name|class|sex|weight|height|time|sum_0|
+----+-----+---+------+------+----+-----+
| aaa|    A|  F|    45|   150|  85|  280|
| bbb|    A|  M|    50|   160|  80|  290|
| ccc|    A|  F|    55|   155|  74|  284|
| ddd|    B|  M|    78|   180|  90|  348|
| eee|    B|  F|    51|   158|  65|  274|
| fff|    B|  M|    40|   155|  68|  263|
| ggg|    C|  F|    80|   185|  90|  355|
| hhh|    C|  M|    86|   175|  81|  342|
|iiii|    C|  F|    52|   162|  73|  287|
+----+-----+---+------+------+----+-----+


いちいち変数名を指定して足し算せずに、リストで指定して置いてforを回しつつsumをとる方法もあります。カラムが多い場合はこっちの方が楽かも。

# 集計対象の変数名をリストで持っておく
col_list = ['weight', 'height', 'time']
(sdf_input
 .withColumn("sum_1", sum(sdf_input[col] for col in col_list)) # 書き方その1
 .withColumn("sum_2", sum(fn.col(col) for col in col_list)) # 書き方その2
 ).show()
+----+-----+---+------+------+----+-----+-----+
|name|class|sex|weight|height|time|sum_1|sum_2|
+----+-----+---+------+------+----+-----+-----+
| aaa|    A|  F|    45|   150|  85|  280|  280|
| bbb|    A|  M|    50|   160|  80|  290|  290|
| ccc|    A|  F|    55|   155|  74|  284|  284|
| ddd|    B|  M|    78|   180|  90|  348|  348|
| eee|    B|  F|    51|   158|  65|  274|  274|
| fff|    B|  M|    40|   155|  68|  263|  263|
| ggg|    C|  F|    80|   185|  90|  355|  355|
| hhh|    C|  M|    86|   175|  81|  342|  342|
|iiii|    C|  F|    52|   162|  73|  287|  287|
+----+-----+---+------+------+----+-----+-----+


5. 複数のカラムのうち最大(最小)の値を採用する

greatest(), least()を使えばできます。

sdf_wight = (
    sdf_input.select(["name", "weight"])
    # 適当な乱数を生成
    .withColumn("weight_next1year", (fn.col("weight") + 10*fn.rand(0)-5).cast("int"))
    .withColumn("weight_next2year", (fn.col("weight") + 10*fn.rand(1)-5).cast("int"))
    # もっとも大きい要素を採用
    .withColumn("weight_max", fn.greatest(fn.col("weight"), fn.col("weight_next1year"), fn.col("weight_next2year")))
    # もっとも小さい要素を採用
    .withColumn("weight_min", fn.least(fn.col("weight"), fn.col("weight_next1year"), fn.col("weight_next2year")))
    #.select(["name", "weight","weight_next_mnth" ]
)

# 見づらいのでpandasのDFに変換してから表示
sdf_wight.toPandas()
name weight weight_next1year weight_next2year weight_max weight_min
0 aaa 45 47 46 47 45
1 bbb 50 50 50 50 50
2 ccc 55 50 51 55 50
3 ddd 78 76 73 78 73
4 eee 51 53 54 54 51
5 fff 40 37 42 42 37
6 ggg 80 77 77 80 77
7 hhh 86 86 82 86 82
8 iiii 52 54 50 54 50


6. 列を指定してfillna

toeming.hatenablog.com



以上

メモっておきたいことがあれば適宜追加する、かもです。