Spark Plan 읽기: 기본 가이드
Spark Plan 읽기: 기본 가이드
Apache Spark에서 실행 계획(Spark Plan)을 읽고 이해하는 것은 성능 최적화와 디버깅에 매우 중요합니다. 실행 계획은 Spark가 어떻게 작업을 실행할지 보여주는 설계도로, 이를 제대로 해석하면 성능 병목현상을 파악하거나 불필요한 작업을 제거할 수 있습니다. 이 글에서는 Spark Plan을 읽기 위해 알아야 할 기본적인 개념과 주요 요소를 정리합니다.
1. Spark Plan이란?
Spark Plan은 Spark 작업이 논리적 단계에서 물리적 실행으로 변환되는 과정을 나타냅니다. 다음은 Spark Plan의 주요 단계입니다:
1) Logical Plan
- 사용자가 작성한 코드(
DataFrame, SQL 등)에 대한 논리적 표현입니다. - 이 단계에서는 최적화가 적용되지 않습니다.
- 예: 필터, 셀렉트 등의 기본 연산 순서.
2) Optimized Logical Plan
- Catalyst Optimizer가 논리적 계획을 최적화한 결과입니다.
- 필요 없는 연산 제거, 필터 조건 푸시다운, 컬럼 정리 등이 적용됩니다.
3) Physical Plan
- 최적화된 논리적 계획을 기반으로 실제 작업을 수행하기 위한 실행 계획입니다.
- 물리적 연산자(예: Scan, Shuffle, Sort)가 포함됩니다.
- 여러 대안 중에서 비용(cost)을 계산하여 최적의 물리적 계획을 선택합니다.
2. Spark Plan 읽기의 기본 요소
1) Physical Plan의 계층 구조
Spark Plan은 계층 구조(tree structure)로 표현됩니다. 각 노드는 Spark가 실행할 작업 단계를 나타냅니다.
- 최상위 노드: 작업의 마지막 단계(예:
Sort,Aggregate). - 하위 노드: 상위 노드가 의존하는 입력 데이터 처리(예:
Scan,Exchange).
예제:
== Physical Plan ==
*(2) Sort [age#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#10 ASC NULLS FIRST, 200), REPARTITION_BY_NUM
+- *(1) Filter (age#10 > 30)
+- FileScan parquet [name#9, age#10] Batched: true, Format: Parquet
PushedFilters: [GreaterThan(age, 30)]
ReadSchema: struct<name:string,age:int>
• Sort → Exchange → Filter → FileScan 순서로 작업이 실행됩니다. • 아래에서 위로 데이터를 처리하는 방식으로 읽어야 합니다.
2) 주요 연산자
FileScan
• 데이터를 읽는 연산입니다.
• Format: 데이터 포맷(예: Parquet, ORC).
• PushedFilters: 데이터 소스에서 처리된 필터 조건.
• ReadSchema: 읽어온 데이터의 스키마.
Filter
• 특정 조건에 맞는 데이터를 필터링합니다.
• 예: Filter (age#10 > 30).
Exchange
• 데이터 재분배(Shuffle)가 발생할 때 나타납니다.
• 예: Exchange rangepartitioning은 데이터를 범위 파티셔닝으로 재분배합니다.
Project
• 특정 컬럼만 선택하거나, 컬럼 계산(예: SELECT)을 나타냅니다.
• 예: Project [name#9, age#10].
Sort
• 데이터를 정렬하는 작업입니다.
• 예: Sort [age#10 ASC NULLS FIRST].
Aggregate
• 데이터를 그룹화하거나 집계합니다.
• 예: HashAggregate 또는 SortAggregate.
3. 실행 계획에서 성능 최적화 포인트
1) Filter Pushdown 확인
• PushedFilters가 포함된 FileScan을 확인하여 조건이 데이터 소스에서 처리되었는지 확인합니다.
• 효율적인 조건을 사용하여 데이터 읽기 최적화를 극대화합니다.
2) Exchange 연산 최소화
• Exchange 연산은 데이터 Shuffle을 나타냅니다.
• Shuffle은 네트워크 및 디스크 I/O 비용이 높으므로, Shuffle이 필요한 연산을 줄이도록 코드를 최적화합니다.
3) Partitioning 확인
• 데이터 파티셔닝이 적절한지 확인하여 병렬 처리 성능을 향상시킵니다.
• Exchange의 REPARTITION_BY_NUM 또는 HASH_PARTITIONING이 불필요하게 발생하는지 점검합니다.
4) Skew 데이터 처리
• 특정 파티션에 데이터가 집중되는 Skew 문제는 성능 저하의 원인이 됩니다.
• Plan에서 Skew 데이터로 인한 불균형을 감지하고, 파티션 키를 변경하거나 salting을 적용합니다.
4. 실행 계획 확인 방법
1) DataFrame의 Plan 보기
모드 설명
simple: Print only a physical plan.extended: Print both logical and physical plans.codegen: Print a physical plan and generated codes if they are available.cost: Print a logical plan and statistics if they are available.-
formatted: Split explain output into two sections: a physical plan outline
and node details. - 볼 때, formatted가 작업 순서대로 보여줘서, 보기 좋습니다.
- TODO: cost에서 비용을 추정할 수 있는 통계치를 알려준다는데, 어떻게 보는지 좀더 알아보면 좋을 것 같습니다.
df.explain(mode=mode)
5. Spark Plan 읽기 예제
다음은 Parquet 데이터를 읽고, 조건을 적용하고, 정렬하는 작업의 실행 계획입니다.
df = spark.read.parquet("path/to/parquet")
filtered_df = df.filter("age > 30").orderBy("age")
filtered_df.explain()
실행 계획:
== Physical Plan ==
*(2) Sort [age#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#10 ASC NULLS FIRST, 200), REPARTITION_BY_NUM
+- *(1) Filter (age#10 > 30)
+- FileScan parquet [name#9, age#10] Batched: true, Format: Parquet
PushedFilters: [GreaterThan(age, 30)]
ReadSchema: struct<name:string,age:int>
분석
-
FileScan
• 데이터 소스: Parquet. • 필터 조건(age > 30)이 PushedFilters로 처리됨.
-
Filter
• Spark 레이어에서 조건이 한 번 더 적용.
-
Exchange
• 데이터를 정렬하기 위해 Shuffle 발생.
-
Sort
• age 컬럼을 기준으로 정렬.
6. Join 전략 이해하기
Spark Plan에서 가장 성능에 영향을 미치는 부분 중 하나가 Join 연산입니다. Spark는 데이터 크기와 특성에 따라 다양한 Join 전략을 선택합니다.
Broadcast Hash Join (BHJ)
작은 테이블을 모든 Worker 노드에 브로드캐스트하여 Join하는 방식입니다. Shuffle이 발생하지 않으므로 매우 빠릅니다.
*(2) BroadcastHashJoin [id#0], [user_id#5], Inner, BuildRight
:- *(2) Filter isnotnull(id#0)
: +- FileScan parquet [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(user_id#5))
+- *(1) Filter isnotnull(user_id#5)
+- FileScan parquet [user_id#5, amount#6]
Plan에서 BroadcastExchange가 보이면 Broadcast Join이 사용된 것입니다. 기본적으로 10MB 이하의 테이블에 적용됩니다.
# Broadcast Join 임계값 설정
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10MB
# 수동으로 Broadcast 힌트 사용
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
Sort Merge Join (SMJ)
두 테이블이 모두 큰 경우 사용되는 기본 Join 전략입니다. 양쪽 테이블을 Join 키로 정렬한 후 병합합니다.
*(5) SortMergeJoin [id#0], [user_id#5], Inner
:- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0, 200)
: +- *(1) Filter isnotnull(id#0)
: +- FileScan parquet [id#0, name#1]
+- *(4) Sort [user_id#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user_id#5, 200)
+- *(3) Filter isnotnull(user_id#5)
+- FileScan parquet [user_id#5, amount#6]
Plan에서 SortMergeJoin과 함께 양쪽에 Exchange hashpartitioning이 보이면 Shuffle이 발생하는 Sort Merge Join입니다.
Shuffle Hash Join
Sort Merge Join과 유사하지만 정렬 단계가 없어 특정 상황에서 더 빠를 수 있습니다. 한쪽 테이블이 메모리에 들어갈 수 있을 때 사용됩니다.
Join 전략 비교
| Join 전략 | Shuffle | Sort | 적합한 경우 |
|---|---|---|---|
| Broadcast Hash Join | 없음 | 없음 | 한쪽 테이블이 작을 때 |
| Sort Merge Join | 있음 | 있음 | 두 테이블 모두 클 때 (기본값) |
| Shuffle Hash Join | 있음 | 없음 | 한쪽이 메모리에 들어갈 때 |
7. AQE (Adaptive Query Execution) 이해하기
Spark 3.0부터 도입된 AQE는 실행 중에 실시간으로 실행 계획을 최적화합니다. AQE가 활성화되면 Plan에서 AdaptiveSparkPlan이 나타납니다.
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- ...
AQE의 주요 최적화
- 파티션 합치기(Coalescing Partitions): Shuffle 후 크기가 작은 파티션들을 자동으로 합쳐 태스크 수를 줄입니다.
- Join 전략 전환: 런타임에 실제 데이터 크기를 확인하고 Broadcast Join으로 전환할 수 있습니다.
- Skew Join 최적화: 데이터가 편향된 파티션을 자동으로 분할하여 처리합니다.
# AQE 활성화 (Spark 3.2+에서는 기본 활성화)
spark.conf.set("spark.sql.adaptive.enabled", True)
# Skew Join 최적화 활성화
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
8. 실전 디버깅 팁
Spark UI에서 Plan 확인하기
explain()으로 코드에서 Plan을 확인할 수도 있지만, Spark UI의 SQL 탭에서 시각적으로 Plan을 확인하는 것이 더 직관적입니다. Spark UI에서는 각 Stage의 실행 시간, 처리 데이터 크기, Shuffle 크기 등을 그래픽으로 확인할 수 있습니다.
Plan에서 주의할 키워드
| 키워드 | 의미 | 대응 방법 |
|---|---|---|
Exchange |
Shuffle 발생 | 불필요한 Shuffle 줄이기 |
BroadcastNestedLoopJoin |
비효율적 Join | Join 조건 확인 |
CartesianProduct |
Cross Join (위험) | 의도적인지 확인 |
SortAggregate |
해시 집계 실패 | 메모리 설정 확인 |
DeserializeToObject |
Dataset API 사용 | DataFrame API로 변경 검토 |
대용량 데이터에서의 성능 점검 순서
- Plan 확인:
explain("formatted")로 전체 실행 계획 확인 - Shuffle 확인:
Exchange연산의 위치와 빈도 점검 - Join 전략 확인: 적절한 Join 전략이 선택되었는지 확인
- Filter Pushdown 확인:
PushedFilters가 적용되었는지 확인 - 파티션 수 확인:
spark.sql.shuffle.partitions설정이 데이터 크기에 적합한지 확인
# 파티션 수 설정 (기본값 200)
spark.conf.set("spark.sql.shuffle.partitions", 400)
# DataFrame의 실제 파티션 수 확인
df.rdd.getNumPartitions()
9. 결론
Spark Plan은 Spark 작업의 실행 방식을 이해하고 최적화하는 데 필수적인 도구입니다. 이를 효과적으로 읽으려면 논리적 계획과 물리적 계획의 차이를 이해하고, 주요 연산자와 성능 저하의 원인을 파악해야 합니다. • Filter Pushdown으로 데이터 읽기 효율을 높이고, • Shuffle 최소화로 네트워크 비용을 줄이며, • 적절한 Join 전략 선택으로 불필요한 데이터 이동을 방지하고, • AQE를 활용하여 런타임 최적화를 극대화하고, • 파티션 구조 최적화로 병렬성을 극대화하세요.
Spark Plan을 잘 이해하면 대규모 데이터 처리에서 성능을 극대화할 수 있습니다. 특히 Plan을 읽는 습관을 들이면, 성능 문제가 발생하기 전에 미리 잠재적인 병목 지점을 파악하고 예방할 수 있습니다.
Comments