雑記 in hibernation

頭の整理と備忘録

SparkにおけるRDDの遅延評価と永続化

Sparkを習熟する上でRDD(とDataframe)の永続化のメリットが理解しにくかったので、Sparkの特性である遅延評価と永続化について簡単にまとめておきます。

(正しく理解できているか自信ないので、内容に誤りがありましたらご指摘いただけると幸いです)

なお、以下の情報を参考としています。

www.task-notes.com

アクションと変換

Sparkにおいて、RDDはざっくり以下のようなフローで扱われます。

  1. 外部データからRDDを生成する
  2. 【変換】入力したRDDを処理し、新たなRDDを生成する
  3. 【アクション】RDDをデータソースとした演算を実行する。

ここで、【変換】とはフィルタリングやマッピングなど、RDDを加工したり抽出したりして新たなRDDを生成する動作を指します。一方、【アクション】とは要素のカウントや統計量の取得など、RDDのデータを入力として何かしらの出力値を得る動作を指します。

御託を並べたところでちっともイメージが湧かんと思いますので、以下の実装例を考えてみます。こちらの記事相当の環境構築が完了している想定でのPysparkでのコーディング例です。

# 1. 外部データからRDDを生成する
# テキストファイルを元にRDDを生成
lines = sc.textFile(file_txt)

# 2. 【変換】入力したRDDを処理し、新たなRDDを生成する
# 文字列"Pyspark"を含む行を新たなRDDとして抽出
lines_include_pyspark = lines.filter(lambda line: "Pyspark" in line)

# 3. 【アクション】RDDをデータソースとした演算を実行する。
# 2で生成したRDDの一行目を出力
print("lines include pyspark : ", lines_include_pyspark.first())


ここでRDDに対する処理をあえて【変換】と【アクション】に分けて理解しているのは、処理のタイミングの点で両者に違いがあるからです。RDDにおける変換処理は、アクションに相当する処理が実行されて初めて処理されるのです。

例えば、PythonのPandasのような他の一般的なフレームワークを例して動作を考えてみると、 「2. 【変換】入力したRDDを処理し、新たなRDDを生成する」の時点でインスタンスとして”lines_include_pyspark”のRDDが生成されることをイメージするかと思います。しかしSparkにおいては、この時点ではあくまでRDD”lines_include_pyspark”を得るための処理が記述されているに過ぎません。実際にRDDが生成されるのは、「# 3. 【アクション】RDDをデータソースとした演算を実行する。」でアクション".first()"が実行されたタイミングです。

このような、RDDの生成が遅延されるような一見して不可思議な動作は、大規模データセットの処理を効率化するために仕組まれています。最終的なアクションが見えている状態で処理を開始することで、必要最小限のデータに対して【変換】【アクション】の処理を行うことができます。例えば上記の例では、最終的なアクションは「"Pyspark"を含む行のRDDのうち、先頭行を抜き出す」ことなので、極端に言えば最初から「"Pyspark"を含む行の先頭」以外の処理は不要となるわけです。

上記のような、変換処理が直ちには実行されない処理フローは遅延評価と呼ばれています。

遅延評価がむしろ非効率となるケース

遅延評価はリソースを効率化するために行われる一方、アクションが実行される度に同じ変換作業が繰り返し実行されてしまう事で、むしろ冗長な処理となってしまうケースがあります。

例えば、先程の例にアクションを追加した場合を考えます。

# 1. 外部データからRDDを生成する
# テキストファイルを元にRDDを生成
lines = sc.textFile(file_txt)

# 2. 【変換】入力したRDDを処理し、新たなRDDを生成する
# 文字列"Pyspark"を含む行を新たなRDDとして抽出
lines_include_pyspark = lines.filter(lambda line: "Pyspark" in line)

# 3. 【アクション】RDDをデータソースとした演算を実行する。
# 2で生成したRDDの一行目を出力
print("lines include pyspark : ", lines_include_pyspark.first())

# 4. 【アクションを追加】RDDをデータソースとした演算を実行する。
# 2で生成したRDDの行数をカウント
print("lines count include pyspark : ", lines_include_pyspark.count())
lines include pyspark :  test for studying Pyspark.
lines count include pyspark :  2


この場合、処理の流れは以下のようになります。

  1. 外部データからRDDを生成する
  2. 【変換】入力したRDDを処理し、新たなRDDを生成する
  3. 【アクション】RDDをデータソースとした演算を実行する(.first()の実行)。
  4. 外部データからRDDを生成する
  5. 【変換】入力したRDDを処理し、新たなRDDを生成する
  6. 【アクション】RDDをデータソースとした演算を実行する(.count()の実行)。

手順1,2と4,5で全く同じ処理が重複していており、全く同じRDDを2度生成してしまっています。

うーん、非効率。

RDDの永続化

ということで、上記のような冗長な処理を回避するために、一度生成したRDDを使い回すことを考えます。どうすれば良いかというと、".persist()"または".cashe()"を利用してRDDをストレージに展開することで、RDDを再利用できるようにします。これを永続化と呼びます。

実装例は以下の通りです。

# 1. 外部データからRDDを生成する
# テキストファイルを元にRDDを生成
lines = sc.textFile(file_txt)

# 2. 【変換】入力したRDDを処理し、新たなRDDを生成する
# 文字列"Pyspark"を含む行を新たなRDDとして抽出
lines_include_pyspark = lines.filter(lambda line: "Pyspark" in line)

# 2'. 【永続化】RDDをメモリに展開して永続化する
lines_include_pyspark.persist()

# 3. 【アクション】RDDをデータソースとした演算を実行する。
# 2で生成したRDDの一行目を出力
print("lines include pyspark : ", lines_include_pyspark.first())

# 4. 【アクションを追加】RDDをデータソースとした演算を実行する。
# 2で生成したRDDの行数をカウント
print("lines count include pyspark : ", lines_include_pyspark.count())
lines include pyspark :  test for studying Pyspark.
lines count include pyspark :  2


この実装例での処理フローは以下のようになります。1つ目のアクション".first"実行時に生成されたRDDを永続化しておく事で、2つのアクション".count()"を実行する際にこのRDDを再利用することができ、同じRDDを再生成する手間を省くことができます。

  1. 外部データからRDDを生成する
  2. 【変換】入力したRDDを処理し、新たなRDDを生成する
  3. 【永続化】RDDをメモリに展開して永続化する
  4. 【アクション】RDDをデータソースとした演算を実行する(.first()の実行)
  5. 【アクション】永続化されたRDDをデータソースとした演算を実行する(.count()の実行)

注意点として、永続化自体も遅延評価されるため、今回の実装例の場合であれば最初のアクション".first()"の前に".persist()"を呼ぶ必要があります。 また、永続化においてRDDを展開するストレージレベルを指定することができます。この辺りは".persist()"と".cashe()"の違いにも繋がってくるのですが、ここでは省略します。詳細はこちらに詳しいです。


おわりに

以上、RDDの永続化についてでした。

Sparkは最近ようやく触り始めたのですが、「Pandasみてーなもんでしょ〜楽勝楽勝〜」とか考えて雰囲気で弄り始めて速攻で出鼻を挫かれました。大規模なデータを扱うことを想定した独特の仕様があるみたいなので、地道に勉強するのが吉って気がしてます。