diff --git a/nanovllm/engine/llm_engine.py b/nanovllm/engine/llm_engine.py index f31c185..e7c4858 100644 --- a/nanovllm/engine/llm_engine.py +++ b/nanovllm/engine/llm_engine.py @@ -49,7 +49,14 @@ class LLMEngine: self.scheduler.add(seq) def step(self): + import os + debug_enabled = os.environ.get('NANOVLLM_LOG_LEVEL', 'INFO').upper() == 'DEBUG' + seqs, is_prefill = self.scheduler.schedule() + if debug_enabled: + mode = "PREFILL" if is_prefill else "DECODE" + print(f"[DEBUG LLMEngine.step] Mode={mode}, active_sequences={len(seqs)}") + if not is_prefill: # The end of the prefill mode. Get TTFT. if Observer.ttft_start != 0: @@ -62,7 +69,11 @@ class LLMEngine: token_ids = self.model_runner.call("run", seqs, is_prefill) self.scheduler.postprocess(seqs, token_ids) outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished] - + + if debug_enabled and outputs: + for seq_id, tokens in outputs: + print(f"[DEBUG LLMEngine.step] Sequence {seq_id} finished, {len(tokens)} tokens generated") + #> Calculate number of tokens processed num_tokens = sum(len(seq) for seq in seqs) if is_prefill else -len(seqs) return outputs, num_tokens @@ -76,6 +87,10 @@ class LLMEngine: sampling_params: SamplingParams | list[SamplingParams], use_tqdm: bool = True, ) -> list[str]: + import os + log_level = os.environ.get('NANOVLLM_LOG_LEVEL', 'INFO') + debug_enabled = log_level.upper() == 'DEBUG' + Observer.complete_reset() if use_tqdm: pbar = tqdm(total=len(prompts), desc="Generating", dynamic_ncols=True) @@ -85,7 +100,24 @@ class LLMEngine: self.add_request(prompt, sp) outputs = {} prefill_throughput = decode_throughput = 0. + iteration = 0 + last_output_count = 0 + while not self.is_finished(): + if debug_enabled and iteration % 100 == 0: + print(f"[DEBUG LLMEngine] Iteration {iteration}, finished_sequences={len(outputs)}, total_prompts={len(prompts)}") + + # Timeout check (32K sample should finish within 20 minutes = 1200 seconds) + if iteration == 0: + import time + start_time = time.time() + elif debug_enabled and iteration % 100 == 0: + elapsed = time.time() - start_time + if elapsed > 1200: # 20 minutes + print(f"[WARNING] Test exceeded 20 minutes timeout! Iteration={iteration}, forcing exit.") + import sys + sys.exit(1) + t = perf_counter() output, num_tokens = self.step() if use_tqdm: diff --git a/nanovllm/kvcache/hybrid_manager.py b/nanovllm/kvcache/hybrid_manager.py index 61dd844..0fe5743 100644 --- a/nanovllm/kvcache/hybrid_manager.py +++ b/nanovllm/kvcache/hybrid_manager.py @@ -231,6 +231,11 @@ class HybridKVCacheManager(KVCacheManager): seq.num_cached_tokens = 0 seq.block_table.clear() + # Reset OffloadEngine state to prevent request-to-request contamination + # This clears all KV buffers and pending async events + if self.offload_engine is not None: + self.offload_engine.reset() + def can_append(self, seq: Sequence) -> bool: """Check if we can append a token.""" need_new_block = (len(seq) % self._block_size == 1) diff --git a/nanovllm/kvcache/offload_engine.py b/nanovllm/kvcache/offload_engine.py index ceeae44..b73e52f 100644 --- a/nanovllm/kvcache/offload_engine.py +++ b/nanovllm/kvcache/offload_engine.py @@ -278,6 +278,42 @@ class OffloadEngine: """ return self.k_cache_gpu, self.v_cache_gpu + def reset(self) -> None: + """ + Reset all KV cache buffers to zero. + + This clears all GPU and CPU-side KV cache storage, preventing + request-to-request contamination. Must be called between generate() + calls when reusing the same OffloadEngine instance. + + Clears: + - GPU ring buffer slots (k_cache_gpu, v_cache_gpu) + - Per-layer decode buffers (decode_k_buffer, decode_v_buffer) + - Cross-layer pipeline buffers (layer_k/v_buffer_a/b) + - Per-layer prefill buffers (prefill_k/v_buffer) + - All pending async transfer events + """ + # Clear GPU ring buffer slots + self.k_cache_gpu.zero_() + self.v_cache_gpu.zero_() + + # Clear per-layer decode buffers + self.decode_k_buffer.zero_() + self.decode_v_buffer.zero_() + + # Clear cross-layer pipeline buffers + self.layer_k_buffer_a.zero_() + self.layer_v_buffer_a.zero_() + self.layer_k_buffer_b.zero_() + self.layer_v_buffer_b.zero_() + + # Clear per-layer prefill buffers + self.prefill_k_buffer.zero_() + self.prefill_v_buffer.zero_() + + # Clear all pending async transfer events + self.pending_events.clear() + # ========== Memory info ========== def gpu_memory_bytes(self) -> int: diff --git a/tests/test_ruler.py b/tests/test_ruler.py index ec2a883..9c5382a 100644 --- a/tests/test_ruler.py +++ b/tests/test_ruler.py @@ -195,10 +195,10 @@ def run_task_test( }) if verbose: - status = "PASS" if passed else "FAIL" + status = "✓ PASS" if passed else "✗ FAIL" exp_preview = str(expected[0])[:30] if expected else "N/A" out_preview = output_text[:50].replace('\n', ' ') - print(f" [{idx}] {status} (score={score:.2f}) exp={exp_preview}... out={out_preview}...") + print(f" [{idx:3d}] {status} (score={score:.2f}) exp={exp_preview}... | out={out_preview}...") avg_score = total_score / len(samples) if samples else 0.0