Spark Plan 읽기: 기본 가이드
Spark Plan 읽기: 기본 가이드
Spark 작업이 느릴 때 explain()을 찍어보면 트리 형태의 plan이 한 무더기 나온다. 처음에는 뭘 봐야 하는지 막막한데, 익숙해지면 어디서 shuffle이 발생하고 어디서 filter pushdown이 안 먹는지 한눈에 들어온다. 이 글은 plan을 읽을 때 알아둬야 할 기본 개념과 자주 보는 연산자를 정리한 메모다.
1. Spark Plan이란?
Spark Plan은 Spark 작업이 논리적 단계에서 물리적 실행으로 변환되는 과정을 나타냅니다. 다음은 Spark Plan의 주요 단계입니다:
1) Logical Plan
- 사용자가 작성한 코드(
DataFrame, SQL 등)에 대한 논리적 표현입니다. - 이 단계에서는 최적화가 적용되지 않습니다.
- 예: 필터, 셀렉트 등의 기본 연산 순서.
2) Optimized Logical Plan
- Catalyst Optimizer가 논리적 계획을 최적화한 결과입니다.
- 필요 없는 연산 제거, 필터 조건 푸시다운, 컬럼 정리 등이 적용됩니다.
3) Physical Plan
- 최적화된 논리적 계획을 기반으로 실제 작업을 수행하기 위한 실행 계획입니다.
- 물리적 연산자(예: Scan, Shuffle, Sort)가 포함됩니다.
- 여러 대안 중에서 비용(cost)을 계산하여 최적의 물리적 계획을 선택합니다.
2. Spark Plan 읽기의 기본 요소
1) Physical Plan의 계층 구조
Spark Plan은 계층 구조(tree structure)로 표현됩니다. 각 노드는 Spark가 실행할 작업 단계를 나타냅니다.
- 최상위 노드: 작업의 마지막 단계(예:
Sort,Aggregate). - 하위 노드: 상위 노드가 의존하는 입력 데이터 처리(예:
Scan,Exchange).
예제:
== Physical Plan ==
*(2) Sort [age#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#10 ASC NULLS FIRST, 200), REPARTITION_BY_NUM
+- *(1) Filter (age#10 > 30)
+- FileScan parquet [name#9, age#10] Batched: true, Format: Parquet
PushedFilters: [GreaterThan(age, 30)]
ReadSchema: struct<name:string,age:int>
• Sort → Exchange → Filter → FileScan 순서로 작업이 실행됩니다. • 아래에서 위로 데이터를 처리하는 방식으로 읽어야 합니다.
2) 주요 연산자
FileScan
• 데이터를 읽는 연산입니다.
• Format: 데이터 포맷(예: Parquet, ORC).
• PushedFilters: 데이터 소스에서 처리된 필터 조건.
• ReadSchema: 읽어온 데이터의 스키마.
Filter
• 특정 조건에 맞는 데이터를 필터링합니다.
• 예: Filter (age#10 > 30).
Exchange
• 데이터 재분배(Shuffle)가 발생할 때 나타납니다.
• 예: Exchange rangepartitioning은 데이터를 범위 파티셔닝으로 재분배합니다.
Project
• 특정 컬럼만 선택하거나, 컬럼 계산(예: SELECT)을 나타냅니다.
• 예: Project [name#9, age#10].
Sort
• 데이터를 정렬하는 작업입니다.
• 예: Sort [age#10 ASC NULLS FIRST].
Aggregate
• 데이터를 그룹화하거나 집계합니다.
• 예: HashAggregate 또는 SortAggregate.
3. 실행 계획에서 성능 최적화 포인트
1) Filter Pushdown 확인
• PushedFilters가 포함된 FileScan을 확인하여 조건이 데이터 소스에서 처리되었는지 확인합니다.
• 효율적인 조건을 사용하여 데이터 읽기 최적화를 극대화합니다.
2) Exchange 연산 최소화
• Exchange 연산은 데이터 Shuffle을 나타냅니다.
• Shuffle은 네트워크 및 디스크 I/O 비용이 높으므로, Shuffle이 필요한 연산을 줄이도록 코드를 최적화합니다.
3) Partitioning 확인
• 데이터 파티셔닝이 적절한지 확인하여 병렬 처리 성능을 향상시킵니다.
• Exchange의 REPARTITION_BY_NUM 또는 HASH_PARTITIONING이 불필요하게 발생하는지 점검합니다.
4) Skew 데이터 처리
• 특정 파티션에 데이터가 집중되는 Skew 문제는 성능 저하의 원인이 됩니다.
• Plan에서 Skew 데이터로 인한 불균형을 감지하고, 파티션 키를 변경하거나 salting을 적용합니다.
4. 실행 계획 확인 방법
1) DataFrame의 Plan 보기
모드 설명
simple: Print only a physical plan.extended: Print both logical and physical plans.codegen: Print a physical plan and generated codes if they are available.cost: Print a logical plan and statistics if they are available.-
formatted: Split explain output into two sections: a physical plan outline
and node details. - 볼 때, formatted가 작업 순서대로 보여줘서, 보기 좋습니다.
- TODO: cost에서 비용을 추정할 수 있는 통계치를 알려준다는데, 어떻게 보는지 좀더 알아보면 좋을 것 같습니다.
df.explain(mode=mode)
5. Spark Plan 읽기 예제
다음은 Parquet 데이터를 읽고, 조건을 적용하고, 정렬하는 작업의 실행 계획입니다.
df = spark.read.parquet("path/to/parquet")
filtered_df = df.filter("age > 30").orderBy("age")
filtered_df.explain()
실행 계획:
== Physical Plan ==
*(2) Sort [age#10 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(age#10 ASC NULLS FIRST, 200), REPARTITION_BY_NUM
+- *(1) Filter (age#10 > 30)
+- FileScan parquet [name#9, age#10] Batched: true, Format: Parquet
PushedFilters: [GreaterThan(age, 30)]
ReadSchema: struct<name:string,age:int>
분석
-
FileScan
• 데이터 소스: Parquet. • 필터 조건(age > 30)이 PushedFilters로 처리됨.
-
Filter
• Spark 레이어에서 조건이 한 번 더 적용.
-
Exchange
• 데이터를 정렬하기 위해 Shuffle 발생.
-
Sort
• age 컬럼을 기준으로 정렬.
6. Join 전략 이해하기
Spark Plan에서 가장 성능에 영향을 미치는 부분 중 하나가 Join 연산입니다. Spark는 데이터 크기와 특성에 따라 다양한 Join 전략을 선택합니다.
Broadcast Hash Join (BHJ)
작은 테이블을 모든 Worker 노드에 브로드캐스트하여 Join하는 방식입니다. Shuffle이 발생하지 않으므로 매우 빠릅니다.
*(2) BroadcastHashJoin [id#0], [user_id#5], Inner, BuildRight
:- *(2) Filter isnotnull(id#0)
: +- FileScan parquet [id#0, name#1]
+- BroadcastExchange HashedRelationBroadcastMode(List(user_id#5))
+- *(1) Filter isnotnull(user_id#5)
+- FileScan parquet [user_id#5, amount#6]
Plan에서 BroadcastExchange가 보이면 Broadcast Join이 사용된 것입니다. 기본적으로 10MB 이하의 테이블에 적용됩니다.
# Broadcast Join 임계값 설정
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10MB
# 수동으로 Broadcast 힌트 사용
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
Sort Merge Join (SMJ)
두 테이블이 모두 큰 경우 사용되는 기본 Join 전략입니다. 양쪽 테이블을 Join 키로 정렬한 후 병합합니다.
*(5) SortMergeJoin [id#0], [user_id#5], Inner
:- *(2) Sort [id#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#0, 200)
: +- *(1) Filter isnotnull(id#0)
: +- FileScan parquet [id#0, name#1]
+- *(4) Sort [user_id#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(user_id#5, 200)
+- *(3) Filter isnotnull(user_id#5)
+- FileScan parquet [user_id#5, amount#6]
Plan에서 SortMergeJoin과 함께 양쪽에 Exchange hashpartitioning이 보이면 Shuffle이 발생하는 Sort Merge Join입니다.
Shuffle Hash Join
Sort Merge Join과 유사하지만 정렬 단계가 없어 특정 상황에서 더 빠를 수 있습니다. 한쪽 테이블이 메모리에 들어갈 수 있을 때 사용됩니다.
Join 전략 비교
| Join 전략 | Shuffle | Sort | 적합한 경우 |
|---|---|---|---|
| Broadcast Hash Join | 없음 | 없음 | 한쪽 테이블이 작을 때 |
| Sort Merge Join | 있음 | 있음 | 두 테이블 모두 클 때 (기본값) |
| Shuffle Hash Join | 있음 | 없음 | 한쪽이 메모리에 들어갈 때 |
7. AQE (Adaptive Query Execution) 이해하기
Spark 3.0부터 도입된 AQE는 실행 중에 실시간으로 실행 계획을 최적화합니다. AQE가 활성화되면 Plan에서 AdaptiveSparkPlan이 나타납니다.
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- ...
AQE의 주요 최적화
- 파티션 합치기(Coalescing Partitions): Shuffle 후 크기가 작은 파티션들을 자동으로 합쳐 태스크 수를 줄입니다.
- Join 전략 전환: 런타임에 실제 데이터 크기를 확인하고 Broadcast Join으로 전환할 수 있습니다.
- Skew Join 최적화: 데이터가 편향된 파티션을 자동으로 분할하여 처리합니다.
# AQE 활성화 (Spark 3.2+에서는 기본 활성화)
spark.conf.set("spark.sql.adaptive.enabled", True)
# Skew Join 최적화 활성화
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
8. 실전 디버깅 팁
Spark UI에서 Plan 확인하기
explain()으로 코드에서 Plan을 확인할 수도 있지만, Spark UI의 SQL 탭에서 시각적으로 Plan을 확인하는 것이 더 직관적입니다. Spark UI에서는 각 Stage의 실행 시간, 처리 데이터 크기, Shuffle 크기 등을 그래픽으로 확인할 수 있습니다.
Plan에서 주의할 키워드
| 키워드 | 의미 | 대응 방법 |
|---|---|---|
Exchange |
Shuffle 발생 | 불필요한 Shuffle 줄이기 |
BroadcastNestedLoopJoin |
비효율적 Join | Join 조건 확인 |
CartesianProduct |
Cross Join (위험) | 의도적인지 확인 |
SortAggregate |
해시 집계 실패 | 메모리 설정 확인 |
DeserializeToObject |
Dataset API 사용 | DataFrame API로 변경 검토 |
대용량 데이터에서의 성능 점검 순서
- Plan 확인:
explain("formatted")로 전체 실행 계획 확인 - Shuffle 확인:
Exchange연산의 위치와 빈도 점검 - Join 전략 확인: 적절한 Join 전략이 선택되었는지 확인
- Filter Pushdown 확인:
PushedFilters가 적용되었는지 확인 - 파티션 수 확인:
spark.sql.shuffle.partitions설정이 데이터 크기에 적합한지 확인
# 파티션 수 설정 (기본값 200)
spark.conf.set("spark.sql.shuffle.partitions", 400)
# DataFrame의 실제 파티션 수 확인
df.rdd.getNumPartitions()
9. 정리
Spark plan을 읽는 일은 처음엔 귀찮지만, 한 번 패턴이 보이면 쿼리 튜닝 속도가 확연히 빨라진다. 매번 점검하게 되는 항목은 대략 다음과 같다.
• Filter Pushdown이 적용됐는지 (PushedFilters)
• Shuffle(Exchange)이 불필요하게 발생하지 않는지
• Join 전략이 데이터 크기에 맞게 선택됐는지
• AQE가 활성화돼 있는지 (AdaptiveSparkPlan)
• 파티션 수가 데이터 크기에 비해 너무 많거나 적지 않은지
습관처럼 plan을 한 번씩 들여다보면, 성능 문제가 터지기 전에 잠재적인 병목을 미리 잡아낼 수 있다.
Comments