Merge branch 'zijie/fix-dist-3': Fix distributed port conflict

- Auto port allocation with _find_free_port() in model_runner.py
- Resource management refactor with close() + context manager in llm_engine.py
- Add tests/test_port_conflict.py and tests/run_parallel_niah.sh
- Remove docs/torch_distributed_port_issue.md (issue fixed)
- Ignore tests/data/ directory

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Zijie Tian
2026-01-12 16:20:44 +08:00
parent de6f36bdb2
commit 64971c8e8a
10 changed files with 784 additions and 792 deletions

View File

@@ -1,314 +1,230 @@
# Task Plan: Enable CUDA Graphs for CPU Offload Mode
# Task Plan: Fix Torch Distributed Port Conflict
## Current Status: ✅ COMPLETED
## Goal
支持多卡环境下同时启动多个独立的 nanovllm 进程进行测试,无需手动管理端口。
### Phase 0 Completed: Refactor Offload Decode to Use Standard Attention Path
## Problem Analysis
### Phases 1-3 Completed: CUDA Graph Support for Offload Mode
### 核心问题
```
当前:所有 nanovllm 实例默认使用端口 2333
└── 多个独立进程同时运行时会冲突!
**Implementation**: Added per-layer CUDA graph capture and replay for offload decode path.
CUDA_VISIBLE_DEVICES=0 python test1.py # 绑定端口 2333 ✓
CUDA_VISIBLE_DEVICES=1 python test2.py # 尝试绑定 2333 → EADDRINUSE ❌
```
**Key Changes**:
1. `capture_offload_cudagraph()` captures one graph per transformer layer
2. Each graph uses the corresponding ring buffer slot based on `layer_id % num_buffers`
3. `run_layerwise_offload_decode()` replays graphs when `enforce_eager=False`
4. Synchronization added between graph replays to ensure correct data flow
**Test Results**:
- `test_needle.py --input-len 32768 --enable-offload --use-cuda-graph`: **PASSED**
### 根本原因
- 端口是系统级资源,与 GPU 无关
- 即使使用不同 GPU端口仍会冲突
- 当前硬编码默认端口 `2333`
---
### Previous Work: Refactor Offload Decode to Use Standard Attention Path
## Solution: Dynamic Port Allocation
**Problem solved**: The original offload decode (`run_layerwise_offload_decode`) bypassed `Attention.forward()` by manually calling attention components. This was inconsistent with the standard execution path.
**Solution implemented**: Refactored to use `layer.forward()` which goes through:
```
Qwen3DecoderLayer.forward()
→ Qwen3Attention.forward()
→ Attention.forward() ← Now properly used!
```
### Code Changes Made
**File**: `nanovllm/engine/model_runner.py`
1. **`run_layerwise_offload_decode()` (line 841-991)** - Completely refactored:
Before (bypassed Attention):
```python
qkv = layer.self_attn.qkv_proj(hidden_ln)
q, k_new, v_new = qkv.split(...)
q = layer.self_attn.q_norm(...)
k = layer.self_attn.k_norm(...)
q, k = layer.self_attn.rotary_emb(...)
attn_output = flash_attn_varlen_func(q, k_full, v_full, ...) # Direct call!
hidden_states = layer.self_attn.o_proj(attn_output)
```
After (uses standard path):
```python
# Set up Attention module's cache to ring buffer
attn_module.k_cache = offload_engine.layer_k_cache[buffer_idx:buffer_idx+1]
attn_module.v_cache = offload_engine.layer_v_cache[buffer_idx:buffer_idx+1]
# Set context for contiguous mode
set_context(is_prefill=False, slot_mapping=..., context_lens=..., block_tables=None)
# Standard layer forward - goes through Attention.forward()!
hidden_states, residual = layer(positions, hidden_states, residual)
```
2. **`ModelRunner.__init__()` (line 46-57)** - Conditional CUDA graph capture:
```python
if not self.enforce_eager:
if config.enable_cpu_offload:
# TODO: Implement capture_offload_cudagraph()
pass # Temporarily use eager execution
else:
self.capture_cudagraph()
```
### Test Results
| Test | Mode | Status |
|------|------|--------|
| `test_needle.py --input-len 4096` | GPU-only | PASSED |
| `test_needle.py --input-len 4096 --enable-offload` | CPU offload | PASSED |
## Remaining Work: Implement Offload CUDA Graph
### Why Standard `capture_cudagraph()` Cannot Be Used
The standard capture function captures the PagedAttention decode path:
### 核心方案
```python
# capture_cudagraph() sets up:
k_cache: [num_blocks, block_size, kv_heads, head_dim] # PagedAttention format
block_tables: [...] # Block indices for paged indexing
def _find_free_port() -> int:
"""让系统自动分配一个空闲端口"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
return s.getsockname()[1]
# 优先使用环境变量,否则自动分配
port = os.environ.get("NANOVLLM_DIST_PORT")
if port is None:
port = _find_free_port()
else:
port = int(port)
```
But offload mode uses contiguous ring buffer:
```python
# Offload decode sets up:
k_cache: [1, max_seq_len, kv_heads, head_dim] # Contiguous format
block_tables: None # No paging
### 效果
```bash
# 无需手动指定端口,可以同时运行多个测试
CUDA_VISIBLE_DEVICES=0 python test1.py & # 自动端口 54321
CUDA_VISIBLE_DEVICES=1 python test2.py & # 自动端口 54322
CUDA_VISIBLE_DEVICES=2 python test3.py & # 自动端口 54323
# 仍然支持手动指定(向后兼容)
NANOVLLM_DIST_PORT=2333 python test.py
```
### Implementation Plan for `capture_offload_cudagraph()`
#### Phase 1: Prepare Fixed-Address Tensors
```python
@torch.inference_mode()
def capture_offload_cudagraph(self):
"""Capture CUDA graphs for offload decode using ring buffer."""
offload_engine = self.kvcache_manager.offload_engine
num_buffers = offload_engine.num_kv_buffers
# Fixed-address tensors for graph capture
input_ids = torch.zeros(1, dtype=torch.int64, device="cuda")
positions = torch.zeros(1, dtype=torch.int64, device="cuda")
slot_mapping = torch.zeros(1, dtype=torch.int32, device="cuda")
context_lens = torch.zeros(1, dtype=torch.int32, device="cuda")
self.offload_graphs = {}
self.offload_graph_pool = None
```
#### Phase 2: Capture Per-Buffer Graphs
Since layer processing rotates through ring buffers (`layer_id % num_buffers`), we need graphs for each buffer slot:
```python
for buffer_idx in range(num_buffers):
graph = torch.cuda.CUDAGraph()
# Set Attention cache to this buffer slot (fixed address)
for layer in self.model.model.layers:
layer.self_attn.attn.k_cache = offload_engine.layer_k_cache[buffer_idx:buffer_idx+1]
layer.self_attn.attn.v_cache = offload_engine.layer_v_cache[buffer_idx:buffer_idx+1]
# Set context
set_context(is_prefill=False, slot_mapping=slot_mapping,
context_lens=context_lens, block_tables=None)
# Warmup
hidden = self.model.model.embed_tokens(input_ids)
residual = None
for layer_id, layer in enumerate(self.model.model.layers):
if layer_id % num_buffers == buffer_idx:
hidden, residual = layer(positions, hidden, residual)
# Capture
with torch.cuda.graph(graph, self.offload_graph_pool):
# Same operations
...
self.offload_graphs[buffer_idx] = graph
```
#### Phase 3: Use Graphs in Decode
Modify `run_layerwise_offload_decode()` to replay graphs:
```python
for layer_id in range(num_layers):
current_buffer = layer_id % num_buffers
# Wait for H2D load
offload_engine.wait_buffer_load(current_buffer)
# Copy decode buffer to ring buffer (same as current)
...
# Update graph variables
self.offload_graph_vars["positions"][0] = positions[0]
self.offload_graph_vars["slot_mapping"][0] = context_len
self.offload_graph_vars["context_lens"][0] = context_len + 1
# Replay graph instead of eager forward
self.offload_graphs[current_buffer].replay()
# Copy new KV to decode buffer (same as current)
...
```
### Challenges and Considerations
| Challenge | Solution |
|-----------|----------|
| H2D transfers interleaved with compute | H2D happens outside graph, only compute is captured |
| Different layers use different buffers | Capture per-buffer graphs, replay correct one |
| Variable context length | Use `cache_seqlens` parameter (fixed address, variable value) |
| Per-layer buffer rotation | Graph captures single-layer forward, loop in Python |
### Alternative: Full-Decode Graph (More Complex)
Instead of per-layer graphs, capture entire decode step:
1. Complete all H2D loads before graph
2. Single graph covers all layers
3. Better kernel fusion, less CPU overhead
4. More complex to implement (need to handle buffer rotation inside graph)
---
## Implementation Phases
| Phase | Description | Status |
|-------|-------------|--------|
| Phase 0 | Refactor offload decode to use Attention.forward() | ✅ Completed |
| Phase 1 | Implement `capture_offload_cudagraph()` with per-layer graphs | ✅ Completed |
| Phase 2 | Modify `run_layerwise_offload_decode()` to use graphs | ✅ Completed |
| Phase 3 | Test and benchmark | ✅ Completed |
| Phase 4 | (Optional) Optimize to full-decode graph | ⬜ Future |
## Architecture After Refactoring
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Offload Decode Flow (After Refactoring) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ For each layer: │
│ 1. Wait for H2D load (ring buffer has prefill KV) │
│ 2. Copy decode buffer → ring buffer (at prefill_len offset) │
│ 3. Set Attention.k_cache = ring_buffer[buffer_idx] │
│ 4. Set context (slot_mapping, context_lens, block_tables=None) │
│ 5. layer.forward() → Qwen3Attention.forward() → Attention.forward() │
│ └── store_kvcache() stores new token to ring buffer │
│ └── flash_attn_with_kvcache() computes attention │
│ 6. Copy new token KV: ring buffer → decode buffer │
│ 7. Start next layer H2D load │
│ │
│ Key insight: Now uses standard Attention path, just with ring buffer │
│ as k_cache/v_cache in contiguous format (block_tables=None) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
## Files Modified
| File | Changes |
|------|---------|
| `model_runner.py:46-50` | Conditional CUDA graph capture: calls `capture_offload_cudagraph()` for offload mode |
| `model_runner.py:69-73` | Updated `exit()` to clean up offload graph resources |
| `model_runner.py:844-1031` | Refactored `run_layerwise_offload_decode()` to use standard `layer.forward()` with optional CUDA graph |
| `model_runner.py:1075-1164` | New `capture_offload_cudagraph()` method for per-layer graph capture |
| `tests/test_needle.py` | Added `--use-cuda-graph` flag to test CUDA graph mode |
## Implementation Details
### `capture_offload_cudagraph()` (line 1075-1164)
Captures per-layer CUDA graphs for offload decode:
### Phase 1: ModelRunner 动态端口 [pending]
**File**: `nanovllm/engine/model_runner.py`
```python
def capture_offload_cudagraph(self):
# Fixed-address tensors for graph capture
hidden_states = torch.randn(1, hidden_size, ...)
residual = torch.randn(1, hidden_size, ...)
layer_outputs = torch.zeros(1, hidden_size, ...)
layer_residual = torch.zeros(1, hidden_size, ...)
import socket
for layer_id in range(num_layers):
buffer_idx = layer_id % num_buffers
def _find_free_port() -> int:
"""Find a free port for distributed communication."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]
# Set Attention cache to ring buffer
attn_module.k_cache = ring_buffer[buffer_idx:buffer_idx+1]
attn_module.v_cache = ring_buffer[buffer_idx:buffer_idx+1]
class ModelRunner:
def __init__(self, config: Config, rank: int, event: Event | list[Event]):
# ... existing code ...
# Warmup and capture
with torch.cuda.graph(graph):
out_h, out_r = layer(positions, hidden_states, residual)
layer_outputs.copy_(out_h)
layer_residual.copy_(out_r)
import os
port = os.environ.get("NANOVLLM_DIST_PORT")
if port is None:
port = _find_free_port()
logger.info(f"Auto-assigned distributed port: {port}")
else:
port = int(port)
# Update inputs for next layer
hidden_states.copy_(layer_outputs)
residual.copy_(layer_residual)
dist.init_process_group("nccl", f"tcp://localhost:{port}", ...)
```
### `run_layerwise_offload_decode()` CUDA Graph Mode
### Phase 2: LLMEngine 资源清理增强 [pending]
**File**: `nanovllm/engine/llm_engine.py`
When CUDA graphs are available:
添加 `close()` 方法和 context manager 支持,确保资源正确释放:
```python
use_cuda_graph = not self.enforce_eager and hasattr(self, 'offload_graphs')
class LLMEngine:
def __init__(self, model, **kwargs):
# ... existing code ...
self._closed = False
atexit.register(self._atexit_handler)
if use_cuda_graph:
# Use fixed-address tensors
graph_vars["positions"][0] = len(seq) - 1
graph_vars["slot_mapping"][0] = context_len
graph_vars["context_lens"][0] = context_len + 1
graph_vars["hidden_states"].copy_(embedding)
graph_vars["residual"].zero_()
def _atexit_handler(self):
if not self._closed:
self.close()
for layer_id in range(num_layers):
# Set up ring buffer and context
...
def close(self):
"""Explicitly close the engine and release all resources."""
if self._closed:
return
self._closed = True
try:
atexit.unregister(self._atexit_handler)
except Exception:
pass
self.model_runner.call("exit")
del self.model_runner
for p in self.ps:
p.join()
# Replay graph
self.offload_graphs[layer_id].replay()
torch.cuda.current_stream().synchronize()
def exit(self):
"""Alias for close() - backward compatibility."""
self.close()
# Copy outputs to inputs for next layer
if layer_id < num_layers - 1:
graph_vars["hidden_states"].copy_(graph_vars["layer_outputs"])
graph_vars["residual"].copy_(graph_vars["layer_residual"])
def __del__(self):
try:
self.close()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
return False
```
## Test Results
### Phase 3: 测试验证 [pending]
**File**: `tests/test_multiple_processes.py` (新建)
| Test | Mode | CUDA Graph | Status |
|------|------|------------|--------|
| `test_needle.py --input-len 4096` | GPU-only | N/A | PASSED |
| `test_needle.py --input-len 4096 --enable-offload` | CPU offload | Disabled | PASSED |
| `test_needle.py --input-len 32768 --enable-offload` | CPU offload | Disabled | PASSED |
| `test_needle.py --input-len 32768 --enable-offload --use-cuda-graph` | CPU offload | Enabled | PASSED |
```python
"""Test multiple independent nanovllm processes."""
import subprocess
import sys
import time
## Next Steps
def test_parallel_processes():
"""Test running multiple nanovllm processes in parallel."""
script = '''
import sys
sys.path.insert(0, ".")
from nanovllm import LLM, SamplingParams
import os
1. ~~Implement `capture_offload_cudagraph()` method~~ ✅
2. ~~Modify `run_layerwise_offload_decode()` to optionally use captured graphs~~ ✅
3. ~~Test correctness with needle-in-haystack~~
4. Benchmark performance improvement from CUDA graphs (optional)
5. Consider full-decode graph optimization for maximum performance (future)
gpu = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
print(f"[GPU {gpu}] Starting LLM")
llm = LLM("path/to/model", enable_cpu_offload=True)
outputs = llm.generate(["Hello"], SamplingParams(max_tokens=10))
print(f"[GPU {gpu}] Output: {outputs[0]['text'][:50]}")
llm.close()
print(f"[GPU {gpu}] Done")
'''
# Start 2 processes on different GPUs
procs = []
for gpu in [0, 1]:
env = {"CUDA_VISIBLE_DEVICES": str(gpu)}
p = subprocess.Popen(
[sys.executable, "-c", script],
env={**os.environ, **env}
)
procs.append(p)
time.sleep(1) # Stagger start slightly
# Wait for all
for p in procs:
assert p.wait() == 0, f"Process failed with code {p.returncode}"
print("PASSED: test_parallel_processes")
if __name__ == "__main__":
test_parallel_processes()
```
### Phase 4: 文档更新 [pending]
**File**: `docs/torch_distributed_port_issue.md`
更新文档标记问题已通过动态端口分配解决。
---
## Usage After Fix
### 场景 1: 多进程并行测试(主要场景)
```bash
# 无需任何额外配置,直接运行
CUDA_VISIBLE_DEVICES=0 python test_group1.py &
CUDA_VISIBLE_DEVICES=1 python test_group2.py &
CUDA_VISIBLE_DEVICES=2 python test_group3.py &
wait
```
### 场景 2: 同一进程顺序创建(也支持)
```python
for i in range(3):
with LLM(model_path) as llm:
outputs = llm.generate(prompts, params)
# 自动清理,下一个可以使用新的随机端口
```
### 场景 3: 手动指定端口(向后兼容)
```bash
NANOVLLM_DIST_PORT=2333 python test.py
```
---
## Success Criteria
- [ ] 多个独立进程可以同时运行(不同 GPU
- [ ] 无需手动指定端口
- [ ] 向后兼容(环境变量仍有效)
- [ ] 同一进程顺序创建也能工作
- [ ] 资源正确清理
---
## Files to Modify
| File | Action | Status |
|------|--------|--------|
| `nanovllm/engine/model_runner.py` | Add `_find_free_port()` | pending |
| `nanovllm/engine/llm_engine.py` | Add `close()`, context manager | pending |
| `tests/test_multiple_processes.py` | Create | pending |
| `docs/torch_distributed_port_issue.md` | Update | pending |