[fix] fixed request to request error.
This commit is contained in:
@@ -49,7 +49,14 @@ class LLMEngine:
|
||||
self.scheduler.add(seq)
|
||||
|
||||
def step(self):
|
||||
import os
|
||||
debug_enabled = os.environ.get('NANOVLLM_LOG_LEVEL', 'INFO').upper() == 'DEBUG'
|
||||
|
||||
seqs, is_prefill = self.scheduler.schedule()
|
||||
if debug_enabled:
|
||||
mode = "PREFILL" if is_prefill else "DECODE"
|
||||
print(f"[DEBUG LLMEngine.step] Mode={mode}, active_sequences={len(seqs)}")
|
||||
|
||||
if not is_prefill:
|
||||
# The end of the prefill mode. Get TTFT.
|
||||
if Observer.ttft_start != 0:
|
||||
@@ -62,7 +69,11 @@ class LLMEngine:
|
||||
token_ids = self.model_runner.call("run", seqs, is_prefill)
|
||||
self.scheduler.postprocess(seqs, token_ids)
|
||||
outputs = [(seq.seq_id, seq.completion_token_ids) for seq in seqs if seq.is_finished]
|
||||
|
||||
|
||||
if debug_enabled and outputs:
|
||||
for seq_id, tokens in outputs:
|
||||
print(f"[DEBUG LLMEngine.step] Sequence {seq_id} finished, {len(tokens)} tokens generated")
|
||||
|
||||
#> Calculate number of tokens processed
|
||||
num_tokens = sum(len(seq) for seq in seqs) if is_prefill else -len(seqs)
|
||||
return outputs, num_tokens
|
||||
@@ -76,6 +87,10 @@ class LLMEngine:
|
||||
sampling_params: SamplingParams | list[SamplingParams],
|
||||
use_tqdm: bool = True,
|
||||
) -> list[str]:
|
||||
import os
|
||||
log_level = os.environ.get('NANOVLLM_LOG_LEVEL', 'INFO')
|
||||
debug_enabled = log_level.upper() == 'DEBUG'
|
||||
|
||||
Observer.complete_reset()
|
||||
if use_tqdm:
|
||||
pbar = tqdm(total=len(prompts), desc="Generating", dynamic_ncols=True)
|
||||
@@ -85,7 +100,24 @@ class LLMEngine:
|
||||
self.add_request(prompt, sp)
|
||||
outputs = {}
|
||||
prefill_throughput = decode_throughput = 0.
|
||||
iteration = 0
|
||||
last_output_count = 0
|
||||
|
||||
while not self.is_finished():
|
||||
if debug_enabled and iteration % 100 == 0:
|
||||
print(f"[DEBUG LLMEngine] Iteration {iteration}, finished_sequences={len(outputs)}, total_prompts={len(prompts)}")
|
||||
|
||||
# Timeout check (32K sample should finish within 20 minutes = 1200 seconds)
|
||||
if iteration == 0:
|
||||
import time
|
||||
start_time = time.time()
|
||||
elif debug_enabled and iteration % 100 == 0:
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed > 1200: # 20 minutes
|
||||
print(f"[WARNING] Test exceeded 20 minutes timeout! Iteration={iteration}, forcing exit.")
|
||||
import sys
|
||||
sys.exit(1)
|
||||
|
||||
t = perf_counter()
|
||||
output, num_tokens = self.step()
|
||||
if use_tqdm:
|
||||
|
||||
@@ -231,6 +231,11 @@ class HybridKVCacheManager(KVCacheManager):
|
||||
seq.num_cached_tokens = 0
|
||||
seq.block_table.clear()
|
||||
|
||||
# Reset OffloadEngine state to prevent request-to-request contamination
|
||||
# This clears all KV buffers and pending async events
|
||||
if self.offload_engine is not None:
|
||||
self.offload_engine.reset()
|
||||
|
||||
def can_append(self, seq: Sequence) -> bool:
|
||||
"""Check if we can append a token."""
|
||||
need_new_block = (len(seq) % self._block_size == 1)
|
||||
|
||||
@@ -278,6 +278,42 @@ class OffloadEngine:
|
||||
"""
|
||||
return self.k_cache_gpu, self.v_cache_gpu
|
||||
|
||||
def reset(self) -> None:
|
||||
"""
|
||||
Reset all KV cache buffers to zero.
|
||||
|
||||
This clears all GPU and CPU-side KV cache storage, preventing
|
||||
request-to-request contamination. Must be called between generate()
|
||||
calls when reusing the same OffloadEngine instance.
|
||||
|
||||
Clears:
|
||||
- GPU ring buffer slots (k_cache_gpu, v_cache_gpu)
|
||||
- Per-layer decode buffers (decode_k_buffer, decode_v_buffer)
|
||||
- Cross-layer pipeline buffers (layer_k/v_buffer_a/b)
|
||||
- Per-layer prefill buffers (prefill_k/v_buffer)
|
||||
- All pending async transfer events
|
||||
"""
|
||||
# Clear GPU ring buffer slots
|
||||
self.k_cache_gpu.zero_()
|
||||
self.v_cache_gpu.zero_()
|
||||
|
||||
# Clear per-layer decode buffers
|
||||
self.decode_k_buffer.zero_()
|
||||
self.decode_v_buffer.zero_()
|
||||
|
||||
# Clear cross-layer pipeline buffers
|
||||
self.layer_k_buffer_a.zero_()
|
||||
self.layer_v_buffer_a.zero_()
|
||||
self.layer_k_buffer_b.zero_()
|
||||
self.layer_v_buffer_b.zero_()
|
||||
|
||||
# Clear per-layer prefill buffers
|
||||
self.prefill_k_buffer.zero_()
|
||||
self.prefill_v_buffer.zero_()
|
||||
|
||||
# Clear all pending async transfer events
|
||||
self.pending_events.clear()
|
||||
|
||||
# ========== Memory info ==========
|
||||
|
||||
def gpu_memory_bytes(self) -> int:
|
||||
|
||||
@@ -195,10 +195,10 @@ def run_task_test(
|
||||
})
|
||||
|
||||
if verbose:
|
||||
status = "PASS" if passed else "FAIL"
|
||||
status = "✓ PASS" if passed else "✗ FAIL"
|
||||
exp_preview = str(expected[0])[:30] if expected else "N/A"
|
||||
out_preview = output_text[:50].replace('\n', ' ')
|
||||
print(f" [{idx}] {status} (score={score:.2f}) exp={exp_preview}... out={out_preview}...")
|
||||
print(f" [{idx:3d}] {status} (score={score:.2f}) exp={exp_preview}... | out={out_preview}...")
|
||||
|
||||
avg_score = total_score / len(samples) if samples else 0.0
|
||||
|
||||
|
||||
Reference in New Issue
Block a user