Join 是分布式数据处理中最昂贵的操作之一。在 Apache Spark 里,join 策略的选择决定一个作业是几分钟还是几小时跑完——或是否因 OOM 错误或失控的 shuffle 而崩溃集群。Spark 的 Catalyst 优化器基于数据集大小和可用统计自动选策略,但理解每种策略的机制、shuffle 的真实成本、数据倾斜如何悄无声息地摧毁一个选得好的策略,以及自适应查询执行如何在运行时改变局面——这些技能区分了资深数据工程师与那些只调 df.join() 并祈祷的人。

⚡ 速览要点
  • 广播 Join(Broadcast Join)——把小的一侧复制到每个 executor;大侧零 shuffle。默认阈值 10 MB(通过 autoBroadcastJoinThreshold 可配)。
  • Sort-Merge Join——Spark 大-大 join 的稳健默认;shuffle 后排序两侧,无论数据集多大都在常量内存里合并。
  • Shuffle Hash Join——shuffle 后从较小侧建内存哈希表;build 侧装得下时比 sort-merge 快,装不下则溢出到磁盘。
  • Bucketing 在写时预分区表,让对同一 key 的重复 join 完全跳过 shuffle——一次性写成本,永久收益。
  • 先查 df.explain();只在有证据它选错或统计陈旧时才用 hint 覆盖优化器。
tldr

一侧小到能装进内存时用广播 join——它完全消除网络 shuffle。大-大 join 用 sort-merge join;它是该场景下 Spark 的稳健默认。build 侧装得进哈希表时 shuffle hash join 对大数据集快。除非显式需要叉积,避免笛卡尔 join。Bucketing 预分区数据以消除重复 join 中的 shuffle。

Spark 的执行模型——Stage 与 Shuffle

理解 join 策略前,你必须理解 Spark 如何执行查询。Spark 把逻辑计划翻译成 stage 的有向无环图(DAG)。每个 stage 是一组能在同一数据的不同分区上并行运行的任务——一个 stage 内任务间无数据移动。stage 由 shuffle 边界分隔:数据必须跨分区重分布的点。

shuffle 是 Spark 执行模型中最昂贵的操作。它涉及:(1) map 侧把 shuffle 数据按目标分区排序写到本地磁盘;(2) reduce 侧通过网络从所有 map 任务读那些数据;(3) 反序列化并重分区数据。对 1 TB 数据集,一次 shuffle 意味着 1 TB 写磁盘、1 TB 经网络发送、1 TB 从网络读——单个操作 3 TB 的 I/O。这就是为什么每个 join 优化从根本上都是关于减少 shuffle 量或完全消除 shuffle。

python
# 读物理计划——永远是第一步诊断
df.explain(mode="extended")  # 显示逻辑、优化后、物理计划

# 要找的关键物理计划节点:
# BroadcastHashJoin  → 选了广播 join(大侧无 shuffle)
# ShuffledHashJoin   → shuffle hash join(两侧 shuffle,建哈希)
# SortMergeJoin      → sort-merge join(两侧 shuffle 并排序)
# CartesianProduct   → cross join(危险:O(M×N) 输出)
# Exchange           → shuffle 边界(每个 Exchange = 一次 shuffle)

# 数 Exchange 节点:0 = 无 shuffle,1 = 单侧 shuffle(广播),2 = 全 shuffle

Spark UI 里的 DAG 可视化把 stage 映射到执行计划。每个 stage 边界对应一次 shuffle。两个大数据集间的 Sort-Merge Join 产生两个 Exchange 节点(每侧一个),并被拆成至少三个 stage:读两侧(stage 1 和 2)、shuffle+排序两侧(stage 3)、合并(stage 4)。广播 join 在大侧产生零个 Exchange 节点——它通常装进两个 stage。

为什么 Join 策略重要

在分布式集群里,join 的主要成本是跨网络移动数据——叫 shuffle。shuffle 强迫 Spark 序列化数据、写磁盘、经网络发到正确的 executor,并在接收端反序列化。对大数据集这能以一个数量级主导总作业运行时。每个 join 优化的目标都是最小化不必要的 shuffle——要么完全消除 shuffle(广播 join、Bucketing),要么在 shuffle 不可避免时选最高效的策略(sort-merge vs shuffle hash,取决于 build 侧是否装进内存)。

广播 Join——消除大侧的 Shuffle

当两个数据集之一小到能装进每个 worker 的内存,Spark 把较小数据集广播(复制)到所有 executor 节点。每个 executor 随后对它那部分较大数据集本地执行 join——大侧无需 shuffle。大表从不移动。小表通过类 BitTorrent 的点对点广播协议每 executor 发一次(不是从 driver 逐个发给每个 executor,那会让 driver 成瓶颈)。

python
from pyspark.sql import functions as F

# 提高广播阈值——executor 有足够堆则安全
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)  # 100 MB

# 手动广播 hint——无视大小估计强制广播
# 当你知道表小但 Catalyst 不知(统计陈旧)时用
result = orders.join(
    F.broadcast(country_codes),   # 把 country_codes 复制到每个 executor
    on="country_code",
    how="left"
)

# SQL hint 语法(等价)
spark.sql("""
  SELECT /*+ BROADCAST(c) */ o.*, c.country_name
  FROM orders o
  JOIN country_codes c ON o.country_code = c.code
""")

# 禁用自动广播——为测试强制 sort-merge
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
广播出问题时

广播一个大于 executor 内存的数据集会在每个 worker 上同时造成 OOM——一次集群级崩溃。当一个上月还小的表已增长时要警惕。检查 Spark UI 广播 stage 的 "Input" 大小。若它接近 executor 堆大小(扣除开销后 executor 堆通常是总 executor 内存的 60–70%),要么增加 executor 内存,要么用 autoBroadcastJoinThreshold=-1 切到 sort-merge。

Shuffle Hash Join——全 Shuffle 后的哈希表 Join

两个数据集都通过网络 shuffle 按 join key 重分区,所以同 key 的所有行落到同一 executor。Spark 随后从一侧("build" 侧,选较小的分区集)建内存哈希表,并用另一侧("probe" 侧)的每行探测它。probe 侧每行的哈希查找是 O(1)——比 sort-merge join 里的顺序合并扫描快。

python
# 启用 Shuffle Hash Join——默认禁用
spark.conf.set("spark.sql.join.preferSortMergeJoin", False)

# SQL hint 在特定查询上强制 Shuffle Hash Join
spark.sql("""
  SELECT /*+ SHUFFLE_HASH(orders) */ *
  FROM orders
  JOIN transactions ON orders.order_id = transactions.order_id
""")

# 什么时候值得?
# 每分区 build 侧(较小表)装进 executor 内存
# 且 join key 高基数(无倾斜)
# 且你想避免 sort-merge 的排序开销

Sort-Merge Join——Spark 大-大 Join 的稳健默认

当广播不可能时,Spark 大-大 join 的默认策略。两侧都按 join key shuffle(两个 Exchange 节点),然后在每个分区内排序。排序后的分区被顺序合并——同时穿过两个有序流的单次线性扫描,以同步前进的方式按记录匹配。这与外部归并排序的归并阶段是同一算法,且无论分区大小都在常量内存里工作:它从不需要把整个分区放进 RAM,每次每侧只一条记录。

python
# Sort-Merge 是 Spark 默认——无需配置
# SQL hint 显式强制 Sort-Merge
spark.sql("""
  SELECT /*+ MERGE(orders, transactions) */ *
  FROM orders
  JOIN transactions ON orders.order_id = transactions.order_id
""")

# 调 Sort-Merge:shuffle 分区数
# 默认 200——对大数据常太少,对小数据太多
# 经验法则:每个 shuffle 分区大小应 ~128–256 MB
# total_data_size_GB * 1024 / 200 MB_每分区 → 目标分区数
spark.conf.set("spark.sql.shuffle.partitions", 2000)  # 对 ~400 GB shuffle 数据

# 启用 AQE(Spark 3.0+),这会自动调优——见下方 AQE 节
spark.conf.set("spark.sql.adaptive.enabled", True)

笛卡尔(Cross)Join——O(M×N)——极度小心使用

把左数据集每行与右数据集每行配对,产生 M×N 结果。无需 join key。输出大小二次增长——即便中等大的输入也产生不可管理的巨大输出。一张 100 万行的表与一张 100 万行的表叉乘产生 1 万亿行。Spark 要求显式 crossJoin() 调用或 spark.sql.crossJoin.enabled=true 以防意外的 cross join。

策略对比

策略最适合网络 Shuffle内存压力触发条件
广播 Join小(< 阈值)× 大仅小侧(一次)低(若小侧装得下)一侧 < autoBroadcastJoinThreshold 或 BROADCAST hint
Shuffle Hash Join中 × 大(build 装进哈希)两侧全 shuffle中–高(哈希表)preferSortMergeJoin=false;build 侧装进内存
Sort-Merge Join大 × 大(默认)两侧全 shuffle + 排序低(流式合并)超过广播阈值时默认
笛卡尔 Join叉积(仅极小数据集)所有数据重分布极端(O(M×N) 输出)无 join key;需 crossJoin.enabled=true

数据倾斜——沉默的性能杀手

数据倾斜发生在行在 join key 值上分布不均时。若一个 key 有 1000 万行而其他都 1000 行,一个 executor 的任务会处理比同伴多 1 万倍的数据,而其余空闲。这是 Spark 作业在"99% 完成"卡几小时的最常见原因——一个掉队任务拖住整个 stage。

检测倾斜

检查 Spark UI 的 Stage 详情页。按 Duration 或 Input Size 排序任务。若少数任务(或就一个)耗时比中位数戏剧性地长,你有倾斜。代码里这样验证:

python
# join 前剖析 key 分布
df.groupBy("join_key") \
  .count() \
  .orderBy(F.col("count").desc()) \
  .show(20)

# 若前 5 个 key 含 > 50% 所有行:你有倾斜

# 技术 1:加盐(Salting)——人为分散热 key
# 在两侧给倾斜 key 追加随机盐(0-9)
import pyspark.sql.functions as F

# 热 key 表:为所有盐值复制每行
hot_keys = F.broadcast(
    small_dimension.crossJoin(
        spark.range(10).toDF("salt")  # 10 个盐值
    ).withColumn("salted_key",
        F.concat(F.col("join_key"), F.lit("_"), F.col("salt")))
)

# 大表:给每行随机分配一个盐
salted_large = large_fact.withColumn(
    "salted_key",
    F.concat(F.col("join_key"),
             F.lit("_"),
             (F.rand() * 10).cast("int").cast("string"))
)

# 在加盐 key 上 join——倾斜分散到 10 倍多的分区
result = salted_large.join(hot_keys, on="salted_key")

Skew Join Hint(Spark 3.0+)

Spark 3.0 引入 SKEW_JOIN hint,告诉 Catalyst 优化器把倾斜分区拆成更小子分区并独立 join,而无需手动加盐:

python
# SQL 倾斜 hint——告诉 Catalyst 哪张表倾斜
spark.sql("""
  SELECT /*+ SKEW_JOIN(orders) */ *
  FROM orders
  JOIN transactions ON orders.order_id = transactions.order_id
""")

# 基于 AQE 的自动倾斜处理(Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)

# 倾斜阈值:一个分区被视为倾斜,若
# 其大小 > skewFactor × 中位数分区大小 且 > skewedPartitionThresholdInBytes
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

Bucketing——消除重复 Join 的 Shuffle

Bucketing 在表写到存储时按 join key 预分区。若 join 两侧用相同桶数按同一 key bucketing,Spark 能完全跳过 shuffle——数据已按 key 共置。表 A 的每个桶与表 B 对应的桶在同一 executor 上 join,零网络传输。这是一次性写成本,在数据集生命周期内对该表的每次后续 join 都受益。

问题:桶数必须两侧精确匹配,且 bucketing 只在从 Hive Metastore 支撑的表格式(通过 saveAsTable 的 Parquet/ORC)读时工作——不是从裸文件读。最优桶数是数据大小的函数:目标每桶文件 ~128–256 MB,这样文件既不太小(任务太多)也不太大(掉队任务)。对 1 TB 表,4,000–8,000 桶是合理起点。

python
# 把 orders 按 customer_id bucketing 写出——一次性成本
orders.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \     # 桶内排序,Sort-Merge 无需重排
    .saveAsTable("orders_bucketed")

# 用相同桶数 + key 写 customers——关键要求
customers.write \
    .bucketBy(256, "customer_id") \
    .sortBy("customer_id") \
    .saveAsTable("customers_bucketed")

# 现在 join 无 shuffle——Catalyst 检测到桶共置
# 物理计划会显示 SortMergeJoin 而前面没有 Exchange 节点
result = spark.table("orders_bucketed").join(
    spark.table("customers_bucketed"),
    on="customer_id"
)

# 验证:查 explain()——无 Exchange 节点 = shuffle 已消除
result.explain()
Bucketing 的坑

Bucketing 在实践中脆弱。若你给一张表加分区(用不同桶数重新 bucketing),共置保证被打破,Spark 静默退回全 shuffle。schema 或分区变更后总用 explain() 确认。另外:bucketing 只在用 spark.table("name") 读时被识别,而非用 spark.read.parquet("path") 直接读 Parquet 文件时。

自适应查询执行(AQE)——Spark 3.0+

AQE 通过允许 Spark 在运行时用实际 shuffle 统计而非执行前估计来做计划决策,从根本上改变了 join 优化局面。三个直接影响 join 性能的能力:

1. 动态 Join 策略切换

若一个数据集被估计为大(于是计划了 sort-merge),但首个 shuffle stage 后实际数据比预期小得多,AQE 能在执行中途切到广播 join——而无需重启查询。这抓住了统计陈旧或不存在、优化器过于保守的常见情况。

2. 动态合并 Shuffle 分区

默认 200 的 shuffle 分区数是任意的且常错:对小数据集太多(造成数千个小任务),对大数据集太少(造成分区过大的掉队任务)。AQE 测量 shuffle 后的实际分区大小,把相邻小分区合并成更少、更大的——自动为实际数据分布调正并行度。

3. 自动倾斜处理

AQE 检测倾斜分区(那些显著大于中位数的)并自动把它们拆成子分区,从另一侧复制匹配的非倾斜分区以对每个子分区 join。这消除掉队任务而无需手动加盐——对有真实世界倾斜数据的生产管道最有影响力的 AQE 特性。

python
# 启用 AQE——Spark 3.2+ 默认 True,3.0/3.1 须手动设
spark.conf.set("spark.sql.adaptive.enabled", True)

# 动态 join 策略切换
# AQE 会在运行时大小 < 阈值时把 Sort-Merge → 广播
spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", True)

# 动态分区合并
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

# 自动倾斜 join 处理
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", 5)  # 5× 中位数 = 倾斜
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

读查询计划与诊断问题

df.explain(mode='extended')(或就 df.explain(True))检查 Spark 选了哪个 join 策略。物理计划是实际执行的——聚焦它。要找的关键节点:

若 Spark 选错策略——表统计陈旧时常见——用 hint 注解或运行 ANALYZE TABLE <name> COMPUTE STATISTICS FOR ALL COLUMNS 刷新统计让 Catalyst 重新规划。启用 AQE 时,陈旧统计问题较小,因为计划在每个 shuffle stage 边界用实际运行时大小重新优化。

关键调优参数

参数默认控制什么
spark.sql.autoBroadcastJoinThreshold10 MB自动广播的数据集最大大小。大 executor 集群提到 100–512 MB。
spark.sql.shuffle.partitions200Sort-Merge/Shuffle Hash 的 shuffle 分区数。启用 AQE 时这成为上限;实际自动合并。
spark.sql.join.preferSortMergeJointrue设 false 以允许 build 侧装进内存时用 Shuffle Hash Join。
spark.sql.adaptive.enabledtrue (3.2+)自适应查询执行的总开关。
spark.sql.adaptive.advisoryPartitionSizeInBytes64 MBAQE 合并的目标分区大小。128–256 MB 通常最优。
spark.sql.adaptive.skewJoin.skewedPartitionFactor5分区须比中位数大这么多倍才被视为倾斜。
spark.executor.memory1g总 executor 堆。广播 join 需小表装进这里。Shuffle Hash 需 build 侧分区装进。
spark.memory.fraction0.6executor 堆用于执行+存储(vs JVM 开销)的比例。内存重的 join 负载提到 0.7–0.75。

Join 决策树——何时用什么

实践中,提交任何 join 重的 Spark 作业前应用这个决策树:

  1. 检查一侧是否小到能广播——跑 df.count() 并估计未压缩大小。若它装进 executor 内存(考虑压缩比,Parquet 通常 3–5 倍),用广播。提高 autoBroadcastJoinThreshold 或用 hint。
  2. 若两侧都大,检查倾斜——剖析 key 分布。若倾斜,启用 AQE 倾斜 join 处理或在选策略前应用手动加盐。
  3. 若 join 在同一 key 上重复——考虑写时把两表都 bucketing。一次性成本,永久消除 shuffle。
  4. 若启用 AQE(Spark 3.2+)——带 AQE 的 Sort-Merge 自适应处理大多数情况。除非剖析显示特定问题,信任它。
  5. 只在以下情况启用 Shuffle Hash Join:每分区 build 侧明确装进 executor 内存(测量,非估计)且排序开销是可测量的瓶颈。
  6. 总用 explain() 验证——在生产数据上跑前确认物理计划匹配你的意图。
总结

永远先查物理计划——让 Spark 优化器做它的工作,只在有证据它选错时才覆盖。决策树:一侧装进内存 → 广播(完全消除大侧 shuffle);两侧都大但 build 侧装进哈希表 → Shuffle Hash;两侧真大 → Sort-Merge(常量内存合并,Spark 的安全默认)。在倾斜让好策略失去意义之前检测并处理它。写时给频繁 join 的大表 bucketing 以永久消除 shuffle。Spark 3.0+ 启用 AQE 以获得自适应策略切换、分区合并和自动倾斜处理——它解决了大多数以前需要手动调优的问题。

🎯 面试速答

为什么 shuffle 是 Spark join 的敌人?shuffle 强迫序列化、写磁盘、网络传输和反序列化——对 1 TB 数据集是一个操作 3 TB 的 I/O。每个 join 优化都旨在最小化不必要的 shuffle:广播在大侧消除它,Bucketing 完全消除它,Sort-Merge 通过做一次并流式合并来最小化额外工作。
广播 vs Sort-Merge——各自何时胜?一侧装进 executor 内存时广播胜——它消除所有大侧网络流量,常带来 10–100 倍加速。大-大 join 时 Sort-Merge 胜,因为它通过顺序扫描在有界常量内存里合并,处理超过总可用 RAM 的数据集而无 OOM 风险。
Bucketing 怎么消除 shuffle?两表在写时用相同桶数按同一 join key 预分区。Spark 的 Catalyst 优化器检测到匹配行已在同一 executor 共置,完全跳过 shuffle Exchange 节点——由物理计划里无 Exchange 确认。
什么是数据倾斜、怎么修?倾斜是某些 join key 值比其他有多得多的行,造成一个 executor 任务处理比同伴多 1000 倍的数据——一个掉队者拖住整个 stage。修法:(1) AQE 的自动倾斜 join 处理(Spark 3.0+),拆分倾斜分区;(2) 手动加盐——在两侧给倾斜 key 追加随机后缀,为所有盐值复制维度侧;(3) 若倾斜的小维度装进内存就广播它。
AQE 对 join 做什么?AQE(Spark 3.0+)在每个 shuffle 边界用实际运行时统计重新优化计划:若实际数据比估计小能把 Sort-Merge 切到广播、合并 shuffle 分区以调正并行度,并自动把倾斜分区拆成子分区以均衡执行。

← 上一篇
OAuth 2.0