Merge perf_opt-1 and perf_opt-2 branches
Combines two performance optimization features: - perf_opt-1: Cross-layer pipeline for decode (double-buffered layer cache) - perf_opt-2: Per-layer prefill buffer for async offload Both features are complementary and improve CPU offload performance. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -176,6 +176,30 @@ class OffloadEngine:
|
||||
self._pipeline_num_blocks = 0
|
||||
self._pipeline_layer_stream = torch.cuda.Stream() # Dedicated stream for layer loading
|
||||
|
||||
# ========== Per-layer prefill buffer for async offload ==========
|
||||
# During chunked prefill, all layers share the same GPU slot. This means
|
||||
# each layer must wait for offload to complete before the next layer can
|
||||
# write to the same slot. This serializes offloads and hurts performance.
|
||||
# Solution: Maintain separate per-layer buffers for prefill.
|
||||
# Each layer writes to its own buffer, enabling fully async offloads.
|
||||
# Shape: [num_layers, block_size, kv_heads, head_dim]
|
||||
self.prefill_k_buffer = torch.zeros(
|
||||
num_layers, block_size, num_kv_heads, head_dim,
|
||||
dtype=dtype, device="cuda"
|
||||
)
|
||||
self.prefill_v_buffer = torch.zeros(
|
||||
num_layers, block_size, num_kv_heads, head_dim,
|
||||
dtype=dtype, device="cuda"
|
||||
)
|
||||
prefill_buf_mb = 2 * num_layers * block_size * num_kv_heads * head_dim * dtype.itemsize / (1024 * 1024)
|
||||
logger.info(f" Per-layer prefill buffer: {prefill_buf_mb:.1f} MB")
|
||||
|
||||
# Per-layer offload events for async prefill offload
|
||||
# Each layer has its own event to track offload completion
|
||||
self.prefill_offload_events = [torch.cuda.Event() for _ in range(num_layers)]
|
||||
# Per-layer transfer streams for parallel offloads
|
||||
self.prefill_offload_streams = [torch.cuda.Stream() for _ in range(num_layers)]
|
||||
|
||||
# ========== Fixed-address CPU KV cache (pinned memory) ==========
|
||||
self.k_cache_cpu = torch.zeros(
|
||||
num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim,
|
||||
@@ -1213,4 +1237,92 @@ class OffloadEngine:
|
||||
|
||||
def is_pipeline_active(self) -> bool:
|
||||
"""Check if decode pipeline is currently active."""
|
||||
return self._pipeline_active
|
||||
return self._pipeline_active
|
||||
|
||||
# ========== Per-layer Prefill Buffer Methods ==========
|
||||
# These methods enable async offload during chunked prefill by using
|
||||
# per-layer buffers instead of shared GPU slots.
|
||||
|
||||
def get_prefill_buffer(self, layer_id: int) -> Tuple[Tensor, Tensor]:
|
||||
"""
|
||||
Get prefill buffer for a layer.
|
||||
|
||||
Args:
|
||||
layer_id: Layer index
|
||||
|
||||
Returns:
|
||||
(k_buffer, v_buffer), shape: [block_size, kv_heads, head_dim]
|
||||
"""
|
||||
return self.prefill_k_buffer[layer_id], self.prefill_v_buffer[layer_id]
|
||||
|
||||
def get_prefill_buffer_slice(
|
||||
self,
|
||||
layer_id: int,
|
||||
num_tokens: int,
|
||||
) -> Tuple[Tensor, Tensor]:
|
||||
"""
|
||||
Get a slice of prefill buffer for attention computation.
|
||||
|
||||
Args:
|
||||
layer_id: Layer index
|
||||
num_tokens: Number of valid tokens in current chunk
|
||||
|
||||
Returns:
|
||||
(k, v) with shape [1, num_tokens, kv_heads, head_dim]
|
||||
"""
|
||||
k = self.prefill_k_buffer[layer_id, :num_tokens].unsqueeze(0)
|
||||
v = self.prefill_v_buffer[layer_id, :num_tokens].unsqueeze(0)
|
||||
return k, v
|
||||
|
||||
def offload_prefill_buffer_async(
|
||||
self,
|
||||
layer_id: int,
|
||||
cpu_block_id: int,
|
||||
num_valid_tokens: int = -1,
|
||||
) -> None:
|
||||
"""
|
||||
Async offload prefill buffer to CPU (no waiting required).
|
||||
|
||||
This uses per-layer streams and events to enable fully async offloads.
|
||||
Each layer can offload independently without blocking other layers.
|
||||
|
||||
Args:
|
||||
layer_id: Layer index
|
||||
cpu_block_id: Target CPU block ID
|
||||
num_valid_tokens: Number of valid tokens (-1 = use block_size)
|
||||
"""
|
||||
valid_tokens = num_valid_tokens if num_valid_tokens > 0 else self.block_size
|
||||
|
||||
# Collect sparse policy metadata before offload
|
||||
if self.sparse_policy is not None:
|
||||
k_cache = self.prefill_k_buffer[layer_id]
|
||||
self.sparse_policy.on_prefill_offload(cpu_block_id, layer_id, k_cache, valid_tokens)
|
||||
|
||||
# Use per-layer stream for parallel offloads
|
||||
stream = self.prefill_offload_streams[layer_id]
|
||||
|
||||
torch.cuda.nvtx.range_push(f"AsyncPrefillOffload: L{layer_id}->CPU[{cpu_block_id}]")
|
||||
with torch.cuda.stream(stream):
|
||||
# Wait for compute to finish writing to prefill buffer
|
||||
stream.wait_stream(self.compute_stream)
|
||||
|
||||
# Copy from prefill buffer to CPU
|
||||
self.k_cache_cpu[layer_id, cpu_block_id].copy_(
|
||||
self.prefill_k_buffer[layer_id], non_blocking=True
|
||||
)
|
||||
self.v_cache_cpu[layer_id, cpu_block_id].copy_(
|
||||
self.prefill_v_buffer[layer_id], non_blocking=True
|
||||
)
|
||||
|
||||
# Record completion event
|
||||
self.prefill_offload_events[layer_id].record(stream)
|
||||
torch.cuda.nvtx.range_pop()
|
||||
|
||||
def wait_all_prefill_offloads(self) -> None:
|
||||
"""Wait for all prefill buffer offloads to complete."""
|
||||
for stream in self.prefill_offload_streams:
|
||||
stream.synchronize()
|
||||
|
||||
def wait_prefill_offload(self, layer_id: int) -> None:
|
||||
"""Wait for a specific layer's prefill offload to complete."""
|
||||
self.prefill_offload_events[layer_id].synchronize()
|
||||
|
||||
Reference in New Issue
Block a user