Compare commits
4 Commits
tzj/layer-
...
b8c00399af
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8c00399af | ||
|
|
13586e689b | ||
|
|
e72725c12b | ||
|
|
cfb188c34a |
6
.gitmodules
vendored
6
.gitmodules
vendored
@@ -1,4 +1,4 @@
|
|||||||
[submodule "3rdparty/Block-Sparse-Attention"]
|
[submodule "3rdparty/Block-SparseAttention"]
|
||||||
path = 3rdparty/Block-Sparse-Attention
|
path = 3rdparty/Block-SparseAttention
|
||||||
url = git@github.com:Zijie-Tian/Block-Sparse-Attention.git
|
url = https://github.com/Zijie-Tian/Block-SparseAttention.git
|
||||||
branch = tzj/minference
|
branch = tzj/minference
|
||||||
|
|||||||
@@ -64,6 +64,7 @@ PYTHONPATH=/home/zijie/Code/nano-vllm:$PYTHONPATH python tests/test_needle.py
|
|||||||
| [`docs/xattention_integration.md`](docs/xattention_integration.md) | XAttention integration guide: algorithm, implementation, design decisions, and testing |
|
| [`docs/xattention_integration.md`](docs/xattention_integration.md) | XAttention integration guide: algorithm, implementation, design decisions, and testing |
|
||||||
| [`docs/xattention_analysis.md`](docs/xattention_analysis.md) | XAttention algorithm analysis: chunked estimation, block sparse attention, integration design |
|
| [`docs/xattention_analysis.md`](docs/xattention_analysis.md) | XAttention algorithm analysis: chunked estimation, block sparse attention, integration design |
|
||||||
| [`docs/development_notes.md`](docs/development_notes.md) | Development notes and scratchpad for ongoing work |
|
| [`docs/development_notes.md`](docs/development_notes.md) | Development notes and scratchpad for ongoing work |
|
||||||
|
| [`docs/chunked_prefill_analysis.md`](docs/chunked_prefill_analysis.md) | **NEW**: Chunked prefill for ultra-long sequences (1M+), memory analysis, MLP activation breakdown, implementation guide |
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
|
|||||||
1055
docs/chunked_prefill_analysis.md
Normal file
1055
docs/chunked_prefill_analysis.md
Normal file
File diff suppressed because it is too large
Load Diff
354
docs/chunked_prefill_integration_plan.md
Normal file
354
docs/chunked_prefill_integration_plan.md
Normal file
@@ -0,0 +1,354 @@
|
|||||||
|
# Chunked Prefill 集成计划
|
||||||
|
|
||||||
|
**目标**: 将 tzj/minference 分支的 chunked prefill 机制移植到 tzj/vs_offload 分支
|
||||||
|
|
||||||
|
**创建日期**: 2026-01-18
|
||||||
|
**基础分支**: `tzj/vs_offload`
|
||||||
|
**源分支**: `tzj/minference`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 目标
|
||||||
|
|
||||||
|
在 tzj/vs_offload 分支上实现 chunked prefill + layerwise offload 机制,支持在 24GB RTX 3090 上运行任意长度的推理(4M, 8M, 16M+ tokens)。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 核心问题
|
||||||
|
|
||||||
|
### tzj/vs_offload 分支的局限性
|
||||||
|
|
||||||
|
当前 tzj/vs_offload 分支的 GPU ring buffer 按 `max_seq_len` 分配,导致 GPU 内存随序列长度线性增长:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# 当前设计
|
||||||
|
self.layer_k_cache = torch.zeros(
|
||||||
|
num_kv_buffers, # e.g., 4
|
||||||
|
max_seq_len, # e.g., 131072 tokens
|
||||||
|
kv_heads,
|
||||||
|
head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
**问题**:
|
||||||
|
- GPU 内存需求 ~ `max_seq_len × 4 × 8 × 128 × 2 bytes`
|
||||||
|
- 对于超长序列不可行:
|
||||||
|
- 4M tokens → ~64 GB GPU 内存 ❌
|
||||||
|
- 8M tokens → ~128 GB GPU 内存 ❌
|
||||||
|
|
||||||
|
### 解决方案:Block-Based 设计
|
||||||
|
|
||||||
|
tzj/minference 分支采用 block-based 设计,GPU 内存固定:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Block-based 设计
|
||||||
|
self.k_cache_gpu = torch.zeros(
|
||||||
|
num_gpu_blocks, # e.g., 2
|
||||||
|
block_size, # e.g., 1024 tokens (固定!)
|
||||||
|
kv_heads,
|
||||||
|
head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
)
|
||||||
|
# GPU 内存: ~4 MB (固定,不随序列长度增长)
|
||||||
|
```
|
||||||
|
|
||||||
|
**优势**:
|
||||||
|
- GPU 内存固定(~1.6 GB),不随序列长度增长
|
||||||
|
- 24GB RTX 3090 可运行 4M+ tokens
|
||||||
|
- 通过 chunked prefill 分块处理超长序列
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 内存布局对比
|
||||||
|
|
||||||
|
| 组件 | tzj/vs_offload | tzj/minference | 说明 |
|
||||||
|
|------|---------------|----------------|------|
|
||||||
|
| **GPU Ring Buffer** | `[num_kv_buffers, max_seq_len, ...]` | `[num_gpu_blocks, block_size, ...]` | minference 无 layer 维度 |
|
||||||
|
| **GPU 内存** | ~2.15 GB (128K) → ~64 GB (4M) | ~4 MB (固定) | minference 节省显著 |
|
||||||
|
| **Prefill Buffer** | ❌ 无 | ✅ `[num_layers, block_size, ...]` | minference 独有 |
|
||||||
|
| **Pipeline Buffers** | ❌ 无 | ✅ 双缓冲区 `[blocks, block_size, ...]` | minference 独有 |
|
||||||
|
| **CPU Cache** | `[num_layers, num_cpu_blocks, block_size, ...]` | 相同 | **一致** |
|
||||||
|
|
||||||
|
### 序列长度支持对比
|
||||||
|
|
||||||
|
| 序列长度 | vs_offload GPU 内存 | minference GPU 内存 | RTX 3090 (24GB) |
|
||||||
|
|----------|-------------------|---------------------|-----------------|
|
||||||
|
| 128K tokens | ~2.15 GB | ~4 MB | ✅ 两者均可 |
|
||||||
|
| 1M tokens | ~16 GB | ~4 MB | ✅ 两者均可 |
|
||||||
|
| **4M tokens** | **~64 GB** ❌ | **~4 MB** ✅ | **仅 minference 可行** |
|
||||||
|
| **8M tokens** | **~128 GB** ❌ | **~4 MB** ✅ | **仅 minference 可行** |
|
||||||
|
| **16M+ tokens** | **~256 GB+** ❌ | **~4 MB** ✅ | **仅 minference 可行** |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 关键设计原则
|
||||||
|
|
||||||
|
1. **Block-Based 设计**:按 `block_size` (1024 tokens) 组织,支持 chunked prefill
|
||||||
|
2. **GPU 内存固定**:不随序列长度增长,是 constant factor
|
||||||
|
3. **CPU 内存线性缩放**:`num_cpu_blocks = ceil(seq_len / block_size)`
|
||||||
|
4. **Unified Ring Buffer**:无 layer 维度,所有层共享 slots
|
||||||
|
5. **完全并行 offload**:per-layer buffer 最大化 PCIe 带宽
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 统一内存布局设计
|
||||||
|
|
||||||
|
### GPU Memory Layout
|
||||||
|
|
||||||
|
```python
|
||||||
|
class OffloadEngine:
|
||||||
|
# 1. Unified Ring Buffer - Block-based,无 layer 维度
|
||||||
|
self.k_cache_gpu = torch.zeros(
|
||||||
|
num_gpu_blocks, # e.g., 2
|
||||||
|
block_size, # e.g., 1024
|
||||||
|
kv_heads,
|
||||||
|
head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
) # ~4 MB (固定)
|
||||||
|
|
||||||
|
# 2. Per-layer Prefill Buffer - 完全并行 offload
|
||||||
|
self.prefill_k_buffer = torch.zeros(
|
||||||
|
num_layers, block_size, kv_heads, head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
) # ~58 MB (固定)
|
||||||
|
|
||||||
|
# 3. Cross-layer Pipeline Buffers - Double-buffering
|
||||||
|
self.layer_k_buffer_a = torch.zeros(
|
||||||
|
max_prefill_blocks, block_size, kv_heads, head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
) # ~512 MB (固定)
|
||||||
|
self.layer_k_buffer_b = torch.zeros(...) # ~512 MB (固定)
|
||||||
|
|
||||||
|
# 4. Per-layer Decode Buffer
|
||||||
|
self.decode_k_buffer = torch.zeros(
|
||||||
|
num_layers, block_size, kv_heads, head_dim,
|
||||||
|
dtype=dtype, device="cuda"
|
||||||
|
) # ~58 MB (固定)
|
||||||
|
|
||||||
|
# GPU 总计:~1.6 GB (固定,不随序列长度增长)
|
||||||
|
```
|
||||||
|
|
||||||
|
### CPU Memory Layout
|
||||||
|
|
||||||
|
```python
|
||||||
|
# CPU Cache - 有 block 维度
|
||||||
|
self.k_cache_cpu = torch.zeros(
|
||||||
|
num_layers,
|
||||||
|
num_cpu_blocks, # 随序列长度缩放
|
||||||
|
block_size,
|
||||||
|
kv_heads,
|
||||||
|
head_dim,
|
||||||
|
dtype=dtype, device="cpu", pin_memory=True
|
||||||
|
)
|
||||||
|
# 128K tokens: ~2.9 GB
|
||||||
|
# 1M tokens: ~5.8 GB
|
||||||
|
# 4M tokens: ~23.3 GB
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Chunked Prefill 流程
|
||||||
|
|
||||||
|
### Prefill 阶段
|
||||||
|
|
||||||
|
```
|
||||||
|
For each chunk:
|
||||||
|
├── 1. Prepare chunk input (block_size tokens)
|
||||||
|
├── 2. Get ring buffer slot: slot = chunk_idx % num_gpu_blocks
|
||||||
|
├── 3. Load previous KV chunks to ring slots[1..N-1]
|
||||||
|
├── 4. Model Forward (all layers)
|
||||||
|
│ For each layer:
|
||||||
|
│ ├── Load previous KV from ring slots
|
||||||
|
│ ├── Compute attention (current chunk + previous)
|
||||||
|
│ ├── Write KV to prefill_buffer[layer_id] ← Per-layer!
|
||||||
|
│ └── Async offload to CPU (parallel across layers)
|
||||||
|
├── 5. Merge attention outputs (LSE)
|
||||||
|
└── 6. Record compute done for slot
|
||||||
|
|
||||||
|
Key: Per-layer prefill buffer → Layer 0 offload || Layer 1 compute || Layer 2 load ...
|
||||||
|
```
|
||||||
|
|
||||||
|
### Decode 阶段
|
||||||
|
|
||||||
|
```
|
||||||
|
├── 1. Setup pipeline: preload Layer 0 to buffer_a
|
||||||
|
├── 2. For each layer:
|
||||||
|
│ ├── Get KV from pipeline buffer (a or b)
|
||||||
|
│ ├── Trigger preload of next layer to other buffer
|
||||||
|
│ ├── Compute attention
|
||||||
|
│ └── Store to decode buffer
|
||||||
|
└── 3. End pipeline
|
||||||
|
|
||||||
|
Key: Double-buffering → Layer N compute || Layer N+1 load
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 合并策略
|
||||||
|
|
||||||
|
### 基础分支选择:tzj/vs_offload
|
||||||
|
|
||||||
|
**原因**:
|
||||||
|
1. 更完善的文档系统
|
||||||
|
2. 更完整的 sparse attention 实现(QUEST, XAttention 等)
|
||||||
|
3. 更清晰的代码组织和注释
|
||||||
|
4. 更活跃的开发维护
|
||||||
|
|
||||||
|
### 移植策略
|
||||||
|
|
||||||
|
**从 tzj/minference 移植**:
|
||||||
|
1. GPU cache 内存布局(无 layer 维度,block-based)
|
||||||
|
2. Per-layer prefill buffer
|
||||||
|
3. Cross-layer pipeline buffers
|
||||||
|
4. Chunked prefill 流程
|
||||||
|
5. LSE 在线合并机制
|
||||||
|
|
||||||
|
**保留 tzj/vs_offload 优势**:
|
||||||
|
1. 文档系统
|
||||||
|
2. Sparse policy 架构
|
||||||
|
3. 代码组织和注释
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Sparse Policy 策略
|
||||||
|
|
||||||
|
**策略**:保留架构,现阶段仅实现 FULL
|
||||||
|
|
||||||
|
- **保留** sparse policy 的架构设计和接口
|
||||||
|
- **预留** 扩展接口给未来的 QUEST 等其他策略
|
||||||
|
- **现阶段仅实现** FULL 策略,确保正确性和稳定性
|
||||||
|
|
||||||
|
### 实现
|
||||||
|
|
||||||
|
```python
|
||||||
|
class SparsePolicy(ABC):
|
||||||
|
@property
|
||||||
|
def supports_prefill(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def supports_decode(self) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def on_prefill_offload(self, cpu_block_id, layer_id, k_cache, num_valid_tokens):
|
||||||
|
"""预留给未来策略(如 QUEST 收集元数据)"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def select_blocks(self, available_blocks, context) -> List[int]:
|
||||||
|
"""FULL: 返回所有可用块"""
|
||||||
|
return available_blocks
|
||||||
|
|
||||||
|
class FullAttentionPolicy(SparsePolicy):
|
||||||
|
@property
|
||||||
|
def supports_prefill(self) -> bool:
|
||||||
|
return True
|
||||||
|
|
||||||
|
@property
|
||||||
|
def supports_decode(self) -> bool:
|
||||||
|
return True
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 关键 API
|
||||||
|
|
||||||
|
### Ring Buffer 管理
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Prefill 阶段
|
||||||
|
get_write_slot_for_prefill(chunk_idx) -> slot_idx
|
||||||
|
get_load_slots_for_prefill(write_slot_idx) -> [slot_ids]
|
||||||
|
|
||||||
|
# Decode 阶段
|
||||||
|
get_load_slots_for_decode() -> [slot_ids] (excludes decode_slot)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Per-layer 操作
|
||||||
|
|
||||||
|
```python
|
||||||
|
# 加载
|
||||||
|
load_to_slot_layer(slot_idx, layer_id, cpu_block_id)
|
||||||
|
wait_slot_layer(slot_idx)
|
||||||
|
|
||||||
|
# Prefill buffer
|
||||||
|
get_prefill_buffer(layer_id) -> (k, v)
|
||||||
|
offload_prefill_buffer_async(layer_id, cpu_block_id, num_tokens)
|
||||||
|
wait_prefill_offload(layer_id)
|
||||||
|
|
||||||
|
# Pipeline
|
||||||
|
start_decode_pipeline(cpu_block_ids)
|
||||||
|
get_decode_layer_kv(layer_id, num_blocks) -> (k, v)
|
||||||
|
end_decode_pipeline()
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 实施阶段
|
||||||
|
|
||||||
|
### Phase 1: 内存布局重构
|
||||||
|
- 修改 GPU cache 为 unified ring buffer
|
||||||
|
- 添加 per-layer prefill buffer
|
||||||
|
- 添加 cross-layer pipeline buffers
|
||||||
|
|
||||||
|
### Phase 2: API 实现
|
||||||
|
- 实现 ring buffer slot 管理 API
|
||||||
|
- 实现 per-layer prefill offload API
|
||||||
|
- 实现 cross-layer pipeline API
|
||||||
|
|
||||||
|
### Phase 3: 集成到 Attention Layer
|
||||||
|
- 修改 attention forward 流程
|
||||||
|
- 集成 per-layer prefill buffer
|
||||||
|
- 集成 cross-layer pipeline
|
||||||
|
|
||||||
|
### Phase 4: 集成到 Model Runner
|
||||||
|
- 实现 chunked prefill 流程
|
||||||
|
- 集成 LSE 合并
|
||||||
|
- 优化流水线
|
||||||
|
|
||||||
|
### Phase 5: Sparse Policy 集成(FULL)
|
||||||
|
- 设计统一的策略接口
|
||||||
|
- 实现 FullAttentionPolicy
|
||||||
|
- 预留 QUEST 等未来策略的扩展接口
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 关键决策
|
||||||
|
|
||||||
|
1. **Block-Based 设计优先**:支持任意长度推理的核心
|
||||||
|
2. **采用 tzj/minference 的内存布局**:GPU cache 无 layer 维度 + block-based
|
||||||
|
3. **以 tzj/vs_offload 为基础分支**:更好的文档和代码组织
|
||||||
|
4. **分阶段合并策略**:降低复杂度,便于验证
|
||||||
|
5. **Sparse Policy - FULL 优先**:保留架构,现阶段仅实现 FULL
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 预期结果
|
||||||
|
|
||||||
|
### 内存使用(28层模型,block_size=1024)
|
||||||
|
|
||||||
|
| 组件 | 内存 |
|
||||||
|
|------|------|
|
||||||
|
| GPU Unified Ring Buffer | ~4 MB |
|
||||||
|
| GPU Per-layer Prefill Buffer | ~58 MB |
|
||||||
|
| GPU Pipeline Buffers (×2) | ~1 GB |
|
||||||
|
| GPU Decode Buffer | ~58 MB |
|
||||||
|
| **GPU 总计** | **~1.6 GB (固定)** |
|
||||||
|
| CPU Cache (4M tokens) | ~23.3 GB |
|
||||||
|
| **总计 (4M tokens)** | **~24.9 GB** ✅ 适配 24GB RTX 3090 |
|
||||||
|
|
||||||
|
### 性能支持
|
||||||
|
|
||||||
|
- ✅ 支持 4M, 8M, 16M+ tokens 的推理
|
||||||
|
- ✅ GPU 内存固定,不随序列长度增长
|
||||||
|
- ✅ 完全并行的 layerwise offload
|
||||||
|
- ✅ Cross-layer 流水线优化
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 参考
|
||||||
|
|
||||||
|
- **OffloadEngine**: `nanovllm/kvcache/offload_engine.py`
|
||||||
|
- **Attention Layer**: `nanovllm/layers/attention.py`
|
||||||
|
- **Model Runner**: `nanovllm/engine/model_runner.py`
|
||||||
|
- **Sparse Policy**: `nanovllm/kvcache/sparse/policy.py`
|
||||||
841
tests/test_offload_unified.py
Normal file
841
tests/test_offload_unified.py
Normal file
@@ -0,0 +1,841 @@
|
|||||||
|
"""
|
||||||
|
OffloadedTensor 统一测试套件
|
||||||
|
|
||||||
|
本文件整合了 OffloadedTensor 的所有测试,包括:
|
||||||
|
1. 基础功能验证
|
||||||
|
2. Chunked GEMM 测试
|
||||||
|
3. 同步分析
|
||||||
|
|
||||||
|
核心组件:
|
||||||
|
- OffloadedTensor: 虚拟 GPU Tensor,支持透明 CPU/GPU 数据移动
|
||||||
|
- OffloadManager: LRU 缓存管理,支持同步/异步传输
|
||||||
|
- ChunkedOffloadLinear: 沿着 seqlen 维度分块的 Linear 层
|
||||||
|
"""
|
||||||
|
|
||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
import weakref
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Optional, Dict, List, Tuple, Any
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Part 1: 核心组件
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
class OffloadedTensor(torch.Tensor):
|
||||||
|
"""
|
||||||
|
虚拟 GPU Tensor:假装在 GPU 上,实际可能在 CPU
|
||||||
|
|
||||||
|
所有计算操作通过 __torch_dispatch__ 拦截,
|
||||||
|
在计算前自动加载数据到 GPU。
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __new__(cls, real_tensor: torch.Tensor, manager: 'OffloadManager', tensor_id: int):
|
||||||
|
device = torch.device("cuda", torch.cuda.current_device())
|
||||||
|
ret = torch.Tensor._make_wrapper_subclass(
|
||||||
|
cls,
|
||||||
|
real_tensor.size(),
|
||||||
|
strides=real_tensor.stride(),
|
||||||
|
dtype=real_tensor.dtype,
|
||||||
|
device=device,
|
||||||
|
requires_grad=real_tensor.requires_grad
|
||||||
|
)
|
||||||
|
ret._real_tensor = real_tensor
|
||||||
|
ret._manager = weakref.ref(manager)
|
||||||
|
ret._tensor_id = tensor_id
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __init__(self, real_tensor: torch.Tensor, manager: 'OffloadManager', tensor_id: int):
|
||||||
|
self._real_tensor = real_tensor
|
||||||
|
self._manager = weakref.ref(manager)
|
||||||
|
self._tensor_id = tensor_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def device(self) -> torch.device:
|
||||||
|
"""永远返回 CUDA device,欺骗 PyTorch 的检查"""
|
||||||
|
return torch.device("cuda", torch.cuda.current_device())
|
||||||
|
|
||||||
|
def to(self, *args, **kwargs):
|
||||||
|
"""拦截 .to() 调用"""
|
||||||
|
device = None
|
||||||
|
if args and isinstance(args[0], torch.device):
|
||||||
|
device = args[0]
|
||||||
|
elif 'device' in kwargs:
|
||||||
|
device = kwargs['device']
|
||||||
|
|
||||||
|
if device and device.type == "cuda":
|
||||||
|
return self
|
||||||
|
return super().to(*args, **kwargs)
|
||||||
|
|
||||||
|
def __torch_dispatch__(self, func, types, args=(), kwargs=None):
|
||||||
|
"""拦截所有 PyTorch 操作,自动加载数据"""
|
||||||
|
kwargs = kwargs or {}
|
||||||
|
|
||||||
|
manager = self._manager()
|
||||||
|
if manager:
|
||||||
|
manager.stats['dispatch_count'] += 1
|
||||||
|
|
||||||
|
# 特殊处理:detach 返回 self
|
||||||
|
func_name = getattr(func, 'name', '')
|
||||||
|
if isinstance(func_name, str) and 'detach' in func_name.lower():
|
||||||
|
return self
|
||||||
|
|
||||||
|
# 解包 OffloadedTensor 为真实 tensor
|
||||||
|
def unwrap(t):
|
||||||
|
if isinstance(t, OffloadedTensor):
|
||||||
|
mgr = t._manager()
|
||||||
|
if mgr:
|
||||||
|
return mgr.get_gpu_tensor(t._real_tensor, t._tensor_id)
|
||||||
|
return t._real_tensor.cuda()
|
||||||
|
return t
|
||||||
|
|
||||||
|
new_args = torch.utils._pytree.tree_map(unwrap, args)
|
||||||
|
new_kwargs = torch.utils._pytree.tree_map(unwrap, kwargs)
|
||||||
|
|
||||||
|
result = func(*new_args, **new_kwargs)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
class OffloadManager:
|
||||||
|
"""
|
||||||
|
管理 tensor 的卸载和预取
|
||||||
|
|
||||||
|
特性:
|
||||||
|
- LRU 缓存管理 GPU 上的张量
|
||||||
|
- 支持同步/异步传输模式
|
||||||
|
- 完整的性能统计
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
device: str = "cuda",
|
||||||
|
offload_device: str = "cpu",
|
||||||
|
max_gpu_tensors: int = 2,
|
||||||
|
non_blocking: bool = False,
|
||||||
|
):
|
||||||
|
self.device = torch.device(device)
|
||||||
|
self.offload_device = torch.device(offload_device)
|
||||||
|
self._gpu_pool: Dict[int, torch.Tensor] = {}
|
||||||
|
self._cpu_storage: Dict[int, torch.Tensor] = {}
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._tensor_id_counter = 0
|
||||||
|
self._max_gpu_tensors = max_gpu_tensors
|
||||||
|
self._access_order: List[int] = []
|
||||||
|
self.non_blocking = non_blocking
|
||||||
|
|
||||||
|
# 统计信息
|
||||||
|
self.stats = {
|
||||||
|
'load_count': 0,
|
||||||
|
'evict_count': 0,
|
||||||
|
'dispatch_count': 0,
|
||||||
|
'transfer_times_ms': [],
|
||||||
|
}
|
||||||
|
|
||||||
|
def _next_id(self) -> int:
|
||||||
|
tid = self._tensor_id_counter
|
||||||
|
self._tensor_id_counter += 1
|
||||||
|
return tid
|
||||||
|
|
||||||
|
def wrap(self, tensor: torch.Tensor) -> OffloadedTensor:
|
||||||
|
"""包装 tensor 为虚拟 GPU tensor"""
|
||||||
|
if isinstance(tensor, OffloadedTensor):
|
||||||
|
return tensor
|
||||||
|
|
||||||
|
tensor_id = self._next_id()
|
||||||
|
cpu_tensor = tensor.detach().to(self.offload_device)
|
||||||
|
self._cpu_storage[tensor_id] = cpu_tensor
|
||||||
|
|
||||||
|
return OffloadedTensor(cpu_tensor, self, tensor_id)
|
||||||
|
|
||||||
|
def get_gpu_tensor(self, real_tensor: torch.Tensor, tensor_id: int) -> torch.Tensor:
|
||||||
|
"""获取 GPU 上的数据(LRU 缓存)"""
|
||||||
|
with self._lock:
|
||||||
|
self.stats['load_count'] += 1
|
||||||
|
|
||||||
|
if tensor_id in self._gpu_pool:
|
||||||
|
# 已在 GPU 上,更新 LRU
|
||||||
|
if tensor_id in self._access_order:
|
||||||
|
self._access_order.remove(tensor_id)
|
||||||
|
self._access_order.append(tensor_id)
|
||||||
|
return self._gpu_pool[tensor_id]
|
||||||
|
|
||||||
|
# LRU 驱逐
|
||||||
|
while len(self._gpu_pool) >= self._max_gpu_tensors:
|
||||||
|
if self._access_order:
|
||||||
|
evict_id = self._access_order.pop(0)
|
||||||
|
if evict_id in self._gpu_pool:
|
||||||
|
del self._gpu_pool[evict_id]
|
||||||
|
self.stats['evict_count'] += 1
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 加载到 GPU
|
||||||
|
cpu_tensor = self._cpu_storage.get(tensor_id, real_tensor)
|
||||||
|
gpu_tensor = cpu_tensor.to(self.device, non_blocking=self.non_blocking)
|
||||||
|
self._gpu_pool[tensor_id] = gpu_tensor
|
||||||
|
self._access_order.append(tensor_id)
|
||||||
|
|
||||||
|
return gpu_tensor
|
||||||
|
|
||||||
|
def get_stats(self) -> Dict[str, Any]:
|
||||||
|
"""获取统计信息"""
|
||||||
|
transfer_times = self.stats['transfer_times_ms']
|
||||||
|
return {
|
||||||
|
'load_count': self.stats['load_count'],
|
||||||
|
'evict_count': self.stats['evict_count'],
|
||||||
|
'dispatch_count': self.stats['dispatch_count'],
|
||||||
|
'gpu_pool_size': len(self._gpu_pool),
|
||||||
|
'total_tensors': len(self._cpu_storage),
|
||||||
|
'total_transfer_time_ms': sum(transfer_times),
|
||||||
|
'avg_transfer_time_ms': sum(transfer_times) / len(transfer_times) if transfer_times else 0,
|
||||||
|
'transfer_times_ms': list(transfer_times),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class OffloadModuleWrapper(nn.Module):
|
||||||
|
"""包装 nn.Module,实现参数级别的卸载"""
|
||||||
|
|
||||||
|
def __init__(self, module: nn.Module, manager: OffloadManager):
|
||||||
|
super().__init__()
|
||||||
|
self._original_module = module
|
||||||
|
self._manager = manager
|
||||||
|
self._wrap_parameters(module, "")
|
||||||
|
|
||||||
|
def _wrap_parameters(self, module: nn.Module, prefix: str):
|
||||||
|
"""递归包装模块的所有参数"""
|
||||||
|
for name, param in list(module.named_parameters(recurse=False)):
|
||||||
|
param.requires_grad_(False)
|
||||||
|
wrapped = self._manager.wrap(param.data)
|
||||||
|
delattr(module, name)
|
||||||
|
setattr(module, name, wrapped)
|
||||||
|
|
||||||
|
for child_name, child in list(module.named_children()):
|
||||||
|
self._wrap_parameters(child, prefix + child_name + ".")
|
||||||
|
|
||||||
|
def forward(self, *args, **kwargs):
|
||||||
|
return self._original_module(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Part 2: 高级模块
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
class ChunkedOffloadLinear(nn.Module):
|
||||||
|
"""
|
||||||
|
沿着 seqlen 维度分块的 Linear 层
|
||||||
|
|
||||||
|
将输入 [seqlen, in_features] 分成多个 chunks,每个 chunk 独立进行 GEMM 计算。
|
||||||
|
weight 使用 OffloadedTensor,按需加载到 GPU。
|
||||||
|
|
||||||
|
Args:
|
||||||
|
in_features: 输入特征维度
|
||||||
|
out_features: 输出特征维度
|
||||||
|
chunk_size: 每个 chunk 的大小
|
||||||
|
max_gpu_tensors: GPU 上最多缓存的 tensor 数量
|
||||||
|
non_blocking: 是否使用异步传输
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
in_features: int,
|
||||||
|
out_features: int,
|
||||||
|
chunk_size: int = 4096,
|
||||||
|
max_gpu_tensors: int = 2,
|
||||||
|
non_blocking: bool = False,
|
||||||
|
bias: bool = False,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
self.in_features = in_features
|
||||||
|
self.out_features = out_features
|
||||||
|
self.chunk_size = chunk_size
|
||||||
|
|
||||||
|
self.manager = OffloadManager(
|
||||||
|
max_gpu_tensors=max_gpu_tensors,
|
||||||
|
non_blocking=non_blocking
|
||||||
|
)
|
||||||
|
|
||||||
|
weight_tensor = torch.empty(out_features, in_features, dtype=torch.float16)
|
||||||
|
nn.init.xavier_uniform_(weight_tensor)
|
||||||
|
weight_tensor.requires_grad_(False)
|
||||||
|
|
||||||
|
self.weight = self.manager.wrap(weight_tensor)
|
||||||
|
self.bias = None
|
||||||
|
if bias:
|
||||||
|
self.bias = nn.Parameter(torch.empty(out_features))
|
||||||
|
|
||||||
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
||||||
|
seqlen = x.shape[0]
|
||||||
|
|
||||||
|
if seqlen <= self.chunk_size:
|
||||||
|
return self._compute_chunk(x)
|
||||||
|
|
||||||
|
outputs = []
|
||||||
|
for start_idx in range(0, seqlen, self.chunk_size):
|
||||||
|
end_idx = min(start_idx + self.chunk_size, seqlen)
|
||||||
|
chunk = x[start_idx:end_idx]
|
||||||
|
chunk_output = self._compute_chunk(chunk)
|
||||||
|
outputs.append(chunk_output)
|
||||||
|
|
||||||
|
return torch.cat(outputs, dim=0)
|
||||||
|
|
||||||
|
def _compute_chunk(self, chunk: torch.Tensor) -> torch.Tensor:
|
||||||
|
return torch.nn.functional.linear(chunk, self.weight, self.bias)
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# 辅助函数
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def calculate_memory(
|
||||||
|
seqlen: int,
|
||||||
|
in_features: int,
|
||||||
|
out_features: int,
|
||||||
|
dtype: torch.dtype = torch.float16,
|
||||||
|
) -> Dict[str, float]:
|
||||||
|
"""计算显存占用(MB)"""
|
||||||
|
element_size = torch.finfo(dtype).bits / 8
|
||||||
|
|
||||||
|
activation = seqlen * in_features * element_size / (1024 ** 2)
|
||||||
|
weight = in_features * out_features * element_size / (1024 ** 2)
|
||||||
|
output = seqlen * out_features * element_size / (1024 ** 2)
|
||||||
|
|
||||||
|
total = activation + weight + output
|
||||||
|
peak = max(activation, output) + weight
|
||||||
|
|
||||||
|
return {
|
||||||
|
'activation_mb': activation,
|
||||||
|
'weight_mb': weight,
|
||||||
|
'output_mb': output,
|
||||||
|
'total_mb': total,
|
||||||
|
'peak_mb': peak,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def run_benchmark(
|
||||||
|
layer: nn.Module,
|
||||||
|
input_tensor: torch.Tensor,
|
||||||
|
num_runs: int = 3,
|
||||||
|
) -> Dict[str, float]:
|
||||||
|
"""运行性能测试"""
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
# Warmup
|
||||||
|
with torch.no_grad():
|
||||||
|
_ = layer(input_tensor)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
# Benchmark
|
||||||
|
start_time = time.time()
|
||||||
|
for _ in range(num_runs):
|
||||||
|
with torch.no_grad():
|
||||||
|
output = layer(input_tensor)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
avg_time = elapsed / num_runs
|
||||||
|
|
||||||
|
total_elements = input_tensor.numel() + output.numel()
|
||||||
|
throughput = total_elements / avg_time / 1e6
|
||||||
|
|
||||||
|
return {
|
||||||
|
'avg_time_ms': avg_time * 1000,
|
||||||
|
'throughput_meps': throughput,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Part 3: 测试套件 - 功能测试
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def test_1_basic_offloaded_tensor():
|
||||||
|
"""测试 OffloadedTensor 基本功能"""
|
||||||
|
print("\n=== Test 1: Basic OffloadedTensor ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
manager = OffloadManager(max_gpu_tensors=2)
|
||||||
|
|
||||||
|
t1 = torch.randn(4, 4)
|
||||||
|
t2 = torch.randn(4, 4)
|
||||||
|
t3 = torch.randn(4, 4)
|
||||||
|
|
||||||
|
w1 = manager.wrap(t1)
|
||||||
|
w2 = manager.wrap(t2)
|
||||||
|
w3 = manager.wrap(t3)
|
||||||
|
|
||||||
|
print(f"✓ Created OffloadedTensors")
|
||||||
|
print(f" w1.device: {w1.device}")
|
||||||
|
print(f" w2.device: {w2.device}")
|
||||||
|
|
||||||
|
assert w1.device.type == "cuda"
|
||||||
|
print(f"✓ is_cuda check passed")
|
||||||
|
|
||||||
|
result = w1 + w2
|
||||||
|
print(f"✓ Addition works: {result.shape}")
|
||||||
|
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ Manager stats: {stats}")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_2_mlp_with_offload():
|
||||||
|
"""测试 MLP 模型使用 OffloadedTensor"""
|
||||||
|
print("\n=== Test 2: MLP with OffloadedTensor ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
class SimpleMLP(nn.Module):
|
||||||
|
def __init__(self, hidden_size=128, intermediate_size=256):
|
||||||
|
super().__init__()
|
||||||
|
self.gate_up_proj = nn.Linear(hidden_size, 2 * intermediate_size, bias=False)
|
||||||
|
self.down_proj = nn.Linear(intermediate_size, hidden_size, bias=False)
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
gate, up = self.gate_up_proj(x).chunk(2, dim=-1)
|
||||||
|
return self.down_proj(nn.functional.silu(gate) * up)
|
||||||
|
|
||||||
|
hidden_size = 128
|
||||||
|
intermediate_size = 256
|
||||||
|
batch_size, seq_len = 2, 4
|
||||||
|
|
||||||
|
input_ids = torch.randn(batch_size, seq_len, hidden_size, device="cuda")
|
||||||
|
|
||||||
|
model_original = SimpleMLP(hidden_size, intermediate_size)
|
||||||
|
model_original.to("cuda")
|
||||||
|
model_original.eval()
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
expected = model_original(input_ids)
|
||||||
|
|
||||||
|
state_dict = model_original.state_dict()
|
||||||
|
|
||||||
|
model = SimpleMLP(hidden_size, intermediate_size)
|
||||||
|
model.load_state_dict(state_dict)
|
||||||
|
model.eval()
|
||||||
|
|
||||||
|
offloaded_model, manager = apply_offload_to_model(model, max_gpu_tensors=2)
|
||||||
|
offloaded_model.eval()
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
output = offloaded_model(input_ids)
|
||||||
|
|
||||||
|
print(f"✓ Forward pass completed: {output.shape}")
|
||||||
|
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ Offload stats: {stats}")
|
||||||
|
|
||||||
|
diff = (output - expected).abs().max().item()
|
||||||
|
print(f"✓ Output correctness: max diff = {diff:.6f}")
|
||||||
|
|
||||||
|
assert diff < 1e-5
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def apply_offload_to_model(model: nn.Module, max_gpu_tensors: int = 2):
|
||||||
|
"""应用卸载到模型的所有参数"""
|
||||||
|
manager = OffloadManager(max_gpu_tensors=max_gpu_tensors)
|
||||||
|
wrapper = OffloadModuleWrapper(model, manager)
|
||||||
|
return wrapper, manager
|
||||||
|
|
||||||
|
|
||||||
|
def test_3_lru_eviction():
|
||||||
|
"""测试 LRU 驱逐机制"""
|
||||||
|
print("\n=== Test 3: LRU Eviction ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
manager = OffloadManager(max_gpu_tensors=2)
|
||||||
|
|
||||||
|
tensors = [torch.randn(2, 2) for _ in range(4)]
|
||||||
|
wrapped = [manager.wrap(t) for t in tensors]
|
||||||
|
|
||||||
|
print(f"✓ Created {len(wrapped)} OffloadedTensors")
|
||||||
|
print(f" GPU pool capacity: {manager._max_gpu_tensors}")
|
||||||
|
|
||||||
|
_ = wrapped[0] + wrapped[1]
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ After accessing t1, t2: GPU pool = {stats['gpu_pool_size']}")
|
||||||
|
|
||||||
|
_ = wrapped[2] + wrapped[2]
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ After accessing t3: GPU pool = {stats['gpu_pool_size']}, evicted = {stats['evict_count']}")
|
||||||
|
|
||||||
|
_ = wrapped[3] + wrapped[3]
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ After accessing t4: GPU pool = {stats['gpu_pool_size']}, evicted = {stats['evict_count']}")
|
||||||
|
|
||||||
|
assert stats['evict_count'] >= 1
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_4_correctness():
|
||||||
|
"""测试输出正确性"""
|
||||||
|
print("\n=== Test 4: Correctness Check ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
in_features = 512
|
||||||
|
out_features = 1024
|
||||||
|
seqlen = 4096
|
||||||
|
chunk_size = 1024
|
||||||
|
|
||||||
|
x = torch.randn(seqlen, in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
# 创建标准层并保存权重
|
||||||
|
linear = nn.Linear(in_features, out_features, bias=False)
|
||||||
|
linear.to("cuda", dtype=torch.float16)
|
||||||
|
linear.eval()
|
||||||
|
with torch.no_grad():
|
||||||
|
expected = linear(x)
|
||||||
|
|
||||||
|
print(f"✓ Got expected output")
|
||||||
|
|
||||||
|
# 创建 ChunkedOffloadLinear,使用相同的权重
|
||||||
|
chunked_layer = ChunkedOffloadLinear(in_features, out_features, chunk_size, max_gpu_tensors=2)
|
||||||
|
|
||||||
|
# 复制权重到 chunked_layer
|
||||||
|
with torch.no_grad():
|
||||||
|
weight_data = linear.weight.data.cpu()
|
||||||
|
chunked_layer.manager._cpu_storage[0] = weight_data
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
actual = chunked_layer(x)
|
||||||
|
|
||||||
|
print(f"✓ Got actual output")
|
||||||
|
|
||||||
|
diff = (actual - expected).abs().max().item()
|
||||||
|
print(f"✓ Max difference: {diff:.6f}")
|
||||||
|
|
||||||
|
assert diff < 1e-5
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Part 3: 测试套件 - 性能测试
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def test_5_memory_analysis():
|
||||||
|
"""分析内存占用"""
|
||||||
|
print("\n=== Test 5: Memory Analysis ===")
|
||||||
|
|
||||||
|
in_features = 4096
|
||||||
|
out_features = 12244
|
||||||
|
chunk_size = 4096
|
||||||
|
|
||||||
|
seqlens = [4096, 16384, 65536, 131072]
|
||||||
|
|
||||||
|
print(f"\nMemory Analysis (in={in_features}, out={out_features}, chunk={chunk_size}):")
|
||||||
|
print(f"{'Seqlen':>10} | {'Activation':>12} | {'Weight':>12} | {'Output':>12} | {'Peak':>12} | {'Chunked':>12}")
|
||||||
|
print("-" * 90)
|
||||||
|
|
||||||
|
for seqlen in seqlens:
|
||||||
|
full = calculate_memory(seqlen, in_features, out_features)
|
||||||
|
chunked = calculate_memory(chunk_size, in_features, out_features)
|
||||||
|
|
||||||
|
print(f"{seqlen:>10} | "
|
||||||
|
f"{full['activation_mb']:>10.1f}MB | "
|
||||||
|
f"{full['weight_mb']:>10.1f}MB | "
|
||||||
|
f"{full['output_mb']:>10.1f}MB | "
|
||||||
|
f"{full['peak_mb']:>10.1f}MB | "
|
||||||
|
f"{chunked['peak_mb']:>10.1f}MB")
|
||||||
|
|
||||||
|
print("\n✓ Chunked offload 显存占用恒定,与序列长度无关!")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_6_long_sequence():
|
||||||
|
"""测试超长序列"""
|
||||||
|
print("\n=== Test 6: Long Sequence (128K tokens) ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
in_features = 4096
|
||||||
|
out_features = 12244
|
||||||
|
seqlen = 128 * 1024
|
||||||
|
chunk_size = 4096
|
||||||
|
|
||||||
|
full = calculate_memory(seqlen, in_features, out_features)
|
||||||
|
chunked = calculate_memory(chunk_size, in_features, out_features)
|
||||||
|
|
||||||
|
print(f"Memory Comparison:")
|
||||||
|
print(f" Full: {full['peak_mb']:.1f} MB")
|
||||||
|
print(f" Chunked: {chunked['peak_mb']:.1f} MB")
|
||||||
|
print(f" Savings: {(1 - chunked['peak_mb']/full['peak_mb'])*100:.1f}%")
|
||||||
|
|
||||||
|
layer = ChunkedOffloadLinear(in_features, out_features, chunk_size, max_gpu_tensors=1)
|
||||||
|
x = torch.randn(seqlen, in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
start = time.time()
|
||||||
|
output = layer(x)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
elapsed = (time.time() - start) * 1000
|
||||||
|
|
||||||
|
print(f"✓ Forward pass: {output.shape}")
|
||||||
|
print(f" Time: {elapsed:.1f} ms")
|
||||||
|
print(f" Throughput: {seqlen/elapsed/1e3:.1f}K tokens/sec")
|
||||||
|
|
||||||
|
stats = layer.manager.get_stats()
|
||||||
|
print(f"✓ Chunks processed: {seqlen // chunk_size}")
|
||||||
|
print(f"✓ Load count: {stats['load_count']}")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_7_performance_comparison():
|
||||||
|
"""性能对比测试"""
|
||||||
|
print("\n=== Test 7: Performance Comparison ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
in_features = 4096
|
||||||
|
out_features = 12244
|
||||||
|
seqlen = 16384
|
||||||
|
chunk_size = 4096
|
||||||
|
|
||||||
|
x = torch.randn(seqlen, in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
linear = nn.Linear(in_features, out_features, bias=False).cuda().half().eval()
|
||||||
|
standard_stats = run_benchmark(linear, x, num_runs=5)
|
||||||
|
print(f"✓ Standard Linear: {standard_stats['avg_time_ms']:.1f} ms")
|
||||||
|
|
||||||
|
chunked_layer = ChunkedOffloadLinear(in_features, out_features, chunk_size, max_gpu_tensors=1)
|
||||||
|
chunked_stats = run_benchmark(chunked_layer, x, num_runs=5)
|
||||||
|
print(f"✓ ChunkedOffloadLinear: {chunked_stats['avg_time_ms']:.1f} ms")
|
||||||
|
|
||||||
|
speedup = standard_stats['avg_time_ms'] / chunked_stats['avg_time_ms']
|
||||||
|
print(f"✓ Speedup: {speedup:.2f}x")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_8_transformers_layer():
|
||||||
|
"""测试实际 transformers 权重"""
|
||||||
|
print("\n=== Test 8: Transformers Layer Test ===")
|
||||||
|
|
||||||
|
try:
|
||||||
|
from transformers import AutoModelForCausalLM
|
||||||
|
except ImportError:
|
||||||
|
print("transformers not installed, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
model_name = "Qwen/Qwen2.5-0.5B-Instruct"
|
||||||
|
|
||||||
|
try:
|
||||||
|
model = AutoModelForCausalLM.from_pretrained(
|
||||||
|
model_name,
|
||||||
|
torch_dtype=torch.float16,
|
||||||
|
trust_remote_code=True,
|
||||||
|
)
|
||||||
|
model.eval()
|
||||||
|
model.to("cuda")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to load model: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
down_proj = model.model.layers[0].mlp.down_proj
|
||||||
|
print(f"✓ Got layer: {down_proj.in_features} -> {down_proj.out_features}")
|
||||||
|
|
||||||
|
batch_size, seq_len = 1, 4
|
||||||
|
test_input = torch.randn(batch_size, seq_len, down_proj.in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
normal_output = down_proj(test_input)
|
||||||
|
|
||||||
|
print(f"✓ Normal inference: {normal_output.shape}")
|
||||||
|
|
||||||
|
import copy
|
||||||
|
test_linear = nn.Linear(down_proj.in_features, down_proj.out_features, bias=False)
|
||||||
|
test_linear.load_state_dict(copy.deepcopy(down_proj.state_dict()))
|
||||||
|
test_linear.to("cuda", dtype=torch.float16)
|
||||||
|
test_linear.eval()
|
||||||
|
|
||||||
|
manager = OffloadManager(max_gpu_tensors=2)
|
||||||
|
offloaded_layer = OffloadModuleWrapper(test_linear, manager)
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
offload_output = offloaded_layer(test_input)
|
||||||
|
|
||||||
|
print(f"✓ Offload inference: {offload_output.shape}")
|
||||||
|
|
||||||
|
stats = manager.get_stats()
|
||||||
|
print(f"✓ Stats: {stats}")
|
||||||
|
|
||||||
|
diff = (offload_output - normal_output).abs().max().item()
|
||||||
|
print(f"✓ Max diff: {diff:.6f}")
|
||||||
|
|
||||||
|
assert diff < 1e-5
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# Part 3: 测试套件 - 同步分析
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def test_9_sync_behavior_analysis():
|
||||||
|
"""分析同步传输 vs 异步传输"""
|
||||||
|
print("\n=== Test 9: Sync Behavior Analysis ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
in_features = 4096
|
||||||
|
out_features = 12244
|
||||||
|
seqlen = 16384
|
||||||
|
chunk_size = 4096
|
||||||
|
|
||||||
|
print(f"Config: in={in_features}, out={out_features}, seqlen={seqlen}, chunk={chunk_size}")
|
||||||
|
print(f"Num chunks: {seqlen // chunk_size}")
|
||||||
|
|
||||||
|
x = torch.randn(seqlen, in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
# 同步版本
|
||||||
|
print(f"\n--- 同步传输 (non_blocking=False) ---")
|
||||||
|
layer_sync = ChunkedOffloadLinear(in_features, out_features, chunk_size, non_blocking=False)
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
start = time.time()
|
||||||
|
_ = layer_sync(x)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
sync_time_ms = (time.time() - start) * 1000
|
||||||
|
|
||||||
|
stats_sync = layer_sync.manager.get_stats()
|
||||||
|
print(f"总时间: {sync_time_ms:.2f} ms")
|
||||||
|
print(f"传输时间: {stats_sync['total_transfer_time_ms']:.2f} ms")
|
||||||
|
print(f"计算时间: {sync_time_ms - stats_sync['total_transfer_time_ms']:.2f} ms")
|
||||||
|
print(f"加载次数: {stats_sync['load_count']}")
|
||||||
|
|
||||||
|
# 异步版本
|
||||||
|
print(f"\n--- 异步传输 (non_blocking=True) ---")
|
||||||
|
layer_async = ChunkedOffloadLinear(in_features, out_features, chunk_size, non_blocking=True)
|
||||||
|
|
||||||
|
with torch.no_grad():
|
||||||
|
start = time.time()
|
||||||
|
_ = layer_async(x)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
async_time_ms = (time.time() - start) * 1000
|
||||||
|
|
||||||
|
stats_async = layer_async.manager.get_stats()
|
||||||
|
print(f"总时间: {async_time_ms:.2f} ms")
|
||||||
|
print(f"传输时间: {stats_async['total_transfer_time_ms']:.2f} ms")
|
||||||
|
print(f"计算时间: {async_time_ms - stats_async['total_transfer_time_ms']:.2f} ms")
|
||||||
|
print(f"加载次数: {stats_async['load_count']}")
|
||||||
|
|
||||||
|
# 对比
|
||||||
|
print(f"\n--- 对比 ---")
|
||||||
|
print(f"总加速比: {sync_time_ms / async_time_ms:.2f}x")
|
||||||
|
|
||||||
|
if stats_async['total_transfer_time_ms'] > 0:
|
||||||
|
print(f"传输加速比: {stats_sync['total_transfer_time_ms'] / stats_async['total_transfer_time_ms']:.2f}x")
|
||||||
|
|
||||||
|
print("\n关键发现:")
|
||||||
|
print(f" 1. 同步传输阻塞 CPU 线程")
|
||||||
|
print(f" 2. 异步传输可提高吞吐量")
|
||||||
|
print(f" 3. 首次运行包含 JIT 编译开销")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
def test_10_profiler_analysis():
|
||||||
|
"""使用 Profiler 分析内核执行"""
|
||||||
|
print("\n=== Test 10: Profiler Analysis ===")
|
||||||
|
|
||||||
|
if not torch.cuda.is_available():
|
||||||
|
print("CUDA not available, skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
in_features = 4096
|
||||||
|
out_features = 12244
|
||||||
|
seqlen = 16384
|
||||||
|
chunk_size = 4096
|
||||||
|
|
||||||
|
layer = ChunkedOffloadLinear(in_features, out_features, chunk_size)
|
||||||
|
x = torch.randn(seqlen, in_features, device="cuda", dtype=torch.float16)
|
||||||
|
|
||||||
|
with torch.profiler.profile(activities=[torch.profiler.ProfilerActivity.CUDA]) as p:
|
||||||
|
with torch.no_grad():
|
||||||
|
_ = layer(x)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
kernel_counts = {}
|
||||||
|
for event in p.key_averages():
|
||||||
|
if event.device_type == torch.profiler.DeviceType.CUDA:
|
||||||
|
name = event.key
|
||||||
|
kernel_counts[name] = kernel_counts.get(name, 0) + 1
|
||||||
|
|
||||||
|
print(f"内核调用统计:")
|
||||||
|
print(f"{'内核类型':<50} {'调用次数':<10}")
|
||||||
|
print("-" * 60)
|
||||||
|
|
||||||
|
for name, count in sorted(kernel_counts.items(), key=lambda x: -x[1])[:15]:
|
||||||
|
name_short = name[:48]
|
||||||
|
print(f"{name_short:<50} {count:<10}")
|
||||||
|
|
||||||
|
memcpy_count = sum(count for name, count in kernel_counts.items() if 'memcpy' in name.lower())
|
||||||
|
print(f"\n分析:")
|
||||||
|
print(f" - 总共 {len(kernel_counts)} 种不同的 CUDA 内核")
|
||||||
|
print(f" - 总调用次数: {sum(kernel_counts.values())}")
|
||||||
|
print(f" - 内存拷贝: {memcpy_count} 次")
|
||||||
|
print("PASSED\n")
|
||||||
|
|
||||||
|
|
||||||
|
# ============================================================
|
||||||
|
# 主测试入口
|
||||||
|
# ============================================================
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""运行所有测试"""
|
||||||
|
print("=" * 70)
|
||||||
|
print("OffloadedTensor 统一测试套件")
|
||||||
|
print("=" * 70)
|
||||||
|
|
||||||
|
# 功能测试
|
||||||
|
print("\n" + "=" * 70)
|
||||||
|
print("功能测试 (Tests 1-4)")
|
||||||
|
print("=" * 70)
|
||||||
|
test_1_basic_offloaded_tensor()
|
||||||
|
test_2_mlp_with_offload()
|
||||||
|
test_3_lru_eviction()
|
||||||
|
test_4_correctness()
|
||||||
|
|
||||||
|
# 性能测试
|
||||||
|
print("\n" + "=" * 70)
|
||||||
|
print("性能测试 (Tests 5-8)")
|
||||||
|
print("=" * 70)
|
||||||
|
test_5_memory_analysis()
|
||||||
|
test_6_long_sequence()
|
||||||
|
test_7_performance_comparison()
|
||||||
|
test_8_transformers_layer()
|
||||||
|
|
||||||
|
# 同步分析
|
||||||
|
print("\n" + "=" * 70)
|
||||||
|
print("同步分析 (Tests 9-10)")
|
||||||
|
print("=" * 70)
|
||||||
|
test_9_sync_behavior_analysis()
|
||||||
|
test_10_profiler_analysis()
|
||||||
|
|
||||||
|
print("=" * 70)
|
||||||
|
print("所有测试完成!")
|
||||||
|
print("=" * 70)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user