雑記 in hibernation

頭の整理と備忘録

PySparkデータフレームをscikit-learnのモデルで推論してみる(pandas_udf)

scikit-learnで学習したモデルを使ってPySparkのデータフレームに対する推論を行う方法はいくつかあって、例えば単純な方法としてはデータフレームを.toPandas()でPandasのデータフレームに変換してからモデルに入力する手があります。しかし、この方法だとSparkの分散処理を活かせませんし、単一の計算機に乗り切らない量のデータセットに対する推論はできません。そこで、pandas_udfを使うことでPySparkのデータフレームのまま推論ができるということで、ちょっと試してみました。

参考はこちら。

qiita.com


1. さっそく実装

実行環境はGoogle Colabです。以下の過去記事の一連の準備が整っている前提です。

Google ColaboratoryでPySpark環境構築(v3.2.1) - 雑記 in hibernation


1.1. テストデータの読み込みと前処理

テストデータを読み込みます

# データの読み込み
import pandas as pd
import numpy as np
input_data = "titanic/train.csv"
df_input = pd.read_csv(input_data)


前処理します

# 使うカラムだけ抽出
id_col = ["PassengerId"]
target_col = ["Survived"]
exp_cols = ["Pclass", "Sex","Age", "SibSp", "Parch", "Fare", "Embarked"]
df_mart = df_input[id_col + target_col + exp_cols]

# 欠損処理
df_mart = df_mart.copy() # SettingWithCopyWarning対策
df_mart['Age'].fillna(df_mart['Age'].mean(), inplace=True)

# One-Hotエンコーディング
df_mart = pd.get_dummies(df_mart, columns=["Sex", "Embarked"])
exp_cols = [s for s in df_mart.columns if s not in target_col]

# 学習 / テストデータ分割
from sklearn.model_selection import train_test_split
df_train, df_test = train_test_split(df_mart, test_size=0.3, random_state=0)
print("train data shape : ", df_train.shape, " / test data shape :", df_test.shape)
train data shape :  (623, 12)  / test data shape : (268, 12)


1.2. モデルの学習

モデルはなんでもいいのですが、今回はとりあえずRandomForestを使ってみます。

学習して精度を見てみます。死ぬほど過学習してますね。まあいいや。

# 学習
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(random_state=0)
model.fit(df_train[exp_cols], df_train[target_col].values.ravel())

# 精度を確認
print("train score=", model.score(df_train[exp_cols], df_train[target_col]))
print("test score=", model.score(df_test[exp_cols], df_test[target_col]))
train score= 1.0
test score= 0.8171641791044776


1.3. PySparkのデータフレームに対して推論を行う(返す結果が1つの場合)

さて、ここからが本番です。 まず、推論の対象とするデータをPySparkのデータフレームに変換しておきましょう。

# テストデータをSpark DataFrameに変換
sdf_test = sqlContext.createDataFrame(df_test)
sdf_test.show(5)
+-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+
|PassengerId|Survived|Pclass| Age|SibSp|Parch|   Fare|Sex_female|Sex_male|Embarked_C|Embarked_Q|Embarked_S|
+-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+
|         14|       0|     3|39.0|    1|    5| 31.275|         0|       1|         0|         0|         1|
|        867|       1|     2|27.0|    1|    0|13.8583|         1|       0|         1|         0|         0|
|        803|       1|     1|11.0|    1|    2|  120.0|         0|       1|         0|         0|         1|
|        107|       1|     3|21.0|    0|    0|   7.65|         1|       0|         0|         0|         1|
|        524|       1|     1|44.0|    0|    1|57.9792|         1|       0|         1|         0|         0|
+-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+
only showing top 5 rows


で、肝心の推論です。以下のように、モデルの説明変数を引数に取り推論の結果をPandasのSeriesで返す関数をpandas_udfで定義してやります。これを.withColumn()とかSelect()とかで呼び出してやれば、PySparkのデータフレームに対する推論の結果を得ることができます。

from pyspark.sql.functions import pandas_udf

# pandas_udfで、関数を定義
# 入力:モデルの入力になるカラム
# 出力:推論した結果(Pandas Seriesで返す)
@pandas_udf("int")
def predict_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(model.predict(X))

# 推論した結果列を追加する
sdf_test = sdf_test.withColumn('pred', predict_udf(*exp_cols))

# カラムを絞って結果表示
sdf_test.select("PassengerId", "Survived", "pred").show(10)
+-----------+--------+----+
|PassengerId|Survived|pred|
+-----------+--------+----+
|         14|       0|   0|
|        867|       1|   1|
|        803|       1|   1|
|        107|       1|   1|
|        524|       1|   1|
|        534|       1|   1|
|        263|       0|   0|
|        600|       1|   1|
|        619|       1|   1|
|        161|       0|   0|
+-----------+--------+----+
only showing top 10 rows


一応精度も見ておきましょう。先程Pandasのデータフレームで推論した場合と一致していて問題なさそうですね。

# スコアの確認
from sklearn.metrics import accuracy_score
df_pred = sdf_test.select("Survived", "pred").toPandas()
df_pred.head()
accuracy_score(df_pred["Survived"], df_pred["pred"])
0.8171641791044776


1.4. PySparkのデータフレームに対して推論を行う(返す結果が2つ以上の場合)

他クラス分類などで複数クラスの予測確率を返したい場合、次ようにlistとして結果を返すことも可能です(今回は2値分類なので、2クラスの予測確率を返します)。

medium.com

# テストデータをSpark DataFrameに変換
sdf_test = sqlContext.createDataFrame(df_test)

from pyspark.sql.types import DoubleType, ArrayType

# pandas_udfで、関数を定義
# 入力:モデルの入力になるカラム
# 出力:推論した各クラスの確率(各要素がlistのPandas Seriesで返す)
@fn.pandas_udf(returnType=ArrayType(DoubleType()))
def predict_pandas_udf(*cols):
    X = pd.concat(cols, axis=1)
    return pd.Series(row.tolist() for row in model.predict_proba(X))

# 推論した結果列を追加する
df_pred_proba = sdf_test.withColumn("pred_proba" , predict_pandas_udf(*exp_cols))

# カラムを絞って結果表示
df_pred_proba.select("PassengerId", "Survived", "pred_proba").show(10)
+-----------+--------+------------+
|PassengerId|Survived|  pred_proba|
+-----------+--------+------------+
|        496|       0|[0.56, 0.44]|
|        649|       0|[0.97, 0.03]|
|        279|       0|  [0.9, 0.1]|
|         32|       1|  [0.1, 0.9]|
|        256|       1|[0.21, 0.79]|
|        299|       1|[0.67, 0.33]|
|        610|       1|[0.01, 0.99]|
|        319|       1|[0.31, 0.69]|
|        485|       1|[0.39, 0.61]|
|        368|       1|[0.34, 0.66]|
+-----------+--------+------------+
only showing top 10 rows


最終的には以下のように複数のカラムに展開して使う感じでしょうかね。

# 推論結果のリストを予測クラスごとの列に展開
for i, c in enumerate(model.classes_):
    df_pred_proba = df_pred_proba.withColumn("pred_proba_"+str(c) , fn.col("pred_proba")[i])
# 元のリストが入っていた列は落とす
df_pred_proba.drop("pred_proba")

# カラムを絞って結果表示
df_pred_proba.select("PassengerId", "Survived", "pred_proba_0","pred_proba_1").show(10)
+-----------+--------+------------+------------+
|PassengerId|Survived|pred_proba_0|pred_proba_1|
+-----------+--------+------------+------------+
|        496|       0|        0.56|        0.44|
|        649|       0|        0.97|        0.03|
|        279|       0|         0.9|         0.1|
|         32|       1|         0.1|         0.9|
|        256|       1|        0.21|        0.79|
|        299|       1|        0.67|        0.33|
|        610|       1|        0.01|        0.99|
|        319|       1|        0.31|        0.69|
|        485|       1|        0.39|        0.61|
|        368|       1|        0.34|        0.66|
+-----------+--------+------------+------------+
only showing top 10 rows


テーブルに対してfor文使うのってなんとなくあんまり綺麗じゃないですよね。もっといい書き方がないだろうか。Selectだと以下のページのようにスッキリ書けるのですが、個人的には元のデータセットのカラムもそのまま残したいことが多く、withColumnで書きたいというこだわり。

Prediction at Scale with scikit-learn and PySpark Pandas UDFs | by Civis Analytics | The Civis Journal | Medium


おわりに

ということで、scikit-learnで学習したモデルをPySparkデータフレームに対して学習する方法のメモでした。