From 16b269d89722c3215781dc5ddfb4fb548ad25de6 Mon Sep 17 00:00:00 2001 From: Zijie Tian Date: Mon, 19 Jan 2026 23:10:49 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20wip:=20update=20sparse=20policy?= =?UTF-8?q?=20refactoring=20plan=20to=20v4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplified scope to FullPolicy only. Added debug validation phase. Co-Authored-By: Claude Opus 4.5 --- task_plan.md | 442 ++++++++++++++++++++++++++------------------------- 1 file changed, 225 insertions(+), 217 deletions(-) diff --git a/task_plan.md b/task_plan.md index 23f2406..6d17547 100644 --- a/task_plan.md +++ b/task_plan.md @@ -1,39 +1,48 @@ -# Task Plan: Sparse Policy 架构重构 v3 +# Task Plan: Sparse Policy 架构重构 v4 (FullPolicy Only) ## Goal 将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。attention.py 只负责调用 policy,不包含任何计算逻辑。 -## 核心设计原则(强制要求) +**范围**: 仅实现 FullPolicy,暂不涉及 QuestPolicy 和 XAttentionBSAPolicy。Decode 阶段不处理。 -1. **Policy 内部完成所有计算**:包括 attention 计算和结果合并 -2. **select_blocks 传入 offload_engine**:policy 通过 offload_engine 加载 blocks -3. **强制实现计算函数**:所有 policy 必须实现 `compute_block_attention` 和 `merge_attention_outputs` +## 当前代码状态(重要发现) + +**`FullPolicy.compute_prefill_attention` 已经实现了完整的 prefill 流程!** + +但 `attention.py` 没有调用它,而是: +- 调用 `sparse_policy.select_blocks()` 仅做 block 筛选 +- 自己实现 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` +- 自己调用 `flash_attn_with_lse` 和 `merge_attention_outputs` + +**结论**:当前代码有冗余,同样的逻辑在两个地方实现。 + +## 核心设计原则 + +1. **Policy 内部完成所有 prefill 计算**:包括 block 加载、attention 计算和结果合并 +2. **select_blocks 传入 offload_engine**:其他策略(Quest/XAttn)可能需要加载 KV 来判断 +3. **统一方法命名**:使用 `compute_chunked_attention`(不是 `compute_prefill_attention`) 4. **chunked_prefill 强制 policy 存在**:没有 policy 则报错 -5. **外部默认 FULL policy**:model_runner.py 默认创建 FullPolicy -6. **attention.py 零计算逻辑**:_chunked_prefill_attention 只调用 policy,不直接调用 flashattn 或 merge +5. **attention.py 零计算逻辑**:`_chunked_prefill_attention` 只调用 policy ## 目标架构 ``` -model_runner.py: - 默认创建 FullPolicy(如果没有指定 sparse policy) - attention.py (_chunked_prefill_attention): 检查 sparse_policy 是否存在 ↓ - 调用 sparse_policy.compute_prefill_attention(q, k, v, ...) + 调用 sparse_policy.compute_chunked_attention(q, k, v, ...) ↓ - 返回最终输出(不包含任何计算逻辑) + 处理 async offload + ↓ + 返回最终输出(不包含任何 attention 计算逻辑) -SparsePolicy.compute_prefill_attention(): - 1. select_blocks(blocks, offload_engine, ctx) → 筛选 blocks - 2. 加载 blocks(通过 offload_engine) - 3. 遍历 blocks: - - 调用 self.compute_block_attention(q, k, v, ...) - - 调用 self.merge_attention_outputs(...) - 4. 计算当前 chunk attention - 5. 合并最终结果 +SparsePolicy.compute_chunked_attention(): + 1. 获取 cpu_block_table + 2. 调用 select_blocks(blocks, offload_engine, ctx) → 筛选 blocks + 3. 加载 blocks 并计算 attention(pipeline 或 sync) + 4. 计算当前 chunk attention(causal) + 5. 合并所有结果 6. 返回 final_output ``` @@ -41,106 +50,71 @@ SparsePolicy.compute_prefill_attention(): | 决策 | 说明 | |------|------| -| **决策 1** | `compute_block_attention` 是抽象方法,所有 policy 必须实现 | -| **决策 2** | `merge_attention_outputs` 是抽象方法,所有 policy 必须实现 | -| **决策 3** | `compute_prefill_attention` 是抽象方法,定义完整的 prefill 流程 | -| **决策 4** | `select_blocks` 接收 `offload_engine` 参数(为未来准备) | -| **决策 5** | chunked_prefill 检查 policy 是否存在,不存在则抛出错误 | -| **决策 6** | model_runner 默认创建 FullPolicy 作为兜底 | -| **决策 7** | attention.py 的 _chunked_prefill_attention 不包含任何 flashattn 或 merge 调用 | +| **决策 1** | `compute_chunked_attention` 是唯一的抽象方法,定义完整 prefill 流程 | +| **决策 2** | 不添加 `compute_block_attention` 和 `merge_attention_outputs` 抽象方法(过度设计) | +| **决策 3** | `select_blocks` 接收 `offload_engine` 参数(其他策略需要) | +| **决策 4** | attention.py 的 `_chunked_prefill_attention` 不包含任何 flashattn 或 merge 调用 | +| **决策 5** | Decode 阶段不处理,保持现有逻辑 | +| **决策 6** | async offload 逻辑保留在 attention.py(不移入 policy) | +| **决策 7** | Phase 4 需要添加 debug 输出验证执行路径 | ## Phases -- [ ] Phase 1: 分析当前架构,理解所有计算逻辑的位置 -- [ ] Phase 2: 在 SparsePolicy 基类中添加三个抽象方法 -- [ ] Phase 3: 修改 FullPolicy,实现三个抽象方法 -- [ ] Phase 4: 修改 QuestPolicy,实现三个抽象方法 -- [ ] Phase 5: 修改 XAttentionBSAPolicy,实现三个抽象方法 -- [ ] Phase 6: 修改 model_runner.py,默认创建 FullPolicy -- [ ] Phase 7: 修改 attention.py,移除所有计算逻辑,只调用 policy -- [ ] Phase 8: 测试验证 +- [x] Phase 1: 分析当前架构 ✅ 已完成 +- [ ] Phase 2: 修改 SparsePolicy 基类 +- [ ] Phase 3: 修改 FullPolicy +- [ ] Phase 4: 验证执行路径(添加 debug 输出) +- [ ] Phase 5: 修改 attention.py +- [ ] Phase 6: 测试验证 -## Phase 1: 分析当前架构,理解所有计算逻辑的位置 +## Phase 1: 分析当前架构 ✅ 已完成 -### 当前 attention.py 中包含的计算逻辑 - -1. `_ring_buffer_pipeline_load` 方法: - - 调用 `offload_engine.load_to_slot_layer()` - - 调用 `offload_engine.wait_slot_layer()` - - 调用 `offload_engine.get_kv_for_slot()` - - 调用 `flash_attn_with_lse()` ← **直接调用** - - 调用 `merge_attention_outputs()` ← **直接调用** - -2. `_sync_load_previous_chunks` 方法: - - 同上,直接调用 flashattn 和 merge +### 当前 attention.py 中包含的计算逻辑(需要移除) +1. `_ring_buffer_pipeline_load` 方法:直接调用 flashattn 和 merge +2. `_sync_load_previous_chunks` 方法:直接调用 flashattn 和 merge 3. `_chunked_prefill_attention` 方法: - - 调用 `_ring_buffer_pipeline_load` 或 `_sync_load_previous_chunks` - - 调用 `flash_attn_with_lse()` 计算当前 chunk - - 调用 `merge_attention_outputs()` 合并结果 + - 调用上述两个方法 + - 计算当前 chunk(flash_attn) + - 合并结果(merge) -### 需要移动的计算逻辑 +### 当前 FullPolicy 已实现的功能 -所有 `flash_attn_with_lse` 和 `merge_attention_outputs` 调用都应该在 SparsePolicy 内部。 +`full_policy.py:40-162` 的 `compute_prefill_attention` 已实现: +- ring buffer pipeline 加载 +- sync 加载 fallback +- 当前 chunk attention 计算 +- 结果合并 -## Phase 2: 在 SparsePolicy 基类中添加三个抽象方法 +**只需重命名为 `compute_chunked_attention` 并微调接口。** -### 2.1 compute_block_attention +## Phase 2: 修改 SparsePolicy 基类 + +### 2.1 修改 select_blocks 接口 ```python @abstractmethod -def compute_block_attention( +def select_blocks( self, - q: torch.Tensor, - k: torch.Tensor, - v: torch.Tensor, - layer_id: int, - softmax_scale: float, - causal: bool, -) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: + available_blocks: List[int], + offload_engine: "OffloadEngine", # 新增参数 + ctx: PolicyContext, +) -> List[int]: """ - 计算单个 block 的 attention。 + 选择要加载的 blocks。 Args: - q: [1, seq_len, num_heads, head_dim] 或 [seq_len, num_heads, head_dim] - k, v: 同上 - layer_id: 层索引 - softmax_scale: softmax 缩放因子 - causal: 是否应用因果掩码 + available_blocks: 所有可用的 block IDs + offload_engine: offload engine(其他策略可能需要加载 KV 来判断) + ctx: policy context Returns: - (o, lse) - attention 输出和 LSE + 选择的 block IDs """ pass ``` -### 2.2 merge_attention_outputs - -```python -@abstractmethod -def merge_attention_outputs( - self, - o_acc: torch.Tensor, - lse_acc: Optional[torch.Tensor], - o_new: torch.Tensor, - lse_new: Optional[torch.Tensor], -) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: - """ - 合并两个 attention 输出。 - - Args: - o_acc: 累积的 attention 输出 [1, seq_len, num_heads, head_dim] - lse_acc: 累积的 LSE - o_new: 新的 attention 输出 - lse_new: 新的 LSE - - Returns: - (merged_o, merged_lse) - """ - pass -``` - -### 2.3 compute_chunked_attention +### 2.2 添加 compute_chunked_attention 抽象方法 ```python @abstractmethod @@ -151,9 +125,9 @@ def compute_chunked_attention( v: torch.Tensor, layer_id: int, softmax_scale: float, - offload_engine: OffloadEngine, + offload_engine: "OffloadEngine", current_chunk_idx: int, - seq: ChunkedSequence, + seq: "ChunkedSequence", num_tokens: int, ) -> torch.Tensor: """ @@ -167,7 +141,8 @@ def compute_chunked_attention( 5. 合并所有结果 Args: - q, k, v: 当前 chunk 的 QKV + q: [seq_len, num_heads, head_dim] 当前 chunk 的 query + k, v: [seq_len, num_kv_heads, head_dim] 当前 chunk 的 KV(已写入 prefill buffer) layer_id: 层索引 softmax_scale: softmax 缩放因子 offload_engine: offload engine @@ -176,173 +151,206 @@ def compute_chunked_attention( num_tokens: 当前 chunk 的 token 数 Returns: - [seq_len, num_heads, head_dim] 最终 attention输出 + [seq_len, num_heads, head_dim] 最终 attention 输出 """ pass ``` -### 2.4 修改 select_blocks 接口 +## Phase 3: 修改 FullPolicy + +### 3.1 重命名方法 + +将 `compute_prefill_attention` 重命名为 `compute_chunked_attention`。 + +### 3.2 修改 select_blocks 签名 ```python def select_blocks( self, available_blocks: List[int], - offload_engine: OffloadEngine, + offload_engine: "OffloadEngine", # 新增参数(不使用) ctx: PolicyContext, ) -> List[int]: - """ - 选择要加载的 blocks。 - - Args: - available_blocks: 所有可用的 block IDs - offload_engine: offload engine(为未来准备,当前可能不使用) - ctx: policy context - - Returns: - 选择的 block IDs - """ - pass + """Return all blocks - no sparsity.""" + return available_blocks ``` -## Phase 3: 修改 FullPolicy,实现三个抽象方法 +### 3.3 验证 compute_chunked_attention 实现 -### 3.1 FullPolicy.compute_block_attention +当前 `compute_prefill_attention` 已实现完整逻辑,确认: +- [x] 获取 cpu_block_table +- [x] ring buffer pipeline 加载 +- [x] sync 加载 fallback +- [x] 当前 chunk attention 计算 +- [x] 结果合并 -直接调用 `flash_attn_with_lse`,处理 3D 输入。 +**注意**:当前实现没有调用 `select_blocks`,需要添加。 -### 3.2 FullPolicy.merge_attention_outputs +## Phase 4: 验证执行路径(添加 debug 输出) -调用 `chunked_attention.merge_attention_outputs`。 +### 4.1 验证目标 -### 3.3 FullPolicy.compute_prefill_attention +确认代码修改后,执行路径正确: -实现完整的 prefill 流程: -1. 获取 `cpu_block_table = kvcache_manager.get_prefilled_cpu_blocks(seq)` -2. 调用 `select_blocks(cpu_block_table, offload_engine, ctx)` -3. 遍历 blocks: - - `offload_engine.load_to_slot_layer(slot, layer_id, cpu_block_id)` - - `offload_engine.wait_slot_layer(slot)` - - `k, v = offload_engine.get_kv_for_slot(slot)` - - 调用 `self.compute_block_attention(q, k, v, layer_id, scale, causal=False)` - - 调用 `self.merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)` -4. 计算当前 chunk attention -5. 合并最终结果 +| 检查点 | 位置 | 预期行为 | +|--------|------|----------| +| **Policy 创建** | `kvcache/__init__.py` | FullAttentionPolicy 被创建 | +| **Policy 调用** | `attention.py` | `_chunked_prefill_attention` 调用 `sparse_policy.compute_chunked_attention` | +| **select_blocks 调用** | `full_policy.py` | `compute_chunked_attention` 内部调用 `select_blocks` | +| **旧方法未调用** | `attention.py` | `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` 不再被调用 | -### 需要移动的代码 - -从 `attention.py` 的 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` 移动逻辑: -- slot 遍历逻辑 -- offload_engine 调用 -- 计算和合并逻辑 - -从 `attention.py` 的 `_chunked_prefill_attention` 移动逻辑: -- 当前 chunk 的 attention 计算 -- 最终合并逻辑 - -## Phase 4: 修改 QuestPolicy - -QuestPolicy 实现与 FullPolicy 类似,区别在于: -- `select_blocks` 返回 Top-K blocks -- 其他计算逻辑相同 - -## Phase 5: 修改 XAttentionBSAPolicy - -当前 XAttentionBSAPolicy 只返回所有 blocks,修改后: -- `select_blocks` 当前返回所有 blocks -- `compute_block_attention` 与 FullPolicy 相同 -- `merge_attention_outputs` 与 FullPolicy 相同 -- `compute_prefill_attention` 与 FullPolicy 相同 - -未来可以实现稀疏计算。 - -## Phase 6: 修改 model_runner.py,默认创建 FullPolicy - -### 6.1 当前创建 sparse policy 的逻辑 +### 4.2 添加 debug 输出位置 +**位置 1: `kvcache/__init__.py` - policy 创建时** ```python -# 当前:只有指定 sparse_policy_type 时才创建 -if sparse_policy_type is not None: - sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs) +sparse_policy = create_sparse_policy(sparse_policy_type, **policy_kwargs) +logger.info(f"[DEBUG] Created sparse policy: {sparse_policy}") ``` -### 6.2 修改后 - +**位置 2: `attention.py` - 调用 policy 时** ```python -# 默认创建 FullPolicy -if sparse_policy_type is None: - sparse_policy_type = SparsePolicyType.FULL - -sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs) +# 在 _chunked_prefill_attention 中 +logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, " + f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}") ``` -### 6.3 位置 - -`model_runner.py` 中的 `allocate_kv_cache` 方法。 - -## Phase 7: 修改 attention.py,移除所有计算逻辑 - -### 7.1 _chunked_prefill_attention 简化 - -**当前(伪代码)**: +**位置 3: `full_policy.py` - compute_chunked_attention 入口** ```python -# 获取 cpu_block_table -# 调用 select_blocks -# 调用 _ring_buffer_pipeline_load(包含计算逻辑) -# 计算当前 chunk(flash_attn) -# 合并结果(merge) +def compute_chunked_attention(self, ...): + logger.debug(f"[DEBUG] FullPolicy.compute_chunked_attention called, " + f"layer={layer_id}, chunk={current_chunk_idx}, num_tokens={num_tokens}") + # ... 实现 ``` +**位置 4: `full_policy.py` - select_blocks 调用** +```python +# 在 compute_chunked_attention 内部 +selected_blocks = self.select_blocks(cpu_block_table, offload_engine, policy_ctx) +logger.debug(f"[DEBUG] select_blocks: input={len(cpu_block_table)} blocks, " + f"output={len(selected_blocks)} blocks") +``` + +### 4.3 验证方法 + +运行测试并检查日志输出: +```bash +PYTHONPATH=/home/zijie/Code/nano-vllm:$PYTHONPATH \ + python tests/test_needle.py --model --enable-offload 2>&1 | grep DEBUG +``` + +预期输出: +``` +[DEBUG] Created sparse policy: FullAttentionPolicy() +[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=0 +[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=0, num_tokens=... +[DEBUG] select_blocks: input=0 blocks, output=0 blocks +[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=1 +[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=1, num_tokens=... +[DEBUG] select_blocks: input=1 blocks, output=1 blocks +... +``` + +### 4.4 清理 debug 输出 + +验证完成后,将 debug 级别的日志改为更低级别(如 `logger.debug`),或通过环境变量控制: +```python +if os.environ.get('NANOVLLM_DEBUG_POLICY'): + logger.info(f"[DEBUG] ...") +``` + +## Phase 5: 修改 attention.py + +### 5.1 简化 _chunked_prefill_attention + **修改后**: ```python -sparse_policy = kvcache_manager.sparse_policy -if sparse_policy is None: - raise RuntimeError("sparse_policy is required for chunked prefill") +def _chunked_prefill_attention(self, q, k, v, context): + kvcache_manager = context.kvcache_manager + seq = context.chunked_seq + offload_engine = kvcache_manager.offload_engine + current_chunk_idx = context.current_chunk_idx + num_tokens = k.shape[0] -o = sparse_policy.compute_prefill_attention( - q, k, v, self.layer_id, self.scale, - offload_engine, current_chunk_idx, seq, num_tokens -) + # 获取 sparse policy + sparse_policy = kvcache_manager.sparse_policy + if sparse_policy is None: + raise RuntimeError("sparse_policy is required for chunked prefill") -# 直接返回,不需要合并(policy 内部已完成所有计算) -return o + # [DEBUG] 验证执行路径 + logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, " + f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}") + + # 调用 policy 计算 attention(所有计算逻辑在 policy 内部) + final_o = sparse_policy.compute_chunked_attention( + q, k, v, + self.layer_id, + self.scale, + offload_engine, + current_chunk_idx, + seq, + num_tokens, + ) + + # Per-layer ASYNC offload(保留在 attention.py) + if offload_engine is not None and seq is not None: + cpu_block_ids, _ = kvcache_manager.get_all_cpu_blocks(seq) + if current_chunk_idx < len(cpu_block_ids): + cpu_block_id = cpu_block_ids[current_chunk_idx] + offload_engine.offload_prefill_buffer_async( + self.layer_id, cpu_block_id, num_tokens + ) + + return final_o ``` -### 7.2 删除的方法 +### 5.2 删除的方法 -删除以下方法(逻辑移到 policy 中): -- `_ring_buffer_pipeline_load` - 逻辑移到 FullPolicy.compute_prefill_attention -- `_sync_load_previous_chunks` - 逻辑移到 FullPolicy.compute_prefill_attention +删除以下方法(逻辑已移到 FullPolicy): +- `_ring_buffer_pipeline_load` +- `_sync_load_previous_chunks` -### 7.3 保留的方法 +### 5.3 保留的方法 -- `_decode_with_layer_pipeline` - decode 逻辑保持不变 -- `_decode_ring_buffer_pipeline` - decode 逻辑保持不变 +Decode 相关方法保持不变: +- `_chunked_decode_attention` +- `_decode_with_layer_pipeline` +- `_decode_ring_buffer_pipeline` -## Phase 8: 测试验证 +## Phase 6: 测试验证 + +### 6.1 功能测试 - [ ] 运行 `test_needle.py --enable-offload` (FULL policy) -- [ ] 验证输出正确 (needle value: 7492) -- [ ] 验证性能无明显下降 +- [ ] 验证输出正确(needle value 匹配) +- [ ] 检查 debug 日志确认执行路径正确 + +### 6.2 性能测试 + +- [ ] 对比重构前后的 prefill 延迟 +- [ ] 验证性能无明显下降(< 5% 回归) + +### 6.3 回归测试 + +- [ ] 验证 decode 阶段不受影响 +- [ ] 验证非 offload 模式不受影响(如果适用) ## 关键文件清单 | 文件 | 修改内容 | |------|----------| -| `nanovllm/kvcache/sparse/policy.py` | 添加三个抽象方法,修改 select_blocks 签名 | -| `nanovllm/kvcache/sparse/full_policy.py` | 实现三个抽象方法,移动计算逻辑 | -| `nanovllm/kvcache/sparse/quest.py` | 实现三个抽象方法 | -| `nanovllm/kvcache/sparse/xattn_bsa.py` | 实现三个抽象方法 | -| `nanovllm/engine/model_runner.py` | 默认创建 FullPolicy | -| `nanovllm/layers/attention.py` | 简化 _chunked_prefill_attention,删除计算方法 | +| `nanovllm/kvcache/sparse/policy.py` | 添加 `compute_chunked_attention` 抽象方法,修改 `select_blocks` 签名 | +| `nanovllm/kvcache/sparse/full_policy.py` | 重命名方法,修改 `select_blocks` 签名,添加 `select_blocks` 调用,添加 debug 输出 | +| `nanovllm/layers/attention.py` | 简化 `_chunked_prefill_attention`,删除 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks`,添加 debug 输出 | +| `nanovllm/kvcache/__init__.py` | 添加 policy 创建的 debug 输出 | ## Decisions Made -- **决策 1**: 三个方法都是抽象方法,强制所有 policy 实现 -- **决策 2**: compute_prefill_attention 定义完整的 prefill 流程,是 policy 的主入口 -- **决策 3**: attention.py 只调用 policy.compute_prefill_attention,零计算逻辑 -- **决策 4**: chunked_prefill 检查 policy 是否存在,不存在则抛出错误 -- **决策 5**: model_runner 默认创建 FullPolicy 作为兜底 -- **决策 6**: _ring_buffer_pipeline_load 和 _sync_load_previous_chunks 删除,逻辑移到 policy +- **决策 1**: 只添加一个抽象方法 `compute_chunked_attention`(不添加 `compute_block_attention` 和 `merge_attention_outputs`) +- **决策 2**: `select_blocks` 接收 `offload_engine` 参数 +- **决策 3**: 统一使用 `compute_chunked_attention` 命名 +- **决策 4**: Decode 阶段不处理 +- **决策 5**: async offload 逻辑保留在 attention.py(不移入 policy) +- **决策 6**: Phase 4 添加 debug 输出验证执行路径,验证完成后可降级或移除 ## Errors Encountered @@ -350,4 +358,4 @@ return o ## Status -**Currently in Phase 1** - 分析当前架构,理解所有计算逻辑的位置 +**Planning Complete** - v4 计划已完成,包含执行路径验证步骤