scikit-learnで学習したモデルを使ってPySparkのデータフレームに対する推論を行う方法はいくつかあって、例えば単純な方法としてはデータフレームを.toPandas()でPandasのデータフレームに変換してからモデルに入力する手があります。しかし、この方法だとSparkの分散処理を活かせませんし、単一の計算機に乗り切らない量のデータセットに対する推論はできません。そこで、pandas_udfを使うことでPySparkのデータフレームのまま推論ができるということで、ちょっと試してみました。
参考はこちら。
1. さっそく実装
実行環境はGoogle Colabです。以下の過去記事の一連の準備が整っている前提です。
Google ColaboratoryでPySpark環境構築(v3.2.1) - 雑記 in hibernation
1.1. テストデータの読み込みと前処理
テストデータを読み込みます
# データの読み込み import pandas as pd import numpy as np input_data = "titanic/train.csv" df_input = pd.read_csv(input_data)
前処理します
# 使うカラムだけ抽出 id_col = ["PassengerId"] target_col = ["Survived"] exp_cols = ["Pclass", "Sex","Age", "SibSp", "Parch", "Fare", "Embarked"] df_mart = df_input[id_col + target_col + exp_cols] # 欠損処理 df_mart = df_mart.copy() # SettingWithCopyWarning対策 df_mart['Age'].fillna(df_mart['Age'].mean(), inplace=True) # One-Hotエンコーディング df_mart = pd.get_dummies(df_mart, columns=["Sex", "Embarked"]) exp_cols = [s for s in df_mart.columns if s not in target_col] # 学習 / テストデータ分割 from sklearn.model_selection import train_test_split df_train, df_test = train_test_split(df_mart, test_size=0.3, random_state=0) print("train data shape : ", df_train.shape, " / test data shape :", df_test.shape)
train data shape : (623, 12) / test data shape : (268, 12)
1.2. モデルの学習
モデルはなんでもいいのですが、今回はとりあえずRandomForestを使ってみます。
学習して精度を見てみます。死ぬほど過学習してますね。まあいいや。
# 学習 from sklearn.ensemble import RandomForestClassifier model = RandomForestClassifier(random_state=0) model.fit(df_train[exp_cols], df_train[target_col].values.ravel()) # 精度を確認 print("train score=", model.score(df_train[exp_cols], df_train[target_col])) print("test score=", model.score(df_test[exp_cols], df_test[target_col]))
train score= 1.0 test score= 0.8171641791044776
1.3. PySparkのデータフレームに対して推論を行う(返す結果が1つの場合)
さて、ここからが本番です。 まず、推論の対象とするデータをPySparkのデータフレームに変換しておきましょう。
# テストデータをSpark DataFrameに変換 sdf_test = sqlContext.createDataFrame(df_test) sdf_test.show(5)
+-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+ |PassengerId|Survived|Pclass| Age|SibSp|Parch| Fare|Sex_female|Sex_male|Embarked_C|Embarked_Q|Embarked_S| +-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+ | 14| 0| 3|39.0| 1| 5| 31.275| 0| 1| 0| 0| 1| | 867| 1| 2|27.0| 1| 0|13.8583| 1| 0| 1| 0| 0| | 803| 1| 1|11.0| 1| 2| 120.0| 0| 1| 0| 0| 1| | 107| 1| 3|21.0| 0| 0| 7.65| 1| 0| 0| 0| 1| | 524| 1| 1|44.0| 0| 1|57.9792| 1| 0| 1| 0| 0| +-----------+--------+------+----+-----+-----+-------+----------+--------+----------+----------+----------+ only showing top 5 rows
で、肝心の推論です。以下のように、モデルの説明変数を引数に取り推論の結果をPandasのSeriesで返す関数をpandas_udfで定義してやります。これを.withColumn()とかSelect()とかで呼び出してやれば、PySparkのデータフレームに対する推論の結果を得ることができます。
from pyspark.sql.functions import pandas_udf # pandas_udfで、関数を定義 # 入力:モデルの入力になるカラム # 出力:推論した結果(Pandas Seriesで返す) @pandas_udf("int") def predict_udf(*cols): X = pd.concat(cols, axis=1) return pd.Series(model.predict(X)) # 推論した結果列を追加する sdf_test = sdf_test.withColumn('pred', predict_udf(*exp_cols)) # カラムを絞って結果表示 sdf_test.select("PassengerId", "Survived", "pred").show(10)
+-----------+--------+----+ |PassengerId|Survived|pred| +-----------+--------+----+ | 14| 0| 0| | 867| 1| 1| | 803| 1| 1| | 107| 1| 1| | 524| 1| 1| | 534| 1| 1| | 263| 0| 0| | 600| 1| 1| | 619| 1| 1| | 161| 0| 0| +-----------+--------+----+ only showing top 10 rows
一応精度も見ておきましょう。先程Pandasのデータフレームで推論した場合と一致していて問題なさそうですね。
# スコアの確認 from sklearn.metrics import accuracy_score df_pred = sdf_test.select("Survived", "pred").toPandas() df_pred.head() accuracy_score(df_pred["Survived"], df_pred["pred"])
0.8171641791044776
1.4. PySparkのデータフレームに対して推論を行う(返す結果が2つ以上の場合)
他クラス分類などで複数クラスの予測確率を返したい場合、次ようにlistとして結果を返すことも可能です(今回は2値分類なので、2クラスの予測確率を返します)。
# テストデータをSpark DataFrameに変換 sdf_test = sqlContext.createDataFrame(df_test) from pyspark.sql.types import DoubleType, ArrayType # pandas_udfで、関数を定義 # 入力:モデルの入力になるカラム # 出力:推論した各クラスの確率(各要素がlistのPandas Seriesで返す) @fn.pandas_udf(returnType=ArrayType(DoubleType())) def predict_pandas_udf(*cols): X = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in model.predict_proba(X)) # 推論した結果列を追加する df_pred_proba = sdf_test.withColumn("pred_proba" , predict_pandas_udf(*exp_cols)) # カラムを絞って結果表示 df_pred_proba.select("PassengerId", "Survived", "pred_proba").show(10)
+-----------+--------+------------+ |PassengerId|Survived| pred_proba| +-----------+--------+------------+ | 496| 0|[0.56, 0.44]| | 649| 0|[0.97, 0.03]| | 279| 0| [0.9, 0.1]| | 32| 1| [0.1, 0.9]| | 256| 1|[0.21, 0.79]| | 299| 1|[0.67, 0.33]| | 610| 1|[0.01, 0.99]| | 319| 1|[0.31, 0.69]| | 485| 1|[0.39, 0.61]| | 368| 1|[0.34, 0.66]| +-----------+--------+------------+ only showing top 10 rows
最終的には以下のように複数のカラムに展開して使う感じでしょうかね。
# 推論結果のリストを予測クラスごとの列に展開 for i, c in enumerate(model.classes_): df_pred_proba = df_pred_proba.withColumn("pred_proba_"+str(c) , fn.col("pred_proba")[i]) # 元のリストが入っていた列は落とす df_pred_proba.drop("pred_proba") # カラムを絞って結果表示 df_pred_proba.select("PassengerId", "Survived", "pred_proba_0","pred_proba_1").show(10)
+-----------+--------+------------+------------+ |PassengerId|Survived|pred_proba_0|pred_proba_1| +-----------+--------+------------+------------+ | 496| 0| 0.56| 0.44| | 649| 0| 0.97| 0.03| | 279| 0| 0.9| 0.1| | 32| 1| 0.1| 0.9| | 256| 1| 0.21| 0.79| | 299| 1| 0.67| 0.33| | 610| 1| 0.01| 0.99| | 319| 1| 0.31| 0.69| | 485| 1| 0.39| 0.61| | 368| 1| 0.34| 0.66| +-----------+--------+------------+------------+ only showing top 10 rows
テーブルに対してfor文使うのってなんとなくあんまり綺麗じゃないですよね。もっといい書き方がないだろうか。Selectだと以下のページのようにスッキリ書けるのですが、個人的には元のデータセットのカラムもそのまま残したいことが多く、withColumnで書きたいというこだわり。
おわりに
ということで、scikit-learnで学習したモデルをPySparkデータフレームに対して学習する方法のメモでした。