🚧 wip: update sparse policy refactoring plan to v4

Simplified scope to FullPolicy only. Added debug validation phase.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Zijie Tian
2026-01-19 23:10:49 +08:00
parent b97b0b96a0
commit 16b269d897

View File

@@ -1,39 +1,48 @@
# Task Plan: Sparse Policy 架构重构 v3
# Task Plan: Sparse Policy 架构重构 v4 (FullPolicy Only)
## Goal
将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。attention.py 只负责调用 policy不包含任何计算逻辑。
## 核心设计原则(强制要求)
**范围**: 仅实现 FullPolicy暂不涉及 QuestPolicy 和 XAttentionBSAPolicy。Decode 阶段不处理。
1. **Policy 内部完成所有计算**:包括 attention 计算和结果合并
2. **select_blocks 传入 offload_engine**policy 通过 offload_engine 加载 blocks
3. **强制实现计算函数**:所有 policy 必须实现 `compute_block_attention` `merge_attention_outputs`
## 当前代码状态(重要发现)
**`FullPolicy.compute_prefill_attention` 已经实现了完整的 prefill 流程!**
`attention.py` 没有调用它,而是:
- 调用 `sparse_policy.select_blocks()` 仅做 block 筛选
- 自己实现 `_ring_buffer_pipeline_load``_sync_load_previous_chunks`
- 自己调用 `flash_attn_with_lse``merge_attention_outputs`
**结论**:当前代码有冗余,同样的逻辑在两个地方实现。
## 核心设计原则
1. **Policy 内部完成所有 prefill 计算**:包括 block 加载、attention 计算和结果合并
2. **select_blocks 传入 offload_engine**其他策略Quest/XAttn可能需要加载 KV 来判断
3. **统一方法命名**:使用 `compute_chunked_attention`(不是 `compute_prefill_attention`
4. **chunked_prefill 强制 policy 存在**:没有 policy 则报错
5. **外部默认 FULL policy**model_runner.py 默认创建 FullPolicy
6. **attention.py 零计算逻辑**_chunked_prefill_attention 只调用 policy不直接调用 flashattn 或 merge
5. **attention.py 零计算逻辑**`_chunked_prefill_attention` 只调用 policy
## 目标架构
```
model_runner.py:
默认创建 FullPolicy如果没有指定 sparse policy
attention.py (_chunked_prefill_attention):
检查 sparse_policy 是否存在
调用 sparse_policy.compute_prefill_attention(q, k, v, ...)
调用 sparse_policy.compute_chunked_attention(q, k, v, ...)
返回最终输出(不包含任何计算逻辑)
处理 async offload
返回最终输出(不包含任何 attention 计算逻辑)
SparsePolicy.compute_prefill_attention():
1. select_blocks(blocks, offload_engine, ctx) → 筛选 blocks
2. 加载 blocks通过 offload_engine
3. 遍历 blocks
- 调用 self.compute_block_attention(q, k, v, ...)
- 调用 self.merge_attention_outputs(...)
4. 计算当前 chunk attention
5. 合并最终结果
SparsePolicy.compute_chunked_attention():
1. 获取 cpu_block_table
2. 调用 select_blocks(blocks, offload_engine, ctx) → 筛选 blocks
3. 加载 blocks 并计算 attentionpipeline 或 sync
4. 计算当前 chunk attentioncausal
5. 合并所有结果
6. 返回 final_output
```
@@ -41,106 +50,71 @@ SparsePolicy.compute_prefill_attention():
| 决策 | 说明 |
|------|------|
| **决策 1** | `compute_block_attention` 是抽象方法,所有 policy 必须实现 |
| **决策 2** | `merge_attention_outputs` 抽象方法,所有 policy 必须实现 |
| **决策 3** | `compute_prefill_attention` 是抽象方法,定义完整的 prefill 流程 |
| **决策 4** | `select_blocks` 接收 `offload_engine` 参数(为未来准备) |
| **决策 5** | chunked_prefill 检查 policy 是否存在,不存在则抛出错误 |
| **决策 6** | model_runner 默认创建 FullPolicy 作为兜底 |
| **决策 7** | attention.py 的 _chunked_prefill_attention 不包含任何 flashattn 或 merge 调用 |
| **决策 1** | `compute_chunked_attention`唯一的抽象方法,定义完整 prefill 流程 |
| **决策 2** | 不添加 `compute_block_attention` `merge_attention_outputs` 抽象方法(过度设计) |
| **决策 3** | `select_blocks` 接收 `offload_engine` 参数(其他策略需要) |
| **决策 4** | attention.py 的 `_chunked_prefill_attention` 不包含任何 flashattn 或 merge 调用 |
| **决策 5** | Decode 阶段不处理,保持现有逻辑 |
| **决策 6** | async offload 逻辑保留在 attention.py不移入 policy |
| **决策 7** | Phase 4 需要添加 debug 输出验证执行路径 |
## Phases
- [ ] Phase 1: 分析当前架构,理解所有计算逻辑的位置
- [ ] Phase 2: SparsePolicy 基类中添加三个抽象方法
- [ ] Phase 3: 修改 FullPolicy,实现三个抽象方法
- [ ] Phase 4: 修改 QuestPolicy实现三个抽象方法
- [ ] Phase 5: 修改 XAttentionBSAPolicy实现三个抽象方法
- [ ] Phase 6: 修改 model_runner.py默认创建 FullPolicy
- [ ] Phase 7: 修改 attention.py移除所有计算逻辑只调用 policy
- [ ] Phase 8: 测试验证
- [x] Phase 1: 分析当前架构 ✅ 已完成
- [ ] Phase 2: 修改 SparsePolicy 基类
- [ ] Phase 3: 修改 FullPolicy
- [ ] Phase 4: 验证执行路径(添加 debug 输出)
- [ ] Phase 5: 修改 attention.py
- [ ] Phase 6: 测试验证
## Phase 1: 分析当前架构,理解所有计算逻辑的位置
## Phase 1: 分析当前架构 ✅ 已完成
### 当前 attention.py 中包含的计算逻辑
1. `_ring_buffer_pipeline_load` 方法:
- 调用 `offload_engine.load_to_slot_layer()`
- 调用 `offload_engine.wait_slot_layer()`
- 调用 `offload_engine.get_kv_for_slot()`
- 调用 `flash_attn_with_lse()`**直接调用**
- 调用 `merge_attention_outputs()`**直接调用**
2. `_sync_load_previous_chunks` 方法:
- 同上,直接调用 flashattn 和 merge
### 当前 attention.py 中包含的计算逻辑(需要移除)
1. `_ring_buffer_pipeline_load` 方法:直接调用 flashattn 和 merge
2. `_sync_load_previous_chunks` 方法:直接调用 flashattn 和 merge
3. `_chunked_prefill_attention` 方法:
- 调用 `_ring_buffer_pipeline_load``_sync_load_previous_chunks`
- 调用 `flash_attn_with_lse()` 计算当前 chunk
- 调用 `merge_attention_outputs()` 合并结果
- 调用上述两个方法
- 计算当前 chunkflash_attn
- 合并结果merge
### 需要移动的计算逻辑
### 当前 FullPolicy 已实现的功能
所有 `flash_attn_with_lse` `merge_attention_outputs` 调用都应该在 SparsePolicy 内部。
`full_policy.py:40-162` `compute_prefill_attention` 已实现:
- ring buffer pipeline 加载
- sync 加载 fallback
- 当前 chunk attention 计算
- 结果合并
## Phase 2: 在 SparsePolicy 基类中添加三个抽象方法
**只需重命名为 `compute_chunked_attention` 并微调接口。**
### 2.1 compute_block_attention
## Phase 2: 修改 SparsePolicy 基类
### 2.1 修改 select_blocks 接口
```python
@abstractmethod
def compute_block_attention(
def select_blocks(
self,
q: torch.Tensor,
k: torch.Tensor,
v: torch.Tensor,
layer_id: int,
softmax_scale: float,
causal: bool,
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
available_blocks: List[int],
offload_engine: "OffloadEngine", # 新增参数
ctx: PolicyContext,
) -> List[int]:
"""
计算单个 block 的 attention
选择要加载的 blocks
Args:
q: [1, seq_len, num_heads, head_dim] 或 [seq_len, num_heads, head_dim]
k, v: 同上
layer_id: 层索引
softmax_scale: softmax 缩放因子
causal: 是否应用因果掩码
available_blocks: 所有可用的 block IDs
offload_engine: offload engine其他策略可能需要加载 KV 来判断)
ctx: policy context
Returns:
(o, lse) - attention 输出和 LSE
选择的 block IDs
"""
pass
```
### 2.2 merge_attention_outputs
```python
@abstractmethod
def merge_attention_outputs(
self,
o_acc: torch.Tensor,
lse_acc: Optional[torch.Tensor],
o_new: torch.Tensor,
lse_new: Optional[torch.Tensor],
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
合并两个 attention 输出。
Args:
o_acc: 累积的 attention 输出 [1, seq_len, num_heads, head_dim]
lse_acc: 累积的 LSE
o_new: 新的 attention 输出
lse_new: 新的 LSE
Returns:
(merged_o, merged_lse)
"""
pass
```
### 2.3 compute_chunked_attention
### 2.2 添加 compute_chunked_attention 抽象方法
```python
@abstractmethod
@@ -151,9 +125,9 @@ def compute_chunked_attention(
v: torch.Tensor,
layer_id: int,
softmax_scale: float,
offload_engine: OffloadEngine,
offload_engine: "OffloadEngine",
current_chunk_idx: int,
seq: ChunkedSequence,
seq: "ChunkedSequence",
num_tokens: int,
) -> torch.Tensor:
"""
@@ -167,7 +141,8 @@ def compute_chunked_attention(
5. 合并所有结果
Args:
q, k, v: 当前 chunk 的 QKV
q: [seq_len, num_heads, head_dim] 当前 chunk 的 query
k, v: [seq_len, num_kv_heads, head_dim] 当前 chunk 的 KV已写入 prefill buffer
layer_id: 层索引
softmax_scale: softmax 缩放因子
offload_engine: offload engine
@@ -176,173 +151,206 @@ def compute_chunked_attention(
num_tokens: 当前 chunk 的 token 数
Returns:
[seq_len, num_heads, head_dim] 最终 attention输出
[seq_len, num_heads, head_dim] 最终 attention 输出
"""
pass
```
### 2.4 修改 select_blocks 接口
## Phase 3: 修改 FullPolicy
### 3.1 重命名方法
`compute_prefill_attention` 重命名为 `compute_chunked_attention`
### 3.2 修改 select_blocks 签名
```python
def select_blocks(
self,
available_blocks: List[int],
offload_engine: OffloadEngine,
offload_engine: "OffloadEngine", # 新增参数(不使用)
ctx: PolicyContext,
) -> List[int]:
"""
选择要加载的 blocks
Args:
available_blocks: 所有可用的 block IDs
offload_engine: offload engine为未来准备当前可能不使用
ctx: policy context
Returns:
选择的 block IDs
"""
pass
"""Return all blocks - no sparsity."""
return available_blocks
```
## Phase 3: 修改 FullPolicy实现三个抽象方法
### 3.3 验证 compute_chunked_attention 实现
### 3.1 FullPolicy.compute_block_attention
当前 `compute_prefill_attention` 已实现完整逻辑,确认:
- [x] 获取 cpu_block_table
- [x] ring buffer pipeline 加载
- [x] sync 加载 fallback
- [x] 当前 chunk attention 计算
- [x] 结果合并
直接调用 `flash_attn_with_lse`,处理 3D 输入
**注意**:当前实现没有调用 `select_blocks`,需要添加
### 3.2 FullPolicy.merge_attention_outputs
## Phase 4: 验证执行路径(添加 debug 输出)
调用 `chunked_attention.merge_attention_outputs`
### 4.1 验证目标
### 3.3 FullPolicy.compute_prefill_attention
确认代码修改后,执行路径正确:
实现完整的 prefill 流程:
1. 获取 `cpu_block_table = kvcache_manager.get_prefilled_cpu_blocks(seq)`
2. 调用 `select_blocks(cpu_block_table, offload_engine, ctx)`
3. 遍历 blocks
- `offload_engine.load_to_slot_layer(slot, layer_id, cpu_block_id)`
- `offload_engine.wait_slot_layer(slot)`
- `k, v = offload_engine.get_kv_for_slot(slot)`
- 调用 `self.compute_block_attention(q, k, v, layer_id, scale, causal=False)`
- 调用 `self.merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)`
4. 计算当前 chunk attention
5. 合并最终结果
| 检查点 | 位置 | 预期行为 |
|--------|------|----------|
| **Policy 创建** | `kvcache/__init__.py` | FullAttentionPolicy 被创建 |
| **Policy 调用** | `attention.py` | `_chunked_prefill_attention` 调用 `sparse_policy.compute_chunked_attention` |
| **select_blocks 调用** | `full_policy.py` | `compute_chunked_attention` 内部调用 `select_blocks` |
| **旧方法未调用** | `attention.py` | `_ring_buffer_pipeline_load``_sync_load_previous_chunks` 不再被调用 |
### 需要移动的代码
`attention.py``_ring_buffer_pipeline_load``_sync_load_previous_chunks` 移动逻辑:
- slot 遍历逻辑
- offload_engine 调用
- 计算和合并逻辑
`attention.py``_chunked_prefill_attention` 移动逻辑:
- 当前 chunk 的 attention 计算
- 最终合并逻辑
## Phase 4: 修改 QuestPolicy
QuestPolicy 实现与 FullPolicy 类似,区别在于:
- `select_blocks` 返回 Top-K blocks
- 其他计算逻辑相同
## Phase 5: 修改 XAttentionBSAPolicy
当前 XAttentionBSAPolicy 只返回所有 blocks修改后
- `select_blocks` 当前返回所有 blocks
- `compute_block_attention` 与 FullPolicy 相同
- `merge_attention_outputs` 与 FullPolicy 相同
- `compute_prefill_attention` 与 FullPolicy 相同
未来可以实现稀疏计算。
## Phase 6: 修改 model_runner.py默认创建 FullPolicy
### 6.1 当前创建 sparse policy 的逻辑
### 4.2 添加 debug 输出位置
**位置 1: `kvcache/__init__.py` - policy 创建时**
```python
# 当前:只有指定 sparse_policy_type 时才创建
if sparse_policy_type is not None:
sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs)
sparse_policy = create_sparse_policy(sparse_policy_type, **policy_kwargs)
logger.info(f"[DEBUG] Created sparse policy: {sparse_policy}")
```
### 6.2 修改后
**位置 2: `attention.py` - 调用 policy 时**
```python
# 默认创建 FullPolicy
if sparse_policy_type is None:
sparse_policy_type = SparsePolicyType.FULL
sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs)
# 在 _chunked_prefill_attention 中
logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, "
f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}")
```
### 6.3 位置
`model_runner.py` 中的 `allocate_kv_cache` 方法。
## Phase 7: 修改 attention.py移除所有计算逻辑
### 7.1 _chunked_prefill_attention 简化
**当前(伪代码)**
**位置 3: `full_policy.py` - compute_chunked_attention 入口**
```python
# 获取 cpu_block_table
# 调用 select_blocks
# 调用 _ring_buffer_pipeline_load包含计算逻辑
# 计算当前 chunkflash_attn
# 合并结果merge
def compute_chunked_attention(self, ...):
logger.debug(f"[DEBUG] FullPolicy.compute_chunked_attention called, "
f"layer={layer_id}, chunk={current_chunk_idx}, num_tokens={num_tokens}")
# ... 实现
```
**位置 4: `full_policy.py` - select_blocks 调用**
```python
# 在 compute_chunked_attention 内部
selected_blocks = self.select_blocks(cpu_block_table, offload_engine, policy_ctx)
logger.debug(f"[DEBUG] select_blocks: input={len(cpu_block_table)} blocks, "
f"output={len(selected_blocks)} blocks")
```
### 4.3 验证方法
运行测试并检查日志输出:
```bash
PYTHONPATH=/home/zijie/Code/nano-vllm:$PYTHONPATH \
python tests/test_needle.py --model <model_path> --enable-offload 2>&1 | grep DEBUG
```
预期输出:
```
[DEBUG] Created sparse policy: FullAttentionPolicy()
[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=0
[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=0, num_tokens=...
[DEBUG] select_blocks: input=0 blocks, output=0 blocks
[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=1
[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=1, num_tokens=...
[DEBUG] select_blocks: input=1 blocks, output=1 blocks
...
```
### 4.4 清理 debug 输出
验证完成后,将 debug 级别的日志改为更低级别(如 `logger.debug`),或通过环境变量控制:
```python
if os.environ.get('NANOVLLM_DEBUG_POLICY'):
logger.info(f"[DEBUG] ...")
```
## Phase 5: 修改 attention.py
### 5.1 简化 _chunked_prefill_attention
**修改后**
```python
sparse_policy = kvcache_manager.sparse_policy
if sparse_policy is None:
raise RuntimeError("sparse_policy is required for chunked prefill")
def _chunked_prefill_attention(self, q, k, v, context):
kvcache_manager = context.kvcache_manager
seq = context.chunked_seq
offload_engine = kvcache_manager.offload_engine
current_chunk_idx = context.current_chunk_idx
num_tokens = k.shape[0]
o = sparse_policy.compute_prefill_attention(
q, k, v, self.layer_id, self.scale,
offload_engine, current_chunk_idx, seq, num_tokens
)
# 获取 sparse policy
sparse_policy = kvcache_manager.sparse_policy
if sparse_policy is None:
raise RuntimeError("sparse_policy is required for chunked prefill")
# 直接返回不需要合并policy 内部已完成所有计算)
return o
# [DEBUG] 验证执行路径
logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, "
f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}")
# 调用 policy 计算 attention所有计算逻辑在 policy 内部)
final_o = sparse_policy.compute_chunked_attention(
q, k, v,
self.layer_id,
self.scale,
offload_engine,
current_chunk_idx,
seq,
num_tokens,
)
# Per-layer ASYNC offload保留在 attention.py
if offload_engine is not None and seq is not None:
cpu_block_ids, _ = kvcache_manager.get_all_cpu_blocks(seq)
if current_chunk_idx < len(cpu_block_ids):
cpu_block_id = cpu_block_ids[current_chunk_idx]
offload_engine.offload_prefill_buffer_async(
self.layer_id, cpu_block_id, num_tokens
)
return final_o
```
### 7.2 删除的方法
### 5.2 删除的方法
删除以下方法(逻辑移到 policy
- `_ring_buffer_pipeline_load` - 逻辑移到 FullPolicy.compute_prefill_attention
- `_sync_load_previous_chunks` - 逻辑移到 FullPolicy.compute_prefill_attention
删除以下方法(逻辑移到 FullPolicy
- `_ring_buffer_pipeline_load`
- `_sync_load_previous_chunks`
### 7.3 保留的方法
### 5.3 保留的方法
- `_decode_with_layer_pipeline` - decode 逻辑保持不变
- `_decode_ring_buffer_pipeline` - decode 逻辑保持不变
Decode 相关方法保持不变
- `_chunked_decode_attention`
- `_decode_with_layer_pipeline`
- `_decode_ring_buffer_pipeline`
## Phase 8: 测试验证
## Phase 6: 测试验证
### 6.1 功能测试
- [ ] 运行 `test_needle.py --enable-offload` (FULL policy)
- [ ] 验证输出正确 (needle value: 7492)
- [ ] 验证性能无明显下降
- [ ] 验证输出正确needle value 匹配)
- [ ] 检查 debug 日志确认执行路径正确
### 6.2 性能测试
- [ ] 对比重构前后的 prefill 延迟
- [ ] 验证性能无明显下降(< 5% 回归)
### 6.3 回归测试
- [ ] 验证 decode 阶段不受影响
- [ ] 验证非 offload 模式不受影响(如果适用)
## 关键文件清单
| 文件 | 修改内容 |
|------|----------|
| `nanovllm/kvcache/sparse/policy.py` | 添加三个抽象方法,修改 select_blocks 签名 |
| `nanovllm/kvcache/sparse/full_policy.py` | 实现三个抽象方法,移动计算逻辑 |
| `nanovllm/kvcache/sparse/quest.py` | 实现三个抽象方法 |
| `nanovllm/kvcache/sparse/xattn_bsa.py` | 实现三个抽象方法 |
| `nanovllm/engine/model_runner.py` | 默认创建 FullPolicy |
| `nanovllm/layers/attention.py` | 简化 _chunked_prefill_attention删除计算方法 |
| `nanovllm/kvcache/sparse/policy.py` | 添加 `compute_chunked_attention` 抽象方法,修改 `select_blocks` 签名 |
| `nanovllm/kvcache/sparse/full_policy.py` | 重命名方法,修改 `select_blocks` 签名,添加 `select_blocks` 调用,添加 debug 输出 |
| `nanovllm/layers/attention.py` | 简化 `_chunked_prefill_attention`,删除 `_ring_buffer_pipeline_load``_sync_load_previous_chunks`,添加 debug 输出 |
| `nanovllm/kvcache/__init__.py` | 添加 policy 创建的 debug 输出 |
## Decisions Made
- **决策 1**: 三个方法都是抽象方法,强制所有 policy 实现
- **决策 2**: compute_prefill_attention 定义完整的 prefill 流程,是 policy 的主入口
- **决策 3**: attention.py 只调用 policy.compute_prefill_attention,零计算逻辑
- **决策 4**: chunked_prefill 检查 policy 是否存在,不存在则抛出错误
- **决策 5**: model_runner 默认创建 FullPolicy 作为兜底
- **决策 6**: _ring_buffer_pipeline_load 和 _sync_load_previous_chunks 删除,逻辑移到 policy
- **决策 1**: 只添加一个抽象方法 `compute_chunked_attention`(不添加 `compute_block_attention``merge_attention_outputs`
- **决策 2**: `select_blocks` 接收 `offload_engine` 参数
- **决策 3**: 统一使用 `compute_chunked_attention` 命名
- **决策 4**: Decode 阶段不处理
- **决策 5**: async offload 逻辑保留在 attention.py不移入 policy
- **决策 6**: Phase 4 添加 debug 输出验证执行路径,验证完成后可降级或移除
## Errors Encountered
@@ -350,4 +358,4 @@ return o
## Status
**Currently in Phase 1** - 分析当前架构,理解所有计算逻辑的位置
**Planning Complete** - v4 计划已完成,包含执行路径验证步骤