From 6783a45e6fe729cf1fee1afe4e9652f1bbefe755 Mon Sep 17 00:00:00 2001 From: Zijie Tian Date: Mon, 19 Jan 2026 23:23:16 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20wip:=20update=20sparse=20policy?= =?UTF-8?q?=20refactoring=20plan=20to=20v4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add clear acceptance criteria and verification methods: - Define 3 acceptance criteria (needle test, zero calc in attention.py, KV via offload_engine) - Document violations to fix (direct flash_attn/copy calls) - Add offload_engine.write_prefill_buffer encapsulation plan - Add LSP-based verification method using cclsp tools Co-Authored-By: Claude Opus 4.5 --- task_plan.md | 142 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 124 insertions(+), 18 deletions(-) diff --git a/task_plan.md b/task_plan.md index 6d17547..2805163 100644 --- a/task_plan.md +++ b/task_plan.md @@ -2,7 +2,15 @@ ## Goal -将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。attention.py 只负责调用 policy,不包含任何计算逻辑。 +将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。 + +### 验收标准(必须全部满足) + +| # | 标准 | 说明 | +|---|------|------| +| **1** | `test_needle.py --enable-offload` 通过 | 功能正确性验证 | +| **2** | `attention.py` 中 chunked prefill 路径零计算调用 | 不直接调用 `flash_attn_*` 或 `merge_attention_outputs`,全部由 policy 完成 | +| **3** | 所有 KV 通信由 `offload_engine` 完成 | 不直接调用 `torch.copy_` 或 `.copy()` 进行 KV 数据传输 | **范围**: 仅实现 FullPolicy,暂不涉及 QuestPolicy 和 XAttentionBSAPolicy。Decode 阶段不处理。 @@ -17,6 +25,18 @@ **结论**:当前代码有冗余,同样的逻辑在两个地方实现。 +### 当前 attention.py 中的违规调用(需要移除) + +```python +# 直接计算调用(违反目标 2) +flash_attn_with_lse(...) +merge_attention_outputs(...) + +# 直接通信调用(违反目标 3) +offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k) +offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v) +``` + ## 核心设计原则 1. **Policy 内部完成所有 prefill 计算**:包括 block 加载、attention 计算和结果合并 @@ -24,6 +44,7 @@ 3. **统一方法命名**:使用 `compute_chunked_attention`(不是 `compute_prefill_attention`) 4. **chunked_prefill 强制 policy 存在**:没有 policy 则报错 5. **attention.py 零计算逻辑**:`_chunked_prefill_attention` 只调用 policy +6. **所有 KV 通信通过 offload_engine**:不直接调用 torch.copy ## 目标架构 @@ -33,15 +54,15 @@ attention.py (_chunked_prefill_attention): ↓ 调用 sparse_policy.compute_chunked_attention(q, k, v, ...) ↓ - 处理 async offload + 处理 async offload(通过 offload_engine) ↓ - 返回最终输出(不包含任何 attention 计算逻辑) + 返回最终输出(不包含任何计算逻辑,不包含任何直接 copy 调用) SparsePolicy.compute_chunked_attention(): 1. 获取 cpu_block_table 2. 调用 select_blocks(blocks, offload_engine, ctx) → 筛选 blocks - 3. 加载 blocks 并计算 attention(pipeline 或 sync) - 4. 计算当前 chunk attention(causal) + 3. 通过 offload_engine 加载 blocks 并计算 attention(pipeline 或 sync) + 4. 通过 offload_engine 获取当前 chunk KV,计算 attention(causal) 5. 合并所有结果 6. 返回 final_output ``` @@ -55,8 +76,9 @@ SparsePolicy.compute_chunked_attention(): | **决策 3** | `select_blocks` 接收 `offload_engine` 参数(其他策略需要) | | **决策 4** | attention.py 的 `_chunked_prefill_attention` 不包含任何 flashattn 或 merge 调用 | | **决策 5** | Decode 阶段不处理,保持现有逻辑 | -| **决策 6** | async offload 逻辑保留在 attention.py(不移入 policy) | +| **决策 6** | async offload 逻辑保留在 attention.py(通过 offload_engine 方法调用) | | **决策 7** | Phase 4 需要添加 debug 输出验证执行路径 | +| **决策 8** | 所有 KV 通信必须通过 offload_engine 方法,不直接调用 torch.copy | ## Phases @@ -78,6 +100,16 @@ SparsePolicy.compute_chunked_attention(): - 计算当前 chunk(flash_attn) - 合并结果(merge) +### 当前 attention.py 中的直接 copy 调用(需要移除或封装) + +```python +# attention.py:115-116 - 写入 prefill buffer +offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k) +offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v) +``` + +**处理方案**:在 offload_engine 中添加封装方法,或将此逻辑移入 policy。 + ### 当前 FullPolicy 已实现的功能 `full_policy.py:40-162` 的 `compute_prefill_attention` 已实现: @@ -136,8 +168,8 @@ def compute_chunked_attention( 这是 policy 的主入口,定义完整的 prefill 计算流程: 1. 获取历史 blocks 2. 筛选 blocks(调用 select_blocks) - 3. 加载和计算历史 blocks - 4. 计算当前 chunk attention + 3. 通过 offload_engine 加载和计算历史 blocks + 4. 通过 offload_engine 获取当前 chunk KV,计算 attention 5. 合并所有结果 Args: @@ -179,13 +211,19 @@ def select_blocks( 当前 `compute_prefill_attention` 已实现完整逻辑,确认: - [x] 获取 cpu_block_table -- [x] ring buffer pipeline 加载 -- [x] sync 加载 fallback +- [x] ring buffer pipeline 加载(通过 offload_engine) +- [x] sync 加载 fallback(通过 offload_engine) - [x] 当前 chunk attention 计算 - [x] 结果合并 **注意**:当前实现没有调用 `select_blocks`,需要添加。 +### 3.4 确保所有 KV 通信通过 offload_engine + +检查 `compute_chunked_attention` 内部: +- 历史 block 加载:已通过 `offload_engine.load_to_slot_layer()` 等方法 ✅ +- 当前 chunk KV 获取:已通过 `offload_engine.get_prefill_buffer_slice()` ✅ + ## Phase 4: 验证执行路径(添加 debug 输出) ### 4.1 验证目标 @@ -198,6 +236,7 @@ def select_blocks( | **Policy 调用** | `attention.py` | `_chunked_prefill_attention` 调用 `sparse_policy.compute_chunked_attention` | | **select_blocks 调用** | `full_policy.py` | `compute_chunked_attention` 内部调用 `select_blocks` | | **旧方法未调用** | `attention.py` | `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` 不再被调用 | +| **无直接 copy 调用** | `attention.py` | chunked prefill 路径不直接调用 `.copy_()` | ### 4.2 添加 debug 输出位置 @@ -281,6 +320,7 @@ def _chunked_prefill_attention(self, q, k, v, context): f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}") # 调用 policy 计算 attention(所有计算逻辑在 policy 内部) + # 注意:不直接调用 flash_attn 或 merge,全部由 policy 完成 final_o = sparse_policy.compute_chunked_attention( q, k, v, self.layer_id, @@ -291,7 +331,7 @@ def _chunked_prefill_attention(self, q, k, v, context): num_tokens, ) - # Per-layer ASYNC offload(保留在 attention.py) + # Per-layer ASYNC offload(通过 offload_engine 方法,不直接 copy) if offload_engine is not None and seq is not None: cpu_block_ids, _ = kvcache_manager.get_all_cpu_blocks(seq) if current_chunk_idx < len(cpu_block_ids): @@ -303,13 +343,37 @@ def _chunked_prefill_attention(self, q, k, v, context): return final_o ``` -### 5.2 删除的方法 +### 5.2 处理 prefill buffer 写入 + +当前 `forward()` 方法中有直接 copy 调用: +```python +# 当前代码(违反目标 3) +offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k) +offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v) +``` + +**方案 A**:在 offload_engine 中添加封装方法 +```python +# offload_engine.py +def write_prefill_buffer(self, layer_id: int, k: Tensor, v: Tensor, num_tokens: int): + self.prefill_k_buffer[layer_id, :num_tokens].copy_(k) + self.prefill_v_buffer[layer_id, :num_tokens].copy_(v) + +# attention.py +offload_engine.write_prefill_buffer(self.layer_id, k, v, num_tokens) +``` + +**方案 B**:将此逻辑移入 policy(作为 compute_chunked_attention 的一部分) + +**推荐方案 A**:保持 attention.py 调用 offload_engine 方法,但不直接操作 buffer。 + +### 5.3 删除的方法 删除以下方法(逻辑已移到 FullPolicy): - `_ring_buffer_pipeline_load` - `_sync_load_previous_chunks` -### 5.3 保留的方法 +### 5.4 保留的方法 Decode 相关方法保持不变: - `_chunked_decode_attention` @@ -324,10 +388,49 @@ Decode 相关方法保持不变: - [ ] 验证输出正确(needle value 匹配) - [ ] 检查 debug 日志确认执行路径正确 -### 6.2 性能测试 +### 6.2 代码审查(验收标准检查) -- [ ] 对比重构前后的 prefill 延迟 -- [ ] 验证性能无明显下降(< 5% 回归) +- [ ] **标准 1**: test_needle.py 通过 ✓ +- [ ] **标准 2**: `_chunked_prefill_attention` 方法内无 `flash_attn` 或 `merge_attention_outputs` 调用 +- [ ] **标准 3**: `_chunked_prefill_attention` 方法内无直接 `.copy_()` 调用 + +**注意**:标准 2 和 3 仅适用于 chunked prefill 路径。Decode 路径和其他路径可以有 `flash_attn` 调用。 + +**验证方法**: + +**方法 1:使用 cclsp LSP 工具验证调用链(推荐)** + +使用 `mcp__cclsp__find_references` 查找计算函数的调用位置,确认 chunked prefill 路径无直接调用: + +``` +# 查找 flash_attn_with_lse 的所有调用 +mcp__cclsp__find_references(file_path="nanovllm/layers/attention.py", symbol_name="flash_attn_with_lse") + +# 查找 merge_attention_outputs 的所有调用 +mcp__cclsp__find_references(file_path="nanovllm/layers/attention.py", symbol_name="merge_attention_outputs") + +# 查找 _chunked_prefill_attention 的实现 +mcp__cclsp__find_definition(file_path="nanovllm/layers/attention.py", symbol_name="_chunked_prefill_attention") +``` + +验证结果应显示: +- `flash_attn_with_lse` 调用仅出现在 decode 路径或 `full_policy.py` 中 +- `_chunked_prefill_attention` 内部只调用 `sparse_policy.compute_chunked_attention` + +**方法 2:手动代码审查** + +检查 `_chunked_prefill_attention` 方法实现,确认: +1. 只调用 `sparse_policy.compute_chunked_attention(...)` +2. 只调用 `offload_engine.offload_prefill_buffer_async(...)` 等 offload_engine 方法 +3. 不直接调用 `flash_attn_*`、`merge_attention_outputs` 或 `.copy_()` + +```bash +# 辅助检查:找出所有 flash_attn 调用位置 +grep -n "flash_attn\|merge_attention_outputs" nanovllm/layers/attention.py + +# 辅助检查:找出所有 copy 调用位置 +grep -n "\.copy_\|\.copy(" nanovllm/layers/attention.py +``` ### 6.3 回归测试 @@ -342,6 +445,7 @@ Decode 相关方法保持不变: | `nanovllm/kvcache/sparse/full_policy.py` | 重命名方法,修改 `select_blocks` 签名,添加 `select_blocks` 调用,添加 debug 输出 | | `nanovllm/layers/attention.py` | 简化 `_chunked_prefill_attention`,删除 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks`,添加 debug 输出 | | `nanovllm/kvcache/__init__.py` | 添加 policy 创建的 debug 输出 | +| `nanovllm/kvcache/offload_engine.py` | (可选)添加 `write_prefill_buffer` 方法封装 | ## Decisions Made @@ -349,8 +453,10 @@ Decode 相关方法保持不变: - **决策 2**: `select_blocks` 接收 `offload_engine` 参数 - **决策 3**: 统一使用 `compute_chunked_attention` 命名 - **决策 4**: Decode 阶段不处理 -- **决策 5**: async offload 逻辑保留在 attention.py(不移入 policy) +- **决策 5**: async offload 逻辑保留在 attention.py(通过 offload_engine 方法调用) - **决策 6**: Phase 4 添加 debug 输出验证执行路径,验证完成后可降级或移除 +- **决策 7**: prefill buffer 写入通过 offload_engine 封装方法实现(方案 A) +- **决策 8**: 所有 KV 通信必须通过 offload_engine 方法,不直接调用 torch.copy ## Errors Encountered @@ -358,4 +464,4 @@ Decode 相关方法保持不变: ## Status -**Planning Complete** - v4 计划已完成,包含执行路径验证步骤 +**Planning Complete** - v4 计划已完成,包含明确的验收标准和执行路径验证步骤