Compare commits
5 Commits
b97b0b96a0
...
a36f8569fc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a36f8569fc | ||
|
|
d3b41b2f64 | ||
|
|
baa4be7e2e | ||
|
|
6783a45e6f | ||
|
|
16b269d897 |
9
.claude/ralph-loop.local.md
Normal file
9
.claude/ralph-loop.local.md
Normal file
@@ -0,0 +1,9 @@
|
||||
---
|
||||
active: true
|
||||
iteration: 1
|
||||
max_iterations: 0
|
||||
completion_promise: "COMPLETE"
|
||||
started_at: "2026-01-19T17:25:00Z"
|
||||
---
|
||||
|
||||
请你按照 task_plan.md的要求,进行 nanovllm 的代码重构,确保plan 中最终目标可以圆满实现,注意你仅仅只能使用 GPU 0 来进行调试,其他 GPU 一定不能使用。最终将测试结果写一个报告。 <promise>COMPLETE</promise> -max-iterations 30
|
||||
107
.claude/rules/sparse-policy.md
Normal file
107
.claude/rules/sparse-policy.md
Normal file
@@ -0,0 +1,107 @@
|
||||
# Sparse Policy 代码规范
|
||||
|
||||
## supports_prefill / supports_decode 标志
|
||||
|
||||
每个 SparsePolicy 子类必须正确设置这两个标志:
|
||||
|
||||
```python
|
||||
class MyPolicy(SparsePolicy):
|
||||
supports_prefill = True # 是否支持 prefill 阶段
|
||||
supports_decode = False # 是否支持 decode 阶段
|
||||
```
|
||||
|
||||
## 方法实现规范
|
||||
|
||||
### 规则:不支持的阶段必须 assert False
|
||||
|
||||
如果 policy 不支持某个阶段,对应的 `compute_chunked_*` 方法内部**必须** `assert False`:
|
||||
|
||||
```python
|
||||
class PrefillOnlyPolicy(SparsePolicy):
|
||||
supports_prefill = True
|
||||
supports_decode = False
|
||||
|
||||
def compute_chunked_attention(self, ...):
|
||||
# 正常实现 prefill 逻辑
|
||||
...
|
||||
|
||||
def compute_chunked_decode(self, ...):
|
||||
# 不支持 decode,必须 assert False
|
||||
assert False, "PrefillOnlyPolicy does not support decode phase"
|
||||
```
|
||||
|
||||
```python
|
||||
class DecodeOnlyPolicy(SparsePolicy):
|
||||
supports_prefill = False
|
||||
supports_decode = True
|
||||
|
||||
def compute_chunked_attention(self, ...):
|
||||
# 不支持 prefill,必须 assert False
|
||||
assert False, "DecodeOnlyPolicy does not support prefill phase"
|
||||
|
||||
def compute_chunked_decode(self, ...):
|
||||
# 正常实现 decode 逻辑
|
||||
...
|
||||
```
|
||||
|
||||
### 规则:FullPolicy 必须同时支持两个阶段
|
||||
|
||||
`FullAttentionPolicy` 作为默认策略,必须同时支持 prefill 和 decode:
|
||||
|
||||
```python
|
||||
class FullAttentionPolicy(SparsePolicy):
|
||||
supports_prefill = True
|
||||
supports_decode = True
|
||||
|
||||
def compute_chunked_attention(self, ...):
|
||||
# 完整实现
|
||||
|
||||
def compute_chunked_decode(self, ...):
|
||||
# 完整实现
|
||||
```
|
||||
|
||||
## 调用方检查
|
||||
|
||||
`attention.py` 中应在调用前检查 policy 是否支持当前阶段:
|
||||
|
||||
```python
|
||||
# Prefill 路径
|
||||
if not sparse_policy.supports_prefill:
|
||||
raise RuntimeError(f"{sparse_policy} does not support prefill")
|
||||
|
||||
# Decode 路径
|
||||
if not sparse_policy.supports_decode:
|
||||
raise RuntimeError(f"{sparse_policy} does not support decode")
|
||||
```
|
||||
|
||||
这样提供双重保护:
|
||||
1. 调用方检查 → 提供清晰的错误信息
|
||||
2. 方法内 assert → 防止绕过检查的调用
|
||||
|
||||
## CPU-GPU 通信规范
|
||||
|
||||
### 规则:所有通信必须通过 OffloadEngine
|
||||
|
||||
在 SparsePolicy 的 `compute_chunked_*` 方法中,所有 CPU-GPU 数据传输**必须**通过 `OffloadEngine` 进行,**禁止**直接使用 `torch.Tensor.copy_()` 或 `.to(device)`:
|
||||
|
||||
```python
|
||||
# ✅ 正确:使用 OffloadEngine 的方法
|
||||
offload_engine.load_to_slot_layer(slot, layer_id, cpu_block_id)
|
||||
offload_engine.wait_slot_layer(slot)
|
||||
k, v = offload_engine.get_kv_for_slot(slot)
|
||||
|
||||
# ✅ 正确:使用 cross-layer pipeline
|
||||
k, v = offload_engine.get_decode_layer_kv(layer_id, num_blocks)
|
||||
|
||||
# ❌ 错误:直接使用 torch 通信
|
||||
gpu_tensor.copy_(cpu_tensor)
|
||||
gpu_tensor = cpu_tensor.to("cuda")
|
||||
gpu_tensor = cpu_tensor.cuda()
|
||||
```
|
||||
|
||||
### 原因
|
||||
|
||||
1. **流同步**:OffloadEngine 内部管理 CUDA streams,确保正确的同步
|
||||
2. **Pipeline 优化**:OffloadEngine 实现了 ring buffer 和 cross-layer pipeline
|
||||
3. **资源管理**:OffloadEngine 管理 GPU buffer slots,避免内存碎片
|
||||
4. **一致性**:统一的接口便于调试和维护
|
||||
@@ -1,23 +1,10 @@
|
||||
{
|
||||
"disabledMcpjsonServers": [
|
||||
"claude-flow@alpha",
|
||||
"ruv-swarm",
|
||||
"flow-nexus"
|
||||
],
|
||||
"hooks": {
|
||||
"SessionStart": [
|
||||
{
|
||||
"hooks": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": "npx @claude-flow/cli@latest daemon start --quiet 2>/dev/null || true",
|
||||
"timeout": 5000,
|
||||
"continueOnError": true
|
||||
},
|
||||
{
|
||||
"type": "command",
|
||||
"command": "[ -n \"$SESSION_ID\" ] && npx @claude-flow/cli@latest hooks session-restore --session-id \"$SESSION_ID\" 2>/dev/null || true",
|
||||
"timeout": 10000,
|
||||
"continueOnError": true
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"Stop": [
|
||||
{
|
||||
"hooks": [
|
||||
@@ -28,43 +15,6 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"PermissionRequest": [
|
||||
{
|
||||
"matcher": "^mcp__claude-flow__.*$",
|
||||
"hooks": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": "echo '{\"decision\": \"allow\", \"reason\": \"claude-flow MCP tool auto-approved\"}'",
|
||||
"timeout": 1000
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"matcher": "^Bash\\(npx @?claude-flow.*\\)$",
|
||||
"hooks": [
|
||||
{
|
||||
"type": "command",
|
||||
"command": "echo '{\"decision\": \"allow\", \"reason\": \"claude-flow CLI auto-approved\"}'",
|
||||
"timeout": 1000
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"permissions": {
|
||||
"allow": [
|
||||
"Bash(npx claude-flow*)",
|
||||
"Bash(npx @claude-flow/*)",
|
||||
"mcp__claude-flow__*"
|
||||
],
|
||||
"deny": []
|
||||
},
|
||||
"claudeFlow": {
|
||||
"version": "3.0.0",
|
||||
"enabled": true,
|
||||
"daemon": {
|
||||
"autoStart": true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,20 @@ This serves as a baseline and default policy when sparse
|
||||
attention is not needed.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import torch
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, TYPE_CHECKING
|
||||
|
||||
from .policy import SparsePolicy, PolicyContext
|
||||
from nanovllm.utils.context import get_context
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nanovllm.kvcache.offload_engine import OffloadEngine
|
||||
from nanovllm.kvcache.manager import KVCacheManager
|
||||
from nanovllm.engine.sequence import Sequence
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FullAttentionPolicy(SparsePolicy):
|
||||
"""
|
||||
@@ -32,30 +40,34 @@ class FullAttentionPolicy(SparsePolicy):
|
||||
def select_blocks(
|
||||
self,
|
||||
available_blocks: List[int],
|
||||
offload_engine: "OffloadEngine",
|
||||
ctx: PolicyContext,
|
||||
) -> List[int]:
|
||||
"""Return all blocks - no sparsity."""
|
||||
return available_blocks
|
||||
|
||||
def compute_prefill_attention(
|
||||
def compute_chunked_attention(
|
||||
self,
|
||||
q: torch.Tensor,
|
||||
k: torch.Tensor,
|
||||
v: torch.Tensor,
|
||||
layer_id: int,
|
||||
softmax_scale: float,
|
||||
offload_engine,
|
||||
offload_engine: "OffloadEngine",
|
||||
kvcache_manager: "KVCacheManager",
|
||||
current_chunk_idx: int,
|
||||
seq,
|
||||
seq: "Sequence",
|
||||
num_tokens: int,
|
||||
) -> torch.Tensor:
|
||||
"""
|
||||
Compute full attention for chunked prefill.
|
||||
|
||||
This method handles the complete chunked prefill flow:
|
||||
1. Load historical blocks from CPU
|
||||
2. Compute attention to historical chunks
|
||||
3. Compute attention to current chunk
|
||||
4. Merge all results
|
||||
1. Get historical blocks
|
||||
2. Select blocks via select_blocks
|
||||
3. Load and compute attention to historical chunks
|
||||
4. Compute attention to current chunk
|
||||
5. Merge all results
|
||||
|
||||
Args:
|
||||
q: Query tensor [seq_len, num_heads, head_dim]
|
||||
@@ -64,22 +76,41 @@ class FullAttentionPolicy(SparsePolicy):
|
||||
layer_id: Current layer index
|
||||
softmax_scale: Softmax scaling factor
|
||||
offload_engine: OffloadEngine for loading blocks
|
||||
kvcache_manager: KVCacheManager for block management
|
||||
current_chunk_idx: Current chunk index
|
||||
seq: ChunkedSequence
|
||||
seq: Sequence object
|
||||
num_tokens: Number of tokens in current chunk
|
||||
|
||||
Returns:
|
||||
Attention output [seq_len, num_heads, head_dim]
|
||||
"""
|
||||
from nanovllm.kvcache.chunked_attention import flash_attn_with_lse, merge_attention_outputs
|
||||
|
||||
logger.debug(f"[DEBUG] FullPolicy.compute_chunked_attention called, "
|
||||
f"layer={layer_id}, chunk={current_chunk_idx}, num_tokens={num_tokens}")
|
||||
|
||||
q_batched = q.unsqueeze(0) # [1, seq_len, num_heads, head_dim]
|
||||
num_tokens = q.shape[0]
|
||||
o_acc = None
|
||||
lse_acc = None
|
||||
compute_stream = offload_engine.compute_stream
|
||||
|
||||
# Step 1: Get and load historical blocks
|
||||
cpu_block_table = seq.kvcache_manager.get_prefilled_cpu_blocks(seq)
|
||||
# Step 1: Get historical blocks
|
||||
cpu_block_table = kvcache_manager.get_prefilled_cpu_blocks(seq)
|
||||
|
||||
# Step 2: Apply select_blocks to filter blocks
|
||||
if cpu_block_table:
|
||||
num_chunks = current_chunk_idx + 1
|
||||
policy_ctx = PolicyContext(
|
||||
query_chunk_idx=current_chunk_idx,
|
||||
num_query_chunks=num_chunks,
|
||||
layer_id=layer_id,
|
||||
query=None, # Prefill typically doesn't use query for selection
|
||||
is_prefill=True,
|
||||
block_size=kvcache_manager.block_size,
|
||||
total_kv_len=len(cpu_block_table) * kvcache_manager.block_size,
|
||||
)
|
||||
cpu_block_table = self.select_blocks(cpu_block_table, offload_engine, policy_ctx)
|
||||
logger.debug(f"[DEBUG] select_blocks: output={len(cpu_block_table)} blocks")
|
||||
|
||||
if cpu_block_table:
|
||||
load_slots = list(range(offload_engine.num_ring_slots))
|
||||
@@ -139,7 +170,7 @@ class FullAttentionPolicy(SparsePolicy):
|
||||
next_cpu_block_id = cpu_block_table[next_block_idx]
|
||||
offload_engine.load_to_slot_layer(next_slot, layer_id, next_cpu_block_id)
|
||||
|
||||
# Step 2: Compute attention to current chunk (causal mask)
|
||||
# Step 4: Compute attention to current chunk (causal mask)
|
||||
with torch.cuda.stream(compute_stream):
|
||||
k_curr, v_curr = offload_engine.get_prefill_buffer_slice(layer_id, num_tokens)
|
||||
current_o, current_lse = flash_attn_with_lse(
|
||||
@@ -148,7 +179,7 @@ class FullAttentionPolicy(SparsePolicy):
|
||||
causal=True,
|
||||
)
|
||||
|
||||
# Step 3: Merge historical and current attention
|
||||
# Step 5: Merge historical and current attention
|
||||
with torch.cuda.stream(compute_stream):
|
||||
if o_acc is None:
|
||||
final_o = current_o
|
||||
|
||||
@@ -7,12 +7,17 @@ from CPU for each query chunk during chunked attention computation.
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Any
|
||||
from typing import List, Optional, Any, TYPE_CHECKING
|
||||
import torch
|
||||
|
||||
# Import SparsePolicyType from config to avoid circular imports
|
||||
from nanovllm.config import SparsePolicyType
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from nanovllm.kvcache.offload_engine import OffloadEngine
|
||||
from nanovllm.kvcache.manager import KVCacheManager
|
||||
from nanovllm.engine.sequence import Sequence
|
||||
|
||||
|
||||
@dataclass
|
||||
class PolicyContext:
|
||||
@@ -107,6 +112,7 @@ class SparsePolicy(ABC):
|
||||
def select_blocks(
|
||||
self,
|
||||
available_blocks: List[int],
|
||||
offload_engine: "OffloadEngine",
|
||||
ctx: PolicyContext,
|
||||
) -> List[int]:
|
||||
"""
|
||||
@@ -120,6 +126,8 @@ class SparsePolicy(ABC):
|
||||
available_blocks: List of CPU block IDs that contain KV cache
|
||||
from previous chunks. These are ordered by
|
||||
their position in the sequence.
|
||||
offload_engine: OffloadEngine for loading KV (some policies need
|
||||
to load KV to make selection decisions).
|
||||
ctx: PolicyContext with information about the current query
|
||||
chunk, layer, phase (prefill/decode), etc.
|
||||
|
||||
@@ -183,5 +191,47 @@ class SparsePolicy(ABC):
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def compute_chunked_attention(
|
||||
self,
|
||||
q: torch.Tensor,
|
||||
k: torch.Tensor,
|
||||
v: torch.Tensor,
|
||||
layer_id: int,
|
||||
softmax_scale: float,
|
||||
offload_engine: "OffloadEngine",
|
||||
kvcache_manager: "KVCacheManager",
|
||||
current_chunk_idx: int,
|
||||
seq: "Sequence",
|
||||
num_tokens: int,
|
||||
) -> torch.Tensor:
|
||||
"""
|
||||
Compute chunked prefill attention (complete flow).
|
||||
|
||||
This is the main entry point for prefill attention computation.
|
||||
It defines the complete prefill flow:
|
||||
1. Get historical blocks
|
||||
2. Select blocks (call select_blocks)
|
||||
3. Load and compute historical blocks via offload_engine
|
||||
4. Get current chunk KV from offload_engine, compute attention
|
||||
5. Merge all results
|
||||
|
||||
Args:
|
||||
q: [seq_len, num_heads, head_dim] query for current chunk
|
||||
k: [seq_len, num_kv_heads, head_dim] key for current chunk (in prefill buffer)
|
||||
v: [seq_len, num_kv_heads, head_dim] value for current chunk (in prefill buffer)
|
||||
layer_id: transformer layer index
|
||||
softmax_scale: softmax scaling factor
|
||||
offload_engine: OffloadEngine for loading blocks
|
||||
kvcache_manager: KVCacheManager for block management
|
||||
current_chunk_idx: current chunk index
|
||||
seq: Sequence object
|
||||
num_tokens: number of tokens in current chunk
|
||||
|
||||
Returns:
|
||||
[seq_len, num_heads, head_dim] final attention output
|
||||
"""
|
||||
pass
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__.__name__}()"
|
||||
|
||||
@@ -174,123 +174,45 @@ class Attention(nn.Module):
|
||||
"""
|
||||
Compute attention with per-layer prefill buffer for async offload.
|
||||
|
||||
Optimized design:
|
||||
- Current chunk's KV is written to per-layer prefill buffer (not GPU slot)
|
||||
- Previous chunks' KV are loaded from CPU using GPU slots
|
||||
- Each layer offloads from its own buffer - no waiting required!
|
||||
Simplified design:
|
||||
- All computation logic is delegated to sparse_policy.compute_chunked_attention()
|
||||
- This method only handles async offload after computation
|
||||
|
||||
For each layer:
|
||||
1. Current chunk's KV is in prefill_buffer[layer_id] (just written by model)
|
||||
2. Load previous chunks from CPU using available slots (pipeline)
|
||||
3. Compute attention against previous KV (no causal mask)
|
||||
4. Compute attention against current KV from prefill buffer (causal)
|
||||
5. Merge all results using online softmax
|
||||
6. Async offload prefill buffer to CPU (no waiting!)
|
||||
The policy handles:
|
||||
1. Loading historical blocks from CPU
|
||||
2. Computing attention against historical KV (no causal mask)
|
||||
3. Computing attention against current KV from prefill buffer (causal)
|
||||
4. Merging all results
|
||||
"""
|
||||
from nanovllm.kvcache.chunked_attention import flash_attn_with_lse, merge_attention_outputs
|
||||
|
||||
current_chunk_idx = context.current_chunk_idx
|
||||
torch.cuda.nvtx.range_push(f"ChunkedPrefill: L{self.layer_id} Chunk{current_chunk_idx}")
|
||||
|
||||
# q shape: [total_tokens, num_heads, head_dim]
|
||||
q_batched = q.unsqueeze(0) # [1, total_tokens, heads, dim]
|
||||
num_tokens = k.shape[0]
|
||||
|
||||
o_acc = None
|
||||
lse_acc = None
|
||||
|
||||
kvcache_manager = context.kvcache_manager
|
||||
seq = context.chunked_seq if hasattr(context, 'chunked_seq') else None
|
||||
offload_engine = kvcache_manager.offload_engine if kvcache_manager is not None else None
|
||||
|
||||
if kvcache_manager is not None and seq is not None and self.layer_id >= 0:
|
||||
# Get prefilled CPU blocks (blocks from previous chunks)
|
||||
cpu_block_table = kvcache_manager.get_prefilled_cpu_blocks(seq)
|
||||
# Get sparse policy - required for chunked prefill
|
||||
sparse_policy = kvcache_manager.sparse_policy
|
||||
if sparse_policy is None:
|
||||
raise RuntimeError("sparse_policy is required for chunked prefill")
|
||||
|
||||
# Apply sparse policy if enabled
|
||||
sparse_policy = kvcache_manager.sparse_policy
|
||||
# [DEBUG] Verify execution path
|
||||
logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, "
|
||||
f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}")
|
||||
|
||||
# === All sparse policies use select_blocks interface ===
|
||||
if cpu_block_table and sparse_policy is not None:
|
||||
num_chunks = getattr(context, 'num_chunks', current_chunk_idx + 1)
|
||||
policy_ctx = PolicyContext(
|
||||
query_chunk_idx=current_chunk_idx,
|
||||
num_query_chunks=num_chunks,
|
||||
layer_id=self.layer_id,
|
||||
query=None, # Prefill typically doesn't use query for selection
|
||||
is_prefill=True,
|
||||
block_size=kvcache_manager.block_size,
|
||||
total_kv_len=len(cpu_block_table) * kvcache_manager.block_size,
|
||||
)
|
||||
cpu_block_table = sparse_policy.select_blocks(
|
||||
cpu_block_table, policy_ctx
|
||||
)
|
||||
|
||||
if cpu_block_table:
|
||||
# Get available load slots (all slots can be used since we use prefill buffer)
|
||||
load_slots = list(range(offload_engine.num_ring_slots))
|
||||
pipeline_depth = len(load_slots)
|
||||
|
||||
if pipeline_depth == 0:
|
||||
# Only 1 slot total, cannot pipeline - use sync loading
|
||||
o_acc, lse_acc = self._sync_load_previous_chunks(
|
||||
q_batched, cpu_block_table, offload_engine
|
||||
)
|
||||
else:
|
||||
# Use ring buffer pipeline
|
||||
o_acc, lse_acc = self._ring_buffer_pipeline_load(
|
||||
q_batched, cpu_block_table, load_slots, offload_engine,
|
||||
current_chunk_idx
|
||||
)
|
||||
|
||||
# Get compute stream for all attention operations
|
||||
compute_stream = offload_engine.compute_stream if offload_engine is not None else None
|
||||
|
||||
# Compute attention against current chunk's KV from prefill buffer (with causal mask)
|
||||
needs_current_chunk_attention = True
|
||||
|
||||
if needs_current_chunk_attention:
|
||||
if compute_stream is not None:
|
||||
with torch.cuda.stream(compute_stream):
|
||||
torch.cuda.nvtx.range_push(f"FlashAttn: L{self.layer_id} CurrentChunk (causal)")
|
||||
# Get KV from per-layer prefill buffer
|
||||
k_batched, v_batched = offload_engine.get_prefill_buffer_slice(self.layer_id, num_tokens)
|
||||
current_o, current_lse = flash_attn_with_lse(
|
||||
q_batched,
|
||||
k_batched,
|
||||
v_batched,
|
||||
softmax_scale=self.scale,
|
||||
causal=True,
|
||||
)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
else:
|
||||
torch.cuda.nvtx.range_push(f"FlashAttn: L{self.layer_id} CurrentChunk (causal)")
|
||||
k_batched = k.unsqueeze(0)
|
||||
v_batched = v.unsqueeze(0)
|
||||
current_o, current_lse = flash_attn_with_lse(
|
||||
q_batched,
|
||||
k_batched,
|
||||
v_batched,
|
||||
softmax_scale=self.scale,
|
||||
causal=True,
|
||||
)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
|
||||
# Merge with accumulated (all on compute_stream for consistency)
|
||||
if o_acc is None:
|
||||
# No accumulated attention (no historical chunks processed)
|
||||
final_o = current_o
|
||||
else:
|
||||
# Has accumulated attention (historical chunks processed)
|
||||
if compute_stream is not None:
|
||||
with torch.cuda.stream(compute_stream):
|
||||
torch.cuda.nvtx.range_push(f"MergeAttn: L{self.layer_id}")
|
||||
final_o, _ = merge_attention_outputs(o_acc, lse_acc, current_o, current_lse)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
else:
|
||||
torch.cuda.nvtx.range_push(f"MergeAttn: L{self.layer_id}")
|
||||
final_o, _ = merge_attention_outputs(o_acc, lse_acc, current_o, current_lse)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
# Delegate all computation to policy (no flash_attn or merge calls here!)
|
||||
final_o = sparse_policy.compute_chunked_attention(
|
||||
q, k, v,
|
||||
self.layer_id,
|
||||
self.scale,
|
||||
offload_engine,
|
||||
kvcache_manager,
|
||||
current_chunk_idx,
|
||||
seq,
|
||||
num_tokens,
|
||||
)
|
||||
|
||||
torch.cuda.nvtx.range_pop() # ChunkedPrefill
|
||||
|
||||
@@ -305,181 +227,7 @@ class Attention(nn.Module):
|
||||
self.layer_id, cpu_block_id, num_tokens
|
||||
)
|
||||
|
||||
# Sync default stream with compute_stream before returning
|
||||
# This ensures the result is ready for the rest of the model (layernorm, MLP)
|
||||
if compute_stream is not None:
|
||||
torch.cuda.default_stream().wait_stream(compute_stream)
|
||||
|
||||
# Remove batch dimension: [1, total_tokens, heads, dim] -> [total_tokens, heads, dim]
|
||||
return final_o.squeeze(0)
|
||||
|
||||
def _sync_load_previous_chunks(
|
||||
self,
|
||||
q_batched: torch.Tensor,
|
||||
cpu_block_table: list,
|
||||
offload_engine,
|
||||
):
|
||||
"""Synchronous loading fallback when pipeline_depth=0."""
|
||||
from nanovllm.kvcache.chunked_attention import flash_attn_with_lse, merge_attention_outputs
|
||||
|
||||
o_acc, lse_acc = None, None
|
||||
compute_stream = offload_engine.compute_stream
|
||||
|
||||
for block_idx, cpu_block_id in enumerate(cpu_block_table):
|
||||
# Load to slot 0 (single slot)
|
||||
offload_engine.load_to_slot_layer(0, self.layer_id, cpu_block_id)
|
||||
offload_engine.wait_slot_layer(0)
|
||||
|
||||
# IMPORTANT: Must use compute_stream to match wait_slot_layer
|
||||
with torch.cuda.stream(compute_stream):
|
||||
prev_k, prev_v = offload_engine.get_kv_for_slot(0)
|
||||
|
||||
prev_o, prev_lse = flash_attn_with_lse(
|
||||
q_batched, prev_k, prev_v,
|
||||
softmax_scale=self.scale,
|
||||
causal=False,
|
||||
)
|
||||
|
||||
if o_acc is None:
|
||||
o_acc, lse_acc = prev_o, prev_lse
|
||||
else:
|
||||
o_acc, lse_acc = merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)
|
||||
|
||||
return o_acc, lse_acc
|
||||
|
||||
def _ring_buffer_pipeline_load(
|
||||
self,
|
||||
q_batched: torch.Tensor,
|
||||
cpu_block_table: list,
|
||||
load_slots: list,
|
||||
offload_engine,
|
||||
current_chunk_idx: int = -1,
|
||||
):
|
||||
"""
|
||||
Ring buffer async pipeline loading with double buffering.
|
||||
|
||||
Uses compute_done events to ensure safe buffer reuse:
|
||||
- Before loading to slot X, wait for previous compute on slot X to finish
|
||||
- Before computing on slot X, wait for load to slot X to finish
|
||||
|
||||
Timeline with 2 slots (A, B):
|
||||
┌──────────────┐
|
||||
│ Load B0→A │
|
||||
└──────────────┘
|
||||
┌──────────────┐ ┌──────────────┐
|
||||
│ Load B1→B │ │ Load B2→A │ ...
|
||||
└──────────────┘ └──────────────┘
|
||||
↘ ↘
|
||||
┌──────────────┐ ┌──────────────┐
|
||||
│ Compute(A) │ │ Compute(B) │ ...
|
||||
└──────────────┘ └──────────────┘
|
||||
|
||||
The load_to_slot_layer internally waits for compute_done[slot] before
|
||||
starting the transfer, ensuring no data race.
|
||||
"""
|
||||
from nanovllm.kvcache.chunked_attention import flash_attn_with_lse, merge_attention_outputs
|
||||
|
||||
num_blocks = len(cpu_block_table)
|
||||
if num_blocks == 0:
|
||||
return None, None
|
||||
|
||||
pipeline_depth = len(load_slots)
|
||||
if pipeline_depth == 0:
|
||||
return None, None
|
||||
|
||||
o_acc, lse_acc = None, None
|
||||
|
||||
if pipeline_depth == 1:
|
||||
# Only 1 slot available, cannot pipeline - use synchronous mode
|
||||
# IMPORTANT: Must use compute_stream to match synchronization in
|
||||
# load_to_slot_layer (waits for compute_done) and wait_slot_layer
|
||||
slot = load_slots[0]
|
||||
compute_stream = offload_engine.compute_stream
|
||||
for block_idx in range(num_blocks):
|
||||
cpu_block_id = cpu_block_table[block_idx]
|
||||
offload_engine.load_to_slot_layer(slot, self.layer_id, cpu_block_id)
|
||||
offload_engine.wait_slot_layer(slot)
|
||||
|
||||
with torch.cuda.stream(compute_stream):
|
||||
# Debug: call hooks on compute_stream (synchronized with transfer)
|
||||
if offload_engine.debug_mode:
|
||||
offload_engine._call_debug_hooks(slot, self.layer_id, cpu_block_id)
|
||||
|
||||
prev_k, prev_v = offload_engine.get_kv_for_slot(slot)
|
||||
|
||||
prev_o, prev_lse = flash_attn_with_lse(
|
||||
q_batched, prev_k, prev_v,
|
||||
softmax_scale=self.scale,
|
||||
causal=False,
|
||||
)
|
||||
# Record compute done so next load can safely reuse this slot
|
||||
offload_engine.record_slot_compute_done(slot)
|
||||
if o_acc is None:
|
||||
o_acc, lse_acc = prev_o, prev_lse
|
||||
else:
|
||||
o_acc, lse_acc = merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)
|
||||
return o_acc, lse_acc
|
||||
|
||||
# N-way pipeline: use ALL available slots for maximum overlap
|
||||
# Pipeline depth = num_slots - 1 (num_slots blocks in flight)
|
||||
num_slots = len(load_slots)
|
||||
|
||||
# Phase 1: Pre-load up to num_slots blocks to fill the pipeline
|
||||
# This starts all transfers in parallel, utilizing full PCIe bandwidth
|
||||
num_preload = min(num_slots, num_blocks)
|
||||
for i in range(num_preload):
|
||||
offload_engine.load_to_slot_layer(load_slots[i], self.layer_id, cpu_block_table[i])
|
||||
|
||||
# Phase 2: Main loop - compute and immediately reuse slot for next transfer
|
||||
# Use dedicated compute_stream (not default stream) to enable overlap with transfers
|
||||
compute_stream = offload_engine.compute_stream
|
||||
|
||||
for block_idx in range(num_blocks):
|
||||
torch.cuda.nvtx.range_push(f"PipelineBlock: L{self.layer_id} B{block_idx}")
|
||||
|
||||
# Cycle through slots: slot[block_idx % num_slots]
|
||||
current_slot = load_slots[block_idx % num_slots]
|
||||
cpu_block_id = cpu_block_table[block_idx]
|
||||
|
||||
# Wait for current slot's transfer to complete (on compute_stream)
|
||||
offload_engine.wait_slot_layer(current_slot)
|
||||
|
||||
# Compute attention on current slot's data
|
||||
# IMPORTANT: Use dedicated compute_stream to avoid implicit sync with default stream
|
||||
with torch.cuda.stream(compute_stream):
|
||||
# Debug: call hooks on compute_stream (synchronized with transfer)
|
||||
if offload_engine.debug_mode:
|
||||
offload_engine._call_debug_hooks(current_slot, self.layer_id, cpu_block_id)
|
||||
|
||||
torch.cuda.nvtx.range_push(f"FlashAttn: L{self.layer_id} PrevBlock{block_idx}")
|
||||
prev_k, prev_v = offload_engine.get_kv_for_slot(current_slot)
|
||||
|
||||
prev_o, prev_lse = flash_attn_with_lse(
|
||||
q_batched, prev_k, prev_v,
|
||||
softmax_scale=self.scale,
|
||||
causal=False,
|
||||
)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
|
||||
# Record compute done - this allows the next transfer to safely overwrite this slot
|
||||
offload_engine.record_slot_compute_done(current_slot)
|
||||
|
||||
# Immediately start loading the NEXT block into this slot (if more blocks remain)
|
||||
# Key insight: reuse current_slot immediately after compute is done!
|
||||
next_block_idx = block_idx + num_slots
|
||||
if next_block_idx < num_blocks:
|
||||
offload_engine.load_to_slot_layer(current_slot, self.layer_id, cpu_block_table[next_block_idx])
|
||||
|
||||
# Merge with accumulated (also on compute_stream for consistency)
|
||||
with torch.cuda.stream(compute_stream):
|
||||
if o_acc is None:
|
||||
o_acc, lse_acc = prev_o, prev_lse
|
||||
else:
|
||||
o_acc, lse_acc = merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)
|
||||
|
||||
torch.cuda.nvtx.range_pop() # PipelineBlock
|
||||
|
||||
return o_acc, lse_acc
|
||||
return final_o
|
||||
|
||||
def _chunked_decode_attention(
|
||||
self,
|
||||
@@ -524,6 +272,8 @@ class Attention(nn.Module):
|
||||
if last_block_valid_tokens == 0 and total_prefill_tokens > 0:
|
||||
last_block_valid_tokens = block_size # Last block was exactly full
|
||||
|
||||
offload_engine = kvcache_manager.offload_engine
|
||||
|
||||
# Apply sparse policy if enabled (Quest does Top-K selection for decode)
|
||||
sparse_policy = kvcache_manager.sparse_policy
|
||||
if sparse_policy is not None:
|
||||
@@ -537,11 +287,9 @@ class Attention(nn.Module):
|
||||
total_kv_len=len(cpu_block_table) * kvcache_manager.block_size,
|
||||
)
|
||||
cpu_block_table = sparse_policy.select_blocks(
|
||||
cpu_block_table, policy_ctx
|
||||
cpu_block_table, offload_engine, policy_ctx
|
||||
)
|
||||
|
||||
offload_engine = kvcache_manager.offload_engine
|
||||
|
||||
# Use cross-layer pipeline if active (initialized in model_runner)
|
||||
if offload_engine.is_pipeline_active():
|
||||
o_acc, lse_acc = self._decode_with_layer_pipeline(
|
||||
|
||||
550
task_plan.md
550
task_plan.md
@@ -1,39 +1,69 @@
|
||||
# Task Plan: Sparse Policy 架构重构 v3
|
||||
# Task Plan: Sparse Policy 架构重构 v4 (FullPolicy Only)
|
||||
|
||||
## Goal
|
||||
|
||||
将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。attention.py 只负责调用 policy,不包含任何计算逻辑。
|
||||
将 chunked prefill 的 attention 计算逻辑完全从 `attention.py` 移到 `SparsePolicy` 内部。
|
||||
|
||||
## 核心设计原则(强制要求)
|
||||
### 验收标准(必须全部满足)
|
||||
|
||||
1. **Policy 内部完成所有计算**:包括 attention 计算和结果合并
|
||||
2. **select_blocks 传入 offload_engine**:policy 通过 offload_engine 加载 blocks
|
||||
3. **强制实现计算函数**:所有 policy 必须实现 `compute_block_attention` 和 `merge_attention_outputs`
|
||||
| # | 标准 | 说明 |
|
||||
|---|------|------|
|
||||
| **1** | `test_needle.py --enable-offload` 通过 | 功能正确性验证 |
|
||||
| **2** | `attention.py` 中 chunked prefill 路径零计算调用 | 不直接调用 `flash_attn_*` 或 `merge_attention_outputs`,全部由 policy 完成 |
|
||||
| **3** | 所有 KV 通信由 `offload_engine` 完成 | 不直接调用 `torch.copy_` 或 `.copy()` 进行 KV 数据传输 |
|
||||
|
||||
**范围**: 仅实现 FullPolicy,暂不涉及 QuestPolicy 和 XAttentionBSAPolicy。Decode 阶段不处理。
|
||||
|
||||
## 当前代码状态(重要发现)
|
||||
|
||||
**`FullPolicy.compute_prefill_attention` 已经实现了完整的 prefill 流程!**
|
||||
|
||||
但 `attention.py` 没有调用它,而是:
|
||||
- 调用 `sparse_policy.select_blocks()` 仅做 block 筛选
|
||||
- 自己实现 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks`
|
||||
- 自己调用 `flash_attn_with_lse` 和 `merge_attention_outputs`
|
||||
|
||||
**结论**:当前代码有冗余,同样的逻辑在两个地方实现。
|
||||
|
||||
### 当前 attention.py 中的违规调用(需要移除)
|
||||
|
||||
```python
|
||||
# 直接计算调用(违反目标 2)
|
||||
flash_attn_with_lse(...)
|
||||
merge_attention_outputs(...)
|
||||
|
||||
# 直接通信调用(违反目标 3)
|
||||
offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k)
|
||||
offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v)
|
||||
```
|
||||
|
||||
## 核心设计原则
|
||||
|
||||
1. **Policy 内部完成所有 prefill 计算**:包括 block 加载、attention 计算和结果合并
|
||||
2. **select_blocks 传入 offload_engine**:其他策略(Quest/XAttn)可能需要加载 KV 来判断
|
||||
3. **统一方法命名**:使用 `compute_chunked_attention`(不是 `compute_prefill_attention`)
|
||||
4. **chunked_prefill 强制 policy 存在**:没有 policy 则报错
|
||||
5. **外部默认 FULL policy**:model_runner.py 默认创建 FullPolicy
|
||||
6. **attention.py 零计算逻辑**:_chunked_prefill_attention 只调用 policy,不直接调用 flashattn 或 merge
|
||||
5. **attention.py 零计算逻辑**:`_chunked_prefill_attention` 只调用 policy
|
||||
6. **所有 KV 通信通过 offload_engine**:不直接调用 torch.copy
|
||||
|
||||
## 目标架构
|
||||
|
||||
```
|
||||
model_runner.py:
|
||||
默认创建 FullPolicy(如果没有指定 sparse policy)
|
||||
|
||||
attention.py (_chunked_prefill_attention):
|
||||
检查 sparse_policy 是否存在
|
||||
↓
|
||||
调用 sparse_policy.compute_prefill_attention(q, k, v, ...)
|
||||
调用 sparse_policy.compute_chunked_attention(q, k, v, ...)
|
||||
↓
|
||||
返回最终输出(不包含任何计算逻辑)
|
||||
处理 async offload(通过 offload_engine)
|
||||
↓
|
||||
返回最终输出(不包含任何计算逻辑,不包含任何直接 copy 调用)
|
||||
|
||||
SparsePolicy.compute_prefill_attention():
|
||||
1. select_blocks(blocks, offload_engine, ctx) → 筛选 blocks
|
||||
2. 加载 blocks(通过 offload_engine)
|
||||
3. 遍历 blocks:
|
||||
- 调用 self.compute_block_attention(q, k, v, ...)
|
||||
- 调用 self.merge_attention_outputs(...)
|
||||
4. 计算当前 chunk attention
|
||||
5. 合并最终结果
|
||||
SparsePolicy.compute_chunked_attention():
|
||||
1. 获取 cpu_block_table
|
||||
2. 调用 select_blocks(blocks, offload_engine, ctx) → 筛选 blocks
|
||||
3. 通过 offload_engine 加载 blocks 并计算 attention(pipeline 或 sync)
|
||||
4. 通过 offload_engine 获取当前 chunk KV,计算 attention(causal)
|
||||
5. 合并所有结果
|
||||
6. 返回 final_output
|
||||
```
|
||||
|
||||
@@ -41,106 +71,82 @@ SparsePolicy.compute_prefill_attention():
|
||||
|
||||
| 决策 | 说明 |
|
||||
|------|------|
|
||||
| **决策 1** | `compute_block_attention` 是抽象方法,所有 policy 必须实现 |
|
||||
| **决策 2** | `merge_attention_outputs` 是抽象方法,所有 policy 必须实现 |
|
||||
| **决策 3** | `compute_prefill_attention` 是抽象方法,定义完整的 prefill 流程 |
|
||||
| **决策 4** | `select_blocks` 接收 `offload_engine` 参数(为未来准备) |
|
||||
| **决策 5** | chunked_prefill 检查 policy 是否存在,不存在则抛出错误 |
|
||||
| **决策 6** | model_runner 默认创建 FullPolicy 作为兜底 |
|
||||
| **决策 7** | attention.py 的 _chunked_prefill_attention 不包含任何 flashattn 或 merge 调用 |
|
||||
| **决策 1** | `compute_chunked_attention` 是唯一的抽象方法,定义完整 prefill 流程 |
|
||||
| **决策 2** | 不添加 `compute_block_attention` 和 `merge_attention_outputs` 抽象方法(过度设计) |
|
||||
| **决策 3** | `select_blocks` 接收 `offload_engine` 参数(其他策略需要) |
|
||||
| **决策 4** | attention.py 的 `_chunked_prefill_attention` 不包含任何 flashattn 或 merge 调用 |
|
||||
| **决策 5** | Decode 阶段不处理,保持现有逻辑 |
|
||||
| **决策 6** | async offload 逻辑保留在 attention.py(通过 offload_engine 方法调用) |
|
||||
| **决策 7** | Phase 4 需要添加 debug 输出验证执行路径 |
|
||||
| **决策 8** | 所有 KV 通信必须通过 offload_engine 方法,不直接调用 torch.copy |
|
||||
|
||||
## Phases
|
||||
|
||||
- [ ] Phase 1: 分析当前架构,理解所有计算逻辑的位置
|
||||
- [ ] Phase 2: 在 SparsePolicy 基类中添加三个抽象方法
|
||||
- [ ] Phase 3: 修改 FullPolicy,实现三个抽象方法
|
||||
- [ ] Phase 4: 修改 QuestPolicy,实现三个抽象方法
|
||||
- [ ] Phase 5: 修改 XAttentionBSAPolicy,实现三个抽象方法
|
||||
- [ ] Phase 6: 修改 model_runner.py,默认创建 FullPolicy
|
||||
- [ ] Phase 7: 修改 attention.py,移除所有计算逻辑,只调用 policy
|
||||
- [ ] Phase 8: 测试验证
|
||||
- [x] Phase 1: 分析当前架构 ✅ 已完成
|
||||
- [ ] Phase 2: 修改 SparsePolicy 基类
|
||||
- [ ] Phase 3: 修改 FullPolicy
|
||||
- [ ] Phase 4: 验证执行路径(添加 debug 输出)
|
||||
- [ ] Phase 5: 修改 attention.py
|
||||
- [ ] Phase 6: 测试验证
|
||||
|
||||
## Phase 1: 分析当前架构,理解所有计算逻辑的位置
|
||||
## Phase 1: 分析当前架构 ✅ 已完成
|
||||
|
||||
### 当前 attention.py 中包含的计算逻辑
|
||||
|
||||
1. `_ring_buffer_pipeline_load` 方法:
|
||||
- 调用 `offload_engine.load_to_slot_layer()`
|
||||
- 调用 `offload_engine.wait_slot_layer()`
|
||||
- 调用 `offload_engine.get_kv_for_slot()`
|
||||
- 调用 `flash_attn_with_lse()` ← **直接调用**
|
||||
- 调用 `merge_attention_outputs()` ← **直接调用**
|
||||
|
||||
2. `_sync_load_previous_chunks` 方法:
|
||||
- 同上,直接调用 flashattn 和 merge
|
||||
### 当前 attention.py 中包含的计算逻辑(需要移除)
|
||||
|
||||
1. `_ring_buffer_pipeline_load` 方法:直接调用 flashattn 和 merge
|
||||
2. `_sync_load_previous_chunks` 方法:直接调用 flashattn 和 merge
|
||||
3. `_chunked_prefill_attention` 方法:
|
||||
- 调用 `_ring_buffer_pipeline_load` 或 `_sync_load_previous_chunks`
|
||||
- 调用 `flash_attn_with_lse()` 计算当前 chunk
|
||||
- 调用 `merge_attention_outputs()` 合并结果
|
||||
- 调用上述两个方法
|
||||
- 计算当前 chunk(flash_attn)
|
||||
- 合并结果(merge)
|
||||
|
||||
### 需要移动的计算逻辑
|
||||
### 当前 attention.py 中的直接 copy 调用(需要移除或封装)
|
||||
|
||||
所有 `flash_attn_with_lse` 和 `merge_attention_outputs` 调用都应该在 SparsePolicy 内部。
|
||||
```python
|
||||
# attention.py:115-116 - 写入 prefill buffer
|
||||
offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k)
|
||||
offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v)
|
||||
```
|
||||
|
||||
## Phase 2: 在 SparsePolicy 基类中添加三个抽象方法
|
||||
**处理方案**:在 offload_engine 中添加封装方法,或将此逻辑移入 policy。
|
||||
|
||||
### 2.1 compute_block_attention
|
||||
### 当前 FullPolicy 已实现的功能
|
||||
|
||||
`full_policy.py:40-162` 的 `compute_prefill_attention` 已实现:
|
||||
- ring buffer pipeline 加载
|
||||
- sync 加载 fallback
|
||||
- 当前 chunk attention 计算
|
||||
- 结果合并
|
||||
|
||||
**只需重命名为 `compute_chunked_attention` 并微调接口。**
|
||||
|
||||
## Phase 2: 修改 SparsePolicy 基类
|
||||
|
||||
### 2.1 修改 select_blocks 接口
|
||||
|
||||
```python
|
||||
@abstractmethod
|
||||
def compute_block_attention(
|
||||
def select_blocks(
|
||||
self,
|
||||
q: torch.Tensor,
|
||||
k: torch.Tensor,
|
||||
v: torch.Tensor,
|
||||
layer_id: int,
|
||||
softmax_scale: float,
|
||||
causal: bool,
|
||||
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
||||
available_blocks: List[int],
|
||||
offload_engine: "OffloadEngine", # 新增参数
|
||||
ctx: PolicyContext,
|
||||
) -> List[int]:
|
||||
"""
|
||||
计算单个 block 的 attention。
|
||||
选择要加载的 blocks。
|
||||
|
||||
Args:
|
||||
q: [1, seq_len, num_heads, head_dim] 或 [seq_len, num_heads, head_dim]
|
||||
k, v: 同上
|
||||
layer_id: 层索引
|
||||
softmax_scale: softmax 缩放因子
|
||||
causal: 是否应用因果掩码
|
||||
available_blocks: 所有可用的 block IDs
|
||||
offload_engine: offload engine(其他策略可能需要加载 KV 来判断)
|
||||
ctx: policy context
|
||||
|
||||
Returns:
|
||||
(o, lse) - attention 输出和 LSE
|
||||
选择的 block IDs
|
||||
"""
|
||||
pass
|
||||
```
|
||||
|
||||
### 2.2 merge_attention_outputs
|
||||
|
||||
```python
|
||||
@abstractmethod
|
||||
def merge_attention_outputs(
|
||||
self,
|
||||
o_acc: torch.Tensor,
|
||||
lse_acc: Optional[torch.Tensor],
|
||||
o_new: torch.Tensor,
|
||||
lse_new: Optional[torch.Tensor],
|
||||
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
||||
"""
|
||||
合并两个 attention 输出。
|
||||
|
||||
Args:
|
||||
o_acc: 累积的 attention 输出 [1, seq_len, num_heads, head_dim]
|
||||
lse_acc: 累积的 LSE
|
||||
o_new: 新的 attention 输出
|
||||
lse_new: 新的 LSE
|
||||
|
||||
Returns:
|
||||
(merged_o, merged_lse)
|
||||
"""
|
||||
pass
|
||||
```
|
||||
|
||||
### 2.3 compute_chunked_attention
|
||||
### 2.2 添加 compute_chunked_attention 抽象方法
|
||||
|
||||
```python
|
||||
@abstractmethod
|
||||
@@ -151,9 +157,9 @@ def compute_chunked_attention(
|
||||
v: torch.Tensor,
|
||||
layer_id: int,
|
||||
softmax_scale: float,
|
||||
offload_engine: OffloadEngine,
|
||||
offload_engine: "OffloadEngine",
|
||||
current_chunk_idx: int,
|
||||
seq: ChunkedSequence,
|
||||
seq: "ChunkedSequence",
|
||||
num_tokens: int,
|
||||
) -> torch.Tensor:
|
||||
"""
|
||||
@@ -162,12 +168,13 @@ def compute_chunked_attention(
|
||||
这是 policy 的主入口,定义完整的 prefill 计算流程:
|
||||
1. 获取历史 blocks
|
||||
2. 筛选 blocks(调用 select_blocks)
|
||||
3. 加载和计算历史 blocks
|
||||
4. 计算当前 chunk attention
|
||||
3. 通过 offload_engine 加载和计算历史 blocks
|
||||
4. 通过 offload_engine 获取当前 chunk KV,计算 attention
|
||||
5. 合并所有结果
|
||||
|
||||
Args:
|
||||
q, k, v: 当前 chunk 的 QKV
|
||||
q: [seq_len, num_heads, head_dim] 当前 chunk 的 query
|
||||
k, v: [seq_len, num_kv_heads, head_dim] 当前 chunk 的 KV(已写入 prefill buffer)
|
||||
layer_id: 层索引
|
||||
softmax_scale: softmax 缩放因子
|
||||
offload_engine: offload engine
|
||||
@@ -176,173 +183,280 @@ def compute_chunked_attention(
|
||||
num_tokens: 当前 chunk 的 token 数
|
||||
|
||||
Returns:
|
||||
[seq_len, num_heads, head_dim] 最终 attention输出
|
||||
[seq_len, num_heads, head_dim] 最终 attention 输出
|
||||
"""
|
||||
pass
|
||||
```
|
||||
|
||||
### 2.4 修改 select_blocks 接口
|
||||
## Phase 3: 修改 FullPolicy
|
||||
|
||||
### 3.1 重命名方法
|
||||
|
||||
将 `compute_prefill_attention` 重命名为 `compute_chunked_attention`。
|
||||
|
||||
### 3.2 修改 select_blocks 签名
|
||||
|
||||
```python
|
||||
def select_blocks(
|
||||
self,
|
||||
available_blocks: List[int],
|
||||
offload_engine: OffloadEngine,
|
||||
offload_engine: "OffloadEngine", # 新增参数(不使用)
|
||||
ctx: PolicyContext,
|
||||
) -> List[int]:
|
||||
"""
|
||||
选择要加载的 blocks。
|
||||
|
||||
Args:
|
||||
available_blocks: 所有可用的 block IDs
|
||||
offload_engine: offload engine(为未来准备,当前可能不使用)
|
||||
ctx: policy context
|
||||
|
||||
Returns:
|
||||
选择的 block IDs
|
||||
"""
|
||||
pass
|
||||
"""Return all blocks - no sparsity."""
|
||||
return available_blocks
|
||||
```
|
||||
|
||||
## Phase 3: 修改 FullPolicy,实现三个抽象方法
|
||||
### 3.3 验证 compute_chunked_attention 实现
|
||||
|
||||
### 3.1 FullPolicy.compute_block_attention
|
||||
当前 `compute_prefill_attention` 已实现完整逻辑,确认:
|
||||
- [x] 获取 cpu_block_table
|
||||
- [x] ring buffer pipeline 加载(通过 offload_engine)
|
||||
- [x] sync 加载 fallback(通过 offload_engine)
|
||||
- [x] 当前 chunk attention 计算
|
||||
- [x] 结果合并
|
||||
|
||||
直接调用 `flash_attn_with_lse`,处理 3D 输入。
|
||||
**注意**:当前实现没有调用 `select_blocks`,需要添加。
|
||||
|
||||
### 3.2 FullPolicy.merge_attention_outputs
|
||||
### 3.4 确保所有 KV 通信通过 offload_engine
|
||||
|
||||
调用 `chunked_attention.merge_attention_outputs`。
|
||||
检查 `compute_chunked_attention` 内部:
|
||||
- 历史 block 加载:已通过 `offload_engine.load_to_slot_layer()` 等方法 ✅
|
||||
- 当前 chunk KV 获取:已通过 `offload_engine.get_prefill_buffer_slice()` ✅
|
||||
|
||||
### 3.3 FullPolicy.compute_prefill_attention
|
||||
## Phase 4: 验证执行路径(添加 debug 输出)
|
||||
|
||||
实现完整的 prefill 流程:
|
||||
1. 获取 `cpu_block_table = kvcache_manager.get_prefilled_cpu_blocks(seq)`
|
||||
2. 调用 `select_blocks(cpu_block_table, offload_engine, ctx)`
|
||||
3. 遍历 blocks:
|
||||
- `offload_engine.load_to_slot_layer(slot, layer_id, cpu_block_id)`
|
||||
- `offload_engine.wait_slot_layer(slot)`
|
||||
- `k, v = offload_engine.get_kv_for_slot(slot)`
|
||||
- 调用 `self.compute_block_attention(q, k, v, layer_id, scale, causal=False)`
|
||||
- 调用 `self.merge_attention_outputs(o_acc, lse_acc, prev_o, prev_lse)`
|
||||
4. 计算当前 chunk attention
|
||||
5. 合并最终结果
|
||||
### 4.1 验证目标
|
||||
|
||||
### 需要移动的代码
|
||||
确认代码修改后,执行路径正确:
|
||||
|
||||
从 `attention.py` 的 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` 移动逻辑:
|
||||
- slot 遍历逻辑
|
||||
- offload_engine 调用
|
||||
- 计算和合并逻辑
|
||||
| 检查点 | 位置 | 预期行为 |
|
||||
|--------|------|----------|
|
||||
| **Policy 创建** | `kvcache/__init__.py` | FullAttentionPolicy 被创建 |
|
||||
| **Policy 调用** | `attention.py` | `_chunked_prefill_attention` 调用 `sparse_policy.compute_chunked_attention` |
|
||||
| **select_blocks 调用** | `full_policy.py` | `compute_chunked_attention` 内部调用 `select_blocks` |
|
||||
| **旧方法未调用** | `attention.py` | `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks` 不再被调用 |
|
||||
| **无直接 copy 调用** | `attention.py` | chunked prefill 路径不直接调用 `.copy_()` |
|
||||
|
||||
从 `attention.py` 的 `_chunked_prefill_attention` 移动逻辑:
|
||||
- 当前 chunk 的 attention 计算
|
||||
- 最终合并逻辑
|
||||
|
||||
## Phase 4: 修改 QuestPolicy
|
||||
|
||||
QuestPolicy 实现与 FullPolicy 类似,区别在于:
|
||||
- `select_blocks` 返回 Top-K blocks
|
||||
- 其他计算逻辑相同
|
||||
|
||||
## Phase 5: 修改 XAttentionBSAPolicy
|
||||
|
||||
当前 XAttentionBSAPolicy 只返回所有 blocks,修改后:
|
||||
- `select_blocks` 当前返回所有 blocks
|
||||
- `compute_block_attention` 与 FullPolicy 相同
|
||||
- `merge_attention_outputs` 与 FullPolicy 相同
|
||||
- `compute_prefill_attention` 与 FullPolicy 相同
|
||||
|
||||
未来可以实现稀疏计算。
|
||||
|
||||
## Phase 6: 修改 model_runner.py,默认创建 FullPolicy
|
||||
|
||||
### 6.1 当前创建 sparse policy 的逻辑
|
||||
### 4.2 添加 debug 输出位置
|
||||
|
||||
**位置 1: `kvcache/__init__.py` - policy 创建时**
|
||||
```python
|
||||
# 当前:只有指定 sparse_policy_type 时才创建
|
||||
if sparse_policy_type is not None:
|
||||
sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs)
|
||||
sparse_policy = create_sparse_policy(sparse_policy_type, **policy_kwargs)
|
||||
logger.info(f"[DEBUG] Created sparse policy: {sparse_policy}")
|
||||
```
|
||||
|
||||
### 6.2 修改后
|
||||
|
||||
**位置 2: `attention.py` - 调用 policy 时**
|
||||
```python
|
||||
# 默认创建 FullPolicy
|
||||
if sparse_policy_type is None:
|
||||
sparse_policy_type = SparsePolicyType.FULL
|
||||
|
||||
sparse_policy = create_sparse_policy(sparse_policy_type, **kwargs)
|
||||
# 在 _chunked_prefill_attention 中
|
||||
logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, "
|
||||
f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}")
|
||||
```
|
||||
|
||||
### 6.3 位置
|
||||
|
||||
`model_runner.py` 中的 `allocate_kv_cache` 方法。
|
||||
|
||||
## Phase 7: 修改 attention.py,移除所有计算逻辑
|
||||
|
||||
### 7.1 _chunked_prefill_attention 简化
|
||||
|
||||
**当前(伪代码)**:
|
||||
**位置 3: `full_policy.py` - compute_chunked_attention 入口**
|
||||
```python
|
||||
# 获取 cpu_block_table
|
||||
# 调用 select_blocks
|
||||
# 调用 _ring_buffer_pipeline_load(包含计算逻辑)
|
||||
# 计算当前 chunk(flash_attn)
|
||||
# 合并结果(merge)
|
||||
def compute_chunked_attention(self, ...):
|
||||
logger.debug(f"[DEBUG] FullPolicy.compute_chunked_attention called, "
|
||||
f"layer={layer_id}, chunk={current_chunk_idx}, num_tokens={num_tokens}")
|
||||
# ... 实现
|
||||
```
|
||||
|
||||
**位置 4: `full_policy.py` - select_blocks 调用**
|
||||
```python
|
||||
# 在 compute_chunked_attention 内部
|
||||
selected_blocks = self.select_blocks(cpu_block_table, offload_engine, policy_ctx)
|
||||
logger.debug(f"[DEBUG] select_blocks: input={len(cpu_block_table)} blocks, "
|
||||
f"output={len(selected_blocks)} blocks")
|
||||
```
|
||||
|
||||
### 4.3 验证方法
|
||||
|
||||
运行测试并检查日志输出:
|
||||
```bash
|
||||
PYTHONPATH=/home/zijie/Code/nano-vllm:$PYTHONPATH \
|
||||
python tests/test_needle.py --model <model_path> --enable-offload 2>&1 | grep DEBUG
|
||||
```
|
||||
|
||||
预期输出:
|
||||
```
|
||||
[DEBUG] Created sparse policy: FullAttentionPolicy()
|
||||
[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=0
|
||||
[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=0, num_tokens=...
|
||||
[DEBUG] select_blocks: input=0 blocks, output=0 blocks
|
||||
[DEBUG] Calling sparse_policy.compute_chunked_attention, policy=FullAttentionPolicy(), layer=0, chunk=1
|
||||
[DEBUG] FullPolicy.compute_chunked_attention called, layer=0, chunk=1, num_tokens=...
|
||||
[DEBUG] select_blocks: input=1 blocks, output=1 blocks
|
||||
...
|
||||
```
|
||||
|
||||
### 4.4 清理 debug 输出
|
||||
|
||||
验证完成后,将 debug 级别的日志改为更低级别(如 `logger.debug`),或通过环境变量控制:
|
||||
```python
|
||||
if os.environ.get('NANOVLLM_DEBUG_POLICY'):
|
||||
logger.info(f"[DEBUG] ...")
|
||||
```
|
||||
|
||||
## Phase 5: 修改 attention.py
|
||||
|
||||
### 5.1 简化 _chunked_prefill_attention
|
||||
|
||||
**修改后**:
|
||||
```python
|
||||
sparse_policy = kvcache_manager.sparse_policy
|
||||
if sparse_policy is None:
|
||||
raise RuntimeError("sparse_policy is required for chunked prefill")
|
||||
def _chunked_prefill_attention(self, q, k, v, context):
|
||||
kvcache_manager = context.kvcache_manager
|
||||
seq = context.chunked_seq
|
||||
offload_engine = kvcache_manager.offload_engine
|
||||
current_chunk_idx = context.current_chunk_idx
|
||||
num_tokens = k.shape[0]
|
||||
|
||||
o = sparse_policy.compute_prefill_attention(
|
||||
q, k, v, self.layer_id, self.scale,
|
||||
offload_engine, current_chunk_idx, seq, num_tokens
|
||||
)
|
||||
# 获取 sparse policy
|
||||
sparse_policy = kvcache_manager.sparse_policy
|
||||
if sparse_policy is None:
|
||||
raise RuntimeError("sparse_policy is required for chunked prefill")
|
||||
|
||||
# 直接返回,不需要合并(policy 内部已完成所有计算)
|
||||
return o
|
||||
# [DEBUG] 验证执行路径
|
||||
logger.debug(f"[DEBUG] Calling sparse_policy.compute_chunked_attention, "
|
||||
f"policy={sparse_policy}, layer={self.layer_id}, chunk={current_chunk_idx}")
|
||||
|
||||
# 调用 policy 计算 attention(所有计算逻辑在 policy 内部)
|
||||
# 注意:不直接调用 flash_attn 或 merge,全部由 policy 完成
|
||||
final_o = sparse_policy.compute_chunked_attention(
|
||||
q, k, v,
|
||||
self.layer_id,
|
||||
self.scale,
|
||||
offload_engine,
|
||||
current_chunk_idx,
|
||||
seq,
|
||||
num_tokens,
|
||||
)
|
||||
|
||||
# Per-layer ASYNC offload(通过 offload_engine 方法,不直接 copy)
|
||||
if offload_engine is not None and seq is not None:
|
||||
cpu_block_ids, _ = kvcache_manager.get_all_cpu_blocks(seq)
|
||||
if current_chunk_idx < len(cpu_block_ids):
|
||||
cpu_block_id = cpu_block_ids[current_chunk_idx]
|
||||
offload_engine.offload_prefill_buffer_async(
|
||||
self.layer_id, cpu_block_id, num_tokens
|
||||
)
|
||||
|
||||
return final_o
|
||||
```
|
||||
|
||||
### 7.2 删除的方法
|
||||
### 5.2 处理 prefill buffer 写入
|
||||
|
||||
删除以下方法(逻辑移到 policy 中):
|
||||
- `_ring_buffer_pipeline_load` - 逻辑移到 FullPolicy.compute_prefill_attention
|
||||
- `_sync_load_previous_chunks` - 逻辑移到 FullPolicy.compute_prefill_attention
|
||||
当前 `forward()` 方法中有直接 copy 调用:
|
||||
```python
|
||||
# 当前代码(违反目标 3)
|
||||
offload_engine.prefill_k_buffer[self.layer_id, :num_tokens].copy_(k)
|
||||
offload_engine.prefill_v_buffer[self.layer_id, :num_tokens].copy_(v)
|
||||
```
|
||||
|
||||
### 7.3 保留的方法
|
||||
**方案 A**:在 offload_engine 中添加封装方法
|
||||
```python
|
||||
# offload_engine.py
|
||||
def write_prefill_buffer(self, layer_id: int, k: Tensor, v: Tensor, num_tokens: int):
|
||||
self.prefill_k_buffer[layer_id, :num_tokens].copy_(k)
|
||||
self.prefill_v_buffer[layer_id, :num_tokens].copy_(v)
|
||||
|
||||
- `_decode_with_layer_pipeline` - decode 逻辑保持不变
|
||||
- `_decode_ring_buffer_pipeline` - decode 逻辑保持不变
|
||||
# attention.py
|
||||
offload_engine.write_prefill_buffer(self.layer_id, k, v, num_tokens)
|
||||
```
|
||||
|
||||
## Phase 8: 测试验证
|
||||
**方案 B**:将此逻辑移入 policy(作为 compute_chunked_attention 的一部分)
|
||||
|
||||
**推荐方案 A**:保持 attention.py 调用 offload_engine 方法,但不直接操作 buffer。
|
||||
|
||||
### 5.3 删除的方法
|
||||
|
||||
删除以下方法(逻辑已移到 FullPolicy):
|
||||
- `_ring_buffer_pipeline_load`
|
||||
- `_sync_load_previous_chunks`
|
||||
|
||||
### 5.4 保留的方法
|
||||
|
||||
Decode 相关方法保持不变:
|
||||
- `_chunked_decode_attention`
|
||||
- `_decode_with_layer_pipeline`
|
||||
- `_decode_ring_buffer_pipeline`
|
||||
|
||||
## Phase 6: 测试验证
|
||||
|
||||
### 6.1 功能测试
|
||||
|
||||
- [ ] 运行 `test_needle.py --enable-offload` (FULL policy)
|
||||
- [ ] 验证输出正确 (needle value: 7492)
|
||||
- [ ] 验证性能无明显下降
|
||||
- [ ] 验证输出正确(needle value 匹配)
|
||||
- [ ] 检查 debug 日志确认执行路径正确
|
||||
|
||||
### 6.2 代码审查(验收标准检查)
|
||||
|
||||
- [ ] **标准 1**: test_needle.py 通过 ✓
|
||||
- [ ] **标准 2**: `_chunked_prefill_attention` 方法内无 `flash_attn` 或 `merge_attention_outputs` 调用
|
||||
- [ ] **标准 3**: `_chunked_prefill_attention` 方法内无直接 `.copy_()` 调用
|
||||
|
||||
**注意**:标准 2 和 3 仅适用于 chunked prefill 路径。Decode 路径和其他路径可以有 `flash_attn` 调用。
|
||||
|
||||
**验证方法**:
|
||||
|
||||
**方法 1:使用 cclsp LSP 工具验证调用链(推荐)**
|
||||
|
||||
使用 `mcp__cclsp__find_references` 查找计算函数的调用位置,确认 chunked prefill 路径无直接调用:
|
||||
|
||||
```
|
||||
# 查找 flash_attn_with_lse 的所有调用
|
||||
mcp__cclsp__find_references(file_path="nanovllm/layers/attention.py", symbol_name="flash_attn_with_lse")
|
||||
|
||||
# 查找 merge_attention_outputs 的所有调用
|
||||
mcp__cclsp__find_references(file_path="nanovllm/layers/attention.py", symbol_name="merge_attention_outputs")
|
||||
|
||||
# 查找 _chunked_prefill_attention 的实现
|
||||
mcp__cclsp__find_definition(file_path="nanovllm/layers/attention.py", symbol_name="_chunked_prefill_attention")
|
||||
```
|
||||
|
||||
验证结果应显示:
|
||||
- `flash_attn_with_lse` 调用仅出现在 decode 路径或 `full_policy.py` 中
|
||||
- `_chunked_prefill_attention` 内部只调用 `sparse_policy.compute_chunked_attention`
|
||||
|
||||
**方法 2:手动代码审查**
|
||||
|
||||
检查 `_chunked_prefill_attention` 方法实现,确认:
|
||||
1. 只调用 `sparse_policy.compute_chunked_attention(...)`
|
||||
2. 只调用 `offload_engine.offload_prefill_buffer_async(...)` 等 offload_engine 方法
|
||||
3. 不直接调用 `flash_attn_*`、`merge_attention_outputs` 或 `.copy_()`
|
||||
|
||||
```bash
|
||||
# 辅助检查:找出所有 flash_attn 调用位置
|
||||
grep -n "flash_attn\|merge_attention_outputs" nanovllm/layers/attention.py
|
||||
|
||||
# 辅助检查:找出所有 copy 调用位置
|
||||
grep -n "\.copy_\|\.copy(" nanovllm/layers/attention.py
|
||||
```
|
||||
|
||||
### 6.3 回归测试
|
||||
|
||||
- [ ] 验证 decode 阶段不受影响
|
||||
- [ ] 验证非 offload 模式不受影响(如果适用)
|
||||
|
||||
## 关键文件清单
|
||||
|
||||
| 文件 | 修改内容 |
|
||||
|------|----------|
|
||||
| `nanovllm/kvcache/sparse/policy.py` | 添加三个抽象方法,修改 select_blocks 签名 |
|
||||
| `nanovllm/kvcache/sparse/full_policy.py` | 实现三个抽象方法,移动计算逻辑 |
|
||||
| `nanovllm/kvcache/sparse/quest.py` | 实现三个抽象方法 |
|
||||
| `nanovllm/kvcache/sparse/xattn_bsa.py` | 实现三个抽象方法 |
|
||||
| `nanovllm/engine/model_runner.py` | 默认创建 FullPolicy |
|
||||
| `nanovllm/layers/attention.py` | 简化 _chunked_prefill_attention,删除计算方法 |
|
||||
| `nanovllm/kvcache/sparse/policy.py` | 添加 `compute_chunked_attention` 抽象方法,修改 `select_blocks` 签名 |
|
||||
| `nanovllm/kvcache/sparse/full_policy.py` | 重命名方法,修改 `select_blocks` 签名,添加 `select_blocks` 调用,添加 debug 输出 |
|
||||
| `nanovllm/layers/attention.py` | 简化 `_chunked_prefill_attention`,删除 `_ring_buffer_pipeline_load` 和 `_sync_load_previous_chunks`,添加 debug 输出 |
|
||||
| `nanovllm/kvcache/__init__.py` | 添加 policy 创建的 debug 输出 |
|
||||
| `nanovllm/kvcache/offload_engine.py` | (可选)添加 `write_prefill_buffer` 方法封装 |
|
||||
|
||||
## Decisions Made
|
||||
|
||||
- **决策 1**: 三个方法都是抽象方法,强制所有 policy 实现
|
||||
- **决策 2**: compute_prefill_attention 定义完整的 prefill 流程,是 policy 的主入口
|
||||
- **决策 3**: attention.py 只调用 policy.compute_prefill_attention,零计算逻辑
|
||||
- **决策 4**: chunked_prefill 检查 policy 是否存在,不存在则抛出错误
|
||||
- **决策 5**: model_runner 默认创建 FullPolicy 作为兜底
|
||||
- **决策 6**: _ring_buffer_pipeline_load 和 _sync_load_previous_chunks 删除,逻辑移到 policy
|
||||
- **决策 1**: 只添加一个抽象方法 `compute_chunked_attention`(不添加 `compute_block_attention` 和 `merge_attention_outputs`)
|
||||
- **决策 2**: `select_blocks` 接收 `offload_engine` 参数
|
||||
- **决策 3**: 统一使用 `compute_chunked_attention` 命名
|
||||
- **决策 4**: Decode 阶段不处理
|
||||
- **决策 5**: async offload 逻辑保留在 attention.py(通过 offload_engine 方法调用)
|
||||
- **决策 6**: Phase 4 添加 debug 输出验证执行路径,验证完成后可降级或移除
|
||||
- **决策 7**: prefill buffer 写入通过 offload_engine 封装方法实现(方案 A)
|
||||
- **决策 8**: 所有 KV 通信必须通过 offload_engine 方法,不直接调用 torch.copy
|
||||
|
||||
## Errors Encountered
|
||||
|
||||
@@ -350,4 +464,4 @@ return o
|
||||
|
||||
## Status
|
||||
|
||||
**Currently in Phase 1** - 分析当前架构,理解所有计算逻辑的位置
|
||||
**Planning Complete** - v4 计划已完成,包含明确的验收标准和执行路径验证步骤
|
||||
|
||||
114
test_report_sparse_policy_refactor.md
Normal file
114
test_report_sparse_policy_refactor.md
Normal file
@@ -0,0 +1,114 @@
|
||||
# SparsePolicy 重构测试报告
|
||||
|
||||
## 任务概述
|
||||
|
||||
根据 task_plan.md 的要求,对 nanovllm 的 SparsePolicy 架构进行重构(v4 版本),将 chunked prefill attention 计算逻辑从 attention.py 完全迁移到 SparsePolicy。
|
||||
|
||||
## 修改范围
|
||||
|
||||
仅针对 FullPolicy,不涉及 QuestPolicy 或 XAttentionBSAPolicy,不修改 decode 阶段逻辑。
|
||||
|
||||
## 完成的修改
|
||||
|
||||
### 1. policy.py (SparsePolicy 基类)
|
||||
|
||||
- 添加 TYPE_CHECKING imports: `OffloadEngine`, `KVCacheManager`, `Sequence`
|
||||
- 修改 `select_blocks` 签名:添加 `offload_engine` 参数
|
||||
- 添加 `compute_chunked_attention` 抽象方法,参数包括:
|
||||
- `q, k, v`: 张量
|
||||
- `layer_id`: 层索引
|
||||
- `softmax_scale`: softmax 缩放因子
|
||||
- `offload_engine`: OffloadEngine 实例
|
||||
- `kvcache_manager`: KVCacheManager 实例
|
||||
- `current_chunk_idx`: 当前 chunk 索引
|
||||
- `seq`: Sequence 对象
|
||||
- `num_tokens`: 当前 chunk 的 token 数
|
||||
|
||||
### 2. full_policy.py (FullAttentionPolicy)
|
||||
|
||||
- 更新 TYPE_CHECKING imports
|
||||
- `select_blocks` 方法签名添加 `offload_engine` 参数
|
||||
- 重命名 `compute_prefill_attention` → `compute_chunked_attention`
|
||||
- 添加 `kvcache_manager` 参数,替换所有 `seq.kvcache_manager` 引用
|
||||
- 添加 debug 日志输出
|
||||
|
||||
### 3. attention.py
|
||||
|
||||
- 简化 `_chunked_prefill_attention` 方法:
|
||||
- 删除所有 `flash_attn_*` 调用
|
||||
- 删除所有 `merge_attention_outputs` 调用
|
||||
- 仅保留委托调用 `sparse_policy.compute_chunked_attention()`
|
||||
- 删除冗余方法:`_sync_load_previous_chunks`, `_ring_buffer_pipeline_load`
|
||||
- decode 路径的 `select_blocks` 调用添加 `offload_engine` 参数
|
||||
|
||||
## 验收标准检查
|
||||
|
||||
| 标准 | 状态 | 说明 |
|
||||
|------|------|------|
|
||||
| test_needle.py --enable-offload 通过 | ✅ | 测试输出 PASSED |
|
||||
| attention.py chunked prefill path 无 flash_attn_* 调用 | ✅ | `_chunked_prefill_attention` 方法(169-230行)内无直接 flash_attn 调用 |
|
||||
| attention.py chunked prefill path 无 merge_attention_outputs 调用 | ✅ | 同上 |
|
||||
| 所有 KV 通信通过 offload_engine 方法 | ✅ | 全部通过 `offload_engine.load_to_slot_layer`, `get_kv_for_slot`, `get_prefill_buffer_slice` |
|
||||
|
||||
## 测试结果
|
||||
|
||||
```
|
||||
============================================================
|
||||
Needle-in-Haystack Test
|
||||
============================================================
|
||||
Model: /home/zijie/models/Llama-3.1-8B-Instruct
|
||||
Max model len: 131072
|
||||
Input length: 8192
|
||||
Block size: 1024
|
||||
Needle position: 50%
|
||||
Needle value: 7492
|
||||
CPU offload: True
|
||||
Sparse policy: FULL
|
||||
============================================================
|
||||
|
||||
[NeedleTest] Target: 8192, Actual: 8213 tokens (diff=21)
|
||||
Expected: 7492
|
||||
Output: 7492<|eot_id|>...
|
||||
Status: PASSED
|
||||
============================================================
|
||||
|
||||
test_needle: PASSED
|
||||
```
|
||||
|
||||
## 性能指标
|
||||
|
||||
- Prefill: 3527 tok/s
|
||||
- Decode: 11 tok/s
|
||||
- TTFT: 2329.29 ms
|
||||
- TPOT: 655.38 ms
|
||||
|
||||
## 架构变更总结
|
||||
|
||||
**重构前**:
|
||||
```
|
||||
attention.py::_chunked_prefill_attention()
|
||||
├── 获取 cpu_block_table
|
||||
├── 调用 sparse_policy.select_blocks()
|
||||
├── 直接调用 flash_attn_with_lse + merge_attention_outputs
|
||||
└── 返回结果
|
||||
```
|
||||
|
||||
**重构后**:
|
||||
```
|
||||
attention.py::_chunked_prefill_attention()
|
||||
├── 获取 context 信息
|
||||
├── 调用 sparse_policy.compute_chunked_attention() # 委托全部计算
|
||||
└── 返回结果
|
||||
|
||||
sparse_policy.compute_chunked_attention() # 在 FullPolicy 中
|
||||
├── 获取 cpu_block_table
|
||||
├── 调用 self.select_blocks()
|
||||
├── 加载并计算历史 KV attention
|
||||
├── 计算当前 chunk attention (causal)
|
||||
├── 合并所有结果
|
||||
└── 返回最终输出
|
||||
```
|
||||
|
||||
## 结论
|
||||
|
||||
SparsePolicy 架构 v4 重构成功完成。所有验收标准均已满足,测试通过。
|
||||
Reference in New Issue
Block a user