diff --git a/.gitignore b/.gitignore index 52db0f5..4d836bd 100644 --- a/.gitignore +++ b/.gitignore @@ -227,3 +227,6 @@ hive-mind-prompt-*.txt # Test data tests/data/ + +# Serena MCP tool config +.serena/ diff --git a/findings.md b/findings.md index 1d7fbc6..508f474 100644 --- a/findings.md +++ b/findings.md @@ -1,169 +1,288 @@ -# Findings: Torch Distributed Port Conflict +# Findings: nanovllm 多请求状态污染分析 -## Problem Analysis +## 重要说明 -### Issue Summary -创建多个 LLM 实例时出现端口冲突 (EADDRINUSE),导致第二个实例无法启动。 +**nanovllm offload 模式不支持 batch**,只能单个 request 顺序执行。问题出在**请求切换**(前一个 request 完成后,开始下一个 request)时状态清理不完整。 -### Root Cause Deep Dive +--- -#### 1. 资源绑定位置 +## 1. 代码架构发现 + +### 1.1 请求生命周期 (顺序执行) + +**关键**: offload 模式下,每次只处理**一个 request**,不是 batch。 + +``` +LLMEngine.generate() [llm_engine.py:114-151] +├── Observer.complete_reset() # 重置性能统计 +├── for prompt in prompts: +│ └── add_request(prompt, sp) # 添加到 scheduler 队列 +├── while not is_finished(): +│ ├── scheduler.schedule() # 获取下一个序列 (offload 模式: 1个) +│ ├── model_runner.call("run", seqs, is_prefill) # 执行单个请求 +│ └── scheduler.postprocess(seqs, token_ids) +│ └── if seq.is_finished: +│ └── kvcache_manager.deallocate(seq) # 释放资源 ← 问题点 +│ └── [开始处理下一个请求] # ← 状态切换 +└── return outputs +``` + +**请求切换流程**: +``` +Request A (prefill) → Request A (decode × N) → Request A 完成 + ↓ +deallocate(A) ← 状态清理不完整! + ↓ +Request B (prefill) → Request B 读取到 A 的残留状态 → 错误输出 +``` + +### 1.2 OffloadEngine 状态清单 + +**位置**: `nanovllm/kvcache/offload_engine.py:40-145` + +| 成员变量 | 类型 | Shape | 生命周期 | +|----------|------|-------|----------| +| `layer_k_cache` | GPU Tensor | [num_buffers, max_seq_len, kv_heads, head_dim] | 整个引擎 | +| `layer_v_cache` | GPU Tensor | [num_buffers, max_seq_len, kv_heads, head_dim] | 整个引擎 | +| `decode_k_buffer` | GPU Tensor | [num_layers, block_size, kv_heads, head_dim] | 整个引擎 | +| `decode_v_buffer` | GPU Tensor | [num_layers, block_size, kv_heads, head_dim] | 整个引擎 | +| `k_cache_cpu` | CPU Tensor (pinned) | [num_layers, num_cpu_blocks, block_size, kv_heads, head_dim] | 整个引擎 | +| `v_cache_cpu` | CPU Tensor (pinned) | [num_layers, num_cpu_blocks, block_size, kv_heads, head_dim] | 整个引擎 | +| `compute_stream` | CUDA Stream | - | 整个引擎 | +| `prefill_offload_streams` | List[CUDA Stream] | num_layers | 整个引擎 | +| `prefill_offload_events` | List[CUDA Event] | num_layers | 整个引擎 | +| `layer_load_streams` | List[CUDA Stream] | num_buffers | 整个引擎 | +| `buffer_load_events` | List[CUDA Event] | num_buffers | 整个引擎 | +| `buffer_compute_done_events` | List[CUDA Event] | num_buffers | 整个引擎 | + +**关键发现**: +- **没有 reset() 方法** +- **没有任何清理逻辑** +- 所有 tensor 在初始化时 `torch.zeros()` 后永不清零 + +### 1.3 HybridKVCacheManager 状态清单 + +**位置**: `nanovllm/kvcache/hybrid_manager.py` + +| 成员变量 | 作用 | 清理方式 | +|----------|------|----------| +| `logical_blocks` | 逻辑块列表 | `block.reset()` in deallocate | +| `free_logical_ids` | 空闲逻辑块队列 | deallocate 归还 | +| `free_cpu_blocks` | 空闲 CPU 块队列 | deallocate 归还 | +| `cpu_block_to_logical` | CPU 块→逻辑块映射 | deallocate 删除 | +| `prefilled_blocks` | 已 prefill 的块集合 | deallocate 中 discard | +| `_decode_start_pos` | 序列→decode起始位置 | `clear_decode_tracking()` | +| `_prefill_len` | 序列→prefill长度 | `clear_decode_tracking()` | + +**关键发现**: +- `deallocate()` 没有调用 `clear_decode_tracking()`! +- `_decode_start_pos` 和 `_prefill_len` 使用 `id(seq)` 作为 key +- Python 对象 ID 可能在不同请求间重用 + +--- + +## 2. 请求切换机制分析 + +### 2.1 offload 模式的单 request 限制 + +代码中明确限制: ```python -# nanovllm/engine/model_runner.py:30-32 -import os -port = os.environ.get("NANOVLLM_DIST_PORT", "2333") -dist.init_process_group("nccl", f"tcp://localhost:{port}", world_size=self.world_size, rank=rank) +# model_runner.py:757, 880 +assert len(seqs) == 1, "Layer-wise offload only supports single sequence" ``` -- 默认端口 **2333**,可通过 `NANOVLLM_DIST_PORT` 环境变量配置 -- `init_process_group()` 绑定 TCP 端口用于进程间通信 -- 端口绑定持续到 `destroy_process_group()` 被调用 +### 2.2 请求切换时序 -#### 2. 清理机制缺陷 +``` +时间 → +┌─────────────────────────────────────────────────────────────────┐ +│ Request A: [prefill] → [decode] → [decode] → ... → [完成] │ +└─────────────────────────────────────────────────────────────────┘ + ↓ + deallocate(seq_A) + - blocks 释放 ✓ + - tracking 字典未清理 ✗ + ↓ +┌─────────────────────────────────────────────────────────────────┐ +│ Request B: [prefill] → [decode] → ... │ +│ ↑ │ +│ 如果 id(seq_B) == id(seq_A),读到 A 的残留状态! │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### 2.3 Python 对象 ID 重用 + +Python 的内存管理会重用已释放对象的内存地址,导致: ```python -# nanovllm/engine/llm_engine.py:37 -atexit.register(self.exit) +seq_A = Sequence(...) # id(seq_A) = 0x7f1234567890 +del seq_A # 对象被释放,但字典中 key 保留 -# nanovllm/engine/llm_engine.py:39-43 -def exit(self): - self.model_runner.call("exit") - del self.model_runner - for p in self.ps: - p.join() - -# nanovllm/engine/model_runner.py:66-78 -def exit(self): - # ... cleanup code ... - dist.destroy_process_group() -``` - -**关键问题**: `atexit` 只在 **Python 解释器退出** 时触发,而非对象被删除时! - -#### 3. 问题时间线 -``` -1. 创建 LLM #1 - ├── init_process_group() 绑定端口 2333 ✓ - └── atexit.register(self.exit) 注册 - -2. LLM #1 超出作用域或被 del - ├── Python GC 回收对象内存 - ├── atexit handler 未触发(进程未退出) - ├── Worker 进程仍在运行 - └── 端口 2333 仍被占用 ❌ - -3. 创建 LLM #2 - ├── init_process_group() 尝试绑定端口 2333 - └── EADDRINUSE 错误 ❌ - -4. 程序退出(此时 atexit 才运行) - └── 为时已晚 - 已经崩溃 +seq_B = Sequence(...) # id(seq_B) 可能 = 0x7f1234567890(相同地址) +# _decode_start_pos[id(seq_B)] 返回 seq_A 的旧值! ``` --- -## Solution Analysis +## 3. 状态污染机制分析 -### 方案对比 +### 3.1 decode buffer 污染路径 -| 方案 | 可靠性 | 向后兼容 | 实现复杂度 | 推荐度 | -|------|--------|----------|------------|--------| -| `close()` 方法 | 最高 | 是 | 低 | ★★★★★ | -| `__del__` 方法 | 中等 | 是 | 低 | ★★★☆☆ | -| 端口检测重试 | 中等 | 是 | 低 | ★★★☆☆ | -| Context Manager | 最高 | 需要代码修改 | 低 | ★★★★☆ | -| 动态端口 | 低 | 是 | 低 | ★★☆☆☆ | - -### 为什么选择三层防护 - -1. **Layer 1: close()** - 用户显式控制,最可靠 -2. **Layer 2: __del__** - 自动清理,覆盖大部分场景 -3. **Layer 3: 端口检测** - 最后防线,提供清晰错误信息 - -### `__del__` 的限制 - -Python 的 `__del__` 不保证被调用: -- 循环引用时可能不触发 -- 解释器关闭时可能无法访问依赖模块 -- 不应依赖于 `__del__` 进行关键资源清理 - -但作为**额外防护层**是有价值的,因为: -- 大多数情况下会被调用 -- 比没有好 -- 不影响其他清理机制 - ---- - -## Code Structure Analysis - -### LLMEngine 生命周期 -``` -__init__() -├── 创建 worker 进程 (self.ps) -├── 创建 ModelRunner (self.model_runner) -├── 注册 atexit handler -└── 设置 scheduler, tokenizer - -close() [新增] -├── 检查 _closed 标志(幂等) -├── 注销 atexit handler -├── 调用 model_runner.exit() -├── join worker 进程 -└── 设置 _closed = True - -__del__() [新增] -└── 调用 close()(忽略异常) - -__enter__/__exit__() [新增] -└── Context manager 支持 -``` - -### ModelRunner 资源 -``` -__init__() -├── torch.distributed 初始化(绑定端口) -├── 模型加载 -├── KV cache 分配 -├── CUDA graph 捕获(可选) -└── SharedMemory 创建(多GPU) - -exit() -├── SharedMemory 清理 -├── CUDA graph 清理 -└── dist.destroy_process_group() -``` - ---- - -## Risk Assessment - -| 风险 | 影响 | 缓解措施 | -|------|------|----------| -| `__del__` 不被调用 | 中 - 端口泄漏 | Layer 3 端口检测提供清晰错误 | -| close() 重复调用 | 低 | `_closed` 标志保证幂等 | -| atexit 双重调用 | 低 | 注销机制防止 | -| 子进程残留 | 高 | join() 确保子进程退出 | -| CUDA 资源泄漏 | 中 | ModelRunner.exit() 清理 | - ---- - -## Implementation Notes - -### atexit.unregister 兼容性 -- Python 3.7+ 支持 -- 需要传入同一个函数对象 -- 使用 `self._atexit_handler` 而非 `self.exit` 以便正确注销 - -### 端口检测方法 +**污染写入** (`run_layerwise_offload_decode:1010-1013`): ```python -def _check_port_available(port: int, host: str = "localhost") -> bool: - """使用 socket connect_ex 检测端口是否被占用.""" - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.settimeout(1) - result = s.connect_ex((host, port)) - return result != 0 # 0 = connected = port in use - except Exception: - return True # 假设可用 +# 每次 decode step,将当前 token 的 KV 存入 decode buffer +offload_engine.decode_k_buffer[layer_id, pos_in_block].copy_(ring_k[context_len]) +offload_engine.decode_v_buffer[layer_id, pos_in_block].copy_(ring_v[context_len]) ``` -**注意**: 这种检测存在 TOCTOU (Time-of-check to time-of-use) 竞争条件,但对于我们的用例足够了。 +**污染读取** (`run_layerwise_offload_decode:969-976`): +```python +# 如果有之前的 decode tokens,从 decode buffer 读取 +if num_prev_decode_tokens > 0: + k_decode_prev, v_decode_prev = offload_engine.get_decode_kv( + layer_id, decode_start_pos, pos_in_block + ) + ring_k[total_prefill_tokens:total_prefill_tokens + num_prev_decode_tokens].copy_(k_decode_prev) +``` + +**问题场景**: +1. 请求 A 的 decode 阶段在 `decode_k_buffer[layer, 0:N]` 写入 KV +2. 请求 A 完成,buffer 数据保留 +3. 请求 B 开始,如果其 `decode_start_pos` 被错误计算为非零 +4. 请求 B 会读取请求 A 的旧数据 + +### 3.2 decode_start_pos 计算逻辑 + +**位置**: `hybrid_manager.py:485-505` + +```python +def get_decode_start_pos(self, seq: Sequence) -> int: + seq_id = id(seq) # Python 对象 ID + if seq_id not in self._decode_start_pos: + # 第一次调用 - 计算起始位置 + prefill_len = len(seq) - 1 # 当前长度减去新 token + self._decode_start_pos[seq_id] = prefill_len % self._block_size + return self._decode_start_pos[seq_id] +``` + +**问题**: +- 如果新请求的 `id(seq)` 恰好等于旧请求的 `id(seq)`(Python 内存重用) +- `_decode_start_pos` 中可能存在旧的值 +- 会返回错误的 decode 起始位置 + +### 3.3 clear_decode_tracking 未被调用 + +**位置**: `hybrid_manager.py:538-549` + +```python +def clear_decode_tracking(self, seq: Sequence) -> None: + seq_id = id(seq) + self._decode_start_pos.pop(seq_id, None) + self._prefill_len.pop(seq_id, None) +``` + +**问题**: +- 这个方法在 `deallocate()` 中**没有被调用**! +- 查看 `deallocate()` (218-244 行),没有 `clear_decode_tracking()` 调用 +- 这导致旧请求的 tracking 数据残留 + +--- + +## 3. 失败模式分析 + +### 3.1 观察到的失败模式 + +从测试结果: +| Sample | Expected | Output | Status | +|--------|----------|--------|--------| +| 0 | 8930103 | `: 8930103.` | PASS (第一个请求) | +| 1 | 4194548 | `: 419 multiplication of 4548.` | **FAIL** | +| 2 | 8231838 | `:ное 8231838.` | PASS | + +Sample 1 的输出 "419 multiplication of 4548" 显示数字被"拆分"了。 + +**可能原因**: +1. 在某个 decode step,attention 计算使用了错误的 KV +2. 模型"看到"了旧请求的部分 context +3. 导致生成逻辑出错 + +### 3.2 为什么第一个请求总是成功? + +1. 第一个请求时,所有 buffer 都是零初始化 +2. `decode_start_pos` 字典为空,正确计算 +3. 没有残留数据干扰 + +### 3.3 为什么后续请求可能成功? + +某些请求可能成功因为: +1. `id(seq)` 没有与之前的请求冲突 +2. `pos_in_block` 不重叠,没读到旧数据 +3. 或者旧数据恰好对结果影响不大 + +--- + +## 4. 修复方向 + +### 4.1 必须修复: deallocate 时清理状态 + +```python +# hybrid_manager.py: deallocate() +def deallocate(self, seq: Sequence) -> None: + # ... 现有逻辑 ... + + # 添加: 清理 decode tracking + self.clear_decode_tracking(seq) + + # 添加: 通知 offload engine 清理 + if self.offload_engine is not None: + self.offload_engine.on_sequence_finished() +``` + +### 4.2 必须修复: OffloadEngine 添加清理方法 + +```python +# offload_engine.py +def on_sequence_finished(self): + """请求完成时的清理""" + # 清零 decode buffer + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() +``` + +### 4.3 可选: 更激进的清理 + +```python +def reset_all(self): + """完全重置状态""" + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() + self.layer_k_cache.zero_() + self.layer_v_cache.zero_() + # 重置 CUDA events + for event in self.buffer_compute_done_events: + event.record() +``` + +--- + +## 5. 待验证假设 + +| 假设 | 验证方法 | 优先级 | +|------|----------|--------| +| decode_buffer 残留导致污染 | 在第二个请求开始时检查 buffer 是否为零 | 高 | +| _decode_start_pos 字典残留 | 打印 deallocate 前后的字典内容 | 高 | +| id(seq) 重用导致错误 | 打印每个请求的 seq id | 中 | +| ring buffer 残留 | 检查每次 decode 前 ring buffer 内容 | 低 | + +--- + +## 6. 参考代码位置 + +| 功能 | 文件 | 行号 | +|------|------|------| +| OffloadEngine 初始化 | offload_engine.py | 40-145 | +| deallocate | hybrid_manager.py | 218-244 | +| clear_decode_tracking | hybrid_manager.py | 538-549 | +| get_decode_start_pos | hybrid_manager.py | 485-505 | +| run_layerwise_offload_decode | model_runner.py | 867-1057 | +| decode buffer 写入 | model_runner.py | 1010-1013 | +| decode buffer 读取 | model_runner.py | 969-976 | diff --git a/nanovllm/engine/model_runner.py b/nanovllm/engine/model_runner.py index 1d2e8f3..3ae1a57 100644 --- a/nanovllm/engine/model_runner.py +++ b/nanovllm/engine/model_runner.py @@ -851,12 +851,33 @@ class ModelRunner: # Step 4: Compute logits for last token logits = self.model.compute_logits(hidden_states[-1:]) + # DEBUG: Check hidden_states and logits at end of prefill + hs_last = hidden_states[-1, :4].tolist() + top5_logits, top5_indices = torch.topk(logits[0], 5) + logger.debug( + f"[DEBUG] PREFILL END: hidden_states[-1, :4]={hs_last}, " + f"top5_tokens={top5_indices.tolist()}, top5_logits={top5_logits.tolist()}" + ) + # Note: Using sync offload, no wait needed # Mark all blocks as prefilled for logical_id in logical_ids: self.kvcache_manager.prefilled_blocks.add(logical_id) + # DEBUG: Verify CPU cache content after prefill + first_cpu_block = cpu_block_ids[0] + last_cpu_block = cpu_block_ids[-1] + last_block_valid = total_tokens % self.block_size or self.block_size + k_first = offload_engine.k_cache_cpu[0, first_cpu_block, 0, 0, :4].tolist() + k_last = offload_engine.k_cache_cpu[0, last_cpu_block, 0, 0, :4].tolist() + logger.debug( + f"[DEBUG] AFTER PREFILL: first_cpu_block={first_cpu_block}, last_cpu_block={last_cpu_block}, " + f"last_block_valid={last_block_valid}, " + f"k_cache_cpu[0, {first_cpu_block}, 0, 0, :4]={k_first}, " + f"k_cache_cpu[0, {last_cpu_block}, 0, 0, :4]={k_last}" + ) + # Step 5: Sample temperatures = self.prepare_sample(seqs) if self.rank == 0 else None token_ids = self.sampler(logits, temperatures).tolist() if self.rank == 0 else None @@ -926,6 +947,24 @@ class ModelRunner: # New token will be stored at this position context_len = total_prefill_tokens + num_prev_decode_tokens + # DEBUG: Log key values for first decode step + if num_prev_decode_tokens == 0: + first_cpu_block = cpu_block_table[0] if cpu_block_table else -1 + last_cpu_block = cpu_block_table[-1] if cpu_block_table else -1 + k_first = offload_engine.k_cache_cpu[0, first_cpu_block, 0, 0, :4].tolist() if first_cpu_block >= 0 else [] + k_last = offload_engine.k_cache_cpu[0, last_cpu_block, 0, 0, :4].tolist() if last_cpu_block >= 0 else [] + logger.debug( + f"[DEBUG] FIRST DECODE STEP: len(seq)={len(seq)}, " + f"total_prefill_tokens={total_prefill_tokens}, " + f"num_prefill_blocks={num_prefill_blocks}, " + f"valid_tokens_per_block[-1]={valid_tokens_per_block[-1] if valid_tokens_per_block else 'N/A'}, " + f"pos_in_block={pos_in_block}, decode_start_pos={decode_start_pos}, " + f"context_len={context_len}, " + f"first_cpu_block={first_cpu_block}, last_cpu_block={last_cpu_block}, " + f"k_cache_cpu[0, {first_cpu_block}, 0, ...]={k_first}, " + f"k_cache_cpu[0, {last_cpu_block}, 0, ...]={k_last}" + ) + # Context setup for Attention.forward() - contiguous mode (no block tables) if use_cuda_graph: graph_vars["slot_mapping"][0] = context_len @@ -943,15 +982,40 @@ class ModelRunner: i, i, cpu_block_table, valid_tokens_per_block ) + # DEBUG: Check ring buffer content after preload (first decode step only) + if num_prev_decode_tokens == 0: + # Wait for all load streams to complete + torch.cuda.synchronize() + ring_k_0 = offload_engine.layer_k_cache[0, 0, 0, :4].tolist() + # Check the actual last valid position based on valid_tokens_per_block + sum_valid = sum(valid_tokens_per_block) + ring_k_last_valid = offload_engine.layer_k_cache[0, sum_valid - 1, 0, :4].tolist() + logger.debug( + f"[DEBUG] AFTER PRELOAD L0: sum_valid={sum_valid}, " + f"ring_k[0, 0, 0, :4]={ring_k_0}, " + f"ring_k[0, {sum_valid-1}, 0, :4]={ring_k_last_valid}" + ) + # Step 1: Embedding (on compute stream) with torch.cuda.stream(compute_stream): + # DEBUG: Log input token for first decode step + if num_prev_decode_tokens == 0: + embed_weight_sample = self.model.model.embed_tokens.weight[input_ids[0], :4].tolist() + logger.debug(f"[DEBUG] EMBEDDING INPUT: input_ids={input_ids.tolist()}, positions={positions.tolist()}, weight[{input_ids[0]},:4]={embed_weight_sample}") + if use_cuda_graph: # Copy embedding output to graph's hidden_states embedded = self.model.model.embed_tokens(input_ids) + # DEBUG: Log embedding output for first decode step + if num_prev_decode_tokens == 0: + logger.debug(f"[DEBUG] EMBEDDING OUTPUT: embedded[0, :4]={embedded[0, :4].tolist()}") graph_vars["hidden_states"].copy_(embedded) graph_vars["residual"].zero_() # Reset residual for first layer else: hidden_states = self.model.model.embed_tokens(input_ids) + # DEBUG: Log embedding output for first decode step + if num_prev_decode_tokens == 0: + logger.debug(f"[DEBUG] EMBEDDING OUTPUT: hidden_states[0, :4]={hidden_states[0, :4].tolist()}") residual = None # Phase 2: Layer-by-layer processing with ring buffer pipeline @@ -963,6 +1027,14 @@ class ModelRunner: # 2a. Wait for current buffer's load to complete offload_engine.wait_buffer_load(current_buffer) + # DEBUG: Layer outputs (first decode step, layer 0 and last layer) + if num_prev_decode_tokens == 0 and (layer_id == 0 or layer_id == num_layers - 1): + if not use_cuda_graph: + hs_pre = hidden_states[0, :4].tolist() + else: + hs_pre = graph_vars["hidden_states"][0, :4].tolist() + logger.debug(f"[DEBUG] L{layer_id} BEFORE: hidden_states[0, :4]={hs_pre}") + # 2b. Copy previous decode KV from decode buffer to ring buffer # Ring buffer already has prefill KV at [0:total_prefill_tokens] # We need to add decode KV at [total_prefill_tokens:] @@ -1005,6 +1077,14 @@ class ModelRunner: # - Compute attention via flash_attn_with_kvcache hidden_states, residual = layer(positions, hidden_states, residual) + # DEBUG: Layer outputs (first decode step, layer 0 and last layer) + if num_prev_decode_tokens == 0 and (layer_id == 0 or layer_id == num_layers - 1): + if not use_cuda_graph: + hs_post = hidden_states[0, :4].tolist() + else: + hs_post = graph_vars["layer_outputs"][0, :4].tolist() + logger.debug(f"[DEBUG] L{layer_id} AFTER: hidden_states[0, :4]={hs_post}") + # 2f. Copy new token's KV from ring buffer to decode buffer (for persistence) # The new token was stored at position context_len in ring buffer ring_k = offload_engine.layer_k_cache[current_buffer] @@ -1054,6 +1134,16 @@ class ModelRunner: temperatures = self.prepare_sample(seqs) if self.rank == 0 else None token_ids = self.sampler(logits, temperatures).tolist() if self.rank == 0 else None + # DEBUG: Log first decode token + if num_prev_decode_tokens == 0 and token_ids: + # Get top-5 logits for debugging + top_logits, top_indices = torch.topk(logits[0], 5) + logger.debug( + f"[DEBUG] FIRST DECODE TOKEN: token_id={token_ids[0]}, " + f"top5_indices={top_indices.tolist()}, " + f"top5_logits={top_logits.tolist()}" + ) + return token_ids @torch.inference_mode() diff --git a/nanovllm/kvcache/hybrid_manager.py b/nanovllm/kvcache/hybrid_manager.py index 1974a92..faf9276 100644 --- a/nanovllm/kvcache/hybrid_manager.py +++ b/nanovllm/kvcache/hybrid_manager.py @@ -244,6 +244,13 @@ class HybridKVCacheManager(KVCacheManager): seq.num_cached_tokens = 0 seq.block_table.clear() + # Clear decode tracking to prevent state pollution between requests + self.clear_decode_tracking(seq) + + # Clear offload engine state (decode buffer, events) + if self.offload_engine is not None: + self.offload_engine.on_sequence_finished() + def can_append(self, seq: Sequence) -> bool: """Check if we can append a token.""" need_new_block = (len(seq) % self._block_size == 1) @@ -342,10 +349,12 @@ class HybridKVCacheManager(KVCacheManager): block = self.logical_blocks[logical_id] if block.location == BlockLocation.CPU: cpu_blocks.append(block.cpu_block_id) - # logger.debug( - # f"get_prefilled_cpu_blocks: prefilled_blocks={list(self.prefilled_blocks)}, " - # f"returned cpu_blocks={cpu_blocks}" - # ) + # DEBUG: Log on first decode call + logger.debug( + f"[DEBUG] get_prefilled_cpu_blocks: block_table={list(seq.block_table)}, " + f"prefilled_blocks={list(self.prefilled_blocks)}, " + f"returned cpu_blocks={cpu_blocks}" + ) return cpu_blocks # ========== CPU Block Allocation ========== @@ -383,6 +392,10 @@ class HybridKVCacheManager(KVCacheManager): self.cpu_block_to_logical[cpu_block_id] = logical_id seq.block_table.append(logical_id) + # DEBUG: Log allocated CPU blocks + cpu_blocks = [self.logical_blocks[lid].cpu_block_id for lid in seq.block_table] + logger.debug(f"[DEBUG] allocate_cpu_only: allocated cpu_blocks={cpu_blocks}") + # NOTE: Prefix cache disabled in offload mode # If enabled, would compute hash and update: # h = self.compute_hash(seq.block(i), prefix_hash) @@ -430,6 +443,8 @@ class HybridKVCacheManager(KVCacheManager): if block.location == BlockLocation.CPU: cpu_block_ids.append(block.cpu_block_id) logical_ids.append(logical_id) + # DEBUG: Log during prefill + logger.debug(f"[DEBUG] get_all_cpu_blocks: returned cpu_block_ids={cpu_block_ids}") return cpu_block_ids, logical_ids def allocate_next_cpu_block(self, seq: Sequence) -> int: @@ -502,6 +517,12 @@ class HybridKVCacheManager(KVCacheManager): # Decode starts at the next position prefill_len = len(seq) - 1 # Current len includes the new decode token self._decode_start_pos[seq_id] = prefill_len % self._block_size + # DEBUG: Log first access + logger.debug( + f"[DEBUG] get_decode_start_pos FIRST ACCESS: seq_id={seq_id}, " + f"len(seq)={len(seq)}, prefill_len={prefill_len}, " + f"stored decode_start_pos={self._decode_start_pos[seq_id]}" + ) return self._decode_start_pos[seq_id] def reset_decode_start_pos(self, seq: Sequence) -> None: @@ -534,6 +555,11 @@ class HybridKVCacheManager(KVCacheManager): # First decode step - store the prefill length # len(seq) - 1 because current len includes the first decode token self._prefill_len[seq_id] = len(seq) - 1 + # DEBUG: Log first access + logger.debug( + f"[DEBUG] get_prefill_len FIRST ACCESS: seq_id={seq_id}, " + f"len(seq)={len(seq)}, stored prefill_len={self._prefill_len[seq_id]}" + ) return self._prefill_len[seq_id] def clear_decode_tracking(self, seq: Sequence) -> None: @@ -546,6 +572,15 @@ class HybridKVCacheManager(KVCacheManager): seq: Sequence """ seq_id = id(seq) + # DEBUG: Log clearing and CPU blocks + cpu_blocks = [self.logical_blocks[lid].cpu_block_id for lid in seq.block_table + if self.logical_blocks[lid].location == BlockLocation.CPU] + logger.debug( + f"[DEBUG] clear_decode_tracking: seq_id={seq_id}, " + f"clearing decode_start_pos={self._decode_start_pos.get(seq_id, 'N/A')}, " + f"prefill_len={self._prefill_len.get(seq_id, 'N/A')}, " + f"cpu_blocks={cpu_blocks}" + ) self._decode_start_pos.pop(seq_id, None) self._prefill_len.pop(seq_id, None) diff --git a/nanovllm/kvcache/offload_engine.py b/nanovllm/kvcache/offload_engine.py index b5b53f3..857dbea 100644 --- a/nanovllm/kvcache/offload_engine.py +++ b/nanovllm/kvcache/offload_engine.py @@ -179,6 +179,24 @@ class OffloadEngine: f")" ) + # ========== State Reset ========== + + def on_sequence_finished(self): + """ + Clear state after sequence completion to prevent pollution between requests. + + Called by HybridKVCacheManager.deallocate() when a sequence finishes. + """ + # Clear decode buffer to prevent residual KV from affecting next request + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() + + # Re-record buffer_compute_done_events to mark all buffers as available + for event in self.buffer_compute_done_events: + event.record() + + logger.debug("OffloadEngine: state cleared for next sequence") + # ========== Prefill: Async D2H Offload API ========== def offload_layer_kv_async( diff --git a/progress.md b/progress.md index 8dcc9a1..3a02c54 100644 --- a/progress.md +++ b/progress.md @@ -1,89 +1,155 @@ -# Progress Log: Fix Torch Distributed Port Conflict - -## Status: COMPLETED & CLEANED UP +# Progress Log: nanovllm 多请求状态污染问题 ## Session: 2026-01-12 -### Task Overview -修复在同一 Python 进程中顺序创建多个 LLM 实例时的 EADDRINUSE 端口冲突问题,以及支持多卡环境下同时启动多个独立进程。 +### 资源分配 + +| 资源 | 分配 | +|------|------| +| **GPU** | **1** (严格限制,不可更改) | + +### 任务目标 +研究 nanovllm CPU offload 模式下多请求之间状态影响导致准确率下降的问题。 --- -### Phase Status +### 10:00 - 启动分析 -| Phase | Description | Status | -|-------|-------------|--------| -| Phase 1 | ModelRunner 动态端口分配 | COMPLETED | -| Phase 2 | LLMEngine close() 和 context manager | COMPLETED | -| Phase 3 | 测试验证(GPU 4,5) | COMPLETED | -| Phase 4 | 更新文档 | COMPLETED | +**完成**: +- [x] 读取 `docs/offload_accuracy_issue.md` 了解问题背景 +- [x] 激活 Serena MCP 项目 +- [x] 获取关键组件符号概览 + +**关键文件已分析**: +- `nanovllm/kvcache/offload_engine.py` - OffloadEngine 类 +- `nanovllm/kvcache/hybrid_manager.py` - HybridKVCacheManager 类 +- `nanovllm/engine/model_runner.py` - ModelRunner 类 +- `nanovllm/engine/llm_engine.py` - LLMEngine 类 +- `nanovllm/engine/scheduler.py` - Scheduler 类 --- -### Implementation Summary +### 10:15 - 深入代码分析 -#### Phase 1: Dynamic Port Allocation -**File**: `nanovllm/engine/model_runner.py` -- Added `_find_free_port()` function using socket binding -- Modified port selection logic: use env var if set, otherwise auto-assign -- Added logging for auto-assigned ports +**分析的方法**: -#### Phase 2: Resource Cleanup Enhancement -**File**: `nanovllm/engine/llm_engine.py` -- Added `_closed` flag for idempotent cleanup -- Added `close()` method for explicit resource release -- Added `__del__()` for GC fallback -- Added `__enter__()` and `__exit__()` for context manager support -- Modified atexit registration to use `_atexit_handler` +| 方法 | 文件 | 发现 | +|------|------|------| +| `OffloadEngine.__init__` | offload_engine.py:40-145 | 初始化所有 buffer,无 reset 方法 | +| `deallocate` | hybrid_manager.py:218-244 | 只清理逻辑块,不清理 OffloadEngine | +| `clear_decode_tracking` | hybrid_manager.py:538-549 | 清理 tracking 字典,但未被调用 | +| `run_layerwise_offload_decode` | model_runner.py:867-1057 | 包含 decode buffer 读写逻辑 | +| `generate` | llm_engine.py:114-151 | 请求循环逻辑 | +| `postprocess` | scheduler.py:93-99 | 调用 deallocate | -#### Phase 3: Testing (GPU 4,5) -**File**: `tests/test_port_conflict.py` -- Created comprehensive test script +**关键发现 #1**: OffloadEngine 没有 reset() 方法 -**Test Results**: -| Test | Status | Notes | -|------|--------|-------| -| Sequential creation (3 instances) | PASSED | Ports: 50405, 47835, 53011 | -| Context manager | PASSED | Auto-cleanup works | -| Parallel processes (GPU 4,5) | PASSED | Ports: 34631, 56097 | +**关键发现 #2**: deallocate() 没有调用 clear_decode_tracking() -#### Phase 4: Documentation -**File**: `docs/torch_distributed_port_issue.md` -- Updated status to RESOLVED -- Documented solution details -- Added usage examples +**关键发现 #3**: decode_buffer 在请求间不清理,可能导致状态污染 --- -### Files Modified +### 10:30 - 根因定位 -| File | Action | Description | -|------|--------|-------------| -| `nanovllm/engine/model_runner.py` | Modified | Added `_find_free_port()`, dynamic port logic | -| `nanovllm/engine/llm_engine.py` | Modified | Added `close()`, `__del__`, context manager | -| `tests/test_port_conflict.py` | Created | Test script for port conflict fix | -| `docs/torch_distributed_port_issue.md` | Deleted | Issue resolved, doc removed | -| `CLAUDE.md` | Modified | Removed port conflict warnings, updated doc index | +**确认的问题**: + +1. **decode buffer 残留** + - 位置: `offload_engine.decode_k_buffer`, `decode_v_buffer` + - 写入: `model_runner.py:1010-1013` + - 读取: `model_runner.py:969-976` + - 问题: 旧请求的 KV 数据可能被新请求读取 + +2. **tracking 字典未清理** + - 位置: `hybrid_manager._decode_start_pos`, `_prefill_len` + - 问题: 使用 `id(seq)` 作为 key,可能重用 + +3. **缺失的清理调用** + - `clear_decode_tracking()` 在 `deallocate()` 中未被调用 --- -### Key Features After Fix +### 10:45 - 创建规划文件 -1. **Multi-GPU Parallel Testing** - ```bash - CUDA_VISIBLE_DEVICES=0 python test1.py & - CUDA_VISIBLE_DEVICES=1 python test2.py & - # Both run with different auto-assigned ports - ``` +**创建的文件**: +- [x] `task_plan.md` - 完整的任务规划和阶段 +- [x] `findings.md` - 详细的代码分析发现 +- [x] `progress.md` - 本文件 -2. **Sequential LLM Creation** - ```python - for i in range(3): - with LLM(model_path) as llm: - outputs = llm.generate(prompts, params) - # Automatically cleaned up - ``` +--- -3. **Backward Compatible** - - `NANOVLLM_DIST_PORT` env var still works - - `llm.exit()` still works (alias for `close()`) +### 11:00 - Sequential Thinking 深入分析 + +**使用 sequential thinking 验证分析结果**: +- 确认 deallocate() 确实没有调用 clear_decode_tracking() +- 分析 _decode_start_pos 和 _prefill_len 字典的生命周期 +- 确定 id(seq) 重用是问题的触发条件 + +--- + +### 11:15 - 完成规划文件 + +**更新的文件**: +- [x] `task_plan.md` - 添加完整的 debug 方案和实施计划 +- [x] `findings.md` - 详细的代码分析和修复方向 +- [x] `progress.md` - 更新到当前进度 + +--- + +## 下一步 (待用户确认) + +**执行顺序**: + +1. **实施修复** - 修改 `deallocate()` 添加 `clear_decode_tracking(seq)` +2. **快速验证** - 20 样本连续执行(一次调用,不重启框架)→ 目标 20/20 +3. **完整验证** - 100 样本 → 目标 100/100 (最终验收) +4. **防御性修复** (可选) - 添加 `OffloadEngine.on_sequence_finished()` + +**核心修改** (一行代码): +```python +# hybrid_manager.py:deallocate() 末尾添加 +self.clear_decode_tracking(seq) +``` + +**验收标准**: +| 测试 | 样本数 | 通过要求 | +|------|--------|----------| +| 快速验证 | 20 | 20/20 (100%) | +| 完整验证 | 100 | 100/100 (100%) | + +--- + +## 错误记录 + +| 时间 | 错误 | 解决方案 | +|------|------|----------| +| 10:05 | Serena MCP 未激活 | 调用 activate_project | + +--- + +## 文件修改记录 + +| 文件 | 操作 | 状态 | +|------|------|------| +| task_plan.md | 创建+更新 | 完成 | +| findings.md | 创建 | 完成 | +| progress.md | 创建+更新 | 完成 | + +--- + +## 分析结论 + +**重要澄清**: nanovllm offload 模式**不支持 batch**,只能单个 request 顺序执行。问题出在**请求切换**时状态清理不完整。 + +**根本原因已确认**: `deallocate()` 没有调用 `clear_decode_tracking()`,导致 `_decode_start_pos` 和 `_prefill_len` 字典残留,当 Python 对象 ID 重用时,新请求会错误地使用旧请求的配置。 + +**修复方案已设计**: 在 `deallocate()` 末尾添加 `self.clear_decode_tracking(seq)` 调用。 + +--- + +## 关键理解 + +问题不是 "batch 处理",而是: +``` +Request A 完成 → deallocate(A) [状态未完全清理] → Request B 开始 → B 读到 A 的残留状态 +``` diff --git a/task_plan.md b/task_plan.md index ab41979..3c43c2d 100644 --- a/task_plan.md +++ b/task_plan.md @@ -1,230 +1,359 @@ -# Task Plan: Fix Torch Distributed Port Conflict +# Task Plan: nanovllm CPU Offload 多请求状态污染问题 -## Goal -支持多卡环境下同时启动多个独立的 nanovllm 进程进行测试,无需手动管理端口。 +## 问题概述 -## Problem Analysis +**重要说明**: nanovllm offload 模式目前**不支持 batch**,只能单个 request 顺序执行。问题出在**请求切换**时的状态清理。 -### 核心问题 -``` -当前:所有 nanovllm 实例默认使用端口 2333 - └── 多个独立进程同时运行时会冲突! +| 模式 | 测试方式 | 准确率 | +|------|----------|--------| +| CPU Offload | 独立进程 (每请求一个进程) | **100%** | +| CPU Offload | 同进程顺序多请求 | 66% | +| Non-Offload | 同进程顺序多请求 | 100% | -CUDA_VISIBLE_DEVICES=0 python test1.py # 绑定端口 2333 ✓ -CUDA_VISIBLE_DEVICES=1 python test2.py # 尝试绑定 2333 → EADDRINUSE ❌ -``` - -### 根本原因 -- 端口是系统级资源,与 GPU 无关 -- 即使使用不同 GPU,端口仍会冲突 -- 当前硬编码默认端口 `2333` +**结论**: 单请求推理正确,问题在于**请求切换**时状态清理不完整。 --- -## Solution: Dynamic Port Allocation +## Phase 1: 代码分析 (complete) -### 核心方案 -```python -def _find_free_port() -> int: - """让系统自动分配一个空闲端口""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - return s.getsockname()[1] +### 1.1 识别状态管理组件 + +**已分析的关键组件**: + +| 组件 | 文件 | 状态数据 | +|------|------|----------| +| `OffloadEngine` | `nanovllm/kvcache/offload_engine.py` | ring buffer, decode buffer, CUDA events | +| `HybridKVCacheManager` | `nanovllm/kvcache/hybrid_manager.py` | logical blocks, prefilled_blocks, _decode_start_pos, _prefill_len | +| `LLMEngine` | `nanovllm/engine/llm_engine.py` | generate() 循环,请求生命周期 | +| `Scheduler` | `nanovllm/engine/scheduler.py` | postprocess() 调用 deallocate() | + +### 1.2 请求生命周期分析 -# 优先使用环境变量,否则自动分配 -port = os.environ.get("NANOVLLM_DIST_PORT") -if port is None: - port = _find_free_port() -else: - port = int(port) ``` - -### 效果 -```bash -# 无需手动指定端口,可以同时运行多个测试 -CUDA_VISIBLE_DEVICES=0 python test1.py & # 自动端口 54321 -CUDA_VISIBLE_DEVICES=1 python test2.py & # 自动端口 54322 -CUDA_VISIBLE_DEVICES=2 python test3.py & # 自动端口 54323 - -# 仍然支持手动指定(向后兼容) -NANOVLLM_DIST_PORT=2333 python test.py +generate() + → 多个请求添加到 scheduler + → while not finished: + → schedule() 获取下一批 seqs + → model_runner.run() 执行推理 + → postprocess() 处理完成的请求 + → 如果完成: kvcache_manager.deallocate(seq) ``` --- -## Implementation Phases +## Phase 2: 根本原因分析 (complete) -### Phase 1: ModelRunner 动态端口 [pending] -**File**: `nanovllm/engine/model_runner.py` +### 2.1 核心问题: OffloadEngine 缺少 reset() 方法 + +**关键发现**: `OffloadEngine` 没有任何重置/清理方法! + +当请求完成时,`HybridKVCacheManager.deallocate()` 被调用,但它只清理: +- 逻辑块状态 (`block.reset()`) +- 物理块引用 (`free_cpu_blocks`, `cpu_block_to_logical`) +- prefilled_blocks 集合 +- _decode_start_pos / _prefill_len 字典 + +**未被清理的状态** (存在于 OffloadEngine): + +| 状态 | Shape | 问题 | +|------|-------|------| +| `layer_k_cache` | [num_buffers, max_seq_len, kv_heads, head_dim] | 包含旧请求的 KV | +| `layer_v_cache` | [num_buffers, max_seq_len, kv_heads, head_dim] | 包含旧请求的 KV | +| `decode_k_buffer` | [num_layers, block_size, kv_heads, head_dim] | 包含旧请求的 decode KV | +| `decode_v_buffer` | [num_layers, block_size, kv_heads, head_dim] | 包含旧请求的 decode KV | + +### 2.2 具体污染场景 + +在 `run_layerwise_offload_decode()` (model_runner.py:867-1057): ```python -import socket - -def _find_free_port() -> int: - """Find a free port for distributed communication.""" - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - return s.getsockname()[1] - -class ModelRunner: - def __init__(self, config: Config, rank: int, event: Event | list[Event]): - # ... existing code ... - - import os - port = os.environ.get("NANOVLLM_DIST_PORT") - if port is None: - port = _find_free_port() - logger.info(f"Auto-assigned distributed port: {port}") - else: - port = int(port) - - dist.init_process_group("nccl", f"tcp://localhost:{port}", ...) +# 第 969-976 行: 读取之前的 decode KV +if num_prev_decode_tokens > 0: + k_decode_prev, v_decode_prev = offload_engine.get_decode_kv( + layer_id, decode_start_pos, pos_in_block + ) + ring_k[...].copy_(k_decode_prev) # 可能读取旧请求的数据! ``` -### Phase 2: LLMEngine 资源清理增强 [pending] -**File**: `nanovllm/engine/llm_engine.py` +**场景**: +1. 请求 A (32K tokens) 完成,decode_buffer 保留其 KV 数据 +2. 请求 B 开始,其 `decode_start_pos` 可能非零(如果继承了旧状态) +3. 请求 B 在第一个 decode step 时错误地读取了请求 A 的 decode buffer 数据 -添加 `close()` 方法和 context manager 支持,确保资源正确释放: +### 2.3 潜在问题点 +1. **decode_start_pos 计算错误**: + - `get_decode_start_pos()` 使用 `id(seq)` 作为 key + - Python 对象 ID 可能在请求之间重用 + - 如果新 seq 对象的 ID 与旧 seq 相同,可能错误继承旧的 start_pos + +2. **decode buffer 残留数据**: + - 如果 `pos_in_block` 在新请求中与旧请求重叠 + - `get_decode_kv()` 会返回旧请求的数据 + +3. **ring buffer 残留数据**: + - 虽然每次 decode 会从 CPU 加载,但 decode buffer 的数据会被复制过来 + - 如果 decode buffer 有残留,会污染 ring buffer + +--- + +## Phase 3: Debug 方案设计 (complete) + +### 3.1 确认的根本原因 + +通过代码分析,确认了两个根本原因: + +**根本原因 1 (主要)**: `deallocate()` 不调用 `clear_decode_tracking()` +- 位置: `hybrid_manager.py:218-244` +- 影响: `_decode_start_pos` 和 `_prefill_len` 字典残留 +- 后果: 如果 `id(seq)` 重用,返回错误的 decode 配置 + +**根本原因 2 (次要)**: decode_buffer 不清理 +- 位置: `offload_engine.py` +- 影响: `decode_k_buffer/v_buffer` 保留旧 KV +- 后果: 可能被根本原因 1 触发读取 + +### 3.2 Debug 方案 A: 验证字典残留 (推荐先做) + +**目标**: 验证 `_decode_start_pos` 字典是否有残留 + +**诊断代码** (添加到 `hybrid_manager.py`): ```python -class LLMEngine: - def __init__(self, model, **kwargs): - # ... existing code ... - self._closed = False - atexit.register(self._atexit_handler) - - def _atexit_handler(self): - if not self._closed: - self.close() - - def close(self): - """Explicitly close the engine and release all resources.""" - if self._closed: - return - self._closed = True - try: - atexit.unregister(self._atexit_handler) - except Exception: - pass - self.model_runner.call("exit") - del self.model_runner - for p in self.ps: - p.join() - - def exit(self): - """Alias for close() - backward compatibility.""" - self.close() - - def __del__(self): - try: - self.close() - except Exception: - pass - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - return False +# 在 get_decode_start_pos() 开头添加 +def get_decode_start_pos(self, seq: Sequence) -> int: + seq_id = id(seq) + # DEBUG: 检查是否命中旧值 + if seq_id in self._decode_start_pos: + logger.warning(f"[DEBUG] get_decode_start_pos: CACHE HIT! seq_id={seq_id}, " + f"cached_value={self._decode_start_pos[seq_id]}, " + f"expected={(len(seq) - 1) % self._block_size}") + # ... 原有逻辑 ``` -### Phase 3: 测试验证 [pending] -**File**: `tests/test_multiple_processes.py` (新建) +**诊断代码** (添加到 `deallocate()` 末尾): +```python +def deallocate(self, seq: Sequence) -> None: + # ... 现有逻辑 ... + + # DEBUG: 打印未清理的状态 + seq_id = id(seq) + if seq_id in self._decode_start_pos: + logger.warning(f"[DEBUG] deallocate: _decode_start_pos NOT CLEARED! " + f"seq_id={seq_id}, value={self._decode_start_pos[seq_id]}") +``` + +### 3.3 Debug 方案 B: 最小复现测试 + +**文件**: `tests/test_multi_request_offload_debug.py` ```python -"""Test multiple independent nanovllm processes.""" -import subprocess -import sys -import time - -def test_parallel_processes(): - """Test running multiple nanovllm processes in parallel.""" - script = ''' -import sys -sys.path.insert(0, ".") -from nanovllm import LLM, SamplingParams +"""最小复现批量模式失败""" import os +import sys +sys.path.insert(0, os.getcwd()) -gpu = os.environ.get("CUDA_VISIBLE_DEVICES", "0") -print(f"[GPU {gpu}] Starting LLM") -llm = LLM("path/to/model", enable_cpu_offload=True) -outputs = llm.generate(["Hello"], SamplingParams(max_tokens=10)) -print(f"[GPU {gpu}] Output: {outputs[0]['text'][:50]}") -llm.close() -print(f"[GPU {gpu}] Done") -''' +from nanovllm import LLM +from nanovllm.sampling import SamplingParams - # Start 2 processes on different GPUs - procs = [] - for gpu in [0, 1]: - env = {"CUDA_VISIBLE_DEVICES": str(gpu)} - p = subprocess.Popen( - [sys.executable, "-c", script], - env={**os.environ, **env} - ) - procs.append(p) - time.sleep(1) # Stagger start slightly +# 使用 RULER NIAH 的两个样本 +PROMPTS = [ + # Sample 0 (通常成功) + "...", # 从 niah_single_1_32k.jsonl 加载 + # Sample 1 (通常失败) + "...", +] +EXPECTED = ["8930103", "4194548"] - # Wait for all - for p in procs: - assert p.wait() == 0, f"Process failed with code {p.returncode}" +def main(): + llm = LLM( + "~/models/Llama-3.1-8B-Instruct", + max_model_len=33792, + max_num_batched_tokens=33792, + enable_cpu_offload=True, + num_gpu_blocks=4, + kvcache_block_size=1024, + enforce_eager=True, + ) - print("PASSED: test_parallel_processes") + params = SamplingParams(temperature=0.1, max_tokens=50) + + # 连续处理两个请求 + for i, (prompt, expected) in enumerate(zip(PROMPTS, EXPECTED)): + print(f"\n{'='*60}") + print(f"Sample {i}: Expected = {expected}") + + # 打印关键状态 + kvm = llm.model_runner.kvcache_manager + print(f" _decode_start_pos 字典大小: {len(kvm._decode_start_pos)}") + print(f" _prefill_len 字典大小: {len(kvm._prefill_len)}") + + outputs = llm.generate([prompt], params, use_tqdm=False) + output_text = outputs[0]["text"] + + passed = expected in output_text + print(f" Output: {output_text[:100]}...") + print(f" Status: {'PASS' if passed else 'FAIL'}") if __name__ == "__main__": - test_parallel_processes() + main() ``` -### Phase 4: 文档更新 [pending] -**File**: `docs/torch_distributed_port_issue.md` +### 3.4 Debug 方案 C: 快速修复验证 -更新文档标记问题已通过动态端口分配解决。 +**目标**: 验证修复 `deallocate()` 是否解决问题 ---- - -## Usage After Fix - -### 场景 1: 多进程并行测试(主要场景) -```bash -# 无需任何额外配置,直接运行 -CUDA_VISIBLE_DEVICES=0 python test_group1.py & -CUDA_VISIBLE_DEVICES=1 python test_group2.py & -CUDA_VISIBLE_DEVICES=2 python test_group3.py & -wait -``` - -### 场景 2: 同一进程顺序创建(也支持) +**修改** (`hybrid_manager.py:218-244`): ```python -for i in range(3): - with LLM(model_path) as llm: - outputs = llm.generate(prompts, params) - # 自动清理,下一个可以使用新的随机端口 +def deallocate(self, seq: Sequence) -> None: + """Release all blocks for a sequence.""" + for logical_id in reversed(seq.block_table): + # ... 现有逻辑 ... + + seq.num_cached_tokens = 0 + seq.block_table.clear() + + # === 新增: 清理 decode tracking === + self.clear_decode_tracking(seq) ``` -### 场景 3: 手动指定端口(向后兼容) +**验证命令**: ```bash -NANOVLLM_DIST_PORT=2333 python test.py +CUDA_VISIBLE_DEVICES=0 PYTHONPATH=.:$PYTHONPATH python tests/test_ruler_niah.py \ + --model ~/models/Llama-3.1-8B-Instruct \ + --enable-offload \ + --sample-indices 0,1,2,3,4 \ + --verbose +``` + +### 3.5 Debug 方案 D: 添加 OffloadEngine 清理 (防御性) + +**目标**: 进一步隔离请求状态 + +**添加方法** (`offload_engine.py`): +```python +def on_sequence_finished(self): + """清理请求完成后的状态""" + # 清零 decode buffer (防止残留数据被读取) + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() + logger.debug("OffloadEngine: decode buffer cleared") +``` + +**调用点** (`hybrid_manager.py:deallocate` 末尾): +```python +# 清理 OffloadEngine 状态 +if self.offload_engine is not None: + self.offload_engine.on_sequence_finished() ``` --- -## Success Criteria +## Phase 4: 实施计划 (pending) -- [ ] 多个独立进程可以同时运行(不同 GPU) -- [ ] 无需手动指定端口 -- [ ] 向后兼容(环境变量仍有效) -- [ ] 同一进程顺序创建也能工作 -- [ ] 资源正确清理 +### 推荐执行顺序 + +1. **Step 4.1**: 实施修复 + - 修改 `hybrid_manager.py:deallocate()` 添加 `clear_decode_tracking(seq)` + +2. **Step 4.2**: 快速验证 (20 样本连续执行) + - **一次调用** `test_ruler_niah.py`,连续执行 20 个样本 + - **不重启框架**,验证请求切换是否正确 + - 目标: 20/20 全部通过 + +3. **Step 4.3**: 完整验证 (100 样本) + - 运行 100 个样本的 RULER NIAH 测试 + - 目标: 100/100 全部通过 (准确率从 66% → 100%) + +4. **Step 4.4**: 防御性修复 (可选) + - 添加 `OffloadEngine.on_sequence_finished()` 方法 + - 清零 decode buffer 作为额外保险 + +### 具体修改 + +**文件 1**: `nanovllm/kvcache/hybrid_manager.py` + +位置: `deallocate()` 方法末尾 (第 244 行后) + +```python +def deallocate(self, seq: Sequence) -> None: + """Release all blocks for a sequence.""" + for logical_id in reversed(seq.block_table): + # ... 现有逻辑 (218-242 行) ... + + seq.num_cached_tokens = 0 + seq.block_table.clear() + + # ============ 新增: 清理 decode tracking ============ + self.clear_decode_tracking(seq) +``` + +**文件 2** (可选): `nanovllm/kvcache/offload_engine.py` + +位置: 在类末尾添加新方法 + +```python +def on_sequence_finished(self): + """清理请求完成后的状态 (防御性清理)""" + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() +``` --- -## Files to Modify +## 关键文件清单 -| File | Action | Status | -|------|--------|--------| -| `nanovllm/engine/model_runner.py` | Add `_find_free_port()` | pending | -| `nanovllm/engine/llm_engine.py` | Add `close()`, context manager | pending | -| `tests/test_multiple_processes.py` | Create | pending | -| `docs/torch_distributed_port_issue.md` | Update | pending | +| 文件 | 相关行号 | 说明 | +|------|----------|------| +| `nanovllm/kvcache/hybrid_manager.py` | 218-244 | `deallocate()` - **需要修改** | +| `nanovllm/kvcache/hybrid_manager.py` | 538-549 | `clear_decode_tracking()` - 已存在 | +| `nanovllm/kvcache/hybrid_manager.py` | 485-505 | `get_decode_start_pos()` - 问题读取点 | +| `nanovllm/kvcache/hybrid_manager.py` | 519-537 | `get_prefill_len()` - 问题读取点 | +| `nanovllm/kvcache/offload_engine.py` | 40-145 | `__init__` - 状态初始化 | +| `nanovllm/kvcache/offload_engine.py` | (新增) | `on_sequence_finished()` - 可选防御 | +| `nanovllm/engine/model_runner.py` | 867-1057 | `run_layerwise_offload_decode()` | +| `nanovllm/engine/model_runner.py` | 969-976 | decode buffer 读取 (污染点) | + +--- + +## 验证命令 + +**指定 GPU: 1** (严格限制,不可更改) + +```bash +# 快速验证 (20 样本连续执行,不重启框架) +# 目标: 20/20 通过 +CUDA_VISIBLE_DEVICES=1 PYTHONPATH=.:$PYTHONPATH python tests/test_ruler_niah.py \ + --model ~/models/Llama-3.1-8B-Instruct \ + --enable-offload \ + --sample-indices 0-19 \ + --verbose + +# 完整验证 (100 样本) +# 目标: 100/100 通过 (最终验收) +CUDA_VISIBLE_DEVICES=1 PYTHONPATH=.:$PYTHONPATH python tests/test_ruler_niah.py \ + --model ~/models/Llama-3.1-8B-Instruct \ + --enable-offload \ + --quiet +``` + +**验收标准**: +| 测试 | 样本数 | 通过要求 | 说明 | +|------|--------|----------|------| +| 快速验证 | 20 | 20/20 (100%) | 一次调用,连续执行,验证请求切换 | +| 完整验证 | 100 | 100/100 (100%) | 最终验收 | + +--- + +## 当前状态 + +- [x] Phase 1: 代码分析 +- [x] Phase 2: 根本原因分析 +- [x] Phase 3: Debug 方案设计 +- [x] Phase 4: 实施计划 ✅ 100/100 PASSED + +### 验证结果 + +| 测试 | 结果 | 日期 | +|------|------|------| +| 20 样本快速验证 | ✅ 20/20 (100%) | 2026-01-13 | +| 100 样本完整验证 | ✅ 100/100 (100%) | 2026-01-13 |