Skip to main content

sglang DP并行详解

版本:v0.4.9.post2

DPControler 两种模式:普通 DP 和 DP Attention 通信方式

常规 DP 是"堆 GPU"式的水平扩展——每个 DP 副本是完全独立的模型副本;而 DP Attention 是在现有 TP 组内部做逻辑切分——MLP 等层仍在整个 TP 组上计算,只有 attention 层按子组做数据并行,从而在不增加 GPU 的情况下提高吞吐。这也解释了为什么 DP Attention 模式需要 TCP 通信(跨节点兼容)以及共享 NCCL 端口(同一个通信组)。

常规 DP 模式

dp_size=2, tp_size=2 为例,共需 4 张 GPU:

  • 每个 DP rank 独占一组 GPU,各自拥有独立的 NCCL 通信组
  • 通信全部走 IPC(Unix 域套接字),仅支持单机
  • control_message_step = 1,控制消息发给每个 worker

DP Attention 模式

tp_size=4, dp_size=2 为例,共需 4 张 GPU(复用同一 TP 组):

  • 所有 GPU 属于同一个 NCCL 组(共享 nccl_port
  • MLP/Embedding 在完整 4 卡 TP 组上计算,Attention 在 2 卡子组内做 TP
  • 通信走 TCP,支持多机部署
  • control_message_step = tp_size,控制消息只发给每个子组的首个 worker

对比

核心差异:常规 DP 是物理隔离的多副本;DP Attention 是同一 TP 组内的逻辑分组——attention 分组并行,其余层仍全组协作。

DataParallelController初始化逻辑

DataParallelController.__init__ 中(第 88-93 行),负责根据数据并行模式选择不同的调度器启动策略。核心逻辑是一个 if/else 分支:

if server_args.enable_dp_attention:(DP Attention 模式)

  • 调用 launch_dp_attention_schedulers(第 163 行)启动调度器。此模式下,数据并行复用了张量并行(TP)的 GPU 组——即所有 DP worker 共享同一个 TP 通信组,而不是每个 DP rank 独占一组 GPU。
  • control_message_step = server_args.tp_size:控制消息的步长设为 TP 大小。因为在 DP Attention 模式中,一个 TP group 内的多个 GPU 被拆分给不同的 DP rank 使用,所以控制器发送控制消息时需要以 tp_size 为步长来正确寻址到每个 DP worker 对应的 scheduler。

else:(普通 DP 模式)

  • 调用 launch_dp_schedulers(第 107 行)启动调度器。此模式下,每个 DP rank 拥有独立的一组 TP GPU,各自运行独立的 scheduler 进程。
  • control_message_step = 1:步长为 1,因为每个 DP rank 对应一个独立的 scheduler,控制消息逐一发送即可。

总结

普通 DPDP Attention
GPU 分配每个 DP rank 独占一组 TP GPU所有 DP rank 共享同一个 TP group
启动方式launch_dp_schedulerslaunch_dp_attention_schedulers
控制消息步长1tp_size

control_message_step 的作用是在后续分发控制消息(如 flush cache、abort request 等)时,决定遍历 worker 的间隔,确保消息准确送达每个 DP rank 对应的调度器。

DP Attention 模式和普通 DP 模式:架构差异

普通 DP 模式

每个 DP rank 拥有一组独立的 TP GPU,模型完整副本被复制多份。

例: tp=4, dp=2 → 共需 8 张 GPU
DP0: [GPU0, GPU1, GPU2, GPU3] ← 独立的 TP group,完整模型副本
DP1: [GPU4, GPU5, GPU6, GPU7] ← 独立的 TP group,完整模型副本
  • launch_dp_schedulers(第 107 行)为每个 DP rank 启动一个独立的 TP group
  • 每个 DP rank 有自己的 NCCL port(第 122 行 bind_port
  • GPU 编号递增分配:base_gpu_id += tp_size * gpu_id_step(第 133 行)
  • KV Cache 在每个 TP group 内的所有 rank 上完整复制

DP Attention 模式

所有 DP rank 共享同一个 TP group,将 TP group 内部拆分为多个"Attention TP 子组"。

例: tp=8, dp=2 → 共需 8 张 GPU(而非 16 张!)
Attn TP Group 0 (DP0): [GPU0, GPU1, GPU2, GPU3] ← 处理请求集 A
Attn TP Group 1 (DP1): [GPU4, GPU5, GPU6, GPU7] ← 处理请求集 B
全局 TP Group: [GPU0 ~ GPU7] ← MLP/MoE 层仍全局通信

核心拆分逻辑在 dp_attention.py:33-41

attn_tp_size = tp_size // dp_size      # 注意力层的 TP 并行度
attn_dp_rank = tp_rank // attn_tp_size # 该 rank 属于哪个 DP 组
attn_tp_rank = tp_rank % attn_tp_size # 在子组内的位置
  • launch_dp_attention_schedulers(第 163 行)只启动一个 TP group,所有 DP rank 共用同一个 NCCL port(第 215 行)
  • 数据在不同层之间需要 gather/scatter 转换(通过 communicator.py 中的 ScatterMode 管理)

DP Attention 模式和普通 DP 模式:关键优化

1. KV Cache 内存大幅减少

这是最核心的优化。普通 DP 模式下,KV Cache 在 TP group 内每个 rank 都有完整副本;DP Attention 模式下,每个 Attention TP 子组只存储自己负责的那部分请求的 KV Cache

文档原文(deepseek.md):"If you do not use DP attention, KV cache will be duplicated among all TP ranks."

以 DeepSeek 模型为例,8 卡 TP=8 时:

  • 普通模式:每张 GPU 都存全部请求的 KV Cache
  • DP Attention (dp=8):每张 GPU 只存 1/8 请求的 KV Cache → KV Cache 占用降为 1/8

2. GPU 数量不增加

普通 DP 需要 tp_size × dp_size 张 GPU,而 DP Attention 只需要 tp_size 张 GPU(DP 复用 TP group 内的 GPU)。

3. 更大的 batch size → 更高吞吐

节省下来的 KV Cache 内存可以容纳更多并发请求,直接提升 decode 吞吐量。文档提到可达 1.9x 解码吞吐提升

4. 每个 DP Worker 独立调度

每个 DP rank 可以独立处理 prefill / decode / idle 不同类型的 batch(scheduler.py:277-283 中只有 attn_tp_rank == 0 接收请求),不同 DP 组之间不需要同步调度决策。数据仅在 MLP/MoE 层前后通过 gather/scatter 汇合。


DP Attention 模式和普通 DP 模式:数据流转机制

DP Attention 的代价是需要在网络层之间进行数据重分布,communicator.py 定义了三种数据分布状态:

SCATTERED:     [a, b, c, d]              — 每个 rank 各持一份独立数据
TP_ATTN_FULL: [ab, ab, cd, cd] — 注意力子组内数据完整复制
FULL: [abcd, abcd, abcd, abcd] — 所有 rank 持有全部数据

典型一层的数据流:SCATTERED → (gather) → TP_ATTN_FULL → Attention → (all-reduce + gather) → FULL → MLP/MoE → (scatter) → SCATTERED


DP Attention 模式和普通 DP 模式:control_message_step 的作用

回到你最初选中的代码(第 88-93 行),在 event_loop(第 287 行)中:

for worker in self.workers[:: self.control_message_step]:
worker.send_pyobj(recv_req)
  • 普通 DP (step=1):每个 worker 是独立的 scheduler,逐一发送控制消息
  • DP Attention (step=tp_size):多个 worker 共享同一个 TP group,控制消息只需发给每个 Attention TP 子组的"头节点"(attn_tp_rank == 0),再由其内部广播

DP Attention 模式和普通 DP 模式:总结

维度普通 DPDP Attention
GPU 需求tp × dptp 张(DP 复用 TP GPU)
KV Cache每个 TP rank 完整复制每个子组只存自己的请求
内存效率高(节省可达数倍)
batch size受限于 KV Cache更大
吞吐量基准最高 ~1.9x decode 提升
通信开销仅组内 TP 通信额外的层间 gather/scatter
适用场景通用KV Cache 较大的模型(如 DeepSeek MLA)

DP Attention 本质上是一种以通信换内存的优化策略,特别适合像 DeepSeek 这类 MLA 注意力机制的模型——其 MLP/MoE 参数量远大于注意力参数量,注意力部分不需要那么大的 TP 并行度,拆成独立的小 TP 子组反而能极大节省 KV Cache 内存。

DeepSeek 模型在 DP Attention 模式下各层的通信模式

以 DeepSeek 为例的完整分布式通信解读。

DeepSeek 模型 DP Attention 模式下的分布式通信全景

TP=4, DP=2 为例,GPU 0-3 共 4 张卡:

  • Attention TP Group 0 (DP0):[GPU0, GPU1],处理请求 a,b
  • Attention TP Group 1 (DP1):[GPU2, GPU3],处理请求 c,d
  • 全局 TP Group:[GPU0, GPU1, GPU2, GPU3],MLP/MoE 层全局通信

三种数据分布状态(communicator.py:44):

SCATTERED:    [a,   b,   c,   d  ]   — 每个 rank 各持独立数据
TP_ATTN_FULL: [ab, ab, cd, cd ] — 注意力子组内数据复制
FULL: [abcd,abcd,abcd,abcd] — 全局复制

一层 DecoderLayer 的完整数据流

DeepSeek 每层的 forward(deepseek_v2.py:1858)由 LayerCommunicator 编排为 5 个阶段

┌─────────────────────────────────────────────────────────────┐
│ Layer Input (TP_ATTN_FULL 或 SCATTERED) │
│ [ab, ab, cd, cd] 或 [a, b, c, d] │
└────────────────────────┬────────────────────────────────────┘

┌──────────▼──────────┐
│ ① prepare_attn() │
│ LayerNorm + 通信 │
└──────────┬──────────┘
│ → TP_ATTN_FULL: [ab, ab, cd, cd]
┌──────────▼──────────┐
│ ② MLA Attention │
│ (子组内 TP 并行) │
└──────────┬──────────┘
│ 输出仍是 TP_ATTN_FULL (partial sum)
┌──────────▼──────────┐
│ ③ prepare_mlp() │
│ AllReduce/RS + LN │
└──────────┬──────────┘
│ → FULL 或 SCATTERED
┌──────────▼──────────┐
│ ④ MLP / MoE │
│ (全局 TP 或 EP) │
└──────────┬──────────┘

┌──────────▼──────────┐
│ ⑤ postprocess() │
│ Gather/Scatter │
└──────────┬──────────┘
│ → 下一层的 input mode

① prepare_attn — 输入 → 注意力

目的:将上一层输出转换为 TP_ATTN_FULL,使注意力子组内的所有 rank 拥有该组的完整 token。

输入状态操作通信原语
TP_ATTN_FULLTP_ATTN_FULL无需通信(同模式)
SCATTEREDTP_ATTN_FULL子组内收集attn_tp_all_gather
# communicator.py:302-316
def _scattered_to_tp_attn_full(hidden_states, forward_batch, context):
attn_tp_all_gather(
list(hidden_states.tensor_split(context.attn_tp_size)),
local_hidden_states,
)

示例:[a, b, c, d] → 子组0内 all_gather → [ab, ab, ...];子组1内 all_gather → [..., cd, cd]


② MLA Attention — 注意力计算

关键设计:MLA 的所有投影矩阵(Q/K/V/O)都按 attn_tp_size(而非 tp_size)分片。

# deepseek_v2.py:777-842
self.num_local_heads = num_heads // attn_tp_size # 注意力头按子组大小分
self.q_b_proj = ColumnParallelLinear(..., tp_rank=attn_tp_rank, tp_size=attn_tp_size)
self.kv_b_proj = ColumnParallelLinear(..., tp_rank=attn_tp_rank, tp_size=attn_tp_size)
self.o_proj = RowParallelLinear(..., tp_rank=attn_tp_rank, tp_size=attn_tp_size,
reduce_results=False) # ← 关键:不做内部 all-reduce
  • o_projreduce_results=False 意味着注意力输出是部分和(partial sum),all-reduce 被推迟到下一阶段,与 LayerNorm 融合执行。
  • KV Cache 只存在于各自的 Attention TP 子组内——这就是内存节省的根源。
  • 此阶段无跨 DP 组通信,每个子组独立计算自己的请求。

③ prepare_mlp — 注意力输出 → MLP 输入

这是最复杂的通信阶段,需要完成:对 o_proj 部分和的归约 + LayerNorm + 模式转换。

根据 MLP 类型分为两种路径:

路径 A:标准 MoE(无 DeepEP)— TP_ATTN_FULL → FULL

MoE 路由需要看到所有 token 才能做全局 expert 选择:

# communicator.py:386-433
def _gather_hidden_states_and_residual(...):
if context.attn_tp_rank == 0:
hidden_states += residual # 只在子组 rank0 加残差
dp_gather_partial(hidden_states, ...) # 全局 all-reduce(跨所有 TP rank)
dp_scatter(residual, hidden_states, ...) # 提取本地残差
hidden_states = layernorm(hidden_states)

通信原语:

  1. dp_gather_partialdp_attention.py:224-263):将每个 DP rank 的数据写入全局 buffer 对应位置(通过 Triton kernel memcpy),然后对全局 TP group 执行 tensor_model_parallel_all_reduce
  2. dp_scatter:从全局 buffer 中提取本 DP rank 的局部数据作为残差

示例:DP0 的 [ab partial] 和 DP1 的 [cd partial] → all-reduce → 每个 rank 都得到 [abcd]

路径 B:DeepEP MoE — TP_ATTN_FULL → SCATTERED

DeepEP 自己处理 expert 路由和 all-to-all,所以只需要 reduce-scatter 到每个 rank 各自的 token:

# communicator.py:435-452
def _scatter_hidden_states_and_residual(...):
tensor_list = list(hidden_states.tensor_split(context.attn_tp_size))
attn_tp_reduce_scatter(hidden_states, tensor_list) # 子组内 reduce-scatter
residual = residual.tensor_split(context.attn_tp_size)[context.attn_tp_rank]
hidden_states, residual = layernorm(hidden_states, residual)

通信原语:attn_tp_reduce_scatter — 在注意力子组内执行 reduce-scatter,每个 rank 得到归约后的自己那部分 token。

示例:子组0 中 [ab, ab] → reduce-scatter → rank0 得到 a(已 reduce),rank1 得到 b(已 reduce)


④ MLP / MoE — 前馈网络计算

Dense MLP 层(前几层)
  • 使用全局 TPColumnParallelLinear + RowParallelLinear
  • RowParallelLinear 内部会做全局 all-reduce(如果 reduce_results=True
Sparse MoE 层(后续层)

无 DeepEP 时(FULL 模式)

  • 每个 rank 都有全部 token,本地做 expert routing + gate
  • 使用全局 TP 的 RowParallelLinear / ColumnParallelLinear
  • Expert 内部做 all-reduce

有 DeepEP 时(SCATTERED 模式)

  • 输入已经是 SCATTERED,每个 rank 只有自己的 token
  • DeepEP 负责 all-to-all 通信:将 token 发送到对应 expert 所在的 rank,计算完再 all-to-all 回来
  • 不需要额外的 TP all-reduce

⑤ postprocess_layer — MLP 输出 → 下一层输入

MLP 输出 → 下一层需要操作通信原语
FULL → TP_ATTN_FULL提取本 DP 组的局部数据dp_scatter
SCATTERED → TP_ATTN_FULL子组内收集 + 残差合并attn_tp_all_gather
SCATTERED → SCATTERED无需通信
# communicator.py:537-554  (SCATTERED → TP_ATTN_FULL)
def _gather(hidden_states, residual, forward_batch, context):
hidden_states += residual # 先加残差
attn_tp_all_gather( # 子组内 all-gather
list(gathered_buffer.tensor_split(context.attn_tp_size)),
hidden_states,
)
return gathered_buffer, None

通信原语总结表

原语通信范围用途
attn_tp_all_gatherAttention 子组内 (e.g. 2卡)SCATTERED → TP_ATTN_FULL
attn_tp_reduce_scatterAttention 子组内TP_ATTN_FULL → SCATTERED(融合 reduce)
dp_gather_partial (= all-reduce)全局 TP group (e.g. 4卡)TP_ATTN_FULL → FULL
dp_scatter本地提取(无通信)FULL → TP_ATTN_FULL / SCATTERED
tensor_model_parallel_all_reduce全局 TP group标准 TP 的参数归约
DeepEP all-to-all跨 expert 节点MoE expert 路由

一个完整层的通信示例(TP=4, DP=2, DeepEP MoE 层)

输入 [a, b, c, d]  (SCATTERED)

├─ attn_tp_all_gather (子组内) ──→ [ab, ab, cd, cd] (TP_ATTN_FULL)

├─ MLA Attention (各子组独立计算,无跨组通信)
│ o_proj 输出 partial sum,仍是 [ab', ab', cd', cd']

├─ attn_tp_reduce_scatter (子组内) ──→ [a'', b'', c'', d''] (SCATTERED)
│ + LayerNorm

├─ DeepEP MoE (all-to-all 路由 expert,各 rank 独立)
│ 输出 [a''', b''', c''', d'''] (SCATTERED)

└─ attn_tp_all_gather (子组内) ──→ [a'''b''', a'''b''', c'''d''', c'''d'''] (TP_ATTN_FULL)
送入下一层

核心洞察:在 DeepEP 路径下,从不需要全局 all-reducedp_gather_partial),所有通信都是子组内的 all-gather / reduce-scatter,通信量显著更小。只有非 DeepEP 的标准 MoE 路径才需要全局 all-reduce 来组装 FULL 数据。

ColumnParallelLinear vs RowParallelLinear 选择规律

核心数学原理

一个线性层 Y = X @ W 要做 TP 分片,有两种切法:

ColumnParallelLinear — 按列(输出维度)切分 W

W = [W₁ | W₂ | ... | Wₚ]   (p = tp_size)

每个 rank i 计算: Yᵢ = X @ Wᵢ
↑ 输入完整 ↑ 输出是局部分片
  • 输入 X 不需要切分,每个 rank 拿到完整输入
  • 输出 Yᵢ 是全局输出的一部分列(对应一部分注意力头或隐藏维度)
  • 无需通信gather_output=False 时)

RowParallelLinear — 按行(输入维度)切分 W

W = [W₁]      每个 rank i 计算: Yᵢ = Xᵢ @ Wᵢ
[W₂] ↑ 输入是局部分片
[...]
[Wₚ]
最终: Y = Y₁ + Y₂ + ... + Yₚ ← 需要 all-reduce
  • 输入 Xᵢ局部分片(上一层 ColumnParallel 的分片输出)
  • 输出 Yᵢ部分和(partial sum)
  • 需要 all-reduce 求和得到完整输出(除非 reduce_results=False 将 reduce 推迟)

黄金搭配规律:Column → Row 配对

ColumnParallel 和 RowParallel 总是成对出现,形成一个无通信→一次通信的流水线:

         ColumnParallel              RowParallel
输入 X ──→ 按列切分,无通信 ──→ 分片结果 ──→ 按行切分,all-reduce ──→ 完整输出 Y
(完整) 每rank算局部输出 (局部) 每rank算部分和 (完整)

关键约束:ColumnParallel 的分片输出维度 = RowParallel 的分片输入维度,这样中间不需要任何通信。


DeepSeek MLA 中的具体应用

MLA 的投影链条和选择逻辑如下(deepseek_v2.py:789-843):

                          ┌─────────────────────────────────────┐
│ MLA Attention 投影层布局 │
└─────────────────────────────────────┘

hidden_states [bs, hidden_size]


┌──────────────────────────────────────┐
│ fused_qkv_a_proj_with_mqa │ ← ReplicatedLinear (不切分)
│ hidden_size → q_lora_rank + │ 因为输出维度很小(~2112),
│ kv_lora_rank + │ 切分反而低效
│ qk_rope_head_dim │
└──────────────┬───────────────────────┘
│ split
┌────────┴────────┐
▼ ▼
q_compressed kv_compressed + k_pe
[q_lora_rank] [kv_lora_rank + rope_dim]
│ │
▼ ▼
┌───────────┐ ┌────────────┐
│ q_b_proj │ │ kv_b_proj │ ← ColumnParallelLinear (按列切)
│ Column │ │ Column │ 输出维度 = num_heads × head_dim
│ Parallel │ │ Parallel │ 按 attn_tp_size 切分注意力头
└─────┬─────┘ └─────┬──────┘ 每个 rank 得到 num_local_heads
│ │
▼ ▼
q_local k_local, v_local
[num_local_heads, [num_local_heads,
qk_head_dim] qk_nope_dim + v_dim]
│ │
└───────┬────────┘

┌───────────┐
│ Attention │ (各子组独立计算, 无通信)
└─────┬─────┘


attn_output [num_local_heads × v_head_dim]


┌───────────┐
│ o_proj │ ← RowParallelLinear (按行切)
│ Row │ 输入维度 = num_local_heads × v_head_dim (分片)
│ Parallel │ 输出维度 = hidden_size (完整维度)
│ reduce= │ reduce_results=False → 输出是 partial sum
│ False │
└─────┬─────┘


partial sum [hidden_size]
(交给 LayerCommunicator 做 reduce + LayerNorm)

每个投影层的选择理由

投影层类型原因
fused_qkv_a_proj_with_mqaReplicatedLinear压缩投影,输出维度极小(~2112),TP 切分得不偿失,且输出需要每个 rank 完整持有才能送入后续 ColumnParallel
q_a_proj / kv_a_proj (旧路径)ReplicatedLinear同上,压缩到低秩空间,维度太小不值得切
q_b_projColumnParallel解压投影,输出维度 = num_heads × head_dim(很大),按注意力头切分 — 每个 rank 只计算 num_local_heads 个头
kv_b_projColumnParallel同理,输出 K/V 的多头表示,按头切分
o_projRowParallel输入是注意力输出(已按头分片),按行对应分片输入,输出需要 reduce 求和得到完整 hidden_size

规律总结

规律一:维度扩展用 Column,维度收缩用 Row

小维度 ──ColumnParallel──→ 大维度(切分输出头/隐层)
大维度 ──RowParallel────→ 小维度(合并回 hidden_size)

Column 切输出维度,适合"扇出"(一变多头);Row 切输入维度,适合"扇入"(多头合一)。

规律二:Column-Row 总是配对,中间零通信

q_b_proj(Column) ──→ [attention] ──→ o_proj(Row)
gate_up_proj(Column) ──→ [SiLU] ──→ down_proj(Row)

Column 的输出分片正好是 Row 的输入分片,中间不需要 all-gather 或 redistribute。只在 Row 的末尾做一次 all-reduce(或 reduce-scatter),整个配对只有一次通信。

规律三:维度太小就不切 → ReplicatedLinear

MLA 的压缩投影(q_lora_rank=1536, kv_lora_rank=512)输出维度很小,TP 切分后每个 rank 只剩几百个维度,通信开销反而超过计算收益。所以用 ReplicatedLinear,每个 rank 持有完整权重。

规律四:reduce_results=False 延迟归约

o_proj 不在内部做 all-reduce,而是推迟到 LayerCommunicator.prepare_mlp() 中,与 LayerNorm 融合执行。在 DP Attention 模式下,还可以选择 reduce-scatter 而非 all-reduce,进一步节省通信。


MLP/MoE 中同样遵循此规律

# Dense MLP:
gate_up_proj = MergedColumnParallelLinear(hidden_size, [intermediate_size]*2) # 扇出
down_proj = RowParallelLinear(intermediate_size, hidden_size) # 扇入

# MoE Expert:
gate_up_proj = MergedColumnParallelLinear(hidden_size, [intermediate_size]*2)
down_proj = RowParallelLinear(intermediate_size, hidden_size)

模式完全一致:Column 负责扩展到中间维度并切分,Row 负责收缩回 hidden_size 并归约。